66 KiB
- Reactor
- Reactive Programming
- Subscriber和Publisher
- Reactor Core
- Flux
0...n - Mono
0...1 - 创建Mono/Flux并进行订阅的方式
- Create Sequence
- Threading and Schedulers
- Handling Errors
- Sinks
- 在多线程环境下安全的使用
Sinks.One和Sinks.Many tryEmit&emitasFlux和asMonoSinks.many().unicast().onBackpressureBuffer(args?)- Sinks.many().multicast().onBackpressureBuffer(args?)
- Sinks.many().multicast().directAllOrNothing()
- Sinks.many().multicast().directBestEffort()
- Sinks.many().replay()
- Sinks.unsafe().many()
- Sinks.one()
- Sinks.empty()
- 在多线程环境下安全的使用
- Flux
- Advanced
Reactor
Reactive Programming
响应式编程是一种异步编程范式,关注于数据流和状态变化的传播。java的响应式编程接口被包含在java9的Flow中。
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用next方法,reactive stream是基于发布/订阅模型的。
迭代器模式是
pull-based,而reactive stream为push-based。
命令式迁移到响应式
可组合性与可读性
“可组合性"代表编排多个异步任务的能力,通过“组合”,可以将前一个异步任务的输出作为后一个异步任务的输入。或者,可以按照fork-join的形式对异步任务进行编排。
reactor同样能解决“可读性”的问题,在使用传统的callback model编写程序时,随着逻辑的复杂,异步进行的层数也会增加,这将会极大降低代码的可读性和可维护性。
在使用call model时,通常需要在回调中执行另一个回调,回调的嵌套通通常会被称为
callback heil。
reactor提供了复杂的“组合”选项,能够反映抽象异步进程的组织,并且,所有的内容通常都会位于同一层级。
Assembly Line
响应式应用中的数据处理类似于流水线,其中,reactor既是传送带又是工作站。数据来源于original publisher,最终传传递给subscriber。
数据在从publisher传送到subscriber的过程中,可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间,受影响的workstation会向上游发送消息来控制数据的生成速率。
Operators
在reactor中,Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到publisher中,并且前一个publisher包装到一个新的publisher实例中。
故而,operator将整个chain都链接起来,数据来源于第一个publisher,并随着chain移动,依次被每个链接处理,最终由subscriber结束该过程。
Nothing Happens Until You subscribe()
当通过reactor编写publisher chain时,数据并不会被泵入到chain中,编写chain只是创建了异步处理的抽象描述。
通过订阅行为,将publisher和subscriber绑定到了一起,订阅行为会触发chain中的数据流。该行为通过内部的signal实现,subscriber将会发送一个reuqest signal,该信号会被传递到chain上游,一直被传递到source publisher。
backpressure
传递到上游的信号该机制也被用于实现backpressure,在assembly line模型中,也被描述为workstation传递给上游的反馈信号,当workstation处理消息比上游workstation满时,会发送该反馈。
reactive stream定义的机制接近于上述描述,其提供两种模式:
- unbounded mode:source publisher可以按其最高速率不受限制的推送数据
- request mode:通过
request机制向source publisher发送信号,告知其准备好处理最多n个元素。
中间的operator也可以在传输过程中对请求做出修改,例如buffer operator可以将elements分割为以10个为单位的batch,如果subscriber请求一个buffer,那么上游source publisher可以产生10个元素。
通过backpressure,可以将push模型转化为push-pull模型:
- 当上游的n个元素已经准备好时,下游可以从上游拉取n个元素
- 当上有没有准备好n个元素时,后续如果n个元素被准备好,其将会被上游推送
hot & cold
对于响应式序列,其可以分为两种:
- cold sequence:对于cold sequence,会为每个订阅者重新开始流程,包括source publisher。例如source中若封装了http调用,会为每个subscriber都执行一个新的http请求
- hot sequence:subscriber只有在其订阅后才收到信号,即使没有subscriber在监听,hot sequence仍然能够发送signal
Subscriber和Publisher
Publisher
对于publisher,其提供了subscribe方法供subscriber进行注册,在执行subscribe方法并向其传入Subscriber对象后,上游publisher会调用下游的onSubscribe方法,并向onSubscribe方法传入Subscription对象。下游可以通过Subscription对象调用request(n)请求。
若中间存在operator(例如map)在担任publisher角色的同时,还对上游进行了订阅,那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
任何变更状态的操作都只在实际subscriber执行订阅操作后被触发。
Subscriber
当下游调用request(n)方法之后,会向上游请求n个数据。上游会向下游发送onNext信号来传输生成的数据。
Reactor Core
reactor引入了两个实现Publisher的类:Mono和Flux。
- Flux:代表包含
0...N个items的reactive sequence - Mono:代表包含
0...1个items的reactive sequence
上述两个类代表了在异步处理场景中的大致基数。
- Mono:例如,对于http请求的场景,一个请求只会产生一个响应,故而对响应执行
count操作并没有任何意义。此时,可以通过Mono<HttpResponse>来代表http调用的结果,Mono中只提供了上下文中包含0...1个元素的对应操作 - 当执行某些
可能会改变异步处理中最大基数的操作时,可能会导致类型的改变,例如执行Flux中的count操作将会返回Mono<Long>的类型
Flux 0...n
Flux<T>是一个标准的Publisher<T>,代表基数为0...n的异步序列,其可以被completion signal或异常所终止。根据reactive stream标准,存在三种signal,且信号会转化为对下游onNext、onComplete、onError的调用。
Flux是一个通用的reactive类型,并且,所有的event type都是可选的。
- 当没有
onNext事件但是存在onComplete事件,代表一个空的有限序列 - 当
onNext和onComplete事件都不存在时,代表一个空的无限序列 - 无限序列并不一定为空,例如
Flux.interval(Duration)会产生一个Flux<Long>,其是无限的并且发送tick
Mono 0...1
Mono<T>是一个标准的Publisher<T>,其通过onNext信号发送至多一个item,然后再结束时发送onComplete信号结束(成功场景);或直接发送onError信号结束(失败场景)。
大多数Mono实现在调用完subscriber的onNext方法之后,预计会立马调用subscriver的onComplete方法。但是,Mono.never是一个例外,其并不会发送任何信号,并且其onNext和onError的组合是被明确禁止的。
创建Mono/Flux并进行订阅的方式
String sequence
如果想要创建String序列,可以通过如下方式:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux.empty
Mono<String> noData = Mono.empty();
Flux.range
在下面示例中,Flux.range第一个参数是range的起始值,第二个参数是要产生的元素个数
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
故而,其产生的内容为5,6,7。
Lambda Subscribe
在进行订阅时,Flux和Mono使用了lambda,在调用subscribe时,有如下几种重载的选择:
// 订阅并触发sequence的产生
subscribe();
// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer);
// 在reactive stream异常终止时,对error进行处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 在sequence处理完时,执行额外的complete操作
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
对于subscribe的使用,示例如下
Flux.range(5,3)
.map(x->{
if(x<7) {
return x;
}
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
})
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
(e) -> {
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
},
()->{
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
});
其执行结果如下:
[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7
Disposable
上述subscribe方法的返回类型为Disposable,该接口代表subscriber对publisher的订阅是可取消的,如需取消订阅,调用dispose方法即可。
对于Mono和Flux而言,source publisher应该在接收到cancellation信号之后停止产生元素,并不能保证取消信号是即时的。(若source产生数据的速度过快,可能在接收到cancel信号之前,source就已经complete)。
Disposables类中存在一些对Disposable的工具方法,例如swap和composite。
BaseSubscriber
subscribe方法除了接收lambda外,还存在更通用的重载方法,接收Subscriber类型的参数。
在这种场景下,传参可以继承BaseSubscriber类。
并且,BaseSubscriber该类是一次性的,其只能够订阅一个publisher,如果其订阅了第二个publisher,那么其对第一个publisher的订阅将会被取消。
BaseSubscriber只能订阅一个publisher的原因是reactive stream规范要求onNext方法不能被并行调用。
示例如下:
Flux.range(3, 5)
.subscribe(new BaseSubscriber<Integer>() {
private Subscription subscription;
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5);
}
@Override
protected void hookOnNext(Integer value) {
log.info("onNext called: {}", value);
// this.subscription.request(1);
}
@Override
protected void hookOnComplete() {
log.info("onComplete called");
super.hookOnComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
log.info("onError called: {}", throwable.getMessage());
super.hookOnError(throwable);
}
});
上述示例中,通过向subscribe方法中传递自定义的BaseSubscriber来实现对上游的订阅,执行结果如下:
2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called
BaseSubscriber的hookOnSubscribe默认实现如下:
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
其请求的数量为Long.MAX_VALUE,代表其publisher为effectively unbounded。
可以通过重写hookOnSubscribe方法来自己指定request数量,如果需要自己指定请求数量,最少需要重写hookOnSubscribe和hookOnNext方法。
BaseSubscriber提供了requestUnbounded方法(其方法和request(Long.MAX_VALUE)等价)和cancel方法。
除了上述列出的hook外,BaseSubscriber还支持如下hooks:
- hookOnComplete
- hookOnError
- hookOnCancel
- hookFinally(当sequence终止时,都会被调用,可以用参数来判断终止类型为complete或error)
- hookFinally的调用顺序位于hookOnComplete和hookOnError之后
backpressure
在reactor的backpressure实现中,consumer pressure传播到上游source的机制是向上游operator发送request请求。当前已发送的请求个数之和被称为demand,并且demand的上限为Long.MAX_VALUE,当demand的值为Long.MAX_VALUE或更大时,代表unbound request。
unbound request代表尽可能快的产生数据,即backpressure关闭。
在reactive chain中,第一个请求来自于final subscriber,其在订阅时(onSubscribe)会发送第一个request请求。目前,大多直接订阅的方法都会通过Long.MAX_VALUE创建一个unbounded request,示例如下:
subcribe()方法和大多数基于lambda的重载方法(除了包含Consumer<Subscription>参数的重载)block,blockFirst,blockLast- 通过
toIterable或toStream进行遍历
目前,定义初始请求最简单的方法为通过BaseSubscription对上游进行订阅,并且重写onSubscribe方法。
buffer
在reactor中,可以通过部分operator进行request的reshape。示例如下:
Flux.range(1, 1000)
.buffer(3)
.subscribe(new BaseSubscriber<List<Integer>>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(List<Integer> value) {
for (Integer v : value) {
log.info("item received: {}", v);
}
}
});
在上述示例中,request(2)代表请求2个buffer,而每个buffer中包含3个Integer,故而总共会接收到2 * 3 = 6个元素。
输出如下:
2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6
prefetch
prefetch机制是一种backpressure的优化策略,用于确保下游处理数据时上游的数据能够即时补充,对吞吐量和资源利用率进行平衡。
prefetch机制通常分为如下部分:
初始请求
在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时,会向上游发送一个初始请求,请求大小为32个元素。
补充优化(Replenishing Optimization)
prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75%的预取元素已经被处理(32 *0.75 = 24),其自动会向上游发送一个新请求,要求补充75%的prefetch量。该过程是动态的,会在整个数据流处理过程中持续进行。
例如,prefetch的大小为10,其limit对应的值为
ceil(10 * 0.75) = 8,每当其下游被处理的元素达到8个,其会重新请求8个数据,并且将被下游处理元素的个数重置,重新从0开始计数,直到该值再达到8,再次发送请求
预加载数据
补充优化的优化点在于,当预取数据还剩余25%(8个)未被处理时,提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。
平滑处理
通过prefetch逐步请求新数据,且每次请求固定的量,可以保证处理数据速率的稳定。如果source端同时来源大量数据,那么若不进行平滑处理,则大量数据的同时处理可能导致竞争,令性能下降。
有如下operators可以对请求的prefetch进行调整
limitRate
除了prefetch之外,还可以通过limitRate或limitRequest来直接针对请求进行调节。
limitRate(N)将来自下游的请求进行拆分,当来自下游的请求被传播到上游时,其会被拆分为small batches。例如,如果下游调用request(100),此时limitRate(10)将会将其拆分为10个request(10)再传播给上游。并且,在此基础上,limitRate还实现了prefetch中的补充优化。
除了limitRate(N)之外(当没有传递lowTie时,limit默认会取N - N>>2,即ceil(N * 0.75)),limtRate还存在limitRate(highTie, lowTie)的重载方法。
lowTie
当lowTie取不同值时,其补充策略如下:
lowTie<=0:如果lowTie小于或等于0,则limit取值和prefetch值相同,仅当prefetch中所有元素都被下游处理完时,limtRate operator才会向上游请求数据lowTie>=prefetch: 当lowTie大于或等于prefetch时,limit取值为ceil(prefetch * 0.75),此时,补充策略和prefetch默认相同,当75%的数据被下游处理时,limitRte会重新向上游请求75%的数据- 若lowTie位于
(0, prefetch)区间之间- 若prefetch的值为
Long.MAX_VALUE,那么limit的值也为Long.MAX_VALUE - 若prefetch值不为
Long.MAX_VALUE,那么limit的值为lowTie,即lowTie的值即为消费后重新拉取的限制值
- 若prefetch的值为
limitRequest
limitRequest(N)用于限制下游请求的总个数。例如,向limitRequest(10)发起两次request,一次请求3一次请求8,那么最后下游只会接收到10个元素。
一旦source发送的元素个数超过
N时,limitRequest将会认为sequence已经完成,会向下游发送onComplete信号
Create Sequence
同步生成generate
创建Flux的最简单形式是通过generate方法生成,其接收一个Consumer<SynchronousSink<T>>类型的参数。
Sink
sink是spring reactor中的一个核心抽象概念,其可以被理解为数据流的出口或发射器,负责将元素、error或complete信号推送到下游订阅者。
sink可以分为如下种类:
- 同步/异步
- 单次/多次发射
sink的核心api如下:
next(T value): 向下游发送数据complete:表示数据流正常结束error(throwable):代表数据流因为错误而终止
generate方法用于产生同步且one by one的emission,即sink为SynchronousSink且其单次invocation中sink.next只能够被调用一次。示例如下:
Flux.generate(sink -> {
String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
sink.next(data); // 仅调用一次
if (isEndOfData()) sink.complete();
});
在调用
sink.next后,可以调用sink.complete或sink.error,但是对complete和error的调用都是可选的。
示例如下:
Flux.generate(sink -> {
sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
})
.take(10)
.subscribe(System.out::println);
通过generate的另一个重载方法generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)可以维护一个状态并进行管理,其中S为状态的对象类型。对于重载方法的两个参数,其含义如下:
stateSupplier:该参数用于提供一个初始状态,每当有新请求进入时,都会调用supplier生成初始状态generator:generator会返回一个新的state
示例如下:
Flux.generate(() -> 0, (s, sink) -> {
// System.out.printf("%s emitted\n", s);
sink.next(s);
return s + 1;
})
.take(5)
.subscribe(System.out::println);
其上游生成的整数依次增加,输出如下:
0
1
2
3
4
异步多线程生成create
Flux.create针对每个round能够产生多个元素,其暴露的sink类型为FluxSink。相对于Flux.generate,Flux.create方法并没有接收state的重载版本。
Flux.create并不会将你的代码变为并行或是异步的.如果在
Flux.create中存在阻塞操作,那么将存在死锁的风险。即使使用了subscribeOn方法,在create lambda中执行长阻塞操作仍然会阻塞item的处理,因为item的source产生和下游处理都处于同一线程中,但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。
Flux.create和listener based api进行适配
街射使用基于listener的api,api定义如下:
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
可以使用Flux.create将其与Flux相桥接:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
Flux.create backpressure
可以通过指定OverflowStrategy来优化backpressure行为,OverflowStrategy可选值如下:
-
IGNORE: 该选项会完全无视下游的backpressure request,当下游queue被填充满时,会抛出IllegalStateException异常部分操作符(例如
publishOn),其subscriber会内置queue,用于存储上游通过onNext推送的数据。当queue满时,将会抛出
IllegalStateException异常 -
ERROR:当下游无法跟上上游的数据发送速度时,上游将会发送IllegalStateException异常在上述描述中,
跟上代表subscriber发送的request个数是否大于source产生的数据个数,例如,若下游只发送了request(1),但是source产生了两个数据,调用了两次sink.next,那么第二次调用时requested已经为0,会调用onOverflow方法发送error信号 -
DROP:当下游无法跟上上游的数据发送速度时(上游发送onNext信号之前,如果下游requested配额不足),上游将会丢弃该数据,sink.next不做任何操作。 -
LATEST: 当上游想要推送数据到下游时,如果下游requested不足,那么上游会用最新的数据覆盖下游之前缓存的数据- 例如,下游堆积事件
1,此时上游推送事件2,则2会覆盖1,之后上游再推送数据3,3会覆盖2...之后,下游调用request(1)后,获取到的是最新的数据3,数据1,2都被覆盖 - 其实现是通过
LatestAsyncSink中的queue来实现的,queue是一个AtomicReference类型的field,当调用sink.next向下游发送数据时,如果下游不满足requested,那么将会将值set到queue中,从而实现后面的数据覆盖前面的数据
- 例如,下游堆积事件
-
BUFFER(默认):BufferAsyncSink中会存在一个无界队列,如果当上游调用sink.next尝试向下游发送数据时,如果下游requested条件不满足,会将数据缓存在无界队列中。(使用无界队列进行缓存可能会导致OOM)
异步单线程生成push
push介于generate和create之间,其用于对单个生产者产生的事件进行处理,push和generate与create的相似点如下:
- 和
create类似,其可以是异步的,并且可以通过OverflowStrategy来管理backpressure - 和
generate类似,对于push,其sink.next,sink.complete,sink.error在指定时刻只能被一个线程调用
其对异步事件的桥接示例如下:
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
public void processError(Throwable e) {
sink.error(e);
}
});
});
hybird push-pull model
大多数reactor operators(例如create),都使用了混合推拉的模型。在混合推拉模型中,大多数事件的处理都是异步的(建议采用push的方法)。
在混合推拉模型中,consumer从source处拉取数据,在consumer发送第一次request之前,source并不会emit任何数据(source可能会生产数据,但是在requested余量不足时,并不会调用下游的onNext,根据OverflowStrategy的不同会执行不同处理)。source只会在下游请求的范围内向下游推送数据。
并且,在最下游的subscriber调用上游subscribe方法时,source的数据生成逻辑才会被触发,调用subscribe方法后,首先会调用subscriber的onSubscribe方法,然后会一直沿着reactive stream向前追溯,直到找到source,然后调用Flux.create接受的Consumer<? super FluxSink<T>>。
sink.onRequest
sink还提供了onRequest方法,其接受一个LongConsumer类似的参数,可以为sink注册一个onRequest回调,后续针对sink(sink实现了Subscription)调用request方法时,consumer都会被调用。
sink.onRequest的使用示例如下:
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
//
for(String s : messages) {
sink.next(s);
}
}
});
sink.onRequest(n -> {
// 此处代码将会在`sink.request`被调用时触发
// 主动向processor拉取数据
List<String> messages = myMessageProcessor.getHistory(n);
for(String s : messages) {
sink.next(s);
}
});
});
Handle
handle为一个instance method(非静态方法),其关联了一个已经存在的source。Mono和Flux中都存在handle方法。
和generate类似,handle使用SynchronousSink,并且只允许逐个发出。handle的作用类似map和filter的组合,可以跳过部分元素,其签名如下
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
其使用示例如下:
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
// 由于reactive steam中不允许有null,
// 故而当通过`alphabet`方法映射可能返回null值时,
// 可以使用handle对返回值进行过滤,只有不为空时才调用`sink.next`
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
onCancel & onDispose
sink支持onCancel和onDispose回调,其区别如下:
onDispose回调用于执行cleanup操作,其在complete,error,cancel之后都会被调用。onCancel回调用于执行取消操作,其只针对cancel执行,并且执行顺序位于cleanup之前
其使用示例如下所示:
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
Threading and Schedulers
reactor模型和rxjava模型类型,是并发无关的,并不强制要求并发模型。并且,大多数operator其运行的线程都和前一个operator相同。
除非显式指定,否则最顶层的operator(source)也会运行在subscribe方法被调用的线程中。
Scheduler
在reactor中,操作执行在哪个线程中取决于使用的Scheduler。Scheduler和ExectuorService类似,负责对任务进行调度,但是相比于ExecutorService其功能更加丰富。
Scheudlers类中存在如下静态方法,分别访问不同的execution context:
Schedulers.immediate():没有执行上下文,被提交的任务会在当前线程中被立马执行Schedulers.single():线程上下文为一个单个、可重用的线程上下文。该方法会对所有的调用都使用相同的线程,直到scheduler被disposedSchedulers.newSingle():每次调用时都使用一个专属线程Schedulers.elastic():该上下文是一个无界、弹性的线程池。在引入Schedulers.boundedElastic()方法后,该方法不再推荐被使用。Schedulers.boundedElastic():该上下文是一个有界、弹性的线程池。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现:ExecutorService-based:会在多个任务之间重用平台线程(即使用相同工作线程执行多个任务)Virtual-thread-per-task-based:jdk21+支持该特性,对每个任务,都会开启一个新的虚拟线程,并且实现并没有维护idle pools
createWorker
在Scheduler中,idleQueue中的线程会在空闲一段时间后自动销毁,但是通过createWorker手动创建的worker必须手动销毁,在使用完成后调用release。
operators using default scheduler
通常情况下,部分operators在未显式指定的前提下,会使用默认的scheduler(通常可显式指定一个不同的scheduler)。
例如,Flux.interval(Duration.ofMillis(300))方法会生成间隔300ms的ticks。默认情况下,其会使用Schedulers.parallel,可以通过如下代码指定一个新的scheduler:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
switch thread context
reactor提供了两种方法来切换执行的上下文:
publishOnsubscribeOn
publishOn和subscribeOn用友如下区别:
- publishOn方法在reactive chain中的位置若发生变化,会对各个节点的执行上下文造成影响
- subscribeOn方法在reactive chain中位置若发生变化,并不会对各个节点的执行上下文造成影响
publishOn
publishOn操作符和作用和其他操作符类型,都是从上游接收到信号并且将信号传递到下游,但是,在执行下游的回调时,通过Scheduler中的worker进行调度。
故而,publishOn会对reactor chain中后续operators造成如下影响:
- 修改执行上下文,执行的线程由scheduler决定
- 按照规范,
onNext的调用是有序的,故而publishOn后续的操作都会使用同一个worker进行调度,对于所有数据,都在同一线程中执行(worker在subscribe时决定publishOn使用的worker)
subscribeOn
subscribeOn影响订阅的过程,通常推荐将其放在source之后。
subscribeOn原理
在FluxSubscribeOn#subscribeOrReturn中,会通过scheduler#createWorker创建worker,并通过worker来对执行source.subscribe的方法进行调度。
故而,在consumer执行subscribe时,随着reactor chain从尾到头,下游subscriber都会调用上游publisher的subscribe方法。在FluxSubscribeOn调用上游的subscribe时,则是会通过worker在进行调度,实际调用在worker中执行。
subscribeOn方法使用方式如下:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.map(i -> "value " + i);
new Thread(() -> flux.subscribe(System.out::println));
Handling Errors
在reactive stream中,errors是终止事件,一旦error被触发,sequence的生成将会被停止,并且将会沿着reactor chain的下游一直传递道subscriber#onError。
error应该在应用程序的级别被处理,处理方案如下:
- 在ui上展示错误信息
- 向rest endpoint发送error payload
故而,subscriber#onError方法应该被定义。
reactor同样支持在chain的中间通过error-handling来处理error,示例如下所示:
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example
在reactor中,任何error都代表终止事件,`即使使用了error-handling operator,original sequence在出现error后也不会继续。
故而,error-handling将
onError信号转化为了一个新sequence的开始(fallback one),即通过新sequence替换了上游旧的terminated sequence。
在
FluxOnErrorReturn.ReturnSubscriber#onError的实现中,operator将Throwable转换为了onNext(fallbackValue)和onComplete两个调用,即在向下游发送onNext(fallbackValue)信号之后,也会终止序列。
通常来说,常见的异常才处理方式如下:
static fallback value
该场景类似于
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
可以调用onErrorReturn方法,示例如下:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
onErrorReturn还存在一个接受Predicate的重载方法,可以决定是否对异常执行recover操作,示例如下:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
catch and swallow error
如果不想将异常替换为fallback value,而是忽视产生的error,仅向下游传递正常生成的元素,可以使用onErrorComplete方法(例如, 1,2,3...序列在向下进行传递时,如果2的下游operator执行发生异常,那么onErrorComplete会忽略2产生的error,仅向下游传递1)。
onErrorComplete的使用示例如下:
Flux.just(10,20,30)
.map(this::doSomethingDangerousOn30)
.onErrorComplete();
onErrorComplete会将onError信号转化为onComplete信号。
类似于onErrorReturn,onErrorComplete同样存在一个重载方法接受Predicate,可以通过该断言判断是否对error执行recover操作。
- 若执行recover操作,则会调用
subscriber#onComplete - 若不执行recover操作,则会调用
subscriber#onError
fallback method
如果在处理数据时,存在多种方法,例如m1,m2,并且在尝试m1失败时想要再尝试m2,可以使用onErrorResume方法。
其等价于如下逻辑:
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
和上述逻辑等价的是如下代码:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(e -> getFromCache(k))
);
其中,onErrorResume返回的都是一个Publisher
onErrorResume的底层逻辑实现如下:
- 其会根据
onErrorResume中传入的Function<? super Throwable, ? extends Publisher<? extends T>>参数和error来构建一个新的publisher - 然后,对新的publisher执行
subscribe操作,并向下游返回新publisher的sequence
onErrorResume同样拥有一个重载函数接受Predicate,决定是否执行recover逻辑。除此之外,其接受Function<? super Throwable, ? extends Publisher<? extends T>>类型参数,可以根据抛出的异常来决定fallback sequence,示例如下:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> {
if (error instanceof TimeoutException)
return getFromCache(k);
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error);
})
);
在上述示例中,
Flux.error会重新抛出上述异常
Dynamic Fallback Value
当出现error时,如果并不想调用下游的onError,而是想给下游一个根据异常信息计算出的默认值,同样可以使用onErrorResume方法,示例如下:
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
上述代码等价于
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
Catch and rethrow
对于捕获异常然后重新抛出的逻辑
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
可以通过onErrorResume实现:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
或者,可以调用onErrorMap,其是对onErrorResume方法的封装:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
log and react on the side
如果想要实现类似将异常捕获、打印然后重新抛出的逻辑,可以使用doOnError
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
上述代码等价于
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
);
doOnError
doOnError和其他以doOn为前缀的operators,其通常会被称为副作用(side effect)。通过这些operators,可以在不修改sequence的情况下监控chain中的事件。
Using resources and the finally block
对于在finally block中清理资源的行为,可以通过doFinally来实现
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
doFinally
doFinally是无论sequence结束(onError/onComplete)还是取消(cancel),都希望执行的副作用(side effect)。其接收参数类型为Consumer<SignalType>,可以通过signaltype判断触发side-effect的terminaction类型。
doFinally使用示例如下:
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
对于try(resource)的代码,则是可以通过using来实现
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
using
using代表Flux由资源派生,并且在Flux处理完成之后都需要对资源执行操作。
// resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
// using
Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
其中,resource实现了Disposable接口,而Flux.using接收的参数类型分别为
() -> disposableInstance: resource supplierdisposable -> Flux.just(disposable.toString()): 一个factory,根据resource产生publisherDisposable::dispose: resource cleanup
retry
retry允许对产生错误的sequence进行重试。
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed()
.subscribe(System.out::println, System.err::println);
Thread.sleep(2100);
在上述示例中,retry(1)会对错误执行一次重试(upstream重新开始生成)。其产生结果如下所示:
259,tick 0
249,tick 1
251,tick 2
506,tick 0
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
retry原理
下游在上游调用onError时,会判断重试次数是否为0,如果不为0,会对重试次数进行减1,并且重新对上游调用subscribe,这会令上游重新从头开始生成序列。
直到重试次数为0之后,FluxRetry才会将onError传递给下游。
retryWhen
retryWhen会使用companion flux来判断错误时是否应该回滚。companion flux由operator创建,但是用户会通过装饰companion flux来自定义重试条件。
companion flux是一个Flux<RetrySignal>类型的值,该值将会被传递给Retry方法/策略(retryWhen只接受一个Retry类型的参数)。
使用者可以对retry方法进行指定,retry方法会返回一个Publisher<?>类型的值。
Retry是一个抽象类,但是提供了
Retry#from方法,通过该方法可以将lambda转换为Retry对象。
Retry周期如下所示:
- 每当error发生时,都会将
RetrySignal发送到companion flux中,此时companion flux已经被retry方法装饰过了Flux<RetrySignal>将会包含至今所有的重试尝试信息,并且可以通过其访问error和error关联的元数据
- 如果
companion flux发送了一个值,那么会触发重试 - 如果
companion fluxcomplete,将会对error执行swallow操作,retry cycle将会停止,并且也会导致sequence complete - 如果
companion flux产生了一个error,retry cycle将会终止,并且导致sequence按异常终止
retry helper
project reactor提供了Retry helper,如RetrySpec和RetryBackoffSpec,二者都允许进行如下所示的自定义行为:
filter:通过filter设置允许触发retry的异常modifyErrorFilter: 对之前filter设置的异常进行修改doBeforeRetry和doAfterRetry:针对retrytrigger执行side effect-
doBeforeRetry()方法触发在delay发生之前,而doAfterRetry()触发在delay之后
-
onRetryExhaustedThrow(BiFunction):在重试数量达到上限后,通过onRetryExhaustedThrow(BiFunction)来自定义异常。- 通常情况下,当重试数量达到上限后,自定义异常类型通过
Exceptions.retryExhausted(…)方法来构建,其返回的异常类型为RetryExhaustedException,可以通过Exceptions.isRetryExhausted(Throwable)方法来进行区分
- 通常情况下,当重试数量达到上限后,自定义异常类型通过
retrying with transient errors
在RetrySignal中存在如下两个方法:
totalRetriesInARow():每当error is recovered(retry attempt导致了onNext而不是onError)时,index都会被设置为0totalRetries():totalRetries()方法返回的值是单调递增的,并不会被重置
当使用RetrySpec和RetryBackoffSpec时,可以通过transientErrors(true)方法来令策略使用totalRetriesInARow()处理transient error。
示例如下所示:
AtomicInteger errorCount = new AtomicInteger();
Flux<Integer> transientFlux = httpRequest.get()
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true))
.blockLast();
assertThat(errorCount).hasValue(6);
transientErrors主要是为了处理周期性发生异常,异常能够重试成功,并且发生异常后一段时间平稳运行的场景
handling Exceptions in operators or functions
通常来说,operator的执行有可能会抛出异常,根据抛出异常类型的不同,存在如何区别:
unchecked Exception:对于抛出的unchecked exception,其都会通过onError向下游传递,例如,map operator中抛出的RuntimeException将会被转化为对下游onError的调用上述代码示例将会触发下游subscriber的onError,输出内容如下:Flux.just("foo") .map(s -> { throw new IllegalArgumentException(s); }) .subscribe(v -> System.out.println("GOT VALUE"), e -> System.out.println("ERROR: " + e));ERROR: java.lang.IllegalArgumentException: foofatal exceptions:在reactor中,定义了一些被视为致命的异常(例如OutOfMemoryError),具体的fatal异常包含范围可见Exceptions.throwIfFatal- 在抛出
fatal error时,reactor operator会抛出异常而不是将其传递到下游
- 在抛出
checked exception
对于checked exception,当调用方法签名中指定了throws xxxException的方法时,需要通过try-catch block对其进行处理。对于checked exception,有如下选择:
- 对异常进行捕获,并且对其执行
recover操作,sequence会继续执行 - 对异常进行捕获,并且封装到
unchecked exception中,并将unchecked exception进行rethrow(这将对sequence进行打断) - 如果需要返回一个
Flux(例如,使用flatMap方法),可以使用Flux.error(checkedException)返回一个error-producing Flux(sequence会终止)
Exceptions Utility
通过reactor提供的Exceptions Utility,可以保证仅当异常为checked exception时,才会封装
Exceptions.propagate在必要时会封装exception。- 其并不会对
RuntimeException进行封装,并且该方法首先会调用throwIfFatal - 该方法会将checked exception封装到
reactor.core.Exceptions.ReactiveException中
- 其并不会对
Exceptions.unwrap:- 该方法会获取original unwrapped exception
使用示例如下所示:
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
Sinks
在多线程环境下安全的使用Sinks.One和Sinks.Many
reactor-core提供的sinks支持多线程环境的使用,并不会触犯规范或导致下游subscribers产生未知行为。
tryEmit & emit
当尝试通过sinks向下游发送signal时,可以调用如下API:
tryEmit*:并行调用将会导致fail fastemit*: 当调用emit*时,提供的EmissionFailureHandler,如果该接口的onEmitFailure方法返回为true,将会在争用场景下执行重试操作(例如busy loop);如果onEmitFailure返回为false,则sink会以error终止。
上述流程是对Processor.onNext的改进,Processor.onNext必须在外部进行同步,否则将会导致下游subscribers未定义的行为。
例如,
Flux.create允许多线程调用sink.onNext,但是其使用的sink是reactor.core.publisher.FluxCreate.SerializedFluxSink,其在next操作中通过队列和cas对下游的onNext操作进行了同步。故而,在传统的
Processor.onNext中,如果要在多线程环境下使用,必须在上游做好同步操作,否则会导致下游的未定义行为。
Processor
Processor是一种特殊的Publisher,其在作为publisher的同时也是Subscriber。
在使用Processor是,一个很常见的问题是,直接对processor向外暴露的onNext,onComplete, onError方法进行调用。
实际上,对于processor中onXXX方法的调用必须要符合reactive stream规范,在外部对onXXX方法进行调用时,要做好同步操作。
sinks的使用示例如下所示:
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
通过sink,多个线程可以并行的产生数据
//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);
//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));
//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);
在使用
EmitFailureHandler.busyLooping时,其返回的示例包含状态,并不能被重用
asFlux和asMono
Sinks.Many支持被转化为Flux,下游可以对Sinks.Many转化后的Flux进行订阅,实例如下所示:
Flux<Integer> fluxView = replaySink.asFlux();
fluxView
.takeWhile(i -> i < 10)
.log()
.blockLast();
同样的,Sinks.One和Sinks.Empty可以通过asMono()方法被转化为Mono
Sinks的类别包括:
many().multicast():sink只会基于subscribers的backpressure请求将最新的数据传递到其subscribers最新的数据代表subscriber订阅之后的数据
many().unicast():和many().multicast()类似,也是在订阅之后将最新的数据推送给订阅者,但是和many().multicast()不同之处如下:many().unicast()带有缓冲区,在第一个订阅者订阅many().unicast()之前,会将推送给many().unicast()的数据都缓冲到缓冲区中,待后续有订阅者订阅后,会将缓冲区数据发送给订阅者many().multicast()默认不带有缓冲区
many().replay(): 对于新订阅的订阅者,会将一定数量的pushed history data进行replay操作,之后再推送新的数据one():sink只会向subscriberr推送一个数据empty():该sink会向subscriber推送termianl signal(error或complete)
Sinks.many().unicast().onBackpressureBuffer(args?)
一个unicast Sinks.Many可以通过其内置buffer处理backpressure,但是,其最多只能有一个subscriber。
通常,可以通过Sinks.many().unicast().onBackpressureBuffer()来创建unicast sink。但是,Sinks.many().unicast()中包含更多的静态方法,可以对其进行更精细的调整。
onBackpressureBuffer
在默认情况下,onBackpressureBuffer()其是无界(unbounded)的:
- 在subscriber尚未请求数据的情况下,如果通过sink推送了数据,那么这些数据都会被缓冲到缓冲区中,缓冲区是无界的
故而,onBackPressureBuffer方法存在一个接收Queue类型参数的重载方法。可以向其传递一个有界队列,该队列将会被用作内部缓冲区。
在为
onBackpressureBuffer指定了Queue的情况下,如果queue已满并且下游并没有向上游发送足够的reqeust时,sink将会拒绝该value的推送。
Sinks.many().multicast().onBackpressureBuffer(args?)
对于该方法创建的multicast Sinks.Many可以向多个subscribers发送数据,并且对每个subscriber都能独立地接收backpressure。
对于每个subscriber,只会接收在其subscribe之后推送到sink的signal
基础的multicast sink可以通过Sinks.many().multicast().onBackpresuureBuffer()来进行创建。
autoCancel
在默认情况下,如果所有subscribers都被取消(cancelled),即取消订阅,其会对internal buffer进行清空,并且停止接收新的subscriber。
如果想要修改autoCancel的行为,可以调用Sinks.many().multicast()中的静态工厂方法,通过autoCancel参数来调整autoCancel的行为。
Sinks.many().multicast().directAllOrNothing()
该方法创建的拥有最简单的backpressure处理策略:
- 如果任何subscriber处于
too slow状态(demand为0),那么对于所有的subscriber,该onNext都会被丢弃。
但是,slow subscribers并不会被终止,一旦slow subscribers又发送了request,所有subscribers都会重新从sinks.many接收数据。
在Sinks.many()终止后(通常是通过调用emitError, emitComplete方法),其仍然允许新的subscriber对其进行订阅,但是只会对新订阅者replay termination signal。
Sinks.many().multicast().directBestEffort()
对于该类型的multicast Sinks.Many,若subscriber is too slow(该subscriber的demand为0),那么该onNext信号仅会针对该slow subscriber进行丢弃。
但是,slow subscribers并不会被终止,一旦slow subscribers开始请求数据,其会重新开始接收新推送的数据。
当Sinks.Many被emitError, emitComplete终止后,其仍然允许新的subscribers对其进行订阅,但是对新订阅的订阅者,只会向新订阅者发送termination signal。
Sinks.many().replay()
一个replay Sinks.Many可以将已发送元素进行缓存,并且对后续的subscriber进行replay。
replay Sinks.Many可以通过如下方式进行创建:
- 缓存指定数量的历史数据
Sinks.many().replay().limit(int) - 缓存所有历史数据,没有上限限制
Sinks.many().replay().all() - 基于time-based window进行缓存
Sinks.many().replay().limit(Duration) - hisotry size limit和time window相结合
Sinks.many().replay().limit(int, Duration)
除此之外,Sinks.many().replay()还包含其他的重载方法,例如可以通过latest()和latestOrDefault()对单个元素进行缓存和replay
Sinks.unsafe().many()
Sinks.unsafe().many()返回的Sinks.Many factory并不会提供producer thread safety,在使用Sinks.unsafe().many()时,需要确保对可能导致onNext, onComplete, onError方法的调用需要保证外部的同步,以确保其满足reactive stream规范。
Sinks.one()
该方法会简单创建一个Sinks.One<T>实例,该实例可以看作Mono<T>,并且其emit方法和many稍有不同:
emitValue(T value): 产生一个onNext(value)信号,并且,在大多数实现中会产生一个onComplete信号emptyEmpty(): 只会产生一个onComplete信号,其和empty Mono等效emitError(Throwable t):产生一个onError(t)信号
Sinks.empty()
该方法会创建一个Sinks.Empty<T>实例,Sinks.Empty和Sinks.One类似,但是Sinks.Empty不提供emitValue方法。
Sinks.Empty无法触发onNext,但是仍可以指定<T>泛型类型。
Advanced
Batching
Grouping With Flux<GroupedFlux<T>>
grouping将会把source Flux分割为多个batch,每个batch都有一个key,grouping对应的operator是groupBy。
其中,每个group都被表示为GroupedFlux<T>,可以通过key方法获取该group关联的key。
一旦由source产生的一个element对应一个新key,那么该key对应的group就会被打开,并且,匹配该key的元素都会在该group中出现(在同一时刻,可能会有多个group处于开启状态)。
故而,group特性如下:
- group之间元素是不相交的(一个source element只能属于一个group)
- group不可能为空(只有group对应key的相匹配元素出现后,group才会被开启)
groupBy使用示例如下:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
当使用grouping时,最好包含较少的
group数量。如果group数量较多,在下游进行flatMap时,如果flatMap concurrency较小,将会造成挂起。
windowing with Flux<Flux<T>>
windowing指将Flux<T>分割为windows,根据大小、时间、boundary-defining predicates、boundary-defining publisher对flux进行分割。
windowing关联的操作符为window, windowTimeout, windowUntil, windowWhile, windowWhen。
windowXXX和groupBy不同的是, groupBy通常会基于key随机的切换group,但是window却是顺序打开的。
windowWhile
windowWhile通过predicate判断是否window应当被关闭,如果source element满足predicate条件,那么window将仍然开启。一旦predicate返回false,那么windows将会被关闭,并且触发的元素也会被丢丢弃。
示例如下:
// 元素sequence
1, 3, 5, 2, 4, 6, 11, 12, 13
// predicate
i -> i % 2 == 0
// windowWhile产生的windows
empty window (由1产生)
empty window (由3产生)
empty window (由5产生)
2, 4, 6 (由11产生)
12 (由13产生)
windowUntil
windowUntil通过predicate是否应该开启一个新的window,如果source element满足predicate条件,那么就会开启一个新的window,并且之前的windows会关闭,并且之前的windows会收到触发的元素。
示例如下:
// 元素sequence
1, 3, 5, 2, 4, 6, 11, 12, 13
// predicate
i -> i % 2 == 0
// windowUntil产生的windows
1, 3, 5, 2 (由2触发)
4 (由4触发)
6 (由6触发)
11, 12 (由12触发)
13
Buffering with Flux<List<T>>
buffering和windowing类似,但是区别如下:
- windows对window进行emit时,类型为
Flux<T> - buffering对buffer进行emit,类型为
Collection<T>
对于buffer的operator和window类似,buffer, bufferTimeout, bufferUntil, bufferWhile, bufferWhen。
相比于windowing operator开启一个新的window,buffer operator会创建一个新的Collection,并向其中添加元素。
同样的,buffer也会造成元素的overlap,示例如下:
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
由于maxSize > skip,导致buffer也出现了元素overlap。
但是,和windowing不同的是,bufferUntil和bufferWhile并不会发送empty collection,示例如下:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered by 13
.verifyComplete();
flatmap
flatMap方法接收一个Function类型的参数,该Function会将一个input item转化为一个Publisher,示例如下:
Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split(""));
上述Function的返回类型为Pbulisher<String>,其将字符串转为大写并对转换后的字符串进行分割,并基于分割后的字符串集合创建了一个新的reactive stream。
上述function的使用如下:
Flux<String> inFlux = Flux.just("baeldung", ".", "com");
Flux<String> outFlux = inFlux.flatMap(mapper);
由于上游存在三个字符串,故而flatMap方法基于上游的三个字符串创建了三个新的reactive stream。新建的三个stream,其中元素由上游字符串分割而得到,并且三个stream中的元素会被填充到另一个新建的reactive stream中。
对其进行subscribe之后,预期结果如下:
List<String> output = new ArrayList<>();
outFlux.subscribe(output::add);
assertThat(output).containsExactlyInAnyOrder("B", "A", "E", "L", "D", "U", "N", "G", ".", "C", "O", "M");
注意,最后输出字符的顺序可能是无序的。
Pipeline Operations
flatMap会通过传递给其的Function和onNext element创建新的reactive stream,并且,一旦新的stream(由Publisher表示)创建好后,flatMap会马上对其进行订阅。并且,订阅操作并不是阻塞的,operator在继续下一个stream之前并不需要等待当前订阅操作终止。
pipeline会同时处理所有基于input item派生的stream,并且,派生stream中的元素随时可能到达。故而,original order可能会被丢失。
如果original order比较重要,可以使用 flatMapSequential操作符。
concurrency
concurrency用于控制在途的inner sequences上限。
prefetch
prefetch用于控制每个inner publisher在途的元素上限
