Files
rikako-note/spring/webflux/webclient.md
2025-06-03 12:55:01 +08:00

475 lines
17 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebClient
WebClient基于Reactor提供了`functional, fluent API`
WebClient是非阻塞的其依赖的codecs和server端使用的codecs相同。
## Configuration
创建`WebClient`最简单的方式是通过静态工厂方法:
- `WebClient.create()`
- `WebClient.create(String baseUrl)`
除此之外,也可以通过`WebClient.builder()`来指定更多选项:
- `uriBuilderFactory` 自定义uriBuilderFactory用于创建UriBuilder`UriBuilder`包含共享的配置例如base URI等
- `defaultUriVariables`: 在拓展uri templates时使用到的默认值
- `defaultHeader`对每个请求都包含的headers
- `defaultCookie`每个请求都包含的Cookie
- `defaultRequest` 对每个请求进行自定义的`Consumer`
- `filter`对于每个请求的client filter
- `exchangeStrategies`自定义http message的reader/writer
- `clientConnector`http client library设置
- `observationRegistry` the registry to use for enabling Observability support
- `observationConvention`: an optional, custom convention to extract metadata for recorded observations.
创建WebClient的示例如下
```java
WebClient client = WebClient.builder()
.codecs(configurer -> ... )
.build();
```
一旦被创建后WebClient是不可变的`但是,可以对其进行克隆并对副本进行修改`,示例如下:
```java
WebClient client1 = WebClient.builder()
.filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate()
.filter(filterC).filter(filterD).build();
// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filterD
```
### MaxInMemorySize
Codecs对于缓存在内存中的数据大小存在限制避免导致应用的内存问题。默认情况下该值被设置为`256KB`,如果该大小不够,那么将会见到如下报错:
```
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer
```
如果想要修改该默认的codecs限制可以使用如下配置
```java
WebClient webClient = WebClient.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
.build();
```
### reactor netty
为了自定义reactor netty配置可以提供一个预先定义的`HttpClient`
```java
HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
```
#### Resources
默认情况下,`HttpClient`将会参与使用`reactor.netty.http.HttpResources`中保存的全局reactor netty resources。
`HttpResources`中包含event loop threads和connection poolHttpClient会使用这些共享的资源。对于基于event loop的并发更倾向使用固定、共享的资源直到进程退出时这些全局资源才会被释放。
如果server的生命周期和进程相同那么对全局资源无需手动释放。但若server在进程的生命周期中可能会启动和停止那么可以声明一个`ReactorResourceFactory`类型的bean并设置`globalResources=true`默认从而确保reactor netty的global resource在`ApplicationContext`关闭时被释放,示例如下所示:
```java
@Bean
public ReactorResourceFactory reactorResourceFactory() {
return new ReactorResourceFactory();
}
```
也可以不使用global reactor netty resources但是在该模式下你需要确保所有reactor netty client and server使用共享的资源示例如下所示
```java
@Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setUseGlobalResources(false);
return factory;
}
@Bean
public WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory(), mapper);
return WebClient.builder().clientConnector(connector).build();
}
```
#### timeout
如果要设置connection timeout示例如下
```java
import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
```
如果要配置read/write timeout可以按照如下示例进行配置
```java
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
// Create WebClient...
```
##### readTimeoutHandler
在指定的时间内如果channel没有执行读取操作将会抛出ReadTimeoutException并对channel执行关闭操作
> `没有数据被读取`的监控,是通过`IdleStateHandler`来实现的。
>
> `ReadTimeoutHandler`继承了`IdleStateHandler`当指定的readIdleTime过后如果`channel`仍然没有指定任何的读操作,那么将会触发一个`IdleStateEvent`事件,并且事件状态为`IdleState.READER_IDLE`
###### writeTimeoutHandler
而writeTimeoutHandler作用则如下
- 其继承了`ChannelOutboundHandlerAdapter`在通过write写入数据时其会通过`ctx.executor()`指定一个`WriteTimeoutTask`该task在指定timeout后被执行
- 如果在writeTimeoutTask被执行时写操作仍然没有处理完成那么其将会触发一个WriteTimeoutException并且channel也会被关闭。
为所有请求配置response timeout:
```java
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create WebClient...
```
为指定的请求配置response timeout:
```java
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.retrieve()
.bodyToMono(String.class);
```
##### connection provider
connection provider会对`固定的最大数量连接`进行缓存和重用。其中,`最大数量`针对的是单个connection pool的最大数量而每个connection pool则是和指定的remote host相关联。
当连接池中的连接都被使用时,后续连接请求将会被阻塞,阻塞时间由`pendingAcquireTime`决定。
### 各个超时含义
再restTemplate等阻塞api中各个超时的含义如下所示
#### readTimeout
会为serverSocket和server进行通信的socket设置`SocketOptions.SO_SOCKET`单位为ms。
`SO_SOCKET`被设置为正数值时,对`socket关联的inputstream`阻塞式调用,将最多阻塞该指定时长。如果调用超时,那么将会抛出`java. net. SocketTimeoutException`但是socket仍然有效。
#### connectTimeout
以ms为单位指定一个超时时长当与服务端建立连接时如果超过该时长后连接仍未建立那么将会抛出`java.net.SocketTimeoutException`异常。
### JDK HttpClient
如果要自定义jdk httpclient配置欸可以使用如下方式
```java
HttpClient httpClient = HttpClient.newBuilder()
.followRedirects(Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(20))
.build();
ClientHttpConnector connector =
new JdkClientHttpConnector(httpClient, new DefaultDataBufferFactory());
WebClient webClient = WebClient.builder().clientConnector(connector).build();
```
### Jetty
自定义jetty httpclient配置的示例如下所示
```java
HttpClient httpClient = new HttpClient();
httpClient.setCookieStore(...);
WebClient webClient = WebClient.builder()
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
```
### HttpComponents
如下示例展示了如何定义Apache HttpComponents
```java
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
clientBuilder.setDefaultRequestConfig(...);
CloseableHttpAsyncClient client = clientBuilder.build();
ClientHttpConnector connector = new HttpComponentsClientHttpConnector(client);
WebClient webClient = WebClient.builder().clientConnector(connector).build();
```
## retrieve()
`retrieve()`方法用于定义`如何对相应进行提取`,示例如下:
```java
WebClient client = WebClient.create("https://example.org");
Mono<ResponseEntity<Person>> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(Person.class);
```
上述示例获取的是`ResponseEntity<Person>`,如果想要直接获取`Persion`类型的response body可以通过如下方式进行获取:
```java
WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Person.class);
```
默认情况下,`4xx``5xx`的http响应将会导致`WebClientResponseException`如果需要自定义error handling逻辑需要使用`onStatus`,示例如下:
```java
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response -> ...)
.onStatus(HttpStatusCode::is5xxServerError, response -> ...)
.bodyToMono(Person.class);
```
## Exchange
`exchangeToMono`方法和`exchangeToFlux`方法可以提供更精确的控制例如在statusCode不同时使用不同的方法来进行反序列化
```java
Mono<Person> entityMono = client.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(Person.class);
}
else {
// Turn to error
return response.createError();
}
});
```
如上所示,当`exchangeToMono() and exchangeToFlux()`返回的flux或mono完成时clientResponse对象将会被释放从而避免内存和连接的泄露。
因而,`response`不能在更下游进行反序列化,`具体的反序列化过程由提供的方法来声明如何进行decode`
## Request Body
对于webclient可以加载任意异步类型对象到request body例如`Mono`
```java
Mono<Person> personMono = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body(personMono, Person.class)
.retrieve()
.bodyToMono(Void.class);
```
同样的,也可以加载多个异步对象,例如`Flux`类型
```java
Flux<Person> personFlux = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(personFlux, Person.class)
.retrieve()
.bodyToMono(Void.class);
```
当不想从异步类型的对象加载值,而是想加载已有的值时,可以使用`bodyValue`方法:
```java
Person person = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(person)
.retrieve()
.bodyToMono(Void.class);
```
### Form Data
如果想要发送form data可以为body指定`MultiValueMap<String, String>`类型的值content会被自动设置为`application/x-www-form-urlencoded`。使用示例如下所示:
```java
MultiValueMap<String, String> formData = ... ;
Mono<Void> result = client.post()
.uri("/path", id)
.bodyValue(formData)
.retrieve()
.bodyToMono(Void.class);
```
同时,可以使用`BodyInserters`来构建form data示例如下所示
```java
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromFormData("k1", "v1").with("k2", "v2"))
.retrieve()
.bodyToMono(Void.class);
```
### Multipart Data
为了发送multipart data需要提供`MultiValueMap<String, ?>`类型的值其中value类型为`代表part内容的object对象``代表part内容和headers的HttpEntity对象`
`MutlipartBodyBuilder`提供了便捷的api使用示例如下所示
```java
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server request
MultiValueMap<String, HttpEntity<?>> parts = builder.build();
```
通常情况下无需为每个part指定`Content-Type`,在`HttpMessageWriter`执行序列化操作时会自动的设置ContentType类型对于`Resources`类型会根据文件拓展名来决定ContentType。
`MutliValueMap`类型的值构建完成后,使用示例如下:
```java
MultipartBodyBuilder builder = ...;
Mono<Void> result = client.post()
.uri("/path", id)
.body(builder.build())
.retrieve()
.bodyToMono(Void.class);
```
除了使用`MultipartBodyBuilder`之外,还可以使用`BodyInserters`来构建multipart body示例如下
```java
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
.retrieve()
.bodyToMono(Void.class);
```
### PartEvent
如果需要提供多个multipart data可以使用`PartEvent`类型:
- form fields可以通过`FormPartEvent::create`来创建
- file uploads可以通过`FormPartEvent::create`来创建
可以通过`Flux.concat`方法来对其进行拼接,示例如下所示:
```java
Resource resource = ...
Mono<String> result = webClient
.post()
.uri("https://example.com")
.body(Flux.concat(
FormPartEvent.create("field", "field value"),
FilePartEvent.create("file", resource)
), PartEvent.class)
.retrieve()
.bodyToMono(String.class);
```
## Filters
可以通过`WebClient.Builder`注册client filter其可以针对请求进行拦截和修改示例如下所示
```java
WebClient client = WebClient.builder()
.filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request)
.header("foo", "bar")
.build();
return next.exchange(filtered);
})
.build();
```
其可被用作认证,示例如下:
```java
WebClient client = WebClient.builder()
.filter(basicAuthentication("user", "password"))
.build();
```
通过修改WebClient对象filter可被添加和删除修改后的新webClient对象并不会影响之前的对象示例如下
```java
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
.build();
```
### filter处理response
在使用filter时需要`确保response content总是被消费`
- 当filter在处理response时需要确保filter总是
- 消费response content
- 或将response传递到下游
示例代码如下所示:
```java
public ExchangeFilterFunction renewTokenFilter() {
return (request, next) -> next.exchange(request).flatMap(response -> {
if (response.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
return response.releaseBody()
.then(renewToken())
.flatMap(token -> {
ClientRequest newRequest = ClientRequest.from(request).build();
return next.exchange(newRequest);
});
} else {
return Mono.just(response);
}
});
}
```
如下示例创建了自定义的filter用于计算POST和PUT的`multipart/form-data`请求的`Content-Length`
```java
public class MultipartExchangeFilterFunction implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
if (MediaType.MULTIPART_FORM_DATA.includes(request.headers().getContentType())
&& (request.method() == HttpMethod.PUT || request.method() == HttpMethod.POST)) {
return next.exchange(ClientRequest.from(request).body((outputMessage, context) ->
request.body().insert(new BufferingDecorator(outputMessage), context)).build()
);
} else {
return next.exchange(request);
}
}
private static final class BufferingDecorator extends ClientHttpRequestDecorator {
private BufferingDecorator(ClientHttpRequest delegate) {
super(delegate);
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return DataBufferUtils.join(body).flatMap(buffer -> {
getHeaders().setContentLength(buffer.readableByteCount());
return super.writeWith(Mono.just(buffer));
});
}
}
}
```
## Attributes
可以向request中添加attributes并且可以通过attributes来影响filter中的行为
```java
WebClient client = WebClient.builder()
.filter((request, next) -> {
Optional<Object> usr = request.attribute("myAttribute");
// ...
})
.build();
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.bodyToMono(Void.class);
}
```
除此之外,可以通过`org.springframework.web.reactive.function.client.DefaultWebClientBuilder#defaultRequest`方法为`webClient`注册一个callback回调该回调可为所有请求插入attribute。