diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 26d0d93..edf4e6e 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -35,7 +35,8 @@ - [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配) - [`Flux.create` backpressure](#fluxcreate-backpressure) - [异步单线程生成`push`](#异步单线程生成push) - - [hybird push-pull model](#hybird-push-pull-model) + - [hybird push-pull model](#hybird-push-pull-model) + - [`sink.onRequest`](#sinkonrequest) # Reactor @@ -488,9 +489,39 @@ Flux bridge = Flux.push(sink -> { }); ``` -#### hybird push-pull model +##### hybird push-pull model 大多数reactor operators(例如create),都使用了混合推拉的模型。在混合推拉模型中,大多数事件的处理都是异步的(建议采用push的方法)。 -在混合推拉模型中,consumer从source处拉取数据,`在consumer发送第一次request之前,source并不会emit任何数据`(source可能会生产数据,但是在requested余量不足时,并不会调用下游的onNext,根据OverflowStrategy的不同会执行不同处理)。`source只会在下游请求的范围内向下游推送数据。` +在混合推拉模型中,consumer从source处拉取数据,`在consumer发送第一次request之前,source并不会emit任何数据`(source可能会生产数据,但是在requested余量不足时,并不会调用下游的onNext,根据OverflowStrategy的不同会执行不同处理)。`source只会在下游请求的范围内向下游推送数据`。 + +并且,在最下游的subscriber调用上游`subscribe`方法时,`source的数据生成逻辑才会被触发`,调用`subscribe`方法后,首先会调用subscriber的`onSubscribe`方法,然后会一直沿着reactive stream向前追溯,直到找到source,然后调用`Flux.create`接受的`Consumer>`。 + +##### `sink.onRequest` +sink还提供了`onRequest`方法,其接受一个`LongConsumer`类似的参数,可以为sink注册一个`onRequest`回调,后续针对`sink`(sink实现了`Subscription`)调用request方法时,`consumer`都会被调用。 + +`sink.onRequest`的使用示例如下: +```java +Flux bridge = Flux.create(sink -> { + myMessageProcessor.register( + new MyMessageListener() { + + public void onMessage(List messages) { + // + for(String s : messages) { + sink.next(s); + } + } + }); + sink.onRequest(n -> { + // 此处代码将会在`sink.request`被调用时触发 + // 主动向processor拉取数据 + List messages = myMessageProcessor.getHistory(n); + for(String s : messages) { + sink.next(s); + } + }); +}); +``` +