25 KiB
- Reactor
Reactor
Reactive Programming
响应式编程是一种异步编程范式,关注于数据流和状态变化的传播。java的响应式编程接口被包含在java9的Flow中。
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用next方法,reactive stream是基于发布/订阅模型的。
迭代器模式是
pull-based,而reactive stream为push-based。
命令式迁移到响应式
可组合性与可读性
“可组合性"代表编排多个异步任务的能力,通过“组合”,可以将前一个异步任务的输出作为后一个异步任务的输入。或者,可以按照fork-join的形式对异步任务进行编排。
reactor同样能解决“可读性”的问题,在使用传统的callback model编写程序时,随着逻辑的复杂,异步进行的层数也会增加,这将会极大降低代码的可读性和可维护性。
在使用call model时,通常需要在回调中执行另一个回调,回调的嵌套通通常会被称为
callback heil。
reactor提供了复杂的“组合”选项,能够反映抽象异步进程的组织,并且,所有的内容通常都会位于同一层级。
Assembly Line
响应式应用中的数据处理类似于流水线,其中,reactor既是传送带又是工作站。数据来源于original publisher,最终传传递给subscriber。
数据在从publisher传送到subscriber的过程中,可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间,受影响的workstation会向上游发送消息来控制数据的生成速率。
Operators
在reactor中,Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到publisher中,并且前一个publisher包装到一个新的publisher实例中。
故而,operator将整个chain都链接起来,数据来源于第一个publisher,并随着chain移动,依次被每个链接处理,最终由subscriber结束该过程。
Nothing Happens Until You subscribe()
当通过reactor编写publisher chain时,数据并不会被泵入到chain中,编写chain只是创建了异步处理的抽象描述。
通过订阅行为,将publisher和subscriber绑定到了一起,订阅行为会触发chain中的数据流。该行为通过内部的signal实现,subscriber将会发送一个reuqest signal,该信号会被传递到chain上游,一直被传递到source publisher。
backpressure
传递到上游的信号该机制也被用于实现backpressure,在assembly line模型中,也被描述为workstation传递给上游的反馈信号,当workstation处理消息比上游workstation满时,会发送该反馈。
reactive stream定义的机制接近于上述描述,其提供两种模式:
- unbounded mode:source publisher可以按其最高速率不受限制的推送数据
- request mode:通过
request机制向source publisher发送信号,告知其准备好处理最多n个元素。
中间的operator也可以在传输过程中对请求做出修改,例如buffer operator可以将elements分割为以10个为单位的batch,如果subscriber请求一个buffer,那么上游source publisher可以产生10个元素。
通过backpressure,可以将push模型转化为push-pull模型:
- 当上游的n个元素已经准备好时,下游可以从上游拉取n个元素
- 当上有没有准备好n个元素时,后续如果n个元素被准备好,其将会被上游推送
hot & cold
对于响应式序列,其可以分为两种:
- cold sequence:对于cold sequence,会为每个订阅者重新开始流程,包括source publisher。例如source中若封装了http调用,会为每个subscriber都执行一个新的http请求
- hot sequence:subscriber只有在其订阅后才收到信号,即使没有subscriber在监听,hot sequence仍然能够发送signal
Subscriber和Publisher
Publisher
对于publisher,其提供了subscribe方法供subscriber进行注册,在执行subscribe方法并向其传入Subscriber对象后,上游publisher会调用下游的onSubscribe方法,并向onSubscribe方法传入Subscription对象。下游可以通过Subscription对象调用request(n)请求。
若中间存在operator(例如map)在担任publisher角色的同时,还对上游进行了订阅,那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
任何变更状态的操作都只在实际subscriber执行订阅操作后被触发。
Subscriber
当下游调用request(n)方法之后,会向上游请求n个数据。上游会向下游发送onNext信号来传输生成的数据。
Reactor Core
reactor引入了两个实现Publisher的类:Mono和Flux。
- Flux:代表包含
0...N个items的reactive sequence - Mono:代表包含
0...1个items的reactive sequence
上述两个类代表了在异步处理场景中的大致基数。
- Mono:例如,对于http请求的场景,一个请求只会产生一个响应,故而对响应执行
count操作并没有任何意义。此时,可以通过Mono<HttpResponse>来代表http调用的结果,Mono中只提供了上下文中包含0...1个元素的对应操作 - 当执行某些
可能会改变异步处理中最大基数的操作时,可能会导致类型的改变,例如执行Flux中的count操作将会返回Mono<Long>的类型
Flux 0...n
Flux<T>是一个标准的Publisher<T>,代表基数为0...n的异步序列,其可以被completion signal或异常所终止。根据reactive stream标准,存在三种signal,且信号会转化为对下游onNext、onComplete、onError的调用。
Flux是一个通用的reactive类型,并且,所有的event type都是可选的。
- 当没有
onNext事件但是存在onComplete事件,代表一个空的有限序列 - 当
onNext和onComplete事件都不存在时,代表一个空的无限序列 - 无限序列并不一定为空,例如
Flux.interval(Duration)会产生一个Flux<Long>,其是无限的并且发送tick
Mono 0...1
Mono<T>是一个标准的Publisher<T>,其通过onNext信号发送至多一个item,然后再结束时发送onComplete信号结束(成功场景);或直接发送onError信号结束(失败场景)。
大多数Mono实现在调用完subscriber的onNext方法之后,预计会立马调用subscriver的onComplete方法。但是,Mono.never是一个例外,其并不会发送任何信号,并且其onNext和onError的组合是被明确禁止的。
创建Mono/Flux并进行订阅的方式
String sequence
如果想要创建String序列,可以通过如下方式:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux.empty
Mono<String> noData = Mono.empty();
Flux.range
在下面示例中,Flux.range第一个参数是range的起始值,第二个参数是要产生的元素个数
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
故而,其产生的内容为5,6,7。
Lambda Subscribe
在进行订阅时,Flux和Mono使用了lambda,在调用subscribe时,有如下几种重载的选择:
// 订阅并触发sequence的产生
subscribe();
// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer);
// 在reactive stream异常终止时,对error进行处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 在sequence处理完时,执行额外的complete操作
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
对于subscribe的使用,示例如下
Flux.range(5,3)
.map(x->{
if(x<7) {
return x;
}
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
})
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
(e) -> {
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
},
()->{
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
});
其执行结果如下:
[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7
Disposable
上述subscribe方法的返回类型为Disposable,该接口代表subscriber对publisher的订阅是可取消的,如需取消订阅,调用dispose方法即可。
对于Mono和Flux而言,source publisher应该在接收到cancellation信号之后停止产生元素,并不能保证取消信号是即时的。(若source产生数据的速度过快,可能在接收到cancel信号之前,source就已经complete)。
Disposables类中存在一些对Disposable的工具方法,例如swap和composite。
BaseSubscriber
subscribe方法除了接收lambda外,还存在更通用的重载方法,接收Subscriber类型的参数。
在这种场景下,传参可以继承BaseSubscriber类。
并且,BaseSubscriber该类是一次性的,其只能够订阅一个publisher,如果其订阅了第二个publisher,那么其对第一个publisher的订阅将会被取消。
BaseSubscriber只能订阅一个publisher的原因是reactive stream规范要求onNext方法不能被并行调用。
示例如下:
Flux.range(3, 5)
.subscribe(new BaseSubscriber<Integer>() {
private Subscription subscription;
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5);
}
@Override
protected void hookOnNext(Integer value) {
log.info("onNext called: {}", value);
// this.subscription.request(1);
}
@Override
protected void hookOnComplete() {
log.info("onComplete called");
super.hookOnComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
log.info("onError called: {}", throwable.getMessage());
super.hookOnError(throwable);
}
});
上述示例中,通过向subscribe方法中传递自定义的BaseSubscriber来实现对上游的订阅,执行结果如下:
2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called
BaseSubscriber的hookOnSubscribe默认实现如下:
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
其请求的数量为Long.MAX_VALUE,代表其publisher为effectively unbounded。
可以通过重写hookOnSubscribe方法来自己指定request数量,如果需要自己指定请求数量,最少需要重写hookOnSubscribe和hookOnNext方法。
BaseSubscriber提供了requestUnbounded方法(其方法和request(Long.MAX_VALUE)等价)和cancel方法。
除了上述列出的hook外,BaseSubscriber还支持如下hooks:
- hookOnComplete
- hookOnError
- hookOnCancel
- hookFinally(当sequence终止时,都会被调用,可以用参数来判断终止类型为complete或error)
- hookFinally的调用顺序位于hookOnComplete和hookOnError之后
backpressure
在reactor的backpressure实现中,consumer pressure传播到上游source的机制是向上游operator发送request请求。当前已发送的请求个数之和被称为demand,并且demand的上限为Long.MAX_VALUE,当demand的值为Long.MAX_VALUE或更大时,代表unbound request。
unbound request代表尽可能快的产生数据,即backpressure关闭。
在reactive chain中,第一个请求来自于final subscriber,其在订阅时(onSubscribe)会发送第一个request请求。目前,大多直接订阅的方法都会通过Long.MAX_VALUE创建一个unbounded request,示例如下:
subcribe()方法和大多数基于lambda的重载方法(除了包含Consumer<Subscription>参数的重载)block,blockFirst,blockLast- 通过
toIterable或toStream进行遍历
目前,定义初始请求最简单的方法为通过BaseSubscription对上游进行订阅,并且重写onSubscribe方法。
buffer
在reactor中,可以通过部分operator进行request的reshape。示例如下:
Flux.range(1, 1000)
.buffer(3)
.subscribe(new BaseSubscriber<List<Integer>>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(List<Integer> value) {
for (Integer v : value) {
log.info("item received: {}", v);
}
}
});
在上述示例中,request(2)代表请求2个buffer,而每个buffer中包含3个Integer,故而总共会接收到2 * 3 = 6个元素。
输出如下:
2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6
prefetch
prefetch机制是一种backpressure的优化策略,用于确保下游处理数据时上游的数据能够即时补充,对吞吐量和资源利用率进行平衡。
prefetch机制通常分为如下部分:
初始请求
在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时,会向上游发送一个初始请求,请求大小为32个元素。
补充优化(Replenishing Optimization)
prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75%的预取元素已经被处理(32 *0.75 = 24),其自动会向上游发送一个新请求,要求补充75%的prefetch量。该过程是动态的,会在整个数据流处理过程中持续进行。
例如,prefetch的大小为10,其limit对应的值为
ceil(10 * 0.75) = 8,每当其下游被处理的元素达到8个,其会重新请求8个数据,并且将被下游处理元素的个数重置,重新从0开始计数,直到该值再达到8,再次发送请求
预加载数据
补充优化的优化点在于,当预取数据还剩余25%(8个)未被处理时,提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。
平滑处理
通过prefetch逐步请求新数据,且每次请求固定的量,可以保证处理数据速率的稳定。如果source端同时来源大量数据,那么若不进行平滑处理,则大量数据的同时处理可能导致竞争,令性能下降。
有如下operators可以对请求的prefetch进行调整
limitRate
除了prefetch之外,还可以通过limitRate或limitRequest来直接针对请求进行调节。
limitRate(N)将来自下游的请求进行拆分,当来自下游的请求被传播到上游时,其会被拆分为small batches。例如,如果下游调用request(100),此时limitRate(10)将会将其拆分为10个request(10)再传播给上游。并且,在此基础上,limitRate还实现了prefetch中的补充优化。
除了limitRate(N)之外(当没有传递lowTie时,limit默认会取N - N>>2,即ceil(N * 0.75)),limtRate还存在limitRate(highTie, lowTie)的重载方法。
lowTie
当lowTie取不同值时,其补充策略如下:
lowTie<=0:如果lowTie小于或等于0,则limit取值和prefetch值相同,仅当prefetch中所有元素都被下游处理完时,limtRate operator才会向上游请求数据lowTie>=prefetch: 当lowTie大于或等于prefetch时,limit取值为ceil(prefetch * 0.75),此时,补充策略和prefetch默认相同,当75%的数据被下游处理时,limitRte会重新向上游请求75%的数据- 若lowTie位于
(0, prefetch)区间之间- 若prefetch的值为
Long.MAX_VALUE,那么limit的值也为Long.MAX_VALUE - 若prefetch值不为
Long.MAX_VALUE,那么limit的值为lowTie,即lowTie的值即为消费后重新拉取的限制值
- 若prefetch的值为
limitRequest
limitRequest(N)用于限制下游请求的总个数。例如,向limitRequest(10)发起两次request,一次请求3一次请求8,那么最后下游只会接收到10个元素。
一旦source发送的元素个数超过
N时,limitRequest将会认为sequence已经完成,会向下游发送onComplete信号
Create Sequence
同步生成generate
创建Flux的最简单形式是通过generate方法生成,其接收一个Consumer<SynchronousSink<T>>类型的参数。
Sink
sink是spring reactor中的一个核心抽象概念,其可以被理解为数据流的出口或发射器,负责将元素、error或complete信号推送到下游订阅者。
sink可以分为如下种类:
- 同步/异步
- 单次/多次发射
sink的核心api如下:
next(T value): 向下游发送数据complete:表示数据流正常结束error(throwable):代表数据流因为错误而终止
generate方法用于产生同步且one by one的emission,即sink为SynchronousSink且其单次invocation中sink.next只能够被调用一次。示例如下:
Flux.generate(sink -> {
String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
sink.next(data); // 仅调用一次
if (isEndOfData()) sink.complete();
});
在调用
sink.next后,可以调用sink.complete或sink.error,但是对complete和error的调用都是可选的。
示例如下:
Flux.generate(sink -> {
sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
})
.take(10)
.subscribe(System.out::println);
通过generate的另一个重载方法generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)可以维护一个状态并进行管理,其中S为状态的对象类型。对于重载方法的两个参数,其含义如下:
stateSupplier:该参数用于提供一个初始状态,每当有新请求进入时,都会调用supplier生成初始状态generator:generator会返回一个新的state
示例如下:
Flux.generate(() -> 0, (s, sink) -> {
// System.out.printf("%s emitted\n", s);
sink.next(s);
return s + 1;
})
.take(5)
.subscribe(System.out::println);
其上游生成的整数依次增加,输出如下:
0
1
2
3
4
异步多线程生成create
Flux.create针对每个round能够产生多个元素,其暴露的sink类型为FluxSink。相对于Flux.generate,Flux.create方法并没有接收state的重载版本。
Flux.create并不会将你的代码变为并行或是异步的.如果在
Flux.create中存在阻塞操作,那么将存在死锁的风险。即使使用了subscribeOn方法,在create lambda中执行长阻塞操作仍然会阻塞item的处理,因为item的source产生和下游处理都处于同一线程中,但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。
Flux.create和listener based api进行适配
街射使用基于listener的api,api定义如下:
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
可以使用Flux.create将其与Flux相桥接:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
Flux.create backpressure
可以通过指定OverflowStrategy来优化backpressure行为,OverflowStrategy可选值如下:
-
IGNORE: 该选项会完全无视下游的backpressure request,当下游queue被填充满时,会抛出IllegalStateException异常部分操作符(例如
publishOn),其subscriber会内置queue,用于存储上游通过onNext推送的数据。当queue满时,将会抛出
IllegalStateException异常 -
ERROR:当下游无法跟上上游的数据发送速度时,上游将会发送IllegalStateException异常在上述描述中,
跟上代表subscriber发送的request个数是否大于source产生的数据个数,例如,若下游只发送了request(1),但是source产生了两个数据,调用了两次sink.next,那么第二次调用时requested已经为0,会调用onOverflow方法发送error信号 -
DROP:当下游无法跟上上游的数据发送速度时(上游发送onNext信号之前,如果下游requested配额不足),上游将会丢弃该数据,sink.next不做任何操作。
