- [Reactor](#reactor) - [Reactive Programming](#reactive-programming) - [命令式迁移到响应式](#命令式迁移到响应式) - [可组合性与可读性](#可组合性与可读性) - [Assembly Line](#assembly-line) - [Operators](#operators) - [Nothing Happens Until You subscribe()](#nothing-happens-until-you-subscribe) - [backpressure](#backpressure) - [hot \& cold](#hot--cold) - [Subscriber和Publisher](#subscriber和publisher) - [Publisher](#publisher) - [Subscriber](#subscriber) - [Reactor Core](#reactor-core) - [Flux `0...n`](#flux-0n) - [Mono `0...1`](#mono-01) - [创建Mono/Flux并进行订阅的方式](#创建monoflux并进行订阅的方式) - [String sequence](#string-sequence) - [Flux.empty](#fluxempty) - [Flux.range](#fluxrange) - [Lambda Subscribe](#lambda-subscribe) - [Disposable](#disposable) - [BaseSubscriber](#basesubscriber) - [backpressure](#backpressure-1) - [buffer](#buffer) - [prefetch](#prefetch) - [初始请求](#初始请求) - [补充优化(Replenishing Optimization)](#补充优化replenishing-optimization) - [limitRate](#limitrate) - [lowTie](#lowtie) - [limitRequest](#limitrequest) - [Create Sequence](#create-sequence) - [同步生成`generate`](#同步生成generate) - [Sink](#sink) - [异步多线程生成`create`](#异步多线程生成create) - [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配) - [`Flux.create` backpressure](#fluxcreate-backpressure) - [异步单线程生成`push`](#异步单线程生成push) - [hybird push-pull model](#hybird-push-pull-model) - [`sink.onRequest`](#sinkonrequest) - [`onCancel` \& `onDispose`](#oncancel--ondispose) - [Handle](#handle) # Reactor ## Reactive Programming 响应式编程是一种异步编程范式,关注于`数据流`和`状态变化的传播`。java的响应式编程接口被包含在java9的`Flow`中。 响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用`next`方法,reactive stream是基于`发布/订阅`模型的。 > 迭代器模式是`pull-based`,而reactive stream为`push-based`。 ### 命令式迁移到响应式 #### 可组合性与可读性 “可组合性"代表编排多个异步任务的能力,通过“组合”,可以将前一个异步任务的输出作为后一个异步任务的输入。或者,可以按照fork-join的形式对异步任务进行编排。 reactor同样能解决“可读性”的问题,在使用传统的callback model编写程序时,随着逻辑的复杂,异步进行的层数也会增加,这将会极大降低代码的可读性和可维护性。 > 在使用call model时,通常需要在回调中执行另一个回调,回调的嵌套通通常会被称为`callback heil`。 reactor提供了复杂的“组合”选项,能够反映抽象异步进程的组织,并且,所有的内容通常都会位于同一层级。 #### Assembly Line 响应式应用中的数据处理类似于流水线,其中,reactor既是传送带又是工作站。数据来源于`original publisher`,最终传传递给`subscriber`。 数据在从publisher传送到subscriber的过程中,可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间,受影响的workstation会向上游发送消息来控制数据的生成速率。 #### Operators 在reactor中,Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到`publisher`中,并且前一个publisher包装到一个新的publisher实例中。 故而,operator将整个chain都链接起来,数据来源于第一个publisher,并随着chain移动,依次被每个链接处理,最终由subscriber结束该过程。 #### Nothing Happens Until You subscribe() 当通过reactor编写publisher chain时,数据并不会被泵入到chain中,编写chain只是创建了异步处理的抽象描述。 通过订阅行为,将publisher和subscriber绑定到了一起,订阅行为会触发chain中的数据流。该行为通过内部的signal实现,subscriber将会发送一个`reuqest signal`,该信号会被传递到chain上游,一直被传递到source publisher。 #### backpressure `传递到上游的信号`该机制也被用于实现backpressure,在assembly line模型中,也被描述为workstation传递给上游的反馈信号,当workstation处理消息比上游workstation满时,会发送该反馈。 reactive stream定义的机制接近于上述描述,其提供两种模式: - unbounded mode:source publisher可以按其最高速率不受限制的推送数据 - request mode:通过`request`机制向source publisher发送信号,告知其准备好处理最多`n`个元素。 中间的operator也可以在传输过程中对请求做出修改,例如`buffer` operator可以将elements分割为以10个为单位的batch,如果subscriber请求一个buffer,那么上游source publisher可以产生10个元素。 通过backpressure,可以将`push`模型转化为`push-pull`模型: - 当上游的n个元素已经准备好时,下游可以从上游拉取n个元素 - 当上有没有准备好n个元素时,后续如果n个元素被准备好,其将会被上游推送 #### hot & cold 对于响应式序列,其可以分为两种: - cold sequence:对于cold sequence,会为每个订阅者重新开始流程,包括source publisher。例如source中若封装了http调用,会为每个subscriber都执行一个新的http请求 - hot sequence:subscriber只有在其订阅后才收到信号,即使没有subscriber在监听,hot sequence仍然能够发送signal ## Subscriber和Publisher ### Publisher 对于publisher,其提供了`subscribe`方法供subscriber进行注册,在执行subscribe方法并向其传入`Subscriber`对象后,上游publisher会调用下游的`onSubscribe`方法,并向`onSubscribe`方法传入`Subscription`对象。下游可以通过`Subscription`对象调用`request(n)`请求。 > 若中间存在operator(例如map)在担任publisher角色的同时,还对上游进行了订阅,那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。 > > `任何变更状态的操作都只在实际subscriber执行订阅操作后被触发`。 ### Subscriber 当下游调用`request(n)`方法之后,会向上游请求`n`个数据。上游会向下游发送`onNext`信号来传输生成的数据。 ## Reactor Core reactor引入了两个实现`Publisher`的类:`Mono`和`Flux`。 - Flux:代表包含`0...N`个items的reactive sequence - Mono:代表包含`0...1`个items的reactive sequence 上述两个类代表了在异步处理场景中的大致基数。 - Mono:例如,对于http请求的场景,一个请求只会产生一个响应,故而对响应执行`count`操作并没有任何意义。此时,可以通过`Mono`来代表http调用的结果,`Mono`中只提供了上下文中包含`0...1`个元素的对应操作 - 当执行某些`可能会改变异步处理中最大基数的操作`时,可能会导致类型的改变,例如执行`Flux`中的`count`操作将会返回`Mono`的类型 ### Flux `0...n` ![alt text](image.png) `Flux`是一个标准的`Publisher`,代表基数为`0...n`的异步序列,其可以被`completion signal`或异常所终止。根据reactive stream标准,存在三种signal,且信号会转化为对下游`onNext`、`onComplete`、`onError`的调用。 Flux是一个通用的reactive类型,并且,所有的event type都是可选的。 - 当没有`onNext`事件但是存在`onComplete`事件,代表一个空的有限序列 - 当`onNext`和`onComplete`事件都不存在时,代表一个空的无限序列 - 无限序列并不一定为空,例如`Flux.interval(Duration)`会产生一个`Flux`,其是无限的并且发送tick ### Mono `0...1` `Mono`是一个标准的`Publisher`,其通过`onNext`信号发送至多一个item,然后再结束时发送`onComplete`信号结束(成功场景);或直接发送`onError`信号结束(失败场景)。 大多数Mono实现在调用完subscriber的`onNext`方法之后,预计会立马调用subscriver的`onComplete`方法。但是,`Mono.never`是一个例外,其并不会发送任何信号,并且其`onNext`和`onError`的组合是被明确禁止的。 ### 创建Mono/Flux并进行订阅的方式 #### String sequence 如果想要创建String序列,可以通过如下方式: ```java Flux seq1 = Flux.just("foo", "bar", "foobar"); List iterable = Arrays.asList("foo", "bar", "foobar"); Flux seq2 = Flux.fromIterable(iterable); ``` #### Flux.empty ```java Mono noData = Mono.empty(); ``` #### Flux.range 在下面示例中,`Flux.range`第一个参数是range的起始值,第二个参数是要产生的元素个数 ```java Flux numbersFromFiveToSeven = Flux.range(5, 3); ``` 故而,其产生的内容为`5,6,7`。 #### Lambda Subscribe 在进行订阅时,`Flux`和`Mono`使用了lambda,在调用subscribe时,有如下几种重载的选择: ```java // 订阅并触发sequence的产生 subscribe(); // 对每个产生的值通过consumer执行处理操作 subscribe(Consumer consumer); // 在reactive stream异常终止时,对error进行处理 subscribe(Consumer consumer, Consumer errorConsumer); // 在sequence处理完时,执行额外的complete操作 subscribe(Consumer consumer, Consumer errorConsumer, Runnable completeConsumer); // 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作 // 该重载已废弃 subscribe(Consumer consumer, Consumer errorConsumer, Runnable completeConsumer, Consumer subscriptionConsumer); ``` 对于subscribe的使用,示例如下 ```java Flux.range(5,3) .map(x->{ if(x<7) { return x; } throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x)); }) .subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v), (e) -> { System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage()); }, ()->{ System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended"); }); ``` 其执行结果如下: ``` [main]-5 [main]-6 [main]-Error Caught: fucking value {7} equals or greater than 7 ``` #### Disposable 上述`subscribe`方法的返回类型为`Disposable`,该接口代表subscriber对publisher的订阅是可取消的,如需取消订阅,调用`dispose`方法即可。 对于Mono和Flux而言,source publisher应该在接收到cancellation信号之后停止产生元素,`并不能保证取消信号是即时的`。(`若source产生数据的速度过快,可能在接收到cancel信号之前,source就已经complete`)。 `Disposables`类中存在一些对`Disposable`的工具方法,例如`swap`和`composite`。 #### BaseSubscriber `subscribe`方法除了接收lambda外,还存在更通用的重载方法,接收`Subscriber`类型的参数。 在这种场景下,传参可以继承`BaseSubscriber`类。 并且,`BaseSubscriber`该类是一次性的,`其只能够订阅一个publisher,如果其订阅了第二个publisher,那么其对第一个publisher的订阅将会被取消`。 > `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。 示例如下: ```java Flux.range(3, 5) .subscribe(new BaseSubscriber() { private Subscription subscription; @Override protected void hookOnSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(5); } @Override protected void hookOnNext(Integer value) { log.info("onNext called: {}", value); // this.subscription.request(1); } @Override protected void hookOnComplete() { log.info("onComplete called"); super.hookOnComplete(); } @Override protected void hookOnError(Throwable throwable) { log.info("onError called: {}", throwable.getMessage()); super.hookOnError(throwable); } }); ``` 上述示例中,通过向`subscribe`方法中传递自定义的`BaseSubscriber`来实现对上游的订阅,执行结果如下: ``` 2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3 2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4 2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5 2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6 2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7 2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called ``` `BaseSubscriber`的`hookOnSubscribe`默认实现如下: ```java protected void hookOnSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); } ``` 其请求的数量为`Long.MAX_VALUE`,代表其publisher为`effectively unbounded`。 可以通过重写`hookOnSubscribe`方法来自己指定request数量,如果需要自己指定请求数量,最少需要重写`hookOnSubscribe`和`hookOnNext`方法。 `BaseSubscriber`提供了`requestUnbounded`方法(`其方法和request(Long.MAX_VALUE)等价`)和`cancel`方法。 除了上述列出的hook外,`BaseSubscriber`还支持如下hooks: - hookOnComplete - hookOnError - hookOnCancel - hookFinally(当sequence终止时,都会被调用,可以用参数来判断终止类型为complete或error) - hookFinally的调用顺序位于hookOnComplete和hookOnError之后 #### backpressure 在reactor的backpressure实现中,consumer pressure传播到上游source的机制是向上游operator发送`request`请求。当前已发送的请求个数之和被称为`demand`,并且`demand`的上限为`Long.MAX_VALUE`,当demand的值为`Long.MAX_VALUE`或更大时,代表`unbound request`。 > `unbound request`代表尽可能快的产生数据,即backpressure关闭。 在reactive chain中,第一个请求来自于`final subscriber`,其在订阅时(onSubscribe)会发送第一个`request`请求。目前,大多直接订阅的方法都会通过`Long.MAX_VALUE`创建一个unbounded request,示例如下: - `subcribe()`方法和大多数基于lambda的重载方法(除了包含`Consumer`参数的重载) - `block`, `blockFirst`, `blockLast` - 通过`toIterable`或`toStream`进行遍历 目前,定义初始请求最简单的方法为`通过BaseSubscription对上游进行订阅,并且重写onSubscribe方法`。 #### buffer 在reactor中,可以通过部分operator进行request的reshape。示例如下: ```java Flux.range(1, 1000) .buffer(3) .subscribe(new BaseSubscriber>() { @Override protected void hookOnSubscribe(Subscription subscription) { subscription.request(2); } @Override protected void hookOnNext(List value) { for (Integer v : value) { log.info("item received: {}", v); } } }); ``` 在上述示例中,`request(2)`代表请求`2个buffer`,而每个`buffer`中包含`3`个`Integer`,故而总共会接收到`2 * 3 = 6`个元素。 输出如下: ``` 2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1 2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2 2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3 2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4 2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5 2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6 ``` #### prefetch `prefetch`机制是一种backpressure的优化策略,用于确保下游处理数据时上游的数据能够即时补充,对吞吐量和资源利用率进行平衡。 prefetch机制通常分为如下部分: ##### 初始请求 在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时,会向上游发送一个初始请求,请求大小为32个元素。 ##### 补充优化(Replenishing Optimization) prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75%的预取元素已经被处理(32 *0.75 = 24),其自动会向上游发送一个新请求,要求补充75%的prefetch量。该过程是动态的,会在整个数据流处理过程中持续进行。 > 例如,prefetch的大小为10,其limit对应的值为`ceil(10 * 0.75) = 8`,每当其下游被处理的元素达到8个,其会重新请求8个数据,并且将`被下游处理元素的个数`重置,重新从0开始计数,直到该值再达到8,再次发送请求 > ##### 预加载数据 > 补充优化的优化点在于,当预取数据还剩余25%(8个)未被处理时,提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。 > ##### 平滑处理 > 通过prefetch逐步请求新数据,且每次请求固定的量,可以保证处理数据速率的稳定。如果source端同时来源大量数据,那么若不进行平滑处理,则大量数据的同时处理可能导致竞争,令性能下降。 有如下operators可以对请求的prefetch进行调整 #### limitRate 除了`prefetch`之外,还可以通过`limitRate`或`limitRequest`来直接针对请求进行调节。 `limitRate(N)`将来自下游的请求进行拆分,当来自下游的请求被传播到上游时,其会被拆分为small batches。例如,如果下游调用`request(100)`,此时`limitRate(10)`将会将其拆分为10个`request(10)`再传播给上游。并且,在此基础上,limitRate还实现了prefetch中的补充优化。 除了`limitRate(N)`之外(当没有传递`lowTie`时,limit默认会取`N - N>>2`,即`ceil(N * 0.75)`),limtRate还存在`limitRate(highTie, lowTie)`的重载方法。 ##### lowTie 当lowTie取不同值时,其补充策略如下: - `lowTie<=0`:如果`lowTie`小于或等于0,则limit取值和`prefetch`值相同,仅当prefetch中所有元素都被下游处理完时,limtRate operator才会向上游请求数据 - `lowTie>=prefetch`: 当lowTie大于或等于prefetch时,limit取值为`ceil(prefetch * 0.75)`,此时,补充策略和prefetch默认相同,当75%的数据被下游处理时,limitRte会重新向上游请求75%的数据 - 若lowTie位于`(0, prefetch)`区间之间 - 若prefetch的值为`Long.MAX_VALUE`,那么limit的值也为`Long.MAX_VALUE` - 若prefetch值不为`Long.MAX_VALUE`,那么limit的值为`lowTie`,即`lowTie`的值即为消费后重新拉取的限制值 #### limitRequest `limitRequest(N)`用于限制下游请求的总个数。例如,向`limitRequest(10)`发起两次request,一次请求3一次请求8,那么最后下游只会接收到10个元素。 > 一旦source发送的元素个数超过`N`时,`limitRequest`将会认为sequence已经完成,会向下游发送onComplete信号 ### Create Sequence #### 同步生成`generate` 创建`Flux`的最简单形式是通过`generate`方法生成,其接收一个`Consumer>`类型的参数。 ##### Sink sink是spring reactor中的一个核心抽象概念,其可以被理解为数据流的出口或`发射器`,`负责将元素、error或complete信号推送到下游订阅者`。 sink可以分为如下种类: - 同步/异步 - 单次/多次发射 sink的核心api如下: - `next(T value)`: 向下游发送数据 - `complete`:表示数据流正常结束 - `error(throwable)`:代表数据流因为错误而终止 generate方法用于产生同步且`one by one`的emission,即sink为`SynchronousSink`且其单次invocation中`sink.next`只能够被调用一次。示例如下: ```java Flux.generate(sink -> { String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的) sink.next(data); // 仅调用一次 if (isEndOfData()) sink.complete(); }); ``` > 在调用`sink.next`后,可以调用`sink.complete`或`sink.error`,但是对`complete`和`error`的调用都是可选的。 示例如下: ```java Flux.generate(sink -> { sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())); }) .take(10) .subscribe(System.out::println); ``` 通过generate的另一个重载方法`generate(Callable stateSupplier, BiFunction, S> generator)`可以维护一个状态并进行管理,其中`S`为状态的对象类型。对于重载方法的两个参数,其含义如下: - `stateSupplier`:该参数用于提供一个初始状态,每当有新请求进入时,都会调用supplier生成初始状态 - `generator`:generator会返回一个新的state 示例如下: ```java Flux.generate(() -> 0, (s, sink) -> { // System.out.printf("%s emitted\n", s); sink.next(s); return s + 1; }) .take(5) .subscribe(System.out::println); ``` 其上游生成的整数依次增加,输出如下: ``` 0 1 2 3 4 ``` #### 异步多线程生成`create` `Flux.create`针对每个round能够产生多个元素,其暴露的sink类型为`FluxSink`。相对于`Flux.generate`,`Flux.create`方法并没有接收`state`的重载版本。 > `Flux.create`并不会将你的代码变为并行或是异步的. > > 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法,在create lambda中执行长阻塞操作仍然会阻塞item的处理,因为item的source产生和下游处理都处于同一线程中,但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。 ##### `Flux.create`和`listener based api`进行适配 街射使用基于listener的api,api定义如下: ```java interface MyEventListener { void onDataChunk(List chunk); void processComplete(); } ``` 可以使用`Flux.create`将其与Flux相桥接: ```java Flux bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener() { public void onDataChunk(List chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); }); ``` ##### `Flux.create` backpressure 可以通过指定`OverflowStrategy`来优化backpressure行为,OverflowStrategy可选值如下: - `IGNORE`: 该选项会完全无视下游的backpressure request,当下游queue被填充满时,会抛出`IllegalStateException`异常 > 部分操作符(例如`publishOn`),其subscriber会内置queue,用于存储上游通过`onNext`推送的数据。 > > 当queue满时,将会抛出`IllegalStateException`异常 - `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常 > 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数,例如,若下游只发送了`request(1)`,但是source产生了两个数据,调用了两次`sink.next`,那么第二次调用时`requested`已经为0,会调用`onOverflow`方法发送`error`信号 - `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前,如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。 - `LATEST`: 当上游想要推送数据到下游时,如果下游`requested`不足,那么上游会用最新的数据覆盖下游之前缓存的数据 - 例如,下游堆积事件`1`,此时上游推送事件`2`,则`2`会覆盖`1`,之后上游再推送数据`3`,`3`会覆盖`2`...之后,下游调用`request(1)`后,获取到的是最新的数据`3`,数据`1,2`都被覆盖 - 其实现是通过`LatestAsyncSink`中的`queue`来实现的,queue是一个`AtomicReference`类型的field,当调用`sink.next`向下游发送数据时,如果下游不满足requested,那么将会将值`set`到queue中,从而实现后面的数据覆盖前面的数据 - `BUFFER`(默认): `BufferAsyncSink`中会存在一个无界队列,如果当上游调用`sink.next`尝试向下游发送数据时,如果下游requested条件不满足,会将数据缓存在无界队列中。(`使用无界队列进行缓存可能会导致OOM`) #### 异步单线程生成`push` `push`介于`generate`和`create`之间,其用于对单个生产者产生的事件进行处理,`push`和`generate`与`create`的相似点如下: - 和`create`类似,其可以是异步的,并且可以通过`OverflowStrategy`来管理backpressure - 和`generate`类似,对于`push`,其`sink.next`,`sink.complete`,`sink.error`在指定时刻只能被一个线程调用 其对异步事件的桥接示例如下: ```java Flux bridge = Flux.push(sink -> { myEventProcessor.register( new SingleThreadEventListener() { public void onDataChunk(List chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } public void processError(Throwable e) { sink.error(e); } }); }); ``` ##### hybird push-pull model 大多数reactor operators(例如create),都使用了混合推拉的模型。在混合推拉模型中,大多数事件的处理都是异步的(建议采用push的方法)。 在混合推拉模型中,consumer从source处拉取数据,`在consumer发送第一次request之前,source并不会emit任何数据`(source可能会生产数据,但是在requested余量不足时,并不会调用下游的onNext,根据OverflowStrategy的不同会执行不同处理)。`source只会在下游请求的范围内向下游推送数据`。 并且,在最下游的subscriber调用上游`subscribe`方法时,`source的数据生成逻辑才会被触发`,调用`subscribe`方法后,首先会调用subscriber的`onSubscribe`方法,然后会一直沿着reactive stream向前追溯,直到找到source,然后调用`Flux.create`接受的`Consumer>`。 ##### `sink.onRequest` sink还提供了`onRequest`方法,其接受一个`LongConsumer`类似的参数,可以为sink注册一个`onRequest`回调,后续针对`sink`(sink实现了`Subscription`)调用request方法时,`consumer`都会被调用。 `sink.onRequest`的使用示例如下: ```java Flux bridge = Flux.create(sink -> { myMessageProcessor.register( new MyMessageListener() { public void onMessage(List messages) { // for(String s : messages) { sink.next(s); } } }); sink.onRequest(n -> { // 此处代码将会在`sink.request`被调用时触发 // 主动向processor拉取数据 List messages = myMessageProcessor.getHistory(n); for(String s : messages) { sink.next(s); } }); }); ``` #### `onCancel` & `onDispose` `sink`支持`onCancel`和`onDispose`回调,其区别如下: - `onDispose`回调用于执行cleanup操作,其在`complete`, `error`, `cancel`之后都会被调用。 - `onCancel`回调用于执行取消操作,其只针对`cancel`执行,并且执行顺序位于cleanup之前 其使用示例如下所示: ```java Flux bridge = Flux.create(sink -> { sink.onRequest(n -> channel.poll(n)) .onCancel(() -> channel.cancel()) .onDispose(() -> channel.close()) }); ``` #### Handle `handle`为一个`instance method`(非静态方法),其关联了一个已经存在的source。`Mono`和`Flux`中都存在`handle`方法。 和`generate`类似,`handle`使用`SynchronousSink`,并且只允许逐个发出。`handle`的作用类似`map`和`filter`的组合,可以跳过部分元素,其签名如下 ```java Flux handle(BiConsumer>); ``` 其使用示例如下: ```java public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' + letterNumber - 1; return "" + (char) letterIndexAscii; } // 由于reactive steam中不允许有null, // 故而当通过`alphabet`方法映射可能返回null值时, // 可以使用handle对返回值进行过滤,只有不为空时才调用`sink.next` Flux alphabet = Flux.just(-1, 30, 13, 9, 20) .handle((i, sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println); ```