Files
rikako-note/spring/webflux/Reactor.md
2025-04-01 18:20:38 +08:00

594 lines
32 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

- [Reactor](#reactor)
- [Reactive Programming](#reactive-programming)
- [命令式迁移到响应式](#命令式迁移到响应式)
- [可组合性与可读性](#可组合性与可读性)
- [Assembly Line](#assembly-line)
- [Operators](#operators)
- [Nothing Happens Until You subscribe()](#nothing-happens-until-you-subscribe)
- [backpressure](#backpressure)
- [hot \& cold](#hot--cold)
- [Subscriber和Publisher](#subscriber和publisher)
- [Publisher](#publisher)
- [Subscriber](#subscriber)
- [Reactor Core](#reactor-core)
- [Flux `0...n`](#flux-0n)
- [Mono `0...1`](#mono-01)
- [创建Mono/Flux并进行订阅的方式](#创建monoflux并进行订阅的方式)
- [String sequence](#string-sequence)
- [Flux.empty](#fluxempty)
- [Flux.range](#fluxrange)
- [Lambda Subscribe](#lambda-subscribe)
- [Disposable](#disposable)
- [BaseSubscriber](#basesubscriber)
- [backpressure](#backpressure-1)
- [buffer](#buffer)
- [prefetch](#prefetch)
- [初始请求](#初始请求)
- [补充优化Replenishing Optimization](#补充优化replenishing-optimization)
- [limitRate](#limitrate)
- [lowTie](#lowtie)
- [limitRequest](#limitrequest)
- [Create Sequence](#create-sequence)
- [同步生成`generate`](#同步生成generate)
- [Sink](#sink)
- [异步多线程生成`create`](#异步多线程生成create)
- [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配)
- [`Flux.create` backpressure](#fluxcreate-backpressure)
- [异步单线程生成`push`](#异步单线程生成push)
- [hybird push-pull model](#hybird-push-pull-model)
- [`sink.onRequest`](#sinkonrequest)
- [Handle](#handle)
- [`onCancel` \& `onDispose`](#oncancel--ondispose)
- [Threading and Schedulers](#threading-and-schedulers)
- [Scheduler](#scheduler)
>>>>>>> ff85f3d (doc: 阅读reactor文档)
# Reactor
## Reactive Programming
响应式编程是一种异步编程范式,关注于`数据流``状态变化的传播`。java的响应式编程接口被包含在java9的`Flow`中。
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用`next`方法reactive stream是基于`发布/订阅`模型的。
> 迭代器模式是`pull-based`而reactive stream为`push-based`。
### 命令式迁移到响应式
#### 可组合性与可读性
“可组合性"代表编排多个异步任务的能力通过“组合”可以将前一个异步任务的输出作为后一个异步任务的输入。或者可以按照fork-join的形式对异步任务进行编排。
reactor同样能解决“可读性”的问题在使用传统的callback model编写程序时随着逻辑的复杂异步进行的层数也会增加这将会极大降低代码的可读性和可维护性。
> 在使用call model时通常需要在回调中执行另一个回调回调的嵌套通通常会被称为`callback heil`。
reactor提供了复杂的“组合”选项能够反映抽象异步进程的组织并且所有的内容通常都会位于同一层级。
#### Assembly Line
响应式应用中的数据处理类似于流水线其中reactor既是传送带又是工作站。数据来源于`original publisher`,最终传传递给`subscriber`
数据在从publisher传送到subscriber的过程中可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间受影响的workstation会向上游发送消息来控制数据的生成速率。
#### Operators
在reactor中Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到`publisher`并且前一个publisher包装到一个新的publisher实例中。
故而operator将整个chain都链接起来数据来源于第一个publisher并随着chain移动依次被每个链接处理最终由subscriber结束该过程。
#### Nothing Happens Until You subscribe()
当通过reactor编写publisher chain时数据并不会被泵入到chain中编写chain只是创建了异步处理的抽象描述。
通过订阅行为将publisher和subscriber绑定到了一起订阅行为会触发chain中的数据流。该行为通过内部的signal实现subscriber将会发送一个`reuqest signal`该信号会被传递到chain上游一直被传递到source publisher。
#### backpressure
`传递到上游的信号`该机制也被用于实现backpressure在assembly line模型中也被描述为workstation传递给上游的反馈信号当workstation处理消息比上游workstation满时会发送该反馈。
reactive stream定义的机制接近于上述描述其提供两种模式
- unbounded modesource publisher可以按其最高速率不受限制的推送数据
- request mode通过`request`机制向source publisher发送信号告知其准备好处理最多`n`个元素。
中间的operator也可以在传输过程中对请求做出修改例如`buffer` operator可以将elements分割为以10个为单位的batch如果subscriber请求一个buffer那么上游source publisher可以产生10个元素。
通过backpressure可以将`push`模型转化为`push-pull`模型:
- 当上游的n个元素已经准备好时下游可以从上游拉取n个元素
- 当上有没有准备好n个元素时后续如果n个元素被准备好其将会被上游推送
#### hot & cold
对于响应式序列,其可以分为两种:
- cold sequence对于cold sequence会为每个订阅者重新开始流程包括source publisher。例如source中若封装了http调用会为每个subscriber都执行一个新的http请求
- hot sequencesubscriber只有在其订阅后才收到信号即使没有subscriber在监听hot sequence仍然能够发送signal
## Subscriber和Publisher
### Publisher
对于publisher其提供了`subscribe`方法供subscriber进行注册在执行subscribe方法并向其传入`Subscriber`对象后上游publisher会调用下游的`onSubscribe`方法,并向`onSubscribe`方法传入`Subscription`对象。下游可以通过`Subscription`对象调用`request(n)`请求。
> 若中间存在operator例如map在担任publisher角色的同时还对上游进行了订阅那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
>
> `任何变更状态的操作都只在实际subscriber执行订阅操作后被触发`。
### Subscriber
当下游调用`request(n)`方法之后,会向上游请求`n`个数据。上游会向下游发送`onNext`信号来传输生成的数据。
## Reactor Core
reactor引入了两个实现`Publisher`的类:`Mono``Flux`
- Flux代表包含`0...N`个items的reactive sequence
- Mono代表包含`0...1`个items的reactive sequence
上述两个类代表了在异步处理场景中的大致基数。
- Mono例如对于http请求的场景一个请求只会产生一个响应故而对响应执行`count`操作并没有任何意义。此时,可以通过`Mono<HttpResponse>`来代表http调用的结果`Mono`中只提供了上下文中包含`0...1`个元素的对应操作
- 当执行某些`可能会改变异步处理中最大基数的操作`时,可能会导致类型的改变,例如执行`Flux`中的`count`操作将会返回`Mono<Long>`的类型
### Flux `0...n`
![alt text](image.png)
`Flux<T>`是一个标准的`Publisher<T>`,代表基数为`0...n`的异步序列,其可以被`completion signal`或异常所终止。根据reactive stream标准存在三种signal且信号会转化为对下游`onNext``onComplete``onError`的调用。
Flux是一个通用的reactive类型并且所有的event type都是可选的。
- 当没有`onNext`事件但是存在`onComplete`事件,代表一个空的有限序列
-`onNext``onComplete`事件都不存在时,代表一个空的无限序列
- 无限序列并不一定为空,例如`Flux.interval(Duration)`会产生一个`Flux<Long>`其是无限的并且发送tick
### Mono `0...1`
`Mono<T>`是一个标准的`Publisher<T>`,其通过`onNext`信号发送至多一个item然后再结束时发送`onComplete`信号结束(成功场景);或直接发送`onError`信号结束(失败场景)。
大多数Mono实现在调用完subscriber的`onNext`方法之后预计会立马调用subscriver的`onComplete`方法。但是,`Mono.never`是一个例外,其并不会发送任何信号,并且其`onNext``onError`的组合是被明确禁止的。
### 创建Mono/Flux并进行订阅的方式
#### String sequence
如果想要创建String序列可以通过如下方式
```java
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
```
#### Flux.empty
```java
Mono<String> noData = Mono.empty();
```
#### Flux.range
在下面示例中,`Flux.range`第一个参数是range的起始值第二个参数是要产生的元素个数
```java
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
```
故而,其产生的内容为`5,6,7`
#### Lambda Subscribe
在进行订阅时,`Flux``Mono`使用了lambda在调用subscribe时有如下几种重载的选择
```java
// 订阅并触发sequence的产生
subscribe();
// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer);
// 在reactive stream异常终止时对error进行处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 在sequence处理完时执行额外的complete操作
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
```
对于subscribe的使用示例如下
```java
Flux.range(5,3)
.map(x->{
if(x<7) {
return x;
}
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
})
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
(e) -> {
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
},
()->{
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
});
```
其执行结果如下:
```
[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7
```
#### Disposable
上述`subscribe`方法的返回类型为`Disposable`该接口代表subscriber对publisher的订阅是可取消的如需取消订阅调用`dispose`方法即可。
对于Mono和Flux而言source publisher应该在接收到cancellation信号之后停止产生元素`并不能保证取消信号是即时的`。(`若source产生数据的速度过快可能在接收到cancel信号之前source就已经complete`)。
`Disposables`类中存在一些对`Disposable`的工具方法,例如`swap``composite`
#### BaseSubscriber
`subscribe`方法除了接收lambda外还存在更通用的重载方法接收`Subscriber`类型的参数。
在这种场景下,传参可以继承`BaseSubscriber`类。
并且,`BaseSubscriber`该类是一次性的,`其只能够订阅一个publisher如果其订阅了第二个publisher那么其对第一个publisher的订阅将会被取消`
> `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。
示例如下:
```java
Flux.range(3, 5)
.subscribe(new BaseSubscriber<Integer>() {
private Subscription subscription;
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5);
}
@Override
protected void hookOnNext(Integer value) {
log.info("onNext called: {}", value);
// this.subscription.request(1);
}
@Override
protected void hookOnComplete() {
log.info("onComplete called");
super.hookOnComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
log.info("onError called: {}", throwable.getMessage());
super.hookOnError(throwable);
}
});
```
上述示例中,通过向`subscribe`方法中传递自定义的`BaseSubscriber`来实现对上游的订阅,执行结果如下:
```
2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called
```
`BaseSubscriber``hookOnSubscribe`默认实现如下:
```java
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
```
其请求的数量为`Long.MAX_VALUE`代表其publisher为`effectively unbounded`
可以通过重写`hookOnSubscribe`方法来自己指定request数量如果需要自己指定请求数量最少需要重写`hookOnSubscribe``hookOnNext`方法。
`BaseSubscriber`提供了`requestUnbounded`方法(`其方法和request(Long.MAX_VALUE)等价`)和`cancel`方法。
除了上述列出的hook外`BaseSubscriber`还支持如下hooks
- hookOnComplete
- hookOnError
- hookOnCancel
- hookFinally(当sequence终止时都会被调用可以用参数来判断终止类型为complete或error)
- hookFinally的调用顺序位于hookOnComplete和hookOnError之后
#### backpressure
在reactor的backpressure实现中consumer pressure传播到上游source的机制是向上游operator发送`request`请求。当前已发送的请求个数之和被称为`demand`,并且`demand`的上限为`Long.MAX_VALUE`当demand的值为`Long.MAX_VALUE`或更大时,代表`unbound request`
> `unbound request`代表尽可能快的产生数据即backpressure关闭。
在reactive chain中第一个请求来自于`final subscriber`,其在订阅时(onSubscribe)会发送第一个`request`请求。目前,大多直接订阅的方法都会通过`Long.MAX_VALUE`创建一个unbounded request示例如下
- `subcribe()`方法和大多数基于lambda的重载方法除了包含`Consumer<Subscription>`参数的重载)
- `block`, `blockFirst` `blockLast`
- 通过`toIterable``toStream`进行遍历
目前,定义初始请求最简单的方法为`通过BaseSubscription对上游进行订阅并且重写onSubscribe方法`
#### buffer
在reactor中可以通过部分operator进行request的reshape。示例如下
```java
Flux.range(1, 1000)
.buffer(3)
.subscribe(new BaseSubscriber<List<Integer>>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(List<Integer> value) {
for (Integer v : value) {
log.info("item received: {}", v);
}
}
});
```
在上述示例中,`request(2)`代表请求`2个buffer`,而每个`buffer`中包含`3``Integer`,故而总共会接收到`2 * 3 = 6`个元素。
输出如下:
```
2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6
```
#### prefetch
`prefetch`机制是一种backpressure的优化策略用于确保下游处理数据时上游的数据能够即时补充对吞吐量和资源利用率进行平衡。
prefetch机制通常分为如下部分
##### 初始请求
在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时会向上游发送一个初始请求请求大小为32个元素。
##### 补充优化Replenishing Optimization
prefetch的补充优化通常采用75%的启发规则一旦操作符发现75%的预取元素已经被处理32 *0.75 = 24其自动会向上游发送一个新请求要求补充75%的prefetch量。该过程是动态的会在整个数据流处理过程中持续进行。
> 例如prefetch的大小为10其limit对应的值为`ceil(10 * 0.75) = 8`每当其下游被处理的元素达到8个其会重新请求8个数据并且将`被下游处理元素的个数`重置重新从0开始计数直到该值再达到8再次发送请求
> ##### 预加载数据
> 补充优化的优化点在于当预取数据还剩余25%8个未被处理时提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。
> ##### 平滑处理
> 通过prefetch逐步请求新数据且每次请求固定的量可以保证处理数据速率的稳定。如果source端同时来源大量数据那么若不进行平滑处理则大量数据的同时处理可能导致竞争令性能下降。
有如下operators可以对请求的prefetch进行调整
#### limitRate
除了`prefetch`之外,还可以通过`limitRate``limitRequest`来直接针对请求进行调节。
`limitRate(N)`将来自下游的请求进行拆分当来自下游的请求被传播到上游时其会被拆分为small batches。例如如果下游调用`request(100)`,此时`limitRate(10)`将会将其拆分为10个`request(10)`再传播给上游。并且在此基础上limitRate还实现了prefetch中的补充优化。
除了`limitRate(N)`之外(当没有传递`lowTie`limit默认会取`N - N>>2`,即`ceil(N * 0.75)`limtRate还存在`limitRate(highTie, lowTie)`的重载方法。
##### lowTie
当lowTie取不同值时其补充策略如下
- `lowTie<=0`:如果`lowTie`小于或等于0则limit取值和`prefetch`值相同仅当prefetch中所有元素都被下游处理完时limtRate operator才会向上游请求数据
- `lowTie>=prefetch` 当lowTie大于或等于prefetch时limit取值为`ceil(prefetch * 0.75)`此时补充策略和prefetch默认相同当75%的数据被下游处理时limitRte会重新向上游请求75%的数据
- 若lowTie位于`(0, prefetch)`区间之间
- 若prefetch的值为`Long.MAX_VALUE`那么limit的值也为`Long.MAX_VALUE`
- 若prefetch值不为`Long.MAX_VALUE`那么limit的值为`lowTie`,即`lowTie`的值即为消费后重新拉取的限制值
#### limitRequest
`limitRequest(N)`用于限制下游请求的总个数。例如,向`limitRequest(10)`发起两次request一次请求3一次请求8那么最后下游只会接收到10个元素。
> 一旦source发送的元素个数超过`N`时,`limitRequest`将会认为sequence已经完成会向下游发送onComplete信号
### Create Sequence
#### 同步生成`generate`
创建`Flux`的最简单形式是通过`generate`方法生成,其接收一个`Consumer<SynchronousSink<T>>`类型的参数。
##### Sink
sink是spring reactor中的一个核心抽象概念其可以被理解为数据流的出口或`发射器``负责将元素、error或complete信号推送到下游订阅者`
sink可以分为如下种类
- 同步/异步
- 单次/多次发射
sink的核心api如下
- `next(T value)`: 向下游发送数据
- `complete`:表示数据流正常结束
- `error(throwable)`:代表数据流因为错误而终止
generate方法用于产生同步且`one by one`的emission即sink为`SynchronousSink`且其单次invocation中`sink.next`只能够被调用一次。示例如下:
```java
Flux.generate(sink -> {
String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
sink.next(data); // 仅调用一次
if (isEndOfData()) sink.complete();
});
```
> 在调用`sink.next`后,可以调用`sink.complete`或`sink.error`,但是对`complete`和`error`的调用都是可选的。
示例如下:
```java
Flux.generate(sink -> {
sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
})
.take(10)
.subscribe(System.out::println);
```
通过generate的另一个重载方法`generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)`可以维护一个状态并进行管理,其中`S`为状态的对象类型。对于重载方法的两个参数,其含义如下:
- `stateSupplier`该参数用于提供一个初始状态每当有新请求进入时都会调用supplier生成初始状态
- `generator`generator会返回一个新的state
示例如下:
```java
Flux.generate(() -> 0, (s, sink) -> {
// System.out.printf("%s emitted\n", s);
sink.next(s);
return s + 1;
})
.take(5)
.subscribe(System.out::println);
```
其上游生成的整数依次增加,输出如下:
```
0
1
2
3
4
```
#### 异步多线程生成`create`
`Flux.create`针对每个round能够产生多个元素其暴露的sink类型为`FluxSink`。相对于`Flux.generate``Flux.create`方法并没有接收`state`的重载版本。
> `Flux.create`并不会将你的代码变为并行或是异步的.
>
> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法在create lambda中执行长阻塞操作仍然会阻塞item的处理因为item的source产生和下游处理都处于同一线程中但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。
##### `Flux.create`和`listener based api`进行适配
街射使用基于listener的apiapi定义如下
```java
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
```
可以使用`Flux.create`将其与Flux相桥接
```java
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
```
##### `Flux.create` backpressure
可以通过指定`OverflowStrategy`来优化backpressure行为OverflowStrategy可选值如下
- `IGNORE`: 该选项会完全无视下游的backpressure request当下游queue被填充满时会抛出`IllegalStateException`异常
> 部分操作符(例如`publishOn`其subscriber会内置queue用于存储上游通过`onNext`推送的数据。
>
> 当queue满时将会抛出`IllegalStateException`异常
- `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常
> 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数例如若下游只发送了`request(1)`但是source产生了两个数据调用了两次`sink.next`,那么第二次调用时`requested`已经为0会调用`onOverflow`方法发送`error`信号
- `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。
- `LATEST`: 当上游想要推送数据到下游时,如果下游`requested`不足,那么上游会用最新的数据覆盖下游之前缓存的数据
- 例如,下游堆积事件`1`,此时上游推送事件`2`,则`2`会覆盖`1`,之后上游再推送数据`3``3`会覆盖`2`...之后,下游调用`request(1)`后,获取到的是最新的数据`3`,数据`1,2`都被覆盖
- 其实现是通过`LatestAsyncSink`中的`queue`来实现的queue是一个`AtomicReference`类型的field当调用`sink.next`向下游发送数据时如果下游不满足requested那么将会将值`set`到queue中从而实现后面的数据覆盖前面的数据
- `BUFFER`(默认): `BufferAsyncSink`中会存在一个无界队列,如果当上游调用`sink.next`尝试向下游发送数据时如果下游requested条件不满足会将数据缓存在无界队列中。`使用无界队列进行缓存可能会导致OOM`
#### 异步单线程生成`push`
`push`介于`generate``create`之间,其用于对单个生产者产生的事件进行处理,`push``generate``create`的相似点如下:
-`create`类似,其可以是异步的,并且可以通过`OverflowStrategy`来管理backpressure
-`generate`类似,对于`push`,其`sink.next`,`sink.complete`,`sink.error`在指定时刻只能被一个线程调用
其对异步事件的桥接示例如下:
```java
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
public void processError(Throwable e) {
sink.error(e);
}
});
});
```
##### hybird push-pull model
大多数reactor operators(例如create)都使用了混合推拉的模型。在混合推拉模型中大多数事件的处理都是异步的建议采用push的方法
在混合推拉模型中consumer从source处拉取数据`在consumer发送第一次request之前source并不会emit任何数据`source可能会生产数据但是在requested余量不足时并不会调用下游的onNext根据OverflowStrategy的不同会执行不同处理`source只会在下游请求的范围内向下游推送数据`
并且在最下游的subscriber调用上游`subscribe`方法时,`source的数据生成逻辑才会被触发`,调用`subscribe`方法后首先会调用subscriber的`onSubscribe`方法然后会一直沿着reactive stream向前追溯直到找到source然后调用`Flux.create`接受的`Consumer<? super FluxSink<T>>`
##### `sink.onRequest`
sink还提供了`onRequest`方法,其接受一个`LongConsumer`类似的参数可以为sink注册一个`onRequest`回调,后续针对`sink`(sink实现了`Subscription`)调用request方法时`consumer`都会被调用。
`sink.onRequest`的使用示例如下:
```java
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
//
for(String s : messages) {
sink.next(s);
}
}
});
sink.onRequest(n -> {
// 此处代码将会在`sink.request`被调用时触发
// 主动向processor拉取数据
List<String> messages = myMessageProcessor.getHistory(n);
for(String s : messages) {
sink.next(s);
}
});
});
```
#### Handle
`handle`为一个`instance method`非静态方法其关联了一个已经存在的source。`Mono``Flux`中都存在`handle`方法。
`generate`类似,`handle`使用`SynchronousSink`,并且只允许逐个发出。`handle`的作用类似`map``filter`的组合,可以跳过部分元素,其签名如下
```java
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
```
其使用示例如下:
```java
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
// 由于reactive steam中不允许有null
// 故而当通过`alphabet`方法映射可能返回null值时
// 可以使用handle对返回值进行过滤只有不为空时才调用`sink.next`
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
```
#### `onCancel` & `onDispose`
`sink`支持`onCancel``onDispose`回调,其区别如下:
- `onDispose`回调用于执行cleanup操作其在`complete`, `error`, `cancel`之后都会被调用。
- `onCancel`回调用于执行取消操作,其只针对`cancel`执行并且执行顺序位于cleanup之前
其使用示例如下所示:
```java
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
```
### Threading and Schedulers
reactor模型和rxjava模型类型是并发无关的并不强制要求并发模型。并且大多数operator其运行的线程都和前一个operator相同。
`除非显式指定否则最顶层的operatorsource也会运行在subscribe方法被调用的线程中`
#### Scheduler
在reactor中操作执行在哪个线程中取决于`使用的Scheduler`。Scheduler和ExectuorService类似负责对任务进行调度但是相比于ExecutorService其功能更加丰富。
`Scheudlers`类中存在如下静态方法,分别访问不同的`execution context`
- `Schedulers.immediate()`:没有执行上下文,被提交的任务会在当前线程中被立马执行
- `Schedulers.single()`:线程上下文为一个`单个、可重用`的线程上下文。该方法会对所有的调用都使用相同的线程直到scheduler被`disposed`
- `Schedulers.newSingle()`:每次调用时都使用一个专属线程
- `Schedulers.elastic()`:该上下文是一个`无界、弹性的线程池`。在引入`Schedulers.boundedElastic()`方法后,该方法不再推荐被使用。
- `Schedulers.boundedElastic()`:该上下文是一个`有界、弹性的线程池`。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现:
- `ExecutorService-based`会在多个tasks之间对平台线程进行重用