Files
rikako-note/spring/webflux/Reactor.md
2025-06-03 23:27:54 +08:00

1372 lines
67 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

- [Reactor](#reactor)
- [Reactive Programming](#reactive-programming)
- [命令式迁移到响应式](#命令式迁移到响应式)
- [可组合性与可读性](#可组合性与可读性)
- [Assembly Line](#assembly-line)
- [Operators](#operators)
- [Nothing Happens Until You subscribe()](#nothing-happens-until-you-subscribe)
- [backpressure](#backpressure)
- [hot \& cold](#hot--cold)
- [Subscriber和Publisher](#subscriber和publisher)
- [Publisher](#publisher)
- [Subscriber](#subscriber)
- [Reactor Core](#reactor-core)
- [Flux `0...n`](#flux-0n)
- [Mono `0...1`](#mono-01)
- [创建Mono/Flux并进行订阅的方式](#创建monoflux并进行订阅的方式)
- [String sequence](#string-sequence)
- [Flux.empty](#fluxempty)
- [Flux.range](#fluxrange)
- [Lambda Subscribe](#lambda-subscribe)
- [Disposable](#disposable)
- [BaseSubscriber](#basesubscriber)
- [backpressure](#backpressure-1)
- [buffer](#buffer)
- [prefetch](#prefetch)
- [初始请求](#初始请求)
- [补充优化Replenishing Optimization](#补充优化replenishing-optimization)
- [limitRate](#limitrate)
- [lowTie](#lowtie)
- [limitRequest](#limitrequest)
- [Create Sequence](#create-sequence)
- [同步生成`generate`](#同步生成generate)
- [Sink](#sink)
- [异步多线程生成`create`](#异步多线程生成create)
- [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配)
- [`Flux.create` backpressure](#fluxcreate-backpressure)
- [异步单线程生成`push`](#异步单线程生成push)
- [hybird push-pull model](#hybird-push-pull-model)
- [`sink.onRequest`](#sinkonrequest)
- [Handle](#handle)
- [`onCancel` \& `onDispose`](#oncancel--ondispose)
- [Threading and Schedulers](#threading-and-schedulers)
- [Scheduler](#scheduler)
- [createWorker](#createworker)
- [operators using default scheduler](#operators-using-default-scheduler)
- [switch thread context](#switch-thread-context)
- [publishOn](#publishon)
- [subscribeOn](#subscribeon)
- [`subscribeOn`原理](#subscribeon原理)
- [Handling Errors](#handling-errors)
- [static fallback value](#static-fallback-value)
- [catch and swallow error](#catch-and-swallow-error)
- [fallback method](#fallback-method)
- [Dynamic Fallback Value](#dynamic-fallback-value)
- [Catch and rethrow](#catch-and-rethrow)
- [log and react on the side](#log-and-react-on-the-side)
- [doOnError](#doonerror)
- [Using resources and the finally block](#using-resources-and-the-finally-block)
- [doFinally](#dofinally)
- [using](#using)
- [retry](#retry)
- [retry原理](#retry原理)
- [retryWhen](#retrywhen)
- [retry helper](#retry-helper)
- [retrying with transient errors](#retrying-with-transient-errors)
- [handling Exceptions in operators or functions](#handling-exceptions-in-operators-or-functions)
- [checked exception](#checked-exception)
- [Exceptions Utility](#exceptions-utility)
- [Sinks](#sinks)
- [在多线程环境下安全的使用`Sinks.One`和`Sinks.Many`](#在多线程环境下安全的使用sinksone和sinksmany)
- [`tryEmit` \& `emit`](#tryemit--emit)
- [Processor](#processor)
- [`asFlux`和`asMono`](#asflux和asmono)
- [`Sinks.many().unicast().onBackpressureBuffer(args?)`](#sinksmanyunicastonbackpressurebufferargs)
- [onBackpressureBuffer](#onbackpressurebuffer)
- [Sinks.many().multicast().onBackpressureBuffer(args?)](#sinksmanymulticastonbackpressurebufferargs)
- [autoCancel](#autocancel)
- [Sinks.many().multicast().directAllOrNothing()](#sinksmanymulticastdirectallornothing)
- [Sinks.many().multicast().directBestEffort()](#sinksmanymulticastdirectbesteffort)
- [Sinks.many().replay()](#sinksmanyreplay)
- [Sinks.unsafe().many()](#sinksunsafemany)
- [Sinks.one()](#sinksone)
- [Sinks.empty()](#sinksempty)
- [Advanced](#advanced)
- [Batching](#batching)
- [Grouping With `Flux<GroupedFlux<T>>`](#grouping-with-fluxgroupedfluxt)
- [windowing with `Flux<Flux<T>>`](#windowing-with-fluxfluxt)
- [windowWhile](#windowwhile)
- [windowUntil](#windowuntil)
- [Buffering with `Flux<List<T>>`](#buffering-with-fluxlistt)
- [flatmap](#flatmap)
- [Pipeline Operations](#pipeline-operations)
- [concurrency](#concurrency)
- [prefetch](#prefetch-1)
- [zip](#zip)
- [zipWith](#zipwith)
- [defer](#defer)
# Reactor
## Reactive Programming
响应式编程是一种异步编程范式,关注于`数据流``状态变化的传播`。java的响应式编程接口被包含在java9的`Flow`中。
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用`next`方法reactive stream是基于`发布/订阅`模型的。
> 迭代器模式是`pull-based`而reactive stream为`push-based`。
### 命令式迁移到响应式
#### 可组合性与可读性
“可组合性"代表编排多个异步任务的能力通过“组合”可以将前一个异步任务的输出作为后一个异步任务的输入。或者可以按照fork-join的形式对异步任务进行编排。
reactor同样能解决“可读性”的问题在使用传统的callback model编写程序时随着逻辑的复杂异步进行的层数也会增加这将会极大降低代码的可读性和可维护性。
> 在使用call model时通常需要在回调中执行另一个回调回调的嵌套通通常会被称为`callback heil`。
reactor提供了复杂的“组合”选项能够反映抽象异步进程的组织并且所有的内容通常都会位于同一层级。
#### Assembly Line
响应式应用中的数据处理类似于流水线其中reactor既是传送带又是工作站。数据来源于`original publisher`,最终传传递给`subscriber`
数据在从publisher传送到subscriber的过程中可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间受影响的workstation会向上游发送消息来控制数据的生成速率。
#### Operators
在reactor中Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到`publisher`并且前一个publisher包装到一个新的publisher实例中。
故而operator将整个chain都链接起来数据来源于第一个publisher并随着chain移动依次被每个链接处理最终由subscriber结束该过程。
#### Nothing Happens Until You subscribe()
当通过reactor编写publisher chain时数据并不会被泵入到chain中编写chain只是创建了异步处理的抽象描述。
通过订阅行为将publisher和subscriber绑定到了一起订阅行为会触发chain中的数据流。该行为通过内部的signal实现subscriber将会发送一个`reuqest signal`该信号会被传递到chain上游一直被传递到source publisher。
#### backpressure
`传递到上游的信号`该机制也被用于实现backpressure在assembly line模型中也被描述为workstation传递给上游的反馈信号当workstation处理消息比上游workstation满时会发送该反馈。
reactive stream定义的机制接近于上述描述其提供两种模式
- unbounded modesource publisher可以按其最高速率不受限制的推送数据
- request mode通过`request`机制向source publisher发送信号告知其准备好处理最多`n`个元素。
中间的operator也可以在传输过程中对请求做出修改例如`buffer` operator可以将elements分割为以10个为单位的batch如果subscriber请求一个buffer那么上游source publisher可以产生10个元素。
通过backpressure可以将`push`模型转化为`push-pull`模型:
- 当上游的n个元素已经准备好时下游可以从上游拉取n个元素
- 当上有没有准备好n个元素时后续如果n个元素被准备好其将会被上游推送
#### hot & cold
对于响应式序列,其可以分为两种:
- cold sequence对于cold sequence会为每个订阅者重新开始流程包括source publisher。例如source中若封装了http调用会为每个subscriber都执行一个新的http请求
- hot sequencesubscriber只有在其订阅后才收到信号即使没有subscriber在监听hot sequence仍然能够发送signal
## Subscriber和Publisher
### Publisher
对于publisher其提供了`subscribe`方法供subscriber进行注册在执行subscribe方法并向其传入`Subscriber`对象后上游publisher会调用下游的`onSubscribe`方法,并向`onSubscribe`方法传入`Subscription`对象。下游可以通过`Subscription`对象调用`request(n)`请求。
> 若中间存在operator例如map在担任publisher角色的同时还对上游进行了订阅那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
>
> `任何变更状态的操作都只在实际subscriber执行订阅操作后被触发`。
### Subscriber
当下游调用`request(n)`方法之后,会向上游请求`n`个数据。上游会向下游发送`onNext`信号来传输生成的数据。
## Reactor Core
reactor引入了两个实现`Publisher`的类:`Mono``Flux`
- Flux代表包含`0...N`个items的reactive sequence
- Mono代表包含`0...1`个items的reactive sequence
上述两个类代表了在异步处理场景中的大致基数。
- Mono例如对于http请求的场景一个请求只会产生一个响应故而对响应执行`count`操作并没有任何意义。此时,可以通过`Mono<HttpResponse>`来代表http调用的结果`Mono`中只提供了上下文中包含`0...1`个元素的对应操作
- 当执行某些`可能会改变异步处理中最大基数的操作`时,可能会导致类型的改变,例如执行`Flux`中的`count`操作将会返回`Mono<Long>`的类型
### Flux `0...n`
![alt text](image.png)
`Flux<T>`是一个标准的`Publisher<T>`,代表基数为`0...n`的异步序列,其可以被`completion signal`或异常所终止。根据reactive stream标准存在三种signal且信号会转化为对下游`onNext``onComplete``onError`的调用。
Flux是一个通用的reactive类型并且所有的event type都是可选的。
- 当没有`onNext`事件但是存在`onComplete`事件,代表一个空的有限序列
-`onNext``onComplete`事件都不存在时,代表一个空的无限序列
- 无限序列并不一定为空,例如`Flux.interval(Duration)`会产生一个`Flux<Long>`其是无限的并且发送tick
### Mono `0...1`
`Mono<T>`是一个标准的`Publisher<T>`,其通过`onNext`信号发送至多一个item然后再结束时发送`onComplete`信号结束(成功场景);或直接发送`onError`信号结束(失败场景)。
大多数Mono实现在调用完subscriber的`onNext`方法之后预计会立马调用subscriver的`onComplete`方法。但是,`Mono.never`是一个例外,其并不会发送任何信号,并且其`onNext``onError`的组合是被明确禁止的。
### 创建Mono/Flux并进行订阅的方式
#### String sequence
如果想要创建String序列可以通过如下方式
```java
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
```
#### Flux.empty
```java
Mono<String> noData = Mono.empty();
```
#### Flux.range
在下面示例中,`Flux.range`第一个参数是range的起始值第二个参数是要产生的元素个数
```java
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
```
故而,其产生的内容为`5,6,7`
#### Lambda Subscribe
在进行订阅时,`Flux``Mono`使用了lambda在调用subscribe时有如下几种重载的选择
```java
// 订阅并触发sequence的产生
subscribe();
// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer);
// 在reactive stream异常终止时对error进行处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 在sequence处理完时执行额外的complete操作
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
```
对于subscribe的使用示例如下
```java
Flux.range(5,3)
.map(x->{
if(x<7) {
return x;
}
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
})
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
(e) -> {
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
},
()->{
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
});
```
其执行结果如下:
```
[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7
```
#### Disposable
上述`subscribe`方法的返回类型为`Disposable`该接口代表subscriber对publisher的订阅是可取消的如需取消订阅调用`dispose`方法即可。
对于Mono和Flux而言source publisher应该在接收到cancellation信号之后停止产生元素`并不能保证取消信号是即时的`。(`若source产生数据的速度过快可能在接收到cancel信号之前source就已经complete`)。
`Disposables`类中存在一些对`Disposable`的工具方法,例如`swap``composite`
#### BaseSubscriber
`subscribe`方法除了接收lambda外还存在更通用的重载方法接收`Subscriber`类型的参数。
在这种场景下,传参可以继承`BaseSubscriber`类。
并且,`BaseSubscriber`该类是一次性的,`其只能够订阅一个publisher如果其订阅了第二个publisher那么其对第一个publisher的订阅将会被取消`
> `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。
示例如下:
```java
Flux.range(3, 5)
.subscribe(new BaseSubscriber<Integer>() {
private Subscription subscription;
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5);
}
@Override
protected void hookOnNext(Integer value) {
log.info("onNext called: {}", value);
// this.subscription.request(1);
}
@Override
protected void hookOnComplete() {
log.info("onComplete called");
super.hookOnComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
log.info("onError called: {}", throwable.getMessage());
super.hookOnError(throwable);
}
});
```
上述示例中,通过向`subscribe`方法中传递自定义的`BaseSubscriber`来实现对上游的订阅,执行结果如下:
```
2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called
```
`BaseSubscriber``hookOnSubscribe`默认实现如下:
```java
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
```
其请求的数量为`Long.MAX_VALUE`代表其publisher为`effectively unbounded`
可以通过重写`hookOnSubscribe`方法来自己指定request数量如果需要自己指定请求数量最少需要重写`hookOnSubscribe``hookOnNext`方法。
`BaseSubscriber`提供了`requestUnbounded`方法(`其方法和request(Long.MAX_VALUE)等价`)和`cancel`方法。
除了上述列出的hook外`BaseSubscriber`还支持如下hooks
- hookOnComplete
- hookOnError
- hookOnCancel
- hookFinally(当sequence终止时都会被调用可以用参数来判断终止类型为complete或error)
- hookFinally的调用顺序位于hookOnComplete和hookOnError之后
#### backpressure
在reactor的backpressure实现中consumer pressure传播到上游source的机制是向上游operator发送`request`请求。当前已发送的请求个数之和被称为`demand`,并且`demand`的上限为`Long.MAX_VALUE`当demand的值为`Long.MAX_VALUE`或更大时,代表`unbound request`
> `unbound request`代表尽可能快的产生数据即backpressure关闭。
在reactive chain中第一个请求来自于`final subscriber`,其在订阅时(onSubscribe)会发送第一个`request`请求。目前,大多直接订阅的方法都会通过`Long.MAX_VALUE`创建一个unbounded request示例如下
- `subcribe()`方法和大多数基于lambda的重载方法除了包含`Consumer<Subscription>`参数的重载)
- `block`, `blockFirst` `blockLast`
- 通过`toIterable``toStream`进行遍历
目前,定义初始请求最简单的方法为`通过BaseSubscription对上游进行订阅并且重写onSubscribe方法`
#### buffer
在reactor中可以通过部分operator进行request的reshape。示例如下
```java
Flux.range(1, 1000)
.buffer(3)
.subscribe(new BaseSubscriber<List<Integer>>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(List<Integer> value) {
for (Integer v : value) {
log.info("item received: {}", v);
}
}
});
```
在上述示例中,`request(2)`代表请求`2个buffer`,而每个`buffer`中包含`3``Integer`,故而总共会接收到`2 * 3 = 6`个元素。
输出如下:
```
2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6
```
#### prefetch
`prefetch`机制是一种backpressure的优化策略用于确保下游处理数据时上游的数据能够即时补充对吞吐量和资源利用率进行平衡。
prefetch机制通常分为如下部分
##### 初始请求
在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时会向上游发送一个初始请求请求大小为32个元素。
##### 补充优化Replenishing Optimization
prefetch的补充优化通常采用75%的启发规则一旦操作符发现75%的预取元素已经被处理32 *0.75 = 24其自动会向上游发送一个新请求要求补充75%的prefetch量。该过程是动态的会在整个数据流处理过程中持续进行。
> 例如prefetch的大小为10其limit对应的值为`ceil(10 * 0.75) = 8`每当其下游被处理的元素达到8个其会重新请求8个数据并且将`被下游处理元素的个数`重置重新从0开始计数直到该值再达到8再次发送请求
> ##### 预加载数据
> 补充优化的优化点在于当预取数据还剩余25%8个未被处理时提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。
> ##### 平滑处理
> 通过prefetch逐步请求新数据且每次请求固定的量可以保证处理数据速率的稳定。如果source端同时来源大量数据那么若不进行平滑处理则大量数据的同时处理可能导致竞争令性能下降。
有如下operators可以对请求的prefetch进行调整
#### limitRate
除了`prefetch`之外,还可以通过`limitRate``limitRequest`来直接针对请求进行调节。
`limitRate(N)`将来自下游的请求进行拆分当来自下游的请求被传播到上游时其会被拆分为small batches。例如如果下游调用`request(100)`,此时`limitRate(10)`将会将其拆分为10个`request(10)`再传播给上游。并且在此基础上limitRate还实现了prefetch中的补充优化。
除了`limitRate(N)`之外(当没有传递`lowTie`limit默认会取`N - N>>2`,即`ceil(N * 0.75)`limtRate还存在`limitRate(highTie, lowTie)`的重载方法。
##### lowTie
当lowTie取不同值时其补充策略如下
- `lowTie<=0`:如果`lowTie`小于或等于0则limit取值和`prefetch`值相同仅当prefetch中所有元素都被下游处理完时limtRate operator才会向上游请求数据
- `lowTie>=prefetch` 当lowTie大于或等于prefetch时limit取值为`ceil(prefetch * 0.75)`此时补充策略和prefetch默认相同当75%的数据被下游处理时limitRte会重新向上游请求75%的数据
- 若lowTie位于`(0, prefetch)`区间之间
- 若prefetch的值为`Long.MAX_VALUE`那么limit的值也为`Long.MAX_VALUE`
- 若prefetch值不为`Long.MAX_VALUE`那么limit的值为`lowTie`,即`lowTie`的值即为消费后重新拉取的限制值
#### limitRequest
`limitRequest(N)`用于限制下游请求的总个数。例如,向`limitRequest(10)`发起两次request一次请求3一次请求8那么最后下游只会接收到10个元素。
> 一旦source发送的元素个数超过`N`时,`limitRequest`将会认为sequence已经完成会向下游发送onComplete信号
### Create Sequence
#### 同步生成`generate`
创建`Flux`的最简单形式是通过`generate`方法生成,其接收一个`Consumer<SynchronousSink<T>>`类型的参数。
##### Sink
sink是spring reactor中的一个核心抽象概念其可以被理解为数据流的出口或`发射器``负责将元素、error或complete信号推送到下游订阅者`
sink可以分为如下种类
- 同步/异步
- 单次/多次发射
sink的核心api如下
- `next(T value)`: 向下游发送数据
- `complete`:表示数据流正常结束
- `error(throwable)`:代表数据流因为错误而终止
generate方法用于产生同步且`one by one`的emission即sink为`SynchronousSink`且其单次invocation中`sink.next`只能够被调用一次。示例如下:
```java
Flux.generate(sink -> {
String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
sink.next(data); // 仅调用一次
if (isEndOfData()) sink.complete();
});
```
> 在调用`sink.next`后,可以调用`sink.complete`或`sink.error`,但是对`complete`和`error`的调用都是可选的。
示例如下:
```java
Flux.generate(sink -> {
sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
})
.take(10)
.subscribe(System.out::println);
```
通过generate的另一个重载方法`generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)`可以维护一个状态并进行管理,其中`S`为状态的对象类型。对于重载方法的两个参数,其含义如下:
- `stateSupplier`该参数用于提供一个初始状态每当有新请求进入时都会调用supplier生成初始状态
- `generator`generator会返回一个新的state
示例如下:
```java
Flux.generate(() -> 0, (s, sink) -> {
// System.out.printf("%s emitted\n", s);
sink.next(s);
return s + 1;
})
.take(5)
.subscribe(System.out::println);
```
其上游生成的整数依次增加,输出如下:
```
0
1
2
3
4
```
#### 异步多线程生成`create`
`Flux.create`针对每个round能够产生多个元素其暴露的sink类型为`FluxSink`。相对于`Flux.generate``Flux.create`方法并没有接收`state`的重载版本。
> `Flux.create`并不会将你的代码变为并行或是异步的.
>
> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法在create lambda中执行长阻塞操作仍然会阻塞item的处理因为item的source产生和下游处理都处于同一线程中但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。
##### `Flux.create`和`listener based api`进行适配
街射使用基于listener的apiapi定义如下
```java
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
```
可以使用`Flux.create`将其与Flux相桥接
```java
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
```
##### `Flux.create` backpressure
可以通过指定`OverflowStrategy`来优化backpressure行为OverflowStrategy可选值如下
- `IGNORE`: 该选项会完全无视下游的backpressure request当下游queue被填充满时会抛出`IllegalStateException`异常
> 部分操作符(例如`publishOn`其subscriber会内置queue用于存储上游通过`onNext`推送的数据。
>
> 当queue满时将会抛出`IllegalStateException`异常
- `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常
> 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数例如若下游只发送了`request(1)`但是source产生了两个数据调用了两次`sink.next`,那么第二次调用时`requested`已经为0会调用`onOverflow`方法发送`error`信号
- `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。
- `LATEST`: 当上游想要推送数据到下游时,如果下游`requested`不足,那么上游会用最新的数据覆盖下游之前缓存的数据
- 例如,下游堆积事件`1`,此时上游推送事件`2`,则`2`会覆盖`1`,之后上游再推送数据`3``3`会覆盖`2`...之后,下游调用`request(1)`后,获取到的是最新的数据`3`,数据`1,2`都被覆盖
- 其实现是通过`LatestAsyncSink`中的`queue`来实现的queue是一个`AtomicReference`类型的field当调用`sink.next`向下游发送数据时如果下游不满足requested那么将会将值`set`到queue中从而实现后面的数据覆盖前面的数据
- `BUFFER`(默认): `BufferAsyncSink`中会存在一个无界队列,如果当上游调用`sink.next`尝试向下游发送数据时如果下游requested条件不满足会将数据缓存在无界队列中。`使用无界队列进行缓存可能会导致OOM`
#### 异步单线程生成`push`
`push`介于`generate``create`之间,其用于对单个生产者产生的事件进行处理,`push``generate``create`的相似点如下:
-`create`类似,其可以是异步的,并且可以通过`OverflowStrategy`来管理backpressure
-`generate`类似,对于`push`,其`sink.next`,`sink.complete`,`sink.error`在指定时刻只能被一个线程调用
其对异步事件的桥接示例如下:
```java
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
public void processError(Throwable e) {
sink.error(e);
}
});
});
```
##### hybird push-pull model
大多数reactor operators(例如create)都使用了混合推拉的模型。在混合推拉模型中大多数事件的处理都是异步的建议采用push的方法
在混合推拉模型中consumer从source处拉取数据`在consumer发送第一次request之前source并不会emit任何数据`source可能会生产数据但是在requested余量不足时并不会调用下游的onNext根据OverflowStrategy的不同会执行不同处理`source只会在下游请求的范围内向下游推送数据`
并且在最下游的subscriber调用上游`subscribe`方法时,`source的数据生成逻辑才会被触发`,调用`subscribe`方法后首先会调用subscriber的`onSubscribe`方法然后会一直沿着reactive stream向前追溯直到找到source然后调用`Flux.create`接受的`Consumer<? super FluxSink<T>>`
##### `sink.onRequest`
sink还提供了`onRequest`方法,其接受一个`LongConsumer`类似的参数可以为sink注册一个`onRequest`回调,后续针对`sink`(sink实现了`Subscription`)调用request方法时`consumer`都会被调用。
`sink.onRequest`的使用示例如下:
```java
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
//
for(String s : messages) {
sink.next(s);
}
}
});
sink.onRequest(n -> {
// 此处代码将会在`sink.request`被调用时触发
// 主动向processor拉取数据
List<String> messages = myMessageProcessor.getHistory(n);
for(String s : messages) {
sink.next(s);
}
});
});
```
#### Handle
`handle`为一个`instance method`非静态方法其关联了一个已经存在的source。`Mono``Flux`中都存在`handle`方法。
`generate`类似,`handle`使用`SynchronousSink`,并且只允许逐个发出。`handle`的作用类似`map``filter`的组合,可以跳过部分元素,其签名如下
```java
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
```
其使用示例如下:
```java
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
// 由于reactive steam中不允许有null
// 故而当通过`alphabet`方法映射可能返回null值时
// 可以使用handle对返回值进行过滤只有不为空时才调用`sink.next`
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
```
#### `onCancel` & `onDispose`
`sink`支持`onCancel``onDispose`回调,其区别如下:
- `onDispose`回调用于执行cleanup操作其在`complete`, `error`, `cancel`之后都会被调用。
- `onCancel`回调用于执行取消操作,其只针对`cancel`执行并且执行顺序位于cleanup之前
其使用示例如下所示:
```java
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
```
### Threading and Schedulers
reactor模型和rxjava模型类型是并发无关的并不强制要求并发模型。并且大多数operator其运行的线程都和前一个operator相同。
`除非显式指定否则最顶层的operatorsource也会运行在subscribe方法被调用的线程中`
#### Scheduler
在reactor中操作执行在哪个线程中取决于`使用的Scheduler`。Scheduler和ExectuorService类似负责对任务进行调度但是相比于ExecutorService其功能更加丰富。
`Scheudlers`类中存在如下静态方法,分别访问不同的`execution context`
- `Schedulers.immediate()`:没有执行上下文,被提交的任务会在当前线程中被立马执行
- `Schedulers.single()`:线程上下文为一个`单个、可重用`的线程上下文。该方法会对所有的调用都使用相同的线程直到scheduler被`disposed`
- `Schedulers.newSingle()`:每次调用时都使用一个专属线程
- `Schedulers.elastic()`:该上下文是一个`无界、弹性的线程池`。在引入`Schedulers.boundedElastic()`方法后,该方法不再推荐被使用。
- `Schedulers.boundedElastic()`:该上下文是一个`有界、弹性的线程池`。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现:
- `ExecutorService-based`:会在多个任务之间重用平台线程(即使用相同工作线程执行多个任务)
- `Virtual-thread-per-task-based``jdk21+`支持该特性对每个任务都会开启一个新的虚拟线程并且实现并没有维护idle pools
#### createWorker
在Scheduler中`idleQueue`中的线程会在空闲一段时间后自动销毁,但是通过`createWorker`手动创建的worker必须手动销毁在使用完成后调用`release`
#### operators using default scheduler
通常情况下部分operators在未显式指定的前提下会使用默认的scheduler通常可显式指定一个不同的scheduler
例如,`Flux.interval(Duration.ofMillis(300))`方法会生成`间隔300ms的ticks`。默认情况下,其会使用`Schedulers.parallel`可以通过如下代码指定一个新的scheduler:
```java
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
```
#### switch thread context
reactor提供了两种方法来切换执行的上下文
- `publishOn`
- `subscribeOn`
> publishOn和subscribeOn用友如下区别
> - publishOn方法在reactive chain中的位置若发生变化会对各个节点的执行上下文造成影响
> - subscribeOn方法在reactive chain中位置若发生变化并不会对各个节点的执行上下文造成影响
#### publishOn
`publishOn`操作符和作用和其他操作符类型,都是从上游接收到信号并且将信号传递到下游,但是,在执行下游的回调时,通过`Scheduler`中的worker进行调度。
故而,`publishOn`会对reactor chain中后续operators造成如下影响
- 修改执行上下文执行的线程由scheduler决定
- 按照规范,`onNext`的调用是有序的故而publishOn后续的操作都会使用同一个worker进行调度对于所有数据都在同一线程中执行worker在subscribe时决定publishOn使用的worker
#### subscribeOn
`subscribeOn`影响订阅的过程通常推荐将其放在source之后。
##### `subscribeOn`原理
`FluxSubscribeOn#subscribeOrReturn`中,会通过`scheduler#createWorker`创建worker并通过worker来对`执行source.subscribe`的方法进行调度。
故而在consumer执行subscribe时随着reactor chain从尾到头下游subscriber都会调用上游publisher的subscribe方法。在`FluxSubscribeOn`调用上游的subscribe时则是会通过worker在进行调度实际调用在worker中执行。
subscribeOn方法使用方式如下
```java
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.map(i -> "value " + i);
new Thread(() -> flux.subscribe(System.out::println));
```
### Handling Errors
在reactive stream中errors是终止事件一旦error被触发sequence的生成将会被停止并且将会沿着reactor chain的下游一直传递道`subscriber#onError`
error应该在应用程序的级别被处理处理方案如下
- 在ui上展示错误信息
- 向rest endpoint发送error payload
故而,`subscriber#onError`方法应该被定义。
reactor同样支持在chain的中间通过`error-handling`来处理error示例如下所示
```java
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example
```
> 在reactor中任何error都代表终止事件`即使使用了error-handling operatororiginal sequence在出现error后也不会继续。
>
> 故而error-handling将`onError`信号转化为了一个新sequence的开始fallback one即通过新sequence替换了上游旧的`terminated sequence`。
> 在`FluxOnErrorReturn.ReturnSubscriber#onError`的实现中operator将`Throwable`转换为了`onNext(fallbackValue)`和`onComplete`两个调用,即在向下游发送`onNext(fallbackValue)`信号之后,也会终止序列。
通常来说,常见的异常才处理方式如下:
#### static fallback value
该场景类似于
```java
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
```
可以调用`onErrorReturn`方法,示例如下:
```java
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
```
`onErrorReturn`还存在一个接受`Predicate`的重载方法可以决定是否对异常执行recover操作示例如下
```java
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
```
#### catch and swallow error
如果不想将异常替换为fallback value而是忽视产生的error仅向下游传递正常生成的元素可以使用`onErrorComplete`方法(例如, `1,2,3...`序列在向下进行传递时,如果`2`的下游operator执行发生异常那么`onErrorComplete`会忽略`2`产生的error仅向下游传递`1`)。
`onErrorComplete`的使用示例如下:
```java
Flux.just(10,20,30)
.map(this::doSomethingDangerousOn30)
.onErrorComplete();
```
> onErrorComplete会将onError信号转化为onComplete信号。
类似于`onErrorReturn``onErrorComplete`同样存在一个重载方法接受`Predicate`可以通过该断言判断是否对error执行`recover`操作。
- 若执行recover操作则会调用`subscriber#onComplete`
- 若不执行recover操作则会调用`subscriber#onError`
#### fallback method
如果在处理数据时,存在多种方法,例如`m1,m2`,并且在尝试`m1`失败时想要再尝试`m2`,可以使用`onErrorResume`方法。
其等价于如下逻辑:
```java
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
```
和上述逻辑等价的是如下代码:
```java
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(e -> getFromCache(k))
);
```
其中,`onErrorResume`返回的都是一个`Publisher`
`onErrorResume`的底层逻辑实现如下:
- 其会根据`onErrorResume`中传入的`Function<? super Throwable, ? extends Publisher<? extends T>>`参数和`error`来构建一个新的publisher
- 然后对新的publisher执行`subscribe`操作并向下游返回新publisher的sequence
`onErrorResume`同样拥有一个重载函数接受`Predicate`决定是否执行recover逻辑。除此之外其接受`Function<? super Throwable, ? extends Publisher<? extends T>>`类型参数可以根据抛出的异常来决定fallback sequence示例如下
```java
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> {
if (error instanceof TimeoutException)
return getFromCache(k);
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error);
})
);
```
> 在上述示例中,`Flux.error`会重新抛出上述异常
#### Dynamic Fallback Value
当出现error时如果并不想调用下游的`onError`,而是想给下游一个`根据异常信息计算出的默认值`,同样可以使用`onErrorResume`方法,示例如下:
```java
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
```
上述代码等价于
```java
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
```
#### Catch and rethrow
对于捕获异常然后重新抛出的逻辑
```java
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
```
可以通过`onErrorResume`实现:
```java
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
```
或者,可以调用`onErrorMap`,其是对`onErrorResume`方法的封装:
```java
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
```
#### log and react on the side
如果想要实现类似`将异常捕获、打印然后重新抛出`的逻辑,可以使用`doOnError`
```java
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
```
上述代码等价于
```java
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
);
```
##### doOnError
`doOnError`和其他以`doOn`为前缀的operators其通常会被称为`副作用side effect`。通过这些operators可以在不修改sequence的情况下监控chain中的事件。
#### Using resources and the finally block
对于`在finally block中清理资源`的行为,可以通过`doFinally`来实现
```java
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
```
##### doFinally
`doFinally`是无论sequence结束onError/onComplete还是取消cancel都希望执行的副作用side effect。其接收参数类型为`Consumer<SignalType>`,可以通过`signaltype`判断`触发side-effect的terminaction类型`
`doFinally`使用示例如下:
```java
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
```
对于`try(resource)`的代码,则是可以通过`using`来实现
```java
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
```
##### using
`using`代表`Flux由资源派生并且在Flux处理完成之后都需要对资源执行操作`
```java
// resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
```
```java
// using
Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
```
其中,`resource`实现了`Disposable`接口,而`Flux.using`接收的参数类型分别为
- `() -> disposableInstance`: resource supplier
- `disposable -> Flux.just(disposable.toString())`: 一个factory根据resource产生publisher
- `Disposable::dispose`: resource cleanup
#### retry
`retry`允许对产生错误的sequence进行重试。
```java
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed()
.subscribe(System.out::println, System.err::println);
Thread.sleep(2100);
```
在上述示例中,`retry(1)`会对错误执行一次重试(`upstream重新开始生成`)。其产生结果如下所示:
```java
259,tick 0
249,tick 1
251,tick 2
506,tick 0
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
```
##### retry原理
下游在上游调用`onError`会判断重试次数是否为0如果不为0会对重试次数进行减1并且重新对上游调用`subscribe`,这会令上游`重新从头开始生成序列`
直到重试次数为0之后`FluxRetry`才会将onError传递给下游。
#### retryWhen
`retryWhen`会使用`companion flux`来判断错误时是否应该回滚。`companion flux`由operator创建但是用户会通过装饰`companion flux`来自定义重试条件。
`companion flux`是一个`Flux<RetrySignal>`类型的值,该值将会被传递给`Retry`方法/策略(`retryWhen`只接受一个`Retry`类型的参数)。
使用者可以对`retry方法`进行指定,`retry方法`会返回一个`Publisher<?>`类型的值。
> Retry是一个抽象类但是提供了`Retry#from`方法通过该方法可以将lambda转换为Retry对象。
Retry周期如下所示
- 每当error发生时都会将`RetrySignal`发送到companion flux中此时`companion flux`已经被`retry方法`装饰过了
- `Flux<RetrySignal>`将会包含至今所有的重试尝试信息并且可以通过其访问error和error关联的元数据
- 如果`companion flux`发送了一个值,那么会触发重试
- 如果`companion flux`complete将会对error执行swallow操作retry cycle将会停止并且也会导致sequence complete
- 如果`companion flux`产生了一个error,retry cycle将会终止并且导致sequence按异常终止
#### retry helper
project reactor提供了`Retry` helper`RetrySpec``RetryBackoffSpec`,二者都允许进行如下所示的自定义行为:
- `filter`通过filter设置`允许触发retry的异常`
- `modifyErrorFilter` 对之前`filter`设置的异常进行修改
- `doBeforeRetry``doAfterRetry`针对retrytrigger执行side effect
- > `doBeforeRetry()`方法触发在delay发生之前`doAfterRetry()`触发在delay之后
- `onRetryExhaustedThrow(BiFunction)`:在重试数量达到上限后,通过`onRetryExhaustedThrow(BiFunction)`来自定义异常。
- 通常情况下,当重试数量达到上限后,自定义异常类型通过`Exceptions.retryExhausted(…​)`方法来构建,其返回的异常类型为`RetryExhaustedException`,可以通过`Exceptions.isRetryExhausted(Throwable)`方法来进行区分
##### retrying with transient errors
`RetrySignal`中存在如下两个方法:
- `totalRetriesInARow()`:每当`error is recovered`retry attempt导致了`onNext`而不是`onError`index都会被设置为0
- `totalRetries()``totalRetries()`方法返回的值是单调递增的,并不会被重置
当使用`RetrySpec``RetryBackoffSpec`时,可以通过`transientErrors(true)`方法来令策略使用`totalRetriesInARow()`处理transient error。
示例如下所示:
```java
AtomicInteger errorCount = new AtomicInteger();
Flux<Integer> transientFlux = httpRequest.get()
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true))
.blockLast();
assertThat(errorCount).hasValue(6);
```
> `transientErrors`主要是为了处理周期性发生异常,异常能够重试成功,并且发生异常后一段时间平稳运行的场景
#### handling Exceptions in operators or functions
通常来说operator的执行有可能会抛出异常根据抛出异常类型的不同存在如何区别
- `unchecked Exception`:对于抛出的`unchecked exception`,其都会通过`onError`向下游传递例如map operator中抛出的`RuntimeException`将会被转化为对下游`onError`的调用
```java
Flux.just("foo")
.map(s -> { throw new IllegalArgumentException(s); })
.subscribe(v -> System.out.println("GOT VALUE"),
e -> System.out.println("ERROR: " + e));
```
上述代码示例将会触发下游subscriber的onError输出内容如下
```console
ERROR: java.lang.IllegalArgumentException: foo
```
- `fatal exceptions`在reactor中定义了一些被视为`致命`的异常(例如`OutOfMemoryError`),具体的`fatal`异常包含范围可见`Exceptions.throwIfFatal`
- 在抛出`fatal error`时reactor operator会抛出异常而不是将其传递到下游
##### checked exception
对于`checked exception`,当调用方法签名中指定了`throws xxxException`的方法时,需要通过`try-catch block`对其进行处理。对于`checked exception`,有如下选择:
- 对异常进行捕获,并且对其执行`recover`操作sequence会继续执行
- 对异常进行捕获,并且封装到`unchecked exception`中,并将`unchecked exception`进行rethrow(这将对sequence进行打断)
- 如果需要返回一个`Flux`(例如,使用`flatMap`方法),可以使用`Flux.error(checkedException)`返回一个`error-producing Flux`sequence会终止
##### Exceptions Utility
通过reactor提供的Exceptions Utility可以保证`仅当异常为checked exception时才会封装`
- `Exceptions.propagate`在必要时会封装exception。
- 其并不会对`RuntimeException`进行封装,并且该方法首先会调用`throwIfFatal`
- 该方法会将checked exception封装到`reactor.core.Exceptions.ReactiveException`中
- `Exceptions.unwrap`
- 该方法会获取original unwrapped exception
使用示例如下所示:
```java
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
```
### Sinks
#### 在多线程环境下安全的使用`Sinks.One`和`Sinks.Many`
reactor-core提供的sinks支持多线程环境的使用并不会触犯规范或导致下游subscribers产生未知行为。
#### `tryEmit` & `emit`
当尝试通过sinks向下游发送signal时可以调用如下API
- `tryEmit*`并行调用将会导致fail fast
- `emit*`: 当调用`emit*`时,提供的`EmissionFailureHandler`,如果该接口的`onEmitFailure`方法返回为true将会在争用场景下执行重试操作例如busy loop如果onEmitFailure返回为false则sink会以error终止。
上述流程是对`Processor.onNext`的改进,`Processor.onNext`必须在外部进行同步否则将会导致下游subscribers未定义的行为。
> 例如,`Flux.create`允许多线程调用`sink.onNext`但是其使用的sink是`reactor.core.publisher.FluxCreate.SerializedFluxSink`其在next操作中通过队列和`cas`对下游的`onNext`操作进行了同步。
>
> 故而,在传统的`Processor.onNext`中,如果要在多线程环境下使用,必须在上游做好同步操作,否则会导致下游的未定义行为。
##### Processor
Processor是一种特殊的`Publisher`,其在作为`publisher`的同时也是`Subscriber`。
在使用Processor是一个很常见的问题是直接对processor向外暴露的`onNext``onComplete` `onError`方法进行调用。
实际上对于processor中`onXXX`方法的调用必须要符合reactive stream规范在外部对`onXXX`方法进行调用时,要做好同步操作。
sinks的使用示例如下所示
```java
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
```
通过sink多个线程可以并行的产生数据
```java
//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);
//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));
//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);
```
> 在使用`EmitFailureHandler.busyLooping`时,其返回的示例包含状态,并不能被重用
#### `asFlux`和`asMono`
`Sinks.Many`支持被转化为`Flux`,下游可以对`Sinks.Many`转化后的`Flux`进行订阅,实例如下所示:
```java
Flux<Integer> fluxView = replaySink.asFlux();
fluxView
.takeWhile(i -> i < 10)
.log()
.blockLast();
```
同样的,`Sinks.One`和`Sinks.Empty`可以通过`asMono()`方法被转化为`Mono`
Sinks的类别包括
- `many().multicast()`sink只会基于subscribers的backpressure请求`将最新的数据传递到其subscribers`
- `最新的数据`代表`subscriber`订阅之后的数据
- `many().unicast()`:和`many().multicast()`类似,也是在订阅之后将`最新的数据`推送给订阅者,但是和`many().multicast()`不同之处如下:
- `many().unicast()`带有缓冲区,在第一个订阅者订阅`many().unicast()`之前,会将推送给`many().unicast()`的数据都缓冲到缓冲区中,待后续有订阅者订阅后,会将缓冲区数据发送给订阅者
- `many().multicast()`默认不带有缓冲区
- `many().replay()`: 对于新订阅的订阅者,会将一定数量的`pushed history data`进行`replay`操作,之后再推送新的数据
- `one()`sink只会向subscriberr推送一个数据
- `empty()`该sink会向subscriber推送`termianl signal`error或complete
#### `Sinks.many().unicast().onBackpressureBuffer(args?)`
一个`unicast Sinks.Many`可以通过其内置buffer处理backpressure但是其最多只能有一个subscriber。
通常,可以通过`Sinks.many().unicast().onBackpressureBuffer()`来创建`unicast sink`。但是,`Sinks.many().unicast()`中包含更多的静态方法,可以对其进行更精细的调整。
##### onBackpressureBuffer
在默认情况下,`onBackpressureBuffer()`其是无界unbounded
- 在subscriber尚未请求数据的情况下如果通过sink推送了数据那么这些数据都会被缓冲到缓冲区中缓冲区是无界的
故而,`onBackPressureBuffer`方法存在一个接收`Queue`类型参数的重载方法。可以向其传递一个有界队列,该队列将会被用作内部缓冲区。
> 在为`onBackpressureBuffer`指定了`Queue`的情况下,如果`queue已满并且下游并没有向上游发送足够的reqeust`时,`sink将会拒绝该value的推送`。
#### Sinks.many().multicast().onBackpressureBuffer(args?)
对于该方法创建的`multicast Sinks.Many`可以向多个subscribers发送数据并且对每个subscriber都能独立地接收backpressure。
> 对于每个subscriber只会接收在其subscribe之后推送到sink的signal
基础的multicast sink可以通过`Sinks.many().multicast().onBackpresuureBuffer()`来进行创建。
##### autoCancel
在默认情况下如果所有subscribers都被取消`cancelled`即取消订阅其会对internal buffer进行清空并且停止接收新的subscriber。
如果想要修改`autoCancel`的行为,可以调用`Sinks.many().multicast()`中的静态工厂方法通过autoCancel参数来调整`autoCancel`的行为。
#### Sinks.many().multicast().directAllOrNothing()
该方法创建的拥有最简单的backpressure处理策略
- 如果任何subscriber处于`too slow`状态demand为0那么对于所有的subscriber该onNext都会被丢弃。
但是slow subscribers并不会被终止`一旦slow subscribers又发送了request`所有subscribers都会重新从sinks.many接收数据。
在`Sinks.many()`终止后(通常是通过调用`emitError, emitComplete`方法其仍然允许新的subscriber对其进行订阅但是只会对新订阅者replay termination signal。
#### Sinks.many().multicast().directBestEffort()
对于该类型的`multicast Sinks.Many`,若`subscriber is too slow`(该subscriber的demand为0),那么该`onNext`信号`仅会针对该slow subscriber`进行丢弃。
但是,`slow subscribers`并不会被终止,一旦`slow subscribers`开始请求数据,其会重新开始接收新推送的数据。
当`Sinks.Many`被`emitError, emitComplete`终止后其仍然允许新的subscribers对其进行订阅但是对新订阅的订阅者只会向新订阅者发送termination signal。
#### Sinks.many().replay()
一个`replay Sinks.Many`可以将已发送元素进行缓存并且对后续的subscriber进行replay。
`replay Sinks.Many`可以通过如下方式进行创建:
- 缓存指定数量的历史数据`Sinks.many().replay().limit(int)`
- 缓存所有历史数据,没有上限限制`Sinks.many().replay().all()`
- 基于time-based window进行缓存`Sinks.many().replay().limit(Duration)`
- hisotry size limit和time window相结合`Sinks.many().replay().limit(int, Duration)`
除此之外,`Sinks.many().replay()`还包含其他的重载方法,例如可以通过`latest()`和`latestOrDefault()`对单个元素进行缓存和replay
#### Sinks.unsafe().many()
`Sinks.unsafe().many()`返回的`Sinks.Many factory`并不会提供producer thread safety在使用`Sinks.unsafe().many()`时,需要确保对`可能导致onNext, onComplete, onError方法`的调用需要保证外部的同步以确保其满足reactive stream规范。
#### Sinks.one()
该方法会简单创建一个`Sinks.One<T>`实例,该实例可以看作`Mono<T>`并且其emit方法和`many`稍有不同:
- `emitValue(T value)`: 产生一个`onNext(value)`信号,并且,在大多数实现中会产生一个`onComplete`信号
- `emptyEmpty()` 只会产生一个`onComplete`信号,其和`empty Mono`等效
- `emitError(Throwable t)`:产生一个`onError(t)`信号
#### Sinks.empty()
该方法会创建一个`Sinks.Empty<T>`实例,`Sinks.Empty`和`Sinks.One`类似,但是`Sinks.Empty`不提供`emitValue`方法。
`Sinks.Empty`无法触发onNext但是仍可以指定`<T>`泛型类型。
## Advanced
### Batching
#### Grouping With `Flux<GroupedFlux<T>>`
grouping将会把`source Flux`分割为多个batch每个batch都有一个keygrouping对应的operator是`groupBy`。
其中每个group都被表示为`GroupedFlux<T>`,可以通过`key`方法获取该group关联的key。
一旦由source产生的一个element对应一个新key那么该key对应的group就会被打开并且匹配该key的元素都会在该group中出现在同一时刻可能会有多个group处于开启状态
故而group特性如下
- group之间元素是不相交的一个source element只能属于一个group
- group不可能为空只有group对应key的相匹配元素出现后group才会被开启
groupBy使用示例如下
```java
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
```
> 当使用grouping时最好包含较少的`group数量`。如果group数量较多在下游进行`flatMap`时如果flatMap concurrency较小将会造成挂起。
#### windowing with `Flux<Flux<T>>`
windowing指将`Flux<T>`分割为windows根据大小、时间、boundary-defining predicates、boundary-defining publisher对flux进行分割。
`windowing`关联的操作符为`window`, `windowTimeout`, `windowUntil`, `windowWhile`, `windowWhen`。
windowXXX和groupBy不同的是 groupBy通常会基于key随机的切换group但是window却是顺序打开的。
##### windowWhile
windowWhile通过predicate判断是否window应当被关闭如果source element满足predicate条件那么window将仍然开启。`一旦predicate返回false那么windows将会被关闭并且触发的元素也会被丢丢弃`。
示例如下:
```
// 元素sequence
1, 3, 5, 2, 4, 6, 11, 12, 13
// predicate
i -> i % 2 == 0
// windowWhile产生的windows
empty window 由1产生
empty window 由3产生
empty window 由5产生
2, 4, 6 由11产生
12 由13产生
```
##### windowUntil
windowUntil通过predicate是否应该开启一个新的window如果source element满足predicate条件那么就会开启一个新的window并且之前的windows会关闭`并且之前的windows会收到触发的元素`。
示例如下:
```
// 元素sequence
1, 3, 5, 2, 4, 6, 11, 12, 13
// predicate
i -> i % 2 == 0
// windowUntil产生的windows
1, 3, 5, 2 (由2触发)
4 (由4触发)
6 由6触发
11, 12 由12触发
13
```
#### Buffering with `Flux<List<T>>`
buffering和windowing类似但是区别如下
- windows对window进行emit时类型为`Flux<T>`
- buffering对buffer进行emit类型为`Collection<T>`
对于buffer的operator和window类似`buffer, bufferTimeout, bufferUntil, bufferWhile, bufferWhen`。
相比于windowing operator开启一个新的windowbuffer operator会创建一个新的Collection并向其中添加元素。
同样的buffer也会造成元素的overlap示例如下
```java
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
```
由于`maxSize > skip`导致buffer也出现了元素overlap。
但是和windowing不同的是`bufferUntil`和`bufferWhile`并不会发送empty collection示例如下
```java
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered by 13
.verifyComplete();
```
### flatmap
flatMap方法接收一个Function类型的参数该Function会将一个`input item`转化为一个`Publisher`,示例如下:
```java
Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split(""));
```
上述Function的返回类型为`Pbulisher<String>`其将字符串转为大写并对转换后的字符串进行分割并基于分割后的字符串集合创建了一个新的reactive stream。
上述function的使用如下
```java
Flux<String> inFlux = Flux.just("baeldung", ".", "com");
Flux<String> outFlux = inFlux.flatMap(mapper);
```
由于上游存在三个字符串故而flatMap方法基于上游的三个字符串创建了三个新的reactive stream。新建的三个stream其中元素由上游字符串分割而得到`并且三个stream中的元素会被填充到另一个新建的reactive stream中。`
对其进行subscribe之后预期结果如下
```java
List<String> output = new ArrayList<>();
outFlux.subscribe(output::add);
assertThat(output).containsExactlyInAnyOrder("B", "A", "E", "L", "D", "U", "N", "G", ".", "C", "O", "M");
```
> 注意,最后输出字符的顺序可能是无序的。
#### Pipeline Operations
flatMap会通过传递给其的`Function`和`onNext element`创建新的reactive stream并且`一旦新的stream由Publisher表示创建好后flatMap会马上对其进行订阅`。并且,`订阅操作并不是阻塞的`operator在继续下一个stream之前并不需要等待当前订阅操作终止。
> pipeline会同时处理所有基于input item派生的stream并且派生stream中的元素随时可能到达。故而original order可能会被丢失。
如果original order比较重要可以使用` flatMapSequential`操作符。
#### concurrency
concurrency用于控制在途的inner sequences上限。
#### prefetch
prefetch用于控制每个inner publisher在途的元素上限
### zip
`zip`操作会将多个resource合并为一个即`等待所有sources都发送一个元素并且将所有发送的元素整合到一个output value中output value通过提供的combinator进行构造`。
`zip` operator会持续执行上述操作直到任一source完成complete
#### zipWith
`zipWith`可以将当前flux和另一个publisher一起执行`zip`操作。
### defer
`Mono.defer`会创建一个Mono对象a每次每次Mono对象a被下游调用`subscribe`时,都会通过`supplier`生成一个新的Mono对象b下游实际订阅的是新的Mono对象b。