From ffcfb75f9ea3f2532da4526d3b5088e0305d5713 Mon Sep 17 00:00:00 2001 From: asahi Date: Sat, 29 Mar 2025 16:30:27 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBreactor=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 43 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 7d1988b..26d0d93 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -34,6 +34,8 @@ - [异步多线程生成`create`](#异步多线程生成create) - [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配) - [`Flux.create` backpressure](#fluxcreate-backpressure) + - [异步单线程生成`push`](#异步单线程生成push) + - [hybird push-pull model](#hybird-push-pull-model) # Reactor @@ -452,4 +454,43 @@ Flux bridge = Flux.create(sink -> { - `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常 > 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数,例如,若下游只发送了`request(1)`,但是source产生了两个数据,调用了两次`sink.next`,那么第二次调用时`requested`已经为0,会调用`onOverflow`方法发送`error`信号 -- `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前,如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。 \ No newline at end of file +- `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前,如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。 +- `LATEST`: 当上游想要推送数据到下游时,如果下游`requested`不足,那么上游会用最新的数据覆盖下游之前缓存的数据 + - 例如,下游堆积事件`1`,此时上游推送事件`2`,则`2`会覆盖`1`,之后上游再推送数据`3`,`3`会覆盖`2`...之后,下游调用`request(1)`后,获取到的是最新的数据`3`,数据`1,2`都被覆盖 + - 其实现是通过`LatestAsyncSink`中的`queue`来实现的,queue是一个`AtomicReference`类型的field,当调用`sink.next`向下游发送数据时,如果下游不满足requested,那么将会将值`set`到queue中,从而实现后面的数据覆盖前面的数据 +- `BUFFER`(默认): `BufferAsyncSink`中会存在一个无界队列,如果当上游调用`sink.next`尝试向下游发送数据时,如果下游requested条件不满足,会将数据缓存在无界队列中。(`使用无界队列进行缓存可能会导致OOM`) + +#### 异步单线程生成`push` +`push`介于`generate`和`create`之间,其用于对单个生产者产生的事件进行处理,`push`和`generate`与`create`的相似点如下: +- 和`create`类似,其可以是异步的,并且可以通过`OverflowStrategy`来管理backpressure +- 和`generate`类似,对于`push`,其`sink.next`,`sink.complete`,`sink.error`在指定时刻只能被一个线程调用 + +其对异步事件的桥接示例如下: +```java +Flux bridge = Flux.push(sink -> { + myEventProcessor.register( + new SingleThreadEventListener() { + + public void onDataChunk(List chunk) { + for(String s : chunk) { + sink.next(s); + } + } + + public void processComplete() { + sink.complete(); + } + + public void processError(Throwable e) { + sink.error(e); + } + }); +}); +``` + +#### hybird push-pull model +大多数reactor operators(例如create),都使用了混合推拉的模型。在混合推拉模型中,大多数事件的处理都是异步的(建议采用push的方法)。 + +在混合推拉模型中,consumer从source处拉取数据,`在consumer发送第一次request之前,source并不会emit任何数据`(source可能会生产数据,但是在requested余量不足时,并不会调用下游的onNext,根据OverflowStrategy的不同会执行不同处理)。`source只会在下游请求的范围内向下游推送数据。` + +