doc: 阅读reactor文档

This commit is contained in:
asahi
2025-03-29 16:30:27 +08:00
parent 45de3952b7
commit ffcfb75f9e

View File

@@ -34,6 +34,8 @@
- [异步多线程生成`create`](#异步多线程生成create) - [异步多线程生成`create`](#异步多线程生成create)
- [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配) - [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配)
- [`Flux.create` backpressure](#fluxcreate-backpressure) - [`Flux.create` backpressure](#fluxcreate-backpressure)
- [异步单线程生成`push`](#异步单线程生成push)
- [hybird push-pull model](#hybird-push-pull-model)
# Reactor # Reactor
@@ -452,4 +454,43 @@ Flux<String> bridge = Flux.create(sink -> {
- `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常 - `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常
> 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数例如若下游只发送了`request(1)`但是source产生了两个数据调用了两次`sink.next`,那么第二次调用时`requested`已经为0会调用`onOverflow`方法发送`error`信号 > 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数例如若下游只发送了`request(1)`但是source产生了两个数据调用了两次`sink.next`,那么第二次调用时`requested`已经为0会调用`onOverflow`方法发送`error`信号
- `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。 - `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。
- `LATEST`: 当上游想要推送数据到下游时,如果下游`requested`不足,那么上游会用最新的数据覆盖下游之前缓存的数据
- 例如,下游堆积事件`1`,此时上游推送事件`2`,则`2`会覆盖`1`,之后上游再推送数据`3``3`会覆盖`2`...之后,下游调用`request(1)`后,获取到的是最新的数据`3`,数据`1,2`都被覆盖
- 其实现是通过`LatestAsyncSink`中的`queue`来实现的queue是一个`AtomicReference`类型的field当调用`sink.next`向下游发送数据时如果下游不满足requested那么将会将值`set`到queue中从而实现后面的数据覆盖前面的数据
- `BUFFER`(默认): `BufferAsyncSink`中会存在一个无界队列,如果当上游调用`sink.next`尝试向下游发送数据时如果下游requested条件不满足会将数据缓存在无界队列中。`使用无界队列进行缓存可能会导致OOM`
#### 异步单线程生成`push`
`push`介于`generate``create`之间,其用于对单个生产者产生的事件进行处理,`push``generate``create`的相似点如下:
-`create`类似,其可以是异步的,并且可以通过`OverflowStrategy`来管理backpressure
-`generate`类似,对于`push`,其`sink.next`,`sink.complete`,`sink.error`在指定时刻只能被一个线程调用
其对异步事件的桥接示例如下:
```java
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
public void processError(Throwable e) {
sink.error(e);
}
});
});
```
#### hybird push-pull model
大多数reactor operators(例如create)都使用了混合推拉的模型。在混合推拉模型中大多数事件的处理都是异步的建议采用push的方法
在混合推拉模型中consumer从source处拉取数据`在consumer发送第一次request之前source并不会emit任何数据`source可能会生产数据但是在requested余量不足时并不会调用下游的onNext根据OverflowStrategy的不同会执行不同处理`source只会在下游请求的范围内向下游推送数据。`