Files
rikako-note/spring/webflux/webclient.md
2025-05-24 21:29:26 +08:00

14 KiB
Raw Blame History

WebClient

WebClient基于Reactor提供了functional, fluent API

WebClient是非阻塞的其依赖的codecs和server端使用的codecs相同。

Configuration

创建WebClient最简单的方式是通过静态工厂方法:

  • WebClient.create()
  • WebClient.create(String baseUrl)

除此之外,也可以通过WebClient.builder()来指定更多选项:

  • uriBuilderFactory 自定义uriBuilderFactory用于创建UriBuilderUriBuilder包含共享的配置例如base URI等
  • defaultUriVariables: 在拓展uri templates时使用到的默认值
  • defaultHeader对每个请求都包含的headers
  • defaultCookie每个请求都包含的Cookie
  • defaultRequest 对每个请求进行自定义的Consumer
  • filter对于每个请求的client filter
  • exchangeStrategies自定义http message的reader/writer
  • clientConnectorhttp client library设置
  • observationRegistry the registry to use for enabling Observability support
  • observationConvention: an optional, custom convention to extract metadata for recorded observations.

创建WebClient的示例如下

WebClient client = WebClient.builder()
		.codecs(configurer -> ... )
		.build();

一旦被创建后WebClient是不可变的但是,可以对其进行克隆并对副本进行修改,示例如下:

WebClient client1 = WebClient.builder()
		.filter(filterA).filter(filterB).build();

WebClient client2 = client1.mutate()
		.filter(filterC).filter(filterD).build();

// client1 has filterA, filterB

// client2 has filterA, filterB, filterC, filterD

MaxInMemorySize

Codecs对于缓存在内存中的数据大小存在限制避免导致应用的内存问题。默认情况下该值被设置为256KB,如果该大小不够,那么将会见到如下报错:

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer

如果想要修改该默认的codecs限制可以使用如下配置

WebClient webClient = WebClient.builder()
		.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
		.build();

reactor netty

为了自定义reactor netty配置可以提供一个预先定义的HttpClient

HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);

WebClient webClient = WebClient.builder()
		.clientConnector(new ReactorClientHttpConnector(httpClient))
		.build();

Resources

默认情况下,HttpClient将会参与使用reactor.netty.http.HttpResources中保存的全局reactor netty resources。

HttpResources中包含event loop threads和connection poolHttpClient会使用这些共享的资源。对于基于event loop的并发更倾向使用固定、共享的资源直到进程退出时这些全局资源才会被释放。

如果server的生命周期和进程相同那么对全局资源无需手动释放。但若server在进程的生命周期中可能会启动和停止那么可以声明一个ReactorResourceFactory类型的bean并设置globalResources=true默认从而确保reactor netty的global resource在ApplicationContext关闭时被释放,示例如下所示:

@Bean
public ReactorResourceFactory reactorResourceFactory() {
	return new ReactorResourceFactory();
}

也可以不使用global reactor netty resources但是在该模式下你需要确保所有reactor netty client and server使用共享的资源示例如下所示

@Bean
public ReactorResourceFactory resourceFactory() {
	ReactorResourceFactory factory = new ReactorResourceFactory();
	factory.setUseGlobalResources(false); 
	return factory;
}

@Bean
public WebClient webClient() {

	Function<HttpClient, HttpClient> mapper = client -> {
		// Further customizations...
	};

	ClientHttpConnector connector =
			new ReactorClientHttpConnector(resourceFactory(), mapper); 

	return WebClient.builder().clientConnector(connector).build(); 
}

timeout

如果要设置connection timeout示例如下

import io.netty.channel.ChannelOption;

HttpClient httpClient = HttpClient.create()
		.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

WebClient webClient = WebClient.builder()
		.clientConnector(new ReactorClientHttpConnector(httpClient))
		.build();

如果要配置read/write timeout可以按照如下示例进行配置

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
		.doOnConnected(conn -> conn
				.addHandlerLast(new ReadTimeoutHandler(10))
				.addHandlerLast(new WriteTimeoutHandler(10)));

// Create WebClient...
readTimeoutHandler

在指定的时间内如果channel没有执行读取操作将会抛出ReadTimeoutException并对channel执行关闭操作

没有数据被读取的监控,是通过IdleStateHandler来实现的。

ReadTimeoutHandler继承了IdleStateHandler当指定的readIdleTime过后如果channel仍然没有指定任何的读操作,那么将会触发一个IdleStateEvent事件,并且事件状态为IdleState.READER_IDLE

writeTimeoutHandler

而writeTimeoutHandler作用则如下

  • 其继承了ChannelOutboundHandlerAdapter在通过write写入数据时其会通过ctx.executor()指定一个WriteTimeoutTask该task在指定timeout后被执行
  • 如果在writeTimeoutTask被执行时写操作仍然没有处理完成那么其将会触发一个WriteTimeoutException并且channel也会被关闭。

为所有请求配置response timeout:

HttpClient httpClient = HttpClient.create()
		.responseTimeout(Duration.ofSeconds(2));

// Create WebClient...

为指定的请求配置response timeout:

WebClient.create().get()
		.uri("https://example.org/path")
		.httpRequest(httpRequest -> {
			HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
			reactorRequest.responseTimeout(Duration.ofSeconds(2));
		})
		.retrieve()
		.bodyToMono(String.class);
connection provider

connection provider会对固定的最大数量连接进行缓存和重用。其中,最大数量针对的是单个connection pool的最大数量而每个connection pool则是和指定的remote host相关联。

当连接池中的连接都被使用时,后续连接请求将会被阻塞,阻塞时间由pendingAcquireTime决定。

各个超时含义

再restTemplate等阻塞api中各个超时的含义如下所示

readTimeout

会为serverSocket和server进行通信的socket设置SocketOptions.SO_SOCKET单位为ms。

SO_SOCKET被设置为正数值时,对socket关联的inputstream阻塞式调用,将最多阻塞该指定时长。如果调用超时,那么将会抛出java. net. SocketTimeoutException但是socket仍然有效。

connectTimeout

以ms为单位指定一个超时时长当与服务端建立连接时如果超过该时长后连接仍未建立那么将会抛出java.net.SocketTimeoutException异常。

JDK HttpClient

如果要自定义jdk httpclient配置欸可以使用如下方式

HttpClient httpClient = HttpClient.newBuilder()
	.followRedirects(Redirect.NORMAL)
	.connectTimeout(Duration.ofSeconds(20))
	.build();

ClientHttpConnector connector =
		new JdkClientHttpConnector(httpClient, new DefaultDataBufferFactory());

WebClient webClient = WebClient.builder().clientConnector(connector).build();

Jetty

自定义jetty httpclient配置的示例如下所示

HttpClient httpClient = new HttpClient();
httpClient.setCookieStore(...);

WebClient webClient = WebClient.builder()
		.clientConnector(new JettyClientHttpConnector(httpClient))
		.build();

HttpComponents

如下示例展示了如何定义Apache HttpComponents

HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
clientBuilder.setDefaultRequestConfig(...);
CloseableHttpAsyncClient client = clientBuilder.build();

ClientHttpConnector connector = new HttpComponentsClientHttpConnector(client);

WebClient webClient = WebClient.builder().clientConnector(connector).build();

retrieve()

retrieve()方法用于定义如何对相应进行提取,示例如下:

WebClient client = WebClient.create("https://example.org");

Mono<ResponseEntity<Person>> result = client.get()
		.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
		.retrieve()
		.toEntity(Person.class);

上述示例获取的是ResponseEntity<Person>,如果想要直接获取Persion类型的response body可以通过如下方式进行获取:

WebClient client = WebClient.create("https://example.org");

Mono<Person> result = client.get()
		.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
		.retrieve()
		.bodyToMono(Person.class);

默认情况下,4xx5xx的http响应将会导致WebClientResponseException如果需要自定义error handling逻辑需要使用onStatus,示例如下:

Mono<Person> result = client.get()
		.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
		.retrieve()
		.onStatus(HttpStatusCode::is4xxClientError, response -> ...)
		.onStatus(HttpStatusCode::is5xxServerError, response -> ...)
		.bodyToMono(Person.class);

Exchange

exchangeToMono方法和exchangeToFlux方法可以提供更精确的控制例如在statusCode不同时使用不同的方法来进行反序列化

Mono<Person> entityMono = client.get()
		.uri("/persons/1")
		.accept(MediaType.APPLICATION_JSON)
		.exchangeToMono(response -> {
			if (response.statusCode().equals(HttpStatus.OK)) {
				return response.bodyToMono(Person.class);
			}
			else {
				// Turn to error
				return response.createError();
			}
		});

如上所示,当exchangeToMono() and exchangeToFlux()返回的flux或mono完成时clientResponse对象将会被释放从而避免内存和连接的泄露。

因而,response不能在更下游进行反序列化,具体的反序列化过程由提供的方法来声明如何进行decode

Request Body

对于webclient可以加载任意异步类型对象到request body例如Mono

Mono<Person> personMono = ... ;

Mono<Void> result = client.post()
		.uri("/persons/{id}", id)
		.contentType(MediaType.APPLICATION_JSON)
		.body(personMono, Person.class)
		.retrieve()
		.bodyToMono(Void.class);

同样的,也可以加载多个异步对象,例如Flux类型

Flux<Person> personFlux = ... ;

Mono<Void> result = client.post()
		.uri("/persons/{id}", id)
		.contentType(MediaType.APPLICATION_STREAM_JSON)
		.body(personFlux, Person.class)
		.retrieve()
		.bodyToMono(Void.class);

当不想从异步类型的对象加载值,而是想加载已有的值时,可以使用bodyValue方法:

Person person = ... ;

Mono<Void> result = client.post()
		.uri("/persons/{id}", id)
		.contentType(MediaType.APPLICATION_JSON)
		.bodyValue(person)
		.retrieve()
		.bodyToMono(Void.class);

Form Data

如果想要发送form data可以为body指定MultiValueMap<String, String>类型的值content会被自动设置为application/x-www-form-urlencoded。使用示例如下所示:

MultiValueMap<String, String> formData = ... ;

Mono<Void> result = client.post()
		.uri("/path", id)
		.bodyValue(formData)
		.retrieve()
		.bodyToMono(Void.class);

同时,可以使用BodyInserters来构建form data示例如下所示

Mono<Void> result = client.post()
		.uri("/path", id)
		.body(fromFormData("k1", "v1").with("k2", "v2"))
		.retrieve()
		.bodyToMono(Void.class);

Multipart Data

为了发送multipart data需要提供MultiValueMap<String, ?>类型的值其中value类型为代表part内容的object对象代表part内容和headers的HttpEntity对象

MutlipartBodyBuilder提供了便捷的api使用示例如下所示

MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server request

MultiValueMap<String, HttpEntity<?>> parts = builder.build();

通常情况下无需为每个part指定Content-Type,在HttpMessageWriter执行序列化操作时会自动的设置ContentType类型对于Resources类型会根据文件拓展名来决定ContentType。

MutliValueMap类型的值构建完成后,使用示例如下:

MultipartBodyBuilder builder = ...;

Mono<Void> result = client.post()
		.uri("/path", id)
		.body(builder.build())
		.retrieve()
		.bodyToMono(Void.class);

除了使用MultipartBodyBuilder之外,还可以使用BodyInserters来构建multipart body示例如下

Mono<Void> result = client.post()
		.uri("/path", id)
		.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
		.retrieve()
		.bodyToMono(Void.class);

PartEvent

如果需要提供多个multipart data可以使用PartEvent类型:

  • form fields可以通过FormPartEvent::create来创建
  • file uploads可以通过FormPartEvent::create来创建

可以通过Flux.concat方法来对其进行拼接,示例如下所示:

Resource resource = ...
Mono<String> result = webClient
	.post()
	.uri("https://example.com")
	.body(Flux.concat(
			FormPartEvent.create("field", "field value"),
			FilePartEvent.create("file", resource)
	), PartEvent.class)
	.retrieve()
	.bodyToMono(String.class);

Filters

可以通过WebClient.Builder注册client filter其可以针对请求进行拦截和修改示例如下所示

WebClient client = WebClient.builder()
		.filter((request, next) -> {

			ClientRequest filtered = ClientRequest.from(request)
					.header("foo", "bar")
					.build();

			return next.exchange(filtered);
		})
		.build();

其可被用作认证,示例如下:

WebClient client = WebClient.builder()
		.filter(basicAuthentication("user", "password"))
		.build();

通过修改WebClient对象filter可被添加和删除修改后的新webClient对象并不会影响之前的对象示例如下

WebClient client = webClient.mutate()
		.filters(filterList -> {
			filterList.add(0, basicAuthentication("user", "password"));
		})
		.build();