From cf6f069e7906c2404927bf247edb2218e9e00c5e Mon Sep 17 00:00:00 2001 From: asahi Date: Fri, 28 Mar 2025 01:34:50 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBspring=20reactor?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 43 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index f074f64..3149661 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -32,6 +32,8 @@ - [同步生成`generate`](#同步生成generate) - [Sink](#sink) - [异步多线程生成`create`](#异步多线程生成create) + - [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配) + - [`Flux.create` backpressure](#fluxcreate-backpressure) # Reactor @@ -412,6 +414,41 @@ Flux.generate(() -> 0, (s, sink) -> { > `Flux.create`并不会将你的代码变为并行或是异步的. > -> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法,在create lambda中执行长阻塞操作仍然会阻塞item的处理,因为item的source产生和下游处理都处于同一线程中,但是上游对线程的阻塞可能导致下游的消费逻辑无法被执行。 -> -> \ No newline at end of file +> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法,在create lambda中执行长阻塞操作仍然会阻塞item的处理,因为item的source产生和下游处理都处于同一线程中,但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。 + + +##### `Flux.create`和`listener based api`进行适配 +街射使用基于listener的api,api定义如下: +```java +interface MyEventListener { + void onDataChunk(List chunk); + void processComplete(); +} +``` +可以使用`Flux.create`将其与Flux相桥接: +```java +Flux bridge = Flux.create(sink -> { + myEventProcessor.register( + new MyEventListener() { + + public void onDataChunk(List chunk) { + for(String s : chunk) { + sink.next(s); + } + } + + public void processComplete() { + sink.complete(); + } + }); +}); +``` +##### `Flux.create` backpressure +可以通过指定`OverflowStrategy`来优化backpressure行为,OverflowStrategy可选值如下: +- `IGNORE`: 该选项会完全无视下游的backpressure request,当下游queue被填充满时,会抛出`IllegalStateException`异常 + > 部分操作符(例如`publishOn`),其subscriber会内置queue,用于存储上游通过`onNext`推送的数据。 + > + > 当queue满时,将会抛出`IllegalStateException`异常 +- `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常 + > 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数,例如,若下游只发送了`request(1)`,但是source产生了两个数据,调用了两次`sink.next`,那么第二次调用时`requested`已经为0,会调用`onOverflow`方法发送`error`信号 +