36 KiB
Spring Webflux
Concept
核心机制
reactive
reactive代表基于“事件响应”的编程模型,
back pressure
在spring webflux中,back pressure为反应式编程的核心机制,用于协调生产者和消费者之间的速率差异,令系统在高负载或资源受限的情况下仍能稳定运行。
- 同步场景:在同步场景下,阻塞式调用是一种天然的back pressure形式,调用方会阻塞并等待,直到被调用方执行完成
- 非阻塞场景:在非阻塞的代码中,需要关注事件速率,生产者产生事件的速率不能压过消费者消费的速率
reactive stream
reactive stream为一个小型规范,定义了异步组件和back pressure交互的规范。reactive stream的主要用途是让Subscriber控制publisher产生数据的速率。
编程模型
spring-web module包含Spring webflux的响应式基础,包括若夏内容:
- http抽象
- 对支持server的reactive stream adapters
- codecs
- 核心WebHandler API,其与servlet api兼容
Spring webflux提供了如下两种编程模型:
Annotated Controllers: 和spring mvc一致,基于相同的注解。spring mvc和webflux controllers都支持reactive return type,因此,很难区分spring mvc和webflux controllersFunctional Endpoint:基于lambda的、轻量的函数编程模型。其可以被看做是一个支持对请求进行route和handle的工具集合。
并发模型
spring mvc
在spring mvc(通常为servlet应用)中,其假设当前线程可能会被阻塞(例如,远程调用等会阻塞当前线程)。为了减少处理请求时阻塞所带来的影响,servlet容器会使用包含大量线程的线程池。
webflux
对于webflux(通常为non-blocking server),其会假设应用并不会阻塞,故而,非阻塞的server可以使用一个线程数较少且固定的线程池(event loop workers)来处理请求。
调用阻塞api
当想要在webflux中使用阻塞api时,可以按如下方式进行使用。RxJava和Reactive都支持publushOn操作,其支持在另一个线程中继续执行。
Reactive Core
spring-web对于reactive web应用具有如下支持:
- 对于server request的处理,拥有如下两种级别的支持:
HttpHandler: 基于non-blocking I/O和Reactive Stream back pressure,适配`Reactor Netty, Undertow, Tomcat, Jetty以及任一Servlet ContainerWebHandler:稍高级别的、通用的web api,用于请求处理,在此基础上构建基于annotated controllers和functional endpoints的编程模型
- 对于client端,
ClientHttpConnector用于发送http请求,同时兼容非阻塞io和Reactive Stream back pressure - codecs用于客户端和服务端的序列化和反序列化
HttpHandler
HttpHandler中只通过一个单独的方法来处理请求和相应,其对不同的http server api进行了最小化的抽象。
下表描述了支持的api:
| Server name | Server API used | Reactive Streams support |
|---|---|---|
Netty |
Netty API |
|
Undertow |
Undertow API |
spring-web: Undertow to Reactive Streams bridge |
Tomcat |
Servlet non-blocking I/O; Tomcat API to read and write ByteBuffers vs byte[] |
spring-web: Servlet non-blocking I/O to Reactive Streams bridge |
Jetty |
Servlet non-blocking I/O; Jetty API to write ByteBuffers vs byte[] |
spring-web: Servlet non-blocking I/O to Reactive Streams bridge |
Servlet container |
Servlet non-blocking I/O |
spring-web: Servlet non-blocking I/O to Reactive Streams bridge |
下表中描述了server依赖:
| Server name | Group id | Artifact name |
|---|---|---|
Reactor Netty |
io.projectreactor.netty |
reactor-netty |
Undertow |
io.undertow |
undertow-core |
Tomcat |
org.apache.tomcat.embed |
tomcat-embed-core |
Jetty |
org.eclipse.jetty |
jetty-server, jetty-servlet |
server api adapters
如下示例展示了如何使用HttpHandler Adapters来适配不同的Server Api:
Reactor Netty
HttpHandler handler = ...
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create().host(host).port(port).handle(adapter).bindNow();
Undertow
HttpHandler handler = ...
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();
Tomcat
HttpHandler handler = ...
Servlet servlet = new TomcatHttpHandlerAdapter(handler);
Tomcat server = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootContext = server.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "main", servlet);
rootContext.addServletMappingDecoded("/", "main");
server.setHost(host);
server.setPort(port);
server.start();
Jetty
HttpHandler handler = ...
Servlet servlet = new JettyHttpHandlerAdapter(handler);
Server server = new Server();
ServletContextHandler contextHandler = new ServletContextHandler(server, "");
contextHandler.addServlet(new ServletHolder(servlet), "/");
contextHandler.start();
ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.addConnector(connector);
server.start();
WebHandler API
org.springframework.web.server package基于HttpHandler构建,提供了通用的Web API。Web Api由多个WebException, 多个WebFilter, 一个WebHandler组件构成,组成了一个chain。
相比于HttpHandler仅仅是对不同http server的抽象,WebHandler提供了一个更加通用、更加广泛的功能集合:
- user sessions attributes
- request attributes
- resolved
LocaleorPrincipalfor request - abstractions for multipart data
bean types for WebHttpHandlerBuilder auto-detect
在spring上下文中,WebHttpHandlerBuilder可以自动探测到如下类型的components:
| Bean name | Bean type | Count | Description |
|---|---|---|---|
<any> |
|
0..N |
Provide handling for exceptions from the chain of |
<any> |
|
0..N |
Apply interception style logic to before and after the rest of the filter chain and
the target |
|
|
1 |
The handler for the request. |
|
|
0..1 |
The manager for |
|
|
0..1 |
For access to |
|
|
0..1 |
The resolver for |
|
|
0..1 |
For processing forwarded type headers, either by extracting and removing them or by removing them only. Not used by default. |
Form Data
ServerWebExchange将会向外暴露如下方法,用于访问form data:
Mono<MultiValueMap<String, String>> getFormData();
DefaultServerWebExchange使用HttpMessageReader来对form data(application/x-www-form-urlencoded)进行parse操作,将formdata转化为MultiValueMap。
Multipart Data
ServerWebExchange向外暴露如下方法,用于访问multipart data。
Mono<MultiValueMap<String, Part>> getMultipartData();
DefaultServerWebExchange将会使用HttpMessageReader<MultiValueMap<String, Part>>来对multipart/form-data,multipart/mixed,multipart/related数据进行转换,数据将会被转化为MultiValueMap类型。
Filter
在WebHandler API中,可以使用WebFilter来实现拦截式的逻辑,当使用Webflux Config时,WebFilter的注可以通过将其注册为bean来实现。
对于WebFilter的优先级,欸可以通过使用@Order注解或实现Ordered接口来实现。
UrlHandler
在编写web程序时,可能希望controller endpoint既能够匹配末尾带/的url版本,又能匹配末尾不带/的url版本。
例如,想要让
@GetMapping("/home")既能够匹配GET /home,又能够匹配GET /home/。
对此,可以使用UrlHandlerFilter进行处理,其支持如下两种配置:
- 当接收到带有末尾带有
/的url时,向浏览器返回一个重定向状态,让浏览器重新请求末尾不带/的url - 将请求当作不带末尾
/,并对请求继续处理
实例化UrlHandlerFilter的示例如下:
UrlHandlerFilter urlHandlerFilter = UrlHandlerFilter
// will HTTP 308 redirect "/blog/my-blog-post/" -> "/blog/my-blog-post"
.trailingSlashHandler("/blog/**").redirect(HttpStatus.PERMANENT_REDIRECT)
// will mutate the request to "/admin/user/account/" and make it as "/admin/user/account"
.trailingSlashHandler("/admin/**").mutateRequest()
.build();
Exceptions
在WebHandler Api中,可以使用WebExceptionHandler对异常进行处理。当使用Webflux Config时,注册WebExceptionHandler仅需将其声明为bean即可,可以通过@Order注解或Ordered接口来指明顺序。
Logging
Log Id
在webflux中,同一个请求可能在多个线程中被执行过,故而,在定位请求日志时,无法通过线程id来关联请求。为了解决该问题,spring webflux在打印消息时前缀了一个log id,其是针对单个特定请求的。
在server端,log id存储在ServerWebExchange中的LOG_ID_ATTRIBUTE属性中。在client端,log id存储在Client Request中的LOG_ID_ATTRIBUTE属性中。
Appenders
slf4j和log4j等日志库都提供异步logger,用于避免阻塞。使用异步logger会存在部分缺点,例如丢弃无法排队的异步消息。但是,其仍然是目前非阻塞框架的最佳选择。
DispatcherHandler
和spring MVC类似,spring webflux同样按照font controller pattern进行设计,包含一个central WebHandler,即DispatcherHandler。
Dispathcer提供了一个共享的请求处理算法,将实际的处理逻辑委托给其他可配置的组件。
DispatcherHandler通过spring配置来发现委托的组件。DispatcherHandler其本身也是一个spring bean,并且实现了ApplicationContextAware接口,可以访问spring上下文。
如果Dispatcher的bean name为webHandler,那么其会被WebHttpHandlerBuilder发现,并且将其放入request-processing chain中。
webflux应用中包含的spring configuration通常包括:
- bean name为
webHandler的DispatcherHandler WebFilter和WebExceptionHandlerbeans- 被委托给
webHandler的beans - 其他
上述配置将被被WebHttpHandlerBuilder使用,用于构建process chain,示例如下所示:
ApplicationContext context = ...
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
上述示例中返回的handler可以和server adapter结合使用。
DispatcherHandler委托
DispatcherHandler会将请求的委托给特定的bean对象,bean对象会处理请求并且将结果渲染到response中。
DispatcherHandler会对如下类型的bean进行auto-detect。
| Bean type | Explanation |
|---|---|
|
Map a request to a handler. The mapping is based on some criteria, the details of
which vary by The main |
|
Help the |
|
Process the result from the handler invocation and finalize the response. See Result Handling. |
Processing
DispatcherHandler按照如下方式处理请求:
- 首先,会挨个查询
HandlerMapping用于查找匹配的handler,会匹配第一个匹配的handler - 如果匹配到handler,会通过对应的HandlerAdapter执行该handler,并且会将handler返回的值作为
HandlerResult暴露 HandlerResult将会被分配给指定的HandlerResultHandler进行处理,可能会直接向response中写入数据或渲染view
Result Handling
通过HandlerAdapter对handler进行调用,返回的结果会被包裹在HandlerResult中,随之还包含额外的上下文。
HandlerResult将会被传递给第一个支持其的上下文,下表中展示了HandlerResultHandler的实现:
| Result Handler Type | Return Values | Default Order |
|---|---|---|
|
|
0 |
|
|
0 |
|
Handle return values from |
100 |
|
See also View Resolution. |
|
Annotated Controllers
spring webflux提供了基于注解的编程模型,通过使用注解来向外暴露request mappings。通过注解标注的controllers可包含灵活的方法签名,并且无需继承任何基类或实现指定接口。
使用示例如下所示:
@RestController
public class HelloController {
@GetMapping("/hello")
public String handle() {
return "Hello WebFlux";
}
}
@Controller
在基于注解的webflux变成模型中,可以使用@Controller或@RestController来声明controller bean。
AOP
对于部分场景,需要在运行时使用aop proxy来对controller进行decorate。
对于针对controller的aop,推荐使用class-based proxy,可以使用@EnableTransactionManagement(proxyTargetClass=true)来实现。
Functional Endpoint
Webflux.fn是一个轻量级的函数式编程模型,方法用于对请求进行路由和处理。其和基于注解的编程模型一致,都基于相同的reactive core。
Overview
在Webflux.fn中,http请求被HandlerFunction处理,HandlerFunction接收ServerRequest并且返回一个delayed ServerResponse(Mono<ServerResponse>)。
Immutable
对于函数时编程模型,request对象和response对象都是不可变的。
一旦reqeust或response对象被创建,其内部状态都无法被直接修改,任何试图对其“修改”的操作都会返回一个新的对象,并且保证原有对象内容不变。
由于request和response对象是不可变的,其天然为线程安全的。
Router Function
incoming requests将会被RouterFunction路由到handler function,RouterFunction接收一个ServerRequest并且返回一个delayed HandlerFunction。
如果存在RouterFunction与request匹配时,其将返回一个handler function;如果没有任何RouterFunction与请求相匹配,将返回一个空的Mono。
RouterFunction的作用和@RequestMapping注解相同。
RouterFunctions.route()方法提供了一个便于创建router的builder,示例如下:
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);
RouterFunction<ServerResponse> route = route() (1)
.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson)
.GET("/person", accept(APPLICATION_JSON), handler::listPeople)
.POST("/person", handler::createPerson)
.build();
public class PersonHandler {
// ...
public Mono<ServerResponse> listPeople(ServerRequest request) {
// ...
}
public Mono<ServerResponse> createPerson(ServerRequest request) {
// ...
}
public Mono<ServerResponse> getPerson(ServerRequest request) {
// ...
}
}
运行RouterFunction的其中一种方式是将其转化为HttpHandler,并将其指定给对应的server adapter:
RouterFunctions.toHttpHandler(RouterFunction)RouterFunctions.toHttpHandler(RouterFunction, HandlerStrategies)
HandlerFunction
ServerRequest和ServerResponse为不可变接口,两者都对body stream提供Reactive Stream back pressure。
- 其中,request body通过Reactor
Flux或Mono表示 - response body可以通过任何
Reactive Stream Publisher进行表示,包括Flux和Mono
Server Request
ServerRequest提供了对如下内容的访问:
- http method
- uri
- headers
- query params
- body
如下示例展示了如何将请求体转化为Mono/Flux类型:
Mono<String> string = request.bodyToMono(String.class);
Flux<Person> people = request.bodyToFlux(Person.class);
如下示例展示了如何访问formdata:
Mono<MultiValueMap<String, String>> map = request.formData();
如下示例展示了如何访问multipart data:
Mono<MultiValueMap<String, Part>> map = request.multipartData();
访问multipart数据内容的示例如下所示:
Flux<PartEvent> allPartEvents = request.bodyToFlux(PartEvent.class);
allPartsEvents.windowUntil(PartEvent::isLast)
.concatMap(p -> p.switchOnFirst((signal, partEvents) -> {
if (signal.hasValue()) {
PartEvent event = signal.get();
if (event instanceof FormPartEvent formEvent) {
String value = formEvent.value();
// handle form field
}
else if (event instanceof FilePartEvent fileEvent) {
String filename = fileEvent.filename();
Flux<DataBuffer> contents = partEvents.map(PartEvent::content);
// handle file upload
}
else {
return Mono.error(new RuntimeException("Unexpected event: " + event));
}
}
else {
return partEvents; // either complete or error signal
}
}));
ServerResponse
ServerResponse类提供了对http返回内容的访问,由于ServerResponse是不可变的,故而可以通过build方法来创建ServerResponse。
如下示例展示了ServerResponse的使用
Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person, Person.class);
URI location = ...
ServerResponse.created(location).build();
同时,在构建响应体时,支持传递hint参数来控制body被序列化和反序列化的方式,示例如下:
ServerResponse.ok().hint(Jackson2CodecSupport.JSON_VIEW_HINT, MyJacksonView.class).body(...);
Handler Class
可以将HandlerFunction写为lambda的形式,示例如下:
HandlerFunction<ServerResponse> helloWorld =
request -> ServerResponse.ok().bodyValue("Hello World");
Validation
如果想要对请求体进行校验,可以使用如下逻辑:
public class PersonHandler {
private final Validator validator = new PersonValidator();
// ...
public Mono<ServerResponse> createPerson(ServerRequest request) {
Mono<Person> person = request.bodyToMono(Person.class).doOnNext(this::validate);
return ok().build(repository.savePerson(person));
}
private void validate(Person person) {
Errors errors = new BeanPropertyBindingResult(person, "person");
validator.validate(person, errors);
if (errors.hasErrors()) {
throw new ServerWebInputException(errors.toString());
}
}
}
RouterFunction
RouterFunction用于将请求路由到对应的HandlerFunction,通常情况下都通过RouterFunctions工具类来创建router。
RouterFunctions.route()方法提供了许多快捷创建router的方法,例如GET(String, HandlerFunction)等。
除了基于HTTP method进行映射外,router builder还提供了引入额外Predicates来进行请求路由的机制,对于每个基于http method进行路由的方法,都存在一个重载方法,用于接收predicates。
Predicates
使用者可以编写自己的RequestPredicate,但是RequestPredicates工具提供了常用的实现,可以基于request path、http method、content-type等进行判断。
在如下示例中,展示了通过Accept header进行判断的示例:
RouterFunction<ServerResponse> route = RouterFunctions.route()
.GET("/hello-world", accept(MediaType.TEXT_PLAIN),
request -> ServerResponse.ok().bodyValue("Hello World")).build();
如果想要对多个predicates进行逻辑运算,可以使用如下方法:
RequestPredicate.and(RequestPredicate):两者都满足RequestPredicate.or(RequestPredicate):两者任一满足
predicates中许多都是组合的,例如equestPredicates.GET(String)是由RequestPredicates.method(HttpMethod)和RequestPredicates.path(String)进行组合的。
在上述示例中,其实也使用到了两个predicates,builder内部使用了RequestPredicates.GET,调用方则指定了accept。
Routes
Router function将会按顺序被调用,如果前一个route没有匹配,将会调用下一个route。
在使用router function builder时,所有定义的routes都被整合在一个RouterFunction中,并且通过build方法进行返回。
除了使用router builder之外,还可以将多个router function整合在一起:
- 调用
RouterFunctions.route()中的add(RouterFunction)方法 RouterFunction.and(RouterFunction)方法- and方法代表前面的方法没有匹配时,后调用后面的RouterFunction
RouterFunction.andRoute(RequestPredicate, HandlerFunction)方法
使用示例如下所示:
PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);
RouterFunction<ServerResponse> otherRoute = ...
RouterFunction<ServerResponse> route = route()
.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson)
.GET("/person", accept(APPLICATION_JSON), handler::listPeople)
.POST("/person", handler::createPerson)
.add(otherRoute)
.build();
Nested Routes
通常情况下,很有可能许多路由都包含相同的predicate,例如共享路径。
如下示例展示了3个routes共用/preson路径的场景:
RouterFunction<ServerResponse> route = route()
.path("/person", builder -> builder
.GET("/{id}", accept(APPLICATION_JSON), handler::getPerson)
.GET(accept(APPLICATION_JSON), handler::listPeople)
.POST(handler::createPerson))
.build();
在上述示例中,route1和route2都接收json格式参数,故而还能继续嵌套,示例如下:
RouterFunction<ServerResponse> route = route()
.path("/person", b1 -> b1
.nest(accept(APPLICATION_JSON), b2 -> b2
.GET("/{id}", handler::getPerson)
.GET(handler::listPeople))
.POST(handler::createPerson))
.build();
Resource Redirect
RounterFunction的Builder支持对resource进行重定向,示例如下:
Resource location = new FileUrlResource("public-resources/");
RouterFunction<ServerResponse> resources = RouterFunctions.resources("/resources/**", location);
Filter Handler Functions
可以针对routing function builder添加filter,builder支持如下方法:
- before
- after
- filter
对builder添加的filter,会应用到buidler中所有的routes,示例如下所示:
RouterFunction<ServerResponse> route = route()
.path("/person", b1 -> b1
.nest(accept(APPLICATION_JSON), b2 -> b2
.GET("/{id}", handler::getPerson)
.GET(handler::listPeople)
.before(request -> ServerRequest.from(request)
.header("X-RequestHeader", "Value")
.build()))
.POST(handler::createPerson))
.after((request, response) -> logResponse(response))
.build();
上述示例中,filter的范围如下:
before:before只会应用到GET请求after:after会应用到所有请求,包括nested routes
对于builder.filter方法,其接收HandlerFilterFunction类型的参数,使用示例如下:
SecurityManager securityManager = ...
RouterFunction<ServerResponse> route = route()
.path("/person", b1 -> b1
.nest(accept(APPLICATION_JSON), b2 -> b2
.GET("/{id}", handler::getPerson)
.GET(handler::listPeople))
.POST(handler::createPerson))
.filter((request, next) -> {
if (securityManager.allowAccessTo(request.path())) {
return next.handle(request);
}
else {
return ServerResponse.status(UNAUTHORIZED).build();
}
})
.build();
除了可以向builder添加filter外,还可以向已经存在的router function中添加filter,通过RouterFunction.filter(HandlerFilterFunction)方法。