doc: 阅读spring reactor文档

This commit is contained in:
asahi
2025-03-31 00:34:30 +08:00
parent ffcfb75f9e
commit 28af3a569b

View File

@@ -35,7 +35,8 @@
- [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配)
- [`Flux.create` backpressure](#fluxcreate-backpressure)
- [异步单线程生成`push`](#异步单线程生成push)
- [hybird push-pull model](#hybird-push-pull-model)
- [hybird push-pull model](#hybird-push-pull-model)
- [`sink.onRequest`](#sinkonrequest)
# Reactor
@@ -488,9 +489,39 @@ Flux<String> bridge = Flux.push(sink -> {
});
```
#### hybird push-pull model
##### hybird push-pull model
大多数reactor operators(例如create)都使用了混合推拉的模型。在混合推拉模型中大多数事件的处理都是异步的建议采用push的方法
在混合推拉模型中consumer从source处拉取数据`在consumer发送第一次request之前source并不会emit任何数据`source可能会生产数据但是在requested余量不足时并不会调用下游的onNext根据OverflowStrategy的不同会执行不同处理`source只会在下游请求的范围内向下游推送数据`
在混合推拉模型中consumer从source处拉取数据`在consumer发送第一次request之前source并不会emit任何数据`source可能会生产数据但是在requested余量不足时并不会调用下游的onNext根据OverflowStrategy的不同会执行不同处理`source只会在下游请求的范围内向下游推送数据`
并且在最下游的subscriber调用上游`subscribe`方法时,`source的数据生成逻辑才会被触发`,调用`subscribe`方法后首先会调用subscriber的`onSubscribe`方法然后会一直沿着reactive stream向前追溯直到找到source然后调用`Flux.create`接受的`Consumer<? super FluxSink<T>>`
##### `sink.onRequest`
sink还提供了`onRequest`方法,其接受一个`LongConsumer`类似的参数可以为sink注册一个`onRequest`回调,后续针对`sink`(sink实现了`Subscription`)调用request方法时`consumer`都会被调用。
`sink.onRequest`的使用示例如下:
```java
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
//
for(String s : messages) {
sink.next(s);
}
}
});
sink.onRequest(n -> {
// 此处代码将会在`sink.request`被调用时触发
// 主动向processor拉取数据
List<String> messages = myMessageProcessor.getHistory(n);
for(String s : messages) {
sink.next(s);
}
});
});
```