Files
rikako-note/spring/webflux/spring webflux.md
2025-03-14 01:23:59 +08:00

36 KiB
Raw Blame History

Spring Webflux

Concept

核心机制

reactive

reactive代表基于“事件响应”的编程模型

back pressure

在spring webflux中back pressure为反应式编程的核心机制,用于协调生产者和消费者之间的速率差异,令系统在高负载或资源受限的情况下仍能稳定运行。

  • 同步场景在同步场景下阻塞式调用是一种天然的back pressure形式调用方会阻塞并等待直到被调用方执行完成
  • 非阻塞场景:在非阻塞的代码中,需要关注事件速率,生产者产生事件的速率不能压过消费者消费的速率
reactive stream

reactive stream为一个小型规范定义了异步组件和back pressure交互的规范。reactive stream的主要用途是让Subscriber控制publisher产生数据的速率。

编程模型

spring-web module包含Spring webflux的响应式基础包括若夏内容

  • http抽象
  • 对支持server的reactive stream adapters
  • codecs
  • 核心WebHandler API其与servlet api兼容

Spring webflux提供了如下两种编程模型

  • Annotated Controllers: 和spring mvc一致基于相同的注解。spring mvcwebflux controllers都支持reactive return type,因此,很难区分spring mvcwebflux controllers
  • Functional Endpoint基于lambda的、轻量的函数编程模型。其可以被看做是一个支持对请求进行route和handle的工具集合。

并发模型

spring mvc

spring mvc通常为servlet应用其假设当前线程可能会被阻塞例如远程调用等会阻塞当前线程。为了减少处理请求时阻塞所带来的影响servlet容器会使用包含大量线程的线程池。

webflux

对于webflux通常为non-blocking server其会假设应用并不会阻塞故而非阻塞的server可以使用一个线程数较少且固定的线程池event loop workers来处理请求。

调用阻塞api

当想要在webflux中使用阻塞api时可以按如下方式进行使用。RxJavaReactive都支持publushOn操作,其支持在另一个线程中继续执行

Reactive Core

spring-web对于reactive web应用具有如下支持

  • 对于server request的处理拥有如下两种级别的支持
    • HttpHandler 基于non-blocking I/OReactive Stream back pressure,适配`Reactor Netty, Undertow, Tomcat, Jetty以及任一Servlet Container
    • WebHandler稍高级别的、通用的web api用于请求处理在此基础上构建基于annotated controllersfunctional endpoints的编程模型
  • 对于client端ClientHttpConnector用于发送http请求同时兼容非阻塞io和Reactive Stream back pressure
  • codecs用于客户端和服务端的序列化和反序列化

HttpHandler

HttpHandler中只通过一个单独的方法来处理请求和相应其对不同的http server api进行了最小化的抽象。

下表描述了支持的api

Server name Server API used Reactive Streams support

Netty

Netty API

Reactor Netty

Undertow

Undertow API

spring-web: Undertow to Reactive Streams bridge

Tomcat

Servlet non-blocking I/O; Tomcat API to read and write ByteBuffers vs byte[]

spring-web: Servlet non-blocking I/O to Reactive Streams bridge

Jetty

Servlet non-blocking I/O; Jetty API to write ByteBuffers vs byte[]

spring-web: Servlet non-blocking I/O to Reactive Streams bridge

Servlet container

Servlet non-blocking I/O

spring-web: Servlet non-blocking I/O to Reactive Streams bridge

下表中描述了server依赖

Server name Group id Artifact name

Reactor Netty

io.projectreactor.netty

reactor-netty

Undertow

io.undertow

undertow-core

Tomcat

org.apache.tomcat.embed

tomcat-embed-core

Jetty

org.eclipse.jetty

jetty-server, jetty-servlet

server api adapters

如下示例展示了如何使用HttpHandler Adapters来适配不同的Server Api

Reactor Netty
HttpHandler handler = ...
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create().host(host).port(port).handle(adapter).bindNow();
Undertow
HttpHandler handler = ...
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();
Tomcat
HttpHandler handler = ...
Servlet servlet = new TomcatHttpHandlerAdapter(handler);

Tomcat server = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootContext = server.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "main", servlet);
rootContext.addServletMappingDecoded("/", "main");
server.setHost(host);
server.setPort(port);
server.start();
Jetty
HttpHandler handler = ...
Servlet servlet = new JettyHttpHandlerAdapter(handler);

Server server = new Server();
ServletContextHandler contextHandler = new ServletContextHandler(server, "");
contextHandler.addServlet(new ServletHolder(servlet), "/");
contextHandler.start();

ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.addConnector(connector);
server.start();

WebHandler API

org.springframework.web.server package基于HttpHandler构建提供了通用的Web API。Web Api由多个WebException, 多个WebFilter, 一个WebHandler组件构成组成了一个chain。

相比于HttpHandler仅仅是对不同http server的抽象WebHandler提供了一个更加通用、更加广泛的功能集合:

  • user sessions attributes
  • request attributes
  • resolved Locale or Principal for request
  • abstractions for multipart data

bean types for WebHttpHandlerBuilder auto-detect

在spring上下文中WebHttpHandlerBuilder可以自动探测到如下类型的components:

Bean name Bean type Count Description

<any>

WebExceptionHandler

0..N

Provide handling for exceptions from the chain of WebFilter instances and the target WebHandler. For more details, see Exceptions.

<any>

WebFilter

0..N

Apply interception style logic to before and after the rest of the filter chain and the target WebHandler. For more details, see Filters.

webHandler

WebHandler

1

The handler for the request.

webSessionManager

WebSessionManager

0..1

The manager for WebSession instances exposed through a method on ServerWebExchange. DefaultWebSessionManager by default.

serverCodecConfigurer

ServerCodecConfigurer

0..1

For access to HttpMessageReader instances for parsing form data and multipart data that is then exposed through methods on ServerWebExchange. ServerCodecConfigurer.create() by default.

localeContextResolver

LocaleContextResolver

0..1

The resolver for LocaleContext exposed through a method on ServerWebExchange. AcceptHeaderLocaleContextResolver by default.

forwardedHeaderTransformer

ForwardedHeaderTransformer

0..1

For processing forwarded type headers, either by extracting and removing them or by removing them only. Not used by default.

Form Data

ServerWebExchange将会向外暴露如下方法用于访问form data

Mono<MultiValueMap<String, String>> getFormData();

DefaultServerWebExchange使用HttpMessageReader来对form dataapplication/x-www-form-urlencoded进行parse操作将formdata转化为MultiValueMap

Multipart Data

ServerWebExchange向外暴露如下方法用于访问multipart data。

Mono<MultiValueMap<String, Part>> getMultipartData();

DefaultServerWebExchange将会使用HttpMessageReader<MultiValueMap<String, Part>>来对multipart/form-datamultipart/mixedmultipart/related数据进行转换,数据将会被转化为MultiValueMap类型。

Filter

WebHandler API中,可以使用WebFilter来实现拦截式的逻辑,当使用Webflux Config时,WebFilter的注可以通过将其注册为bean来实现。

对于WebFilter的优先级,欸可以通过使用@Order注解或实现Ordered接口来实现。

UrlHandler

在编写web程序时可能希望controller endpoint既能够匹配末尾带/的url版本又能匹配末尾不带/的url版本。

例如,想要让@GetMapping("/home")既能够匹配GET /home,又能够匹配GET /home/

对此,可以使用UrlHandlerFilter进行处理,其支持如下两种配置:

  • 当接收到带有末尾带有/的url时向浏览器返回一个重定向状态让浏览器重新请求末尾不带/的url
  • 将请求当作不带末尾/,并对请求继续处理

实例化UrlHandlerFilter的示例如下

UrlHandlerFilter urlHandlerFilter = UrlHandlerFilter
		// will HTTP 308 redirect "/blog/my-blog-post/" -> "/blog/my-blog-post"
		.trailingSlashHandler("/blog/**").redirect(HttpStatus.PERMANENT_REDIRECT)
		// will mutate the request to "/admin/user/account/" and make it as "/admin/user/account"
		.trailingSlashHandler("/admin/**").mutateRequest()
		.build();

Exceptions

WebHandler Api中,可以使用WebExceptionHandler对异常进行处理。当使用Webflux Config时,注册WebExceptionHandler仅需将其声明为bean即可可以通过@Order注解或Ordered接口来指明顺序。

Logging

Log Id

在webflux中同一个请求可能在多个线程中被执行过故而在定位请求日志时无法通过线程id来关联请求。为了解决该问题spring webflux在打印消息时前缀了一个log id,其是针对单个特定请求的。

在server端log id存储在ServerWebExchange中的LOG_ID_ATTRIBUTE属性中。在client端log id存储在Client Request中的LOG_ID_ATTRIBUTE属性中。

Appenders

slf4jlog4j等日志库都提供异步logger用于避免阻塞。使用异步logger会存在部分缺点例如丢弃无法排队的异步消息。但是,其仍然是目前非阻塞框架的最佳选择。

DispatcherHandler

和spring MVC类似spring webflux同样按照font controller pattern进行设计,包含一个central WebHandler,即DispatcherHandler

Dispathcer提供了一个共享的请求处理算法将实际的处理逻辑委托给其他可配置的组件。

DispatcherHandler通过spring配置来发现委托的组件。DispatcherHandler其本身也是一个spring bean并且实现了ApplicationContextAware接口可以访问spring上下文。

如果Dispatcher的bean name为webHandler,那么其会被WebHttpHandlerBuilder发现,并且将其放入request-processing chain中。

webflux应用中包含的spring configuration通常包括

  • bean name为webHandlerDispatcherHandler
  • WebFilterWebExceptionHandler beans
  • 被委托给webHandler的beans
  • 其他

上述配置将被被WebHttpHandlerBuilder使用用于构建process chain示例如下所示

ApplicationContext context = ...
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();

上述示例中返回的handler可以和server adapter结合使用。

DispatcherHandler委托

DispatcherHandler会将请求的委托给特定的bean对象bean对象会处理请求并且将结果渲染到response中。

DispatcherHandler会对如下类型的bean进行auto-detect。

Bean type Explanation

HandlerMapping

Map a request to a handler. The mapping is based on some criteria, the details of which vary by HandlerMapping implementationannotated controllers, simple URL pattern mappings, and others.

The main HandlerMapping implementations are RequestMappingHandlerMapping for @RequestMapping annotated methods, RouterFunctionMapping for functional endpoint routes, and SimpleUrlHandlerMapping for explicit registrations of URI path patterns and WebHandler instances.

HandlerAdapter

Help the DispatcherHandler to invoke a handler mapped to a request regardless of how the handler is actually invoked. For example, invoking an annotated controller requires resolving annotations. The main purpose of a HandlerAdapter is to shield the DispatcherHandler from such details.

HandlerResultHandler

Process the result from the handler invocation and finalize the response. See Result Handling.

Processing

DispatcherHandler按照如下方式处理请求:

  • 首先,会挨个查询HandlerMapping用于查找匹配的handler会匹配第一个匹配的handler
  • 如果匹配到handler会通过对应的HandlerAdapter执行该handler并且会将handler返回的值作为HandlerResult暴露
  • HandlerResult将会被分配给指定的HandlerResultHandler进行处理可能会直接向response中写入数据或渲染view

Result Handling

通过HandlerAdapter对handler进行调用返回的结果会被包裹在HandlerResult中,随之还包含额外的上下文。

HandlerResult将会被传递给第一个支持其的上下文,下表中展示了HandlerResultHandler的实现:

Result Handler Type Return Values Default Order

ResponseEntityResultHandler

ResponseEntity, typically from @Controller instances.

0

ServerResponseResultHandler

ServerResponse, typically from functional endpoints.

0

ResponseBodyResultHandler

Handle return values from @ResponseBody methods or @RestController classes.

100

ViewResolutionResultHandler

CharSequence, View, Model, Map, Rendering, or any other Object is treated as a model attribute.

See also View Resolution.

Integer.MAX_VALUE

Annotated Controllers

spring webflux提供了基于注解的编程模型通过使用注解来向外暴露request mappings。通过注解标注的controllers可包含灵活的方法签名并且无需继承任何基类或实现指定接口。

使用示例如下所示:

@RestController
public class HelloController {

	@GetMapping("/hello")
	public String handle() {
		return "Hello WebFlux";
	}
}

@Controller

在基于注解的webflux变成模型中可以使用@Controller@RestController来声明controller bean。

AOP

对于部分场景需要在运行时使用aop proxy来对controller进行decorate。

对于针对controller的aop推荐使用class-based proxy,可以使用@EnableTransactionManagement(proxyTargetClass=true)来实现。

Functional Endpoint

Webflux.fn是一个轻量级的函数式编程模型方法用于对请求进行路由和处理。其和基于注解的编程模型一致都基于相同的reactive core。

Overview

Webflux.fnhttp请求被HandlerFunction处理,HandlerFunction接收ServerRequest并且返回一个delayed ServerResponseMono<ServerResponse>)。

Immutable

对于函数时编程模型,request对象和response对象都是不可变的

一旦reqeust或response对象被创建其内部状态都无法被直接修改任何试图对其“修改”的操作都会返回一个新的对象并且保证原有对象内容不变。

由于request和response对象是不可变的其天然为线程安全的。

Router Function

incoming requests将会被RouterFunction路由到handler functionRouterFunction接收一个ServerRequest并且返回一个delayed HandlerFunction

如果存在RouterFunction与request匹配时其将返回一个handler function如果没有任何RouterFunction与请求相匹配将返回一个空的Mono

RouterFunction的作用和@RequestMapping注解相同。

RouterFunctions.route()方法提供了一个便于创建router的builder示例如下

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> route = route() (1)
	.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson)
	.GET("/person", accept(APPLICATION_JSON), handler::listPeople)
	.POST("/person", handler::createPerson)
	.build();


public class PersonHandler {

	// ...

	public Mono<ServerResponse> listPeople(ServerRequest request) {
		// ...
	}

	public Mono<ServerResponse> createPerson(ServerRequest request) {
		// ...
	}

	public Mono<ServerResponse> getPerson(ServerRequest request) {
		// ...
	}
}

运行RouterFunction的其中一种方式是将其转化为HttpHandler并将其指定给对应的server adapter:

  • RouterFunctions.toHttpHandler(RouterFunction)
  • RouterFunctions.toHttpHandler(RouterFunction, HandlerStrategies)

HandlerFunction

ServerRequestServerResponse为不可变接口两者都对body stream提供Reactive Stream back pressure。

  • 其中request body通过Reactor FluxMono表示
  • response body可以通过任何Reactive Stream Publisher进行表示,包括FluxMono

Server Request

ServerRequest提供了对如下内容的访问

  • http method
  • uri
  • headers
  • query params
  • body

如下示例展示了如何将请求体转化为Mono/Flux类型:

Mono<String> string = request.bodyToMono(String.class);

Flux<Person> people = request.bodyToFlux(Person.class);

如下示例展示了如何访问formdata:

Mono<MultiValueMap<String, String>> map = request.formData();

如下示例展示了如何访问multipart data

Mono<MultiValueMap<String, Part>> map = request.multipartData();

访问multipart数据内容的示例如下所示:

Flux<PartEvent> allPartEvents = request.bodyToFlux(PartEvent.class);
allPartsEvents.windowUntil(PartEvent::isLast)
      .concatMap(p -> p.switchOnFirst((signal, partEvents) -> {
          if (signal.hasValue()) {
              PartEvent event = signal.get();
              if (event instanceof FormPartEvent formEvent) {
                  String value = formEvent.value();
                  // handle form field
              }
              else if (event instanceof FilePartEvent fileEvent) {
                  String filename = fileEvent.filename();
                  Flux<DataBuffer> contents = partEvents.map(PartEvent::content);
                  // handle file upload
              }
              else {
                  return Mono.error(new RuntimeException("Unexpected event: " + event));
              }
          }
          else {
              return partEvents; // either complete or error signal
          }
      }));

ServerResponse

ServerResponse类提供了对http返回内容的访问由于ServerResponse是不可变的故而可以通过build方法来创建ServerResponse。

如下示例展示了ServerResponse的使用

Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person, Person.class);

URI location = ...
ServerResponse.created(location).build();

同时在构建响应体时支持传递hint参数来控制body被序列化和反序列化的方式示例如下

ServerResponse.ok().hint(Jackson2CodecSupport.JSON_VIEW_HINT, MyJacksonView.class).body(...);

Handler Class

可以将HandlerFunction写为lambda的形式示例如下

HandlerFunction<ServerResponse> helloWorld =
  request -> ServerResponse.ok().bodyValue("Hello World");

Validation

如果想要对请求体进行校验,可以使用如下逻辑:

public class PersonHandler {

	private final Validator validator = new PersonValidator();

	// ...

	public Mono<ServerResponse> createPerson(ServerRequest request) {
		Mono<Person> person = request.bodyToMono(Person.class).doOnNext(this::validate);
		return ok().build(repository.savePerson(person));
	}

	private void validate(Person person) {
		Errors errors = new BeanPropertyBindingResult(person, "person");
		validator.validate(person, errors);
		if (errors.hasErrors()) {
			throw new ServerWebInputException(errors.toString());
		}
	}
}

RouterFunction

RouterFunction用于将请求路由到对应的HandlerFunction通常情况下都通过RouterFunctions工具类来创建router。

RouterFunctions.route()方法提供了许多快捷创建router的方法例如GET(String, HandlerFunction)等。

除了基于HTTP method进行映射外router builder还提供了引入额外Predicates来进行请求路由的机制对于每个基于http method进行路由的方法都存在一个重载方法用于接收predicates。

Predicates

使用者可以编写自己的RequestPredicate但是RequestPredicates工具提供了常用的实现可以基于request path、http method、content-type等进行判断。

在如下示例中,展示了通过Accept header进行判断的示例

RouterFunction<ServerResponse> route = RouterFunctions.route()
	.GET("/hello-world", accept(MediaType.TEXT_PLAIN),
		request -> ServerResponse.ok().bodyValue("Hello World")).build();

如果想要对多个predicates进行逻辑运算可以使用如下方法

  • RequestPredicate.and(RequestPredicate):两者都满足
  • RequestPredicate.or(RequestPredicate):两者任一满足

predicates中许多都是组合的例如equestPredicates.GET(String)是由RequestPredicates.method(HttpMethod)RequestPredicates.path(String)进行组合的。

在上述示例中其实也使用到了两个predicatesbuilder内部使用了RequestPredicates.GET,调用方则指定了accept

Routes

Router function将会按顺序被调用如果前一个route没有匹配将会调用下一个route。

在使用router function builder时所有定义的routes都被整合在一个RouterFunction中,并且通过build方法进行返回。

除了使用router builder之外还可以将多个router function整合在一起

  • 调用RouterFunctions.route()中的add(RouterFunction)方法
  • RouterFunction.and(RouterFunction)方法
    • and方法代表前面的方法没有匹配时后调用后面的RouterFunction
  • RouterFunction.andRoute(RequestPredicate, HandlerFunction)方法

使用示例如下所示:

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> otherRoute = ...

RouterFunction<ServerResponse> route = route()
	.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson) 
	.GET("/person", accept(APPLICATION_JSON), handler::listPeople) 
	.POST("/person", handler::createPerson) 
	.add(otherRoute) 
	.build();

Nested Routes

通常情况下很有可能许多路由都包含相同的predicate例如共享路径。

如下示例展示了3个routes共用/preson路径的场景:

RouterFunction<ServerResponse> route = route()
	.path("/person", builder -> builder 
		.GET("/{id}", accept(APPLICATION_JSON), handler::getPerson)
		.GET(accept(APPLICATION_JSON), handler::listPeople)
		.POST(handler::createPerson))
	.build();

在上述示例中route1和route2都接收json格式参数故而还能继续嵌套示例如下

RouterFunction<ServerResponse> route = route()
	.path("/person", b1 -> b1
		.nest(accept(APPLICATION_JSON), b2 -> b2
			.GET("/{id}", handler::getPerson)
			.GET(handler::listPeople))
		.POST(handler::createPerson))
	.build();

Resource Redirect

RounterFunction的Builder支持对resource进行重定向示例如下

Resource location = new FileUrlResource("public-resources/");
RouterFunction<ServerResponse> resources = RouterFunctions.resources("/resources/**", location);

Filter Handler Functions

可以针对routing function builder添加filterbuilder支持如下方法

  • before
  • after
  • filter

对builder添加的filter会应用到buidler中所有的routes示例如下所示

RouterFunction<ServerResponse> route = route()
	.path("/person", b1 -> b1
		.nest(accept(APPLICATION_JSON), b2 -> b2
			.GET("/{id}", handler::getPerson)
			.GET(handler::listPeople)
			.before(request -> ServerRequest.from(request)
				.header("X-RequestHeader", "Value")
				.build()))
		.POST(handler::createPerson))
	.after((request, response) -> logResponse(response))
	.build();

上述示例中filter的范围如下

  • beforebefore只会应用到GET请求
  • afterafter会应用到所有请求包括nested routes

对于builder.filter方法,其接收HandlerFilterFunction类型的参数,使用示例如下:

SecurityManager securityManager = ...

RouterFunction<ServerResponse> route = route()
	.path("/person", b1 -> b1
		.nest(accept(APPLICATION_JSON), b2 -> b2
			.GET("/{id}", handler::getPerson)
			.GET(handler::listPeople))
		.POST(handler::createPerson))
	.filter((request, next) -> {
		if (securityManager.allowAccessTo(request.path())) {
			return next.handle(request);
		}
		else {
			return ServerResponse.status(UNAUTHORIZED).build();
		}
	})
	.build();

除了可以向builder添加filter外还可以向已经存在的router function中添加filter通过RouterFunction.filter(HandlerFilterFunction)方法。