Files
rikako-note/spring/webflux/webclient.md
2025-05-24 16:28:27 +08:00

236 lines
9.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebClient
WebClient基于Reactor提供了`functional, fluent API`
WebClient是非阻塞的其依赖的codecs和server端使用的codecs相同。
## Configuration
创建`WebClient`最简单的方式是通过静态工厂方法:
- `WebClient.create()`
- `WebClient.create(String baseUrl)`
除此之外,也可以通过`WebClient.builder()`来指定更多选项:
- `uriBuilderFactory` 自定义uriBuilderFactory用于创建UriBuilder`UriBuilder`包含共享的配置例如base URI等
- `defaultUriVariables`: 在拓展uri templates时使用到的默认值
- `defaultHeader`对每个请求都包含的headers
- `defaultCookie`每个请求都包含的Cookie
- `defaultRequest` 对每个请求进行自定义的`Consumer`
- `filter`对于每个请求的client filter
- `exchangeStrategies`自定义http message的reader/writer
- `clientConnector`http client library设置
- `observationRegistry` the registry to use for enabling Observability support
- `observationConvention`: an optional, custom convention to extract metadata for recorded observations.
创建WebClient的示例如下
```java
WebClient client = WebClient.builder()
.codecs(configurer -> ... )
.build();
```
一旦被创建后WebClient是不可变的`但是,可以对其进行克隆并对副本进行修改`,示例如下:
```java
WebClient client1 = WebClient.builder()
.filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate()
.filter(filterC).filter(filterD).build();
// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filterD
```
### MaxInMemorySize
Codecs对于缓存在内存中的数据大小存在限制避免导致应用的内存问题。默认情况下该值被设置为`256KB`,如果该大小不够,那么将会见到如下报错:
```
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer
```
如果想要修改该默认的codecs限制可以使用如下配置
```java
WebClient webClient = WebClient.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
.build();
```
### reactor netty
为了自定义reactor netty配置可以提供一个预先定义的`HttpClient`
```java
HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
```
#### Resources
默认情况下,`HttpClient`将会参与使用`reactor.netty.http.HttpResources`中保存的全局reactor netty resources。
`HttpResources`中包含event loop threads和connection poolHttpClient会使用这些共享的资源。对于基于event loop的并发更倾向使用固定、共享的资源直到进程退出时这些全局资源才会被释放。
如果server的生命周期和进程相同那么对全局资源无需手动释放。但若server在进程的生命周期中可能会启动和停止那么可以声明一个`ReactorResourceFactory`类型的bean并设置`globalResources=true`默认从而确保reactor netty的global resource在`ApplicationContext`关闭时被释放,示例如下所示:
```java
@Bean
public ReactorResourceFactory reactorResourceFactory() {
return new ReactorResourceFactory();
}
```
也可以不使用global reactor netty resources但是在该模式下你需要确保所有reactor netty client and server使用共享的资源示例如下所示
```java
@Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setUseGlobalResources(false);
return factory;
}
@Bean
public WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory(), mapper);
return WebClient.builder().clientConnector(connector).build();
}
```
#### timeout
如果要设置connection timeout示例如下
```java
import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
```
如果要配置read/write timeout可以按照如下示例进行配置
```java
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
// Create WebClient...
```
##### readTimeoutHandler
在指定的时间内如果channel没有执行读取操作将会抛出ReadTimeoutException并对channel执行关闭操作
> `没有数据被读取`的监控,是通过`IdleStateHandler`来实现的。
>
> `ReadTimeoutHandler`继承了`IdleStateHandler`当指定的readIdleTime过后如果`channel`仍然没有指定任何的读操作,那么将会触发一个`IdleStateEvent`事件,并且事件状态为`IdleState.READER_IDLE`
###### writeTimeoutHandler
而writeTimeoutHandler作用则如下
- 其继承了`ChannelOutboundHandlerAdapter`在通过write写入数据时其会通过`ctx.executor()`指定一个`WriteTimeoutTask`该task在指定timeout后被执行
- 如果在writeTimeoutTask被执行时写操作仍然没有处理完成那么其将会触发一个WriteTimeoutException并且channel也会被关闭。
为所有请求配置response timeout:
```java
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create WebClient...
```
为指定的请求配置response timeout:
```java
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.retrieve()
.bodyToMono(String.class);
```
##### connection provider
connection provider会对`固定的最大数量连接`进行缓存和重用。其中,`最大数量`针对的是单个connection pool的最大数量而每个connection pool则是和指定的remote host相关联。
当连接池中的连接都被使用时,后续连接请求将会被阻塞,阻塞时间由`pendingAcquireTime`决定。
### 各个超时含义
再restTemplate等阻塞api中各个超时的含义如下所示
#### readTimeout
会为serverSocket和server进行通信的socket设置`SocketOptions.SO_SOCKET`单位为ms。
`SO_SOCKET`被设置为正数值时,对`socket关联的inputstream`阻塞式调用,将最多阻塞该指定时长。如果调用超时,那么将会抛出`java. net. SocketTimeoutException`但是socket仍然有效。
#### connectTimeout
以ms为单位指定一个超时时长当与服务端建立连接时如果超过该时长后连接仍未建立那么将会抛出`java.net.SocketTimeoutException`异常。
### JDK HttpClient
如果要自定义jdk httpclient配置欸可以使用如下方式
```java
HttpClient httpClient = HttpClient.newBuilder()
.followRedirects(Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(20))
.build();
ClientHttpConnector connector =
new JdkClientHttpConnector(httpClient, new DefaultDataBufferFactory());
WebClient webClient = WebClient.builder().clientConnector(connector).build();
```
### Jetty
自定义jetty httpclient配置的示例如下所示
```java
HttpClient httpClient = new HttpClient();
httpClient.setCookieStore(...);
WebClient webClient = WebClient.builder()
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
```
### HttpComponents
如下示例展示了如何定义Apache HttpComponents
```java
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
clientBuilder.setDefaultRequestConfig(...);
CloseableHttpAsyncClient client = clientBuilder.build();
ClientHttpConnector connector = new HttpComponentsClientHttpConnector(client);
WebClient webClient = WebClient.builder().clientConnector(connector).build();
```
## retrieve()
`retrieve()`方法用于定义`如何对相应进行提取`,示例如下:
```java
WebClient client = WebClient.create("https://example.org");
Mono<ResponseEntity<Person>> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(Person.class);
```
上述示例获取的是`ResponseEntity<Person>`,如果想要直接获取`Persion`类型的response body可以通过如下方式进行获取:
```java
WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Person.class);
```
默认情况下,`4xx``5xx`的http响应将会导致`WebClientResponseException`如果需要自定义error handling逻辑需要使用`onStatus`,示例如下:
```java
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response -> ...)
.onStatus(HttpStatusCode::is5xxServerError, response -> ...)
.bodyToMono(Person.class);
```