1110 lines
54 KiB
Markdown
1110 lines
54 KiB
Markdown
- [Reactor](#reactor)
|
||
- [Reactive Programming](#reactive-programming)
|
||
- [命令式迁移到响应式](#命令式迁移到响应式)
|
||
- [可组合性与可读性](#可组合性与可读性)
|
||
- [Assembly Line](#assembly-line)
|
||
- [Operators](#operators)
|
||
- [Nothing Happens Until You subscribe()](#nothing-happens-until-you-subscribe)
|
||
- [backpressure](#backpressure)
|
||
- [hot \& cold](#hot--cold)
|
||
- [Subscriber和Publisher](#subscriber和publisher)
|
||
- [Publisher](#publisher)
|
||
- [Subscriber](#subscriber)
|
||
- [Reactor Core](#reactor-core)
|
||
- [Flux `0...n`](#flux-0n)
|
||
- [Mono `0...1`](#mono-01)
|
||
- [创建Mono/Flux并进行订阅的方式](#创建monoflux并进行订阅的方式)
|
||
- [String sequence](#string-sequence)
|
||
- [Flux.empty](#fluxempty)
|
||
- [Flux.range](#fluxrange)
|
||
- [Lambda Subscribe](#lambda-subscribe)
|
||
- [Disposable](#disposable)
|
||
- [BaseSubscriber](#basesubscriber)
|
||
- [backpressure](#backpressure-1)
|
||
- [buffer](#buffer)
|
||
- [prefetch](#prefetch)
|
||
- [初始请求](#初始请求)
|
||
- [补充优化(Replenishing Optimization)](#补充优化replenishing-optimization)
|
||
- [limitRate](#limitrate)
|
||
- [lowTie](#lowtie)
|
||
- [limitRequest](#limitrequest)
|
||
- [Create Sequence](#create-sequence)
|
||
- [同步生成`generate`](#同步生成generate)
|
||
- [Sink](#sink)
|
||
- [异步多线程生成`create`](#异步多线程生成create)
|
||
- [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配)
|
||
- [`Flux.create` backpressure](#fluxcreate-backpressure)
|
||
- [异步单线程生成`push`](#异步单线程生成push)
|
||
- [hybird push-pull model](#hybird-push-pull-model)
|
||
- [`sink.onRequest`](#sinkonrequest)
|
||
- [Handle](#handle)
|
||
- [`onCancel` \& `onDispose`](#oncancel--ondispose)
|
||
- [Threading and Schedulers](#threading-and-schedulers)
|
||
- [Scheduler](#scheduler)
|
||
- [createWorker](#createworker)
|
||
- [operators using default scheduler](#operators-using-default-scheduler)
|
||
- [switch thread context](#switch-thread-context)
|
||
- [publishOn](#publishon)
|
||
- [subscribeOn](#subscribeon)
|
||
- [`subscribeOn`原理](#subscribeon原理)
|
||
- [Handling Errors](#handling-errors)
|
||
- [static fallback value](#static-fallback-value)
|
||
- [catch and swallow error](#catch-and-swallow-error)
|
||
- [fallback method](#fallback-method)
|
||
- [Dynamic Fallback Value](#dynamic-fallback-value)
|
||
- [Catch and rethrow](#catch-and-rethrow)
|
||
- [log and react on the side](#log-and-react-on-the-side)
|
||
- [doOnError](#doonerror)
|
||
- [Using resources and the finally block](#using-resources-and-the-finally-block)
|
||
- [doFinally](#dofinally)
|
||
- [using](#using)
|
||
- [retry](#retry)
|
||
- [retry原理](#retry原理)
|
||
- [retryWhen](#retrywhen)
|
||
- [retry helper](#retry-helper)
|
||
- [retrying with transient errors](#retrying-with-transient-errors)
|
||
- [handling Exceptions in operators or functions](#handling-exceptions-in-operators-or-functions)
|
||
- [checked exception](#checked-exception)
|
||
- [Exceptions Utility](#exceptions-utility)
|
||
- [Sinks](#sinks)
|
||
- [在多线程环境下安全的使用`Sinks.One`和`Sinks.Many`](#在多线程环境下安全的使用sinksone和sinksmany)
|
||
- [`tryEmit` \& `emit`](#tryemit--emit)
|
||
- [Processor](#processor)
|
||
>>>>>>> 8d41980 (doc: 阅读transient errors文档)
|
||
|
||
# Reactor
|
||
## Reactive Programming
|
||
响应式编程是一种异步编程范式,关注于`数据流`和`状态变化的传播`。java的响应式编程接口被包含在java9的`Flow`中。
|
||
|
||
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用`next`方法,reactive stream是基于`发布/订阅`模型的。
|
||
|
||
> 迭代器模式是`pull-based`,而reactive stream为`push-based`。
|
||
|
||
### 命令式迁移到响应式
|
||
#### 可组合性与可读性
|
||
“可组合性"代表编排多个异步任务的能力,通过“组合”,可以将前一个异步任务的输出作为后一个异步任务的输入。或者,可以按照fork-join的形式对异步任务进行编排。
|
||
|
||
reactor同样能解决“可读性”的问题,在使用传统的callback model编写程序时,随着逻辑的复杂,异步进行的层数也会增加,这将会极大降低代码的可读性和可维护性。
|
||
|
||
> 在使用call model时,通常需要在回调中执行另一个回调,回调的嵌套通通常会被称为`callback heil`。
|
||
|
||
reactor提供了复杂的“组合”选项,能够反映抽象异步进程的组织,并且,所有的内容通常都会位于同一层级。
|
||
|
||
#### Assembly Line
|
||
响应式应用中的数据处理类似于流水线,其中,reactor既是传送带又是工作站。数据来源于`original publisher`,最终传传递给`subscriber`。
|
||
|
||
数据在从publisher传送到subscriber的过程中,可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间,受影响的workstation会向上游发送消息来控制数据的生成速率。
|
||
|
||
#### Operators
|
||
在reactor中,Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到`publisher`中,并且前一个publisher包装到一个新的publisher实例中。
|
||
|
||
故而,operator将整个chain都链接起来,数据来源于第一个publisher,并随着chain移动,依次被每个链接处理,最终由subscriber结束该过程。
|
||
|
||
#### Nothing Happens Until You subscribe()
|
||
当通过reactor编写publisher chain时,数据并不会被泵入到chain中,编写chain只是创建了异步处理的抽象描述。
|
||
|
||
通过订阅行为,将publisher和subscriber绑定到了一起,订阅行为会触发chain中的数据流。该行为通过内部的signal实现,subscriber将会发送一个`reuqest signal`,该信号会被传递到chain上游,一直被传递到source publisher。
|
||
|
||
#### backpressure
|
||
`传递到上游的信号`该机制也被用于实现backpressure,在assembly line模型中,也被描述为workstation传递给上游的反馈信号,当workstation处理消息比上游workstation满时,会发送该反馈。
|
||
|
||
reactive stream定义的机制接近于上述描述,其提供两种模式:
|
||
- unbounded mode:source publisher可以按其最高速率不受限制的推送数据
|
||
- request mode:通过`request`机制向source publisher发送信号,告知其准备好处理最多`n`个元素。
|
||
|
||
中间的operator也可以在传输过程中对请求做出修改,例如`buffer` operator可以将elements分割为以10个为单位的batch,如果subscriber请求一个buffer,那么上游source publisher可以产生10个元素。
|
||
|
||
通过backpressure,可以将`push`模型转化为`push-pull`模型:
|
||
- 当上游的n个元素已经准备好时,下游可以从上游拉取n个元素
|
||
- 当上有没有准备好n个元素时,后续如果n个元素被准备好,其将会被上游推送
|
||
|
||
#### hot & cold
|
||
对于响应式序列,其可以分为两种:
|
||
- cold sequence:对于cold sequence,会为每个订阅者重新开始流程,包括source publisher。例如source中若封装了http调用,会为每个subscriber都执行一个新的http请求
|
||
- hot sequence:subscriber只有在其订阅后才收到信号,即使没有subscriber在监听,hot sequence仍然能够发送signal
|
||
|
||
## Subscriber和Publisher
|
||
### Publisher
|
||
对于publisher,其提供了`subscribe`方法供subscriber进行注册,在执行subscribe方法并向其传入`Subscriber`对象后,上游publisher会调用下游的`onSubscribe`方法,并向`onSubscribe`方法传入`Subscription`对象。下游可以通过`Subscription`对象调用`request(n)`请求。
|
||
|
||
> 若中间存在operator(例如map)在担任publisher角色的同时,还对上游进行了订阅,那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
|
||
>
|
||
> `任何变更状态的操作都只在实际subscriber执行订阅操作后被触发`。
|
||
|
||
### Subscriber
|
||
当下游调用`request(n)`方法之后,会向上游请求`n`个数据。上游会向下游发送`onNext`信号来传输生成的数据。
|
||
|
||
## Reactor Core
|
||
reactor引入了两个实现`Publisher`的类:`Mono`和`Flux`。
|
||
- Flux:代表包含`0...N`个items的reactive sequence
|
||
- Mono:代表包含`0...1`个items的reactive sequence
|
||
|
||
上述两个类代表了在异步处理场景中的大致基数。
|
||
- Mono:例如,对于http请求的场景,一个请求只会产生一个响应,故而对响应执行`count`操作并没有任何意义。此时,可以通过`Mono<HttpResponse>`来代表http调用的结果,`Mono`中只提供了上下文中包含`0...1`个元素的对应操作
|
||
- 当执行某些`可能会改变异步处理中最大基数的操作`时,可能会导致类型的改变,例如执行`Flux`中的`count`操作将会返回`Mono<Long>`的类型
|
||
|
||
### Flux `0...n`
|
||

|
||
|
||
`Flux<T>`是一个标准的`Publisher<T>`,代表基数为`0...n`的异步序列,其可以被`completion signal`或异常所终止。根据reactive stream标准,存在三种signal,且信号会转化为对下游`onNext`、`onComplete`、`onError`的调用。
|
||
|
||
Flux是一个通用的reactive类型,并且,所有的event type都是可选的。
|
||
- 当没有`onNext`事件但是存在`onComplete`事件,代表一个空的有限序列
|
||
- 当`onNext`和`onComplete`事件都不存在时,代表一个空的无限序列
|
||
- 无限序列并不一定为空,例如`Flux.interval(Duration)`会产生一个`Flux<Long>`,其是无限的并且发送tick
|
||
|
||
### Mono `0...1`
|
||
`Mono<T>`是一个标准的`Publisher<T>`,其通过`onNext`信号发送至多一个item,然后再结束时发送`onComplete`信号结束(成功场景);或直接发送`onError`信号结束(失败场景)。
|
||
|
||
大多数Mono实现在调用完subscriber的`onNext`方法之后,预计会立马调用subscriver的`onComplete`方法。但是,`Mono.never`是一个例外,其并不会发送任何信号,并且其`onNext`和`onError`的组合是被明确禁止的。
|
||
|
||
### 创建Mono/Flux并进行订阅的方式
|
||
#### String sequence
|
||
如果想要创建String序列,可以通过如下方式:
|
||
```java
|
||
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
|
||
|
||
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
|
||
Flux<String> seq2 = Flux.fromIterable(iterable);
|
||
```
|
||
#### Flux.empty
|
||
```java
|
||
Mono<String> noData = Mono.empty();
|
||
```
|
||
#### Flux.range
|
||
在下面示例中,`Flux.range`第一个参数是range的起始值,第二个参数是要产生的元素个数
|
||
```java
|
||
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
|
||
```
|
||
故而,其产生的内容为`5,6,7`。
|
||
|
||
#### Lambda Subscribe
|
||
在进行订阅时,`Flux`和`Mono`使用了lambda,在调用subscribe时,有如下几种重载的选择:
|
||
```java
|
||
// 订阅并触发sequence的产生
|
||
subscribe();
|
||
|
||
// 对每个产生的值通过consumer执行处理操作
|
||
subscribe(Consumer<? super T> consumer);
|
||
|
||
// 在reactive stream异常终止时,对error进行处理
|
||
subscribe(Consumer<? super T> consumer,
|
||
Consumer<? super Throwable> errorConsumer);
|
||
|
||
// 在sequence处理完时,执行额外的complete操作
|
||
subscribe(Consumer<? super T> consumer,
|
||
Consumer<? super Throwable> errorConsumer,
|
||
Runnable completeConsumer);
|
||
|
||
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
|
||
// 该重载已废弃
|
||
subscribe(Consumer<? super T> consumer,
|
||
Consumer<? super Throwable> errorConsumer,
|
||
Runnable completeConsumer,
|
||
Consumer<? super Subscription> subscriptionConsumer);
|
||
```
|
||
对于subscribe的使用,示例如下
|
||
```java
|
||
Flux.range(5,3)
|
||
.map(x->{
|
||
if(x<7) {
|
||
return x;
|
||
}
|
||
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
|
||
})
|
||
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
|
||
(e) -> {
|
||
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
|
||
},
|
||
()->{
|
||
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
|
||
});
|
||
```
|
||
其执行结果如下:
|
||
```
|
||
[main]-5
|
||
[main]-6
|
||
[main]-Error Caught: fucking value {7} equals or greater than 7
|
||
```
|
||
|
||
#### Disposable
|
||
上述`subscribe`方法的返回类型为`Disposable`,该接口代表subscriber对publisher的订阅是可取消的,如需取消订阅,调用`dispose`方法即可。
|
||
|
||
对于Mono和Flux而言,source publisher应该在接收到cancellation信号之后停止产生元素,`并不能保证取消信号是即时的`。(`若source产生数据的速度过快,可能在接收到cancel信号之前,source就已经complete`)。
|
||
|
||
`Disposables`类中存在一些对`Disposable`的工具方法,例如`swap`和`composite`。
|
||
|
||
#### BaseSubscriber
|
||
`subscribe`方法除了接收lambda外,还存在更通用的重载方法,接收`Subscriber`类型的参数。
|
||
|
||
在这种场景下,传参可以继承`BaseSubscriber`类。
|
||
|
||
并且,`BaseSubscriber`该类是一次性的,`其只能够订阅一个publisher,如果其订阅了第二个publisher,那么其对第一个publisher的订阅将会被取消`。
|
||
|
||
> `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。
|
||
|
||
示例如下:
|
||
```java
|
||
Flux.range(3, 5)
|
||
.subscribe(new BaseSubscriber<Integer>() {
|
||
private Subscription subscription;
|
||
|
||
@Override
|
||
protected void hookOnSubscribe(Subscription subscription) {
|
||
this.subscription = subscription;
|
||
subscription.request(5);
|
||
}
|
||
|
||
@Override
|
||
protected void hookOnNext(Integer value) {
|
||
log.info("onNext called: {}", value);
|
||
// this.subscription.request(1);
|
||
}
|
||
|
||
@Override
|
||
protected void hookOnComplete() {
|
||
log.info("onComplete called");
|
||
super.hookOnComplete();
|
||
}
|
||
|
||
@Override
|
||
protected void hookOnError(Throwable throwable) {
|
||
log.info("onError called: {}", throwable.getMessage());
|
||
super.hookOnError(throwable);
|
||
}
|
||
});
|
||
```
|
||
上述示例中,通过向`subscribe`方法中传递自定义的`BaseSubscriber`来实现对上游的订阅,执行结果如下:
|
||
```
|
||
2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3
|
||
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4
|
||
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5
|
||
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6
|
||
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7
|
||
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called
|
||
```
|
||
|
||
`BaseSubscriber`的`hookOnSubscribe`默认实现如下:
|
||
```java
|
||
protected void hookOnSubscribe(Subscription subscription) {
|
||
subscription.request(Long.MAX_VALUE);
|
||
}
|
||
```
|
||
其请求的数量为`Long.MAX_VALUE`,代表其publisher为`effectively unbounded`。
|
||
|
||
可以通过重写`hookOnSubscribe`方法来自己指定request数量,如果需要自己指定请求数量,最少需要重写`hookOnSubscribe`和`hookOnNext`方法。
|
||
|
||
`BaseSubscriber`提供了`requestUnbounded`方法(`其方法和request(Long.MAX_VALUE)等价`)和`cancel`方法。
|
||
|
||
除了上述列出的hook外,`BaseSubscriber`还支持如下hooks:
|
||
- hookOnComplete
|
||
- hookOnError
|
||
- hookOnCancel
|
||
- hookFinally(当sequence终止时,都会被调用,可以用参数来判断终止类型为complete或error)
|
||
- hookFinally的调用顺序位于hookOnComplete和hookOnError之后
|
||
|
||
#### backpressure
|
||
在reactor的backpressure实现中,consumer pressure传播到上游source的机制是向上游operator发送`request`请求。当前已发送的请求个数之和被称为`demand`,并且`demand`的上限为`Long.MAX_VALUE`,当demand的值为`Long.MAX_VALUE`或更大时,代表`unbound request`。
|
||
|
||
> `unbound request`代表尽可能快的产生数据,即backpressure关闭。
|
||
|
||
在reactive chain中,第一个请求来自于`final subscriber`,其在订阅时(onSubscribe)会发送第一个`request`请求。目前,大多直接订阅的方法都会通过`Long.MAX_VALUE`创建一个unbounded request,示例如下:
|
||
- `subcribe()`方法和大多数基于lambda的重载方法(除了包含`Consumer<Subscription>`参数的重载)
|
||
- `block`, `blockFirst`, `blockLast`
|
||
- 通过`toIterable`或`toStream`进行遍历
|
||
|
||
目前,定义初始请求最简单的方法为`通过BaseSubscription对上游进行订阅,并且重写onSubscribe方法`。
|
||
|
||
#### buffer
|
||
在reactor中,可以通过部分operator进行request的reshape。示例如下:
|
||
```java
|
||
Flux.range(1, 1000)
|
||
.buffer(3)
|
||
.subscribe(new BaseSubscriber<List<Integer>>() {
|
||
@Override
|
||
protected void hookOnSubscribe(Subscription subscription) {
|
||
subscription.request(2);
|
||
}
|
||
|
||
@Override
|
||
protected void hookOnNext(List<Integer> value) {
|
||
for (Integer v : value) {
|
||
log.info("item received: {}", v);
|
||
}
|
||
}
|
||
});
|
||
```
|
||
在上述示例中,`request(2)`代表请求`2个buffer`,而每个`buffer`中包含`3`个`Integer`,故而总共会接收到`2 * 3 = 6`个元素。
|
||
|
||
输出如下:
|
||
```
|
||
2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1
|
||
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2
|
||
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3
|
||
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4
|
||
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5
|
||
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6
|
||
```
|
||
|
||
#### prefetch
|
||
`prefetch`机制是一种backpressure的优化策略,用于确保下游处理数据时上游的数据能够即时补充,对吞吐量和资源利用率进行平衡。
|
||
|
||
prefetch机制通常分为如下部分:
|
||
##### 初始请求
|
||
在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时,会向上游发送一个初始请求,请求大小为32个元素。
|
||
|
||
##### 补充优化(Replenishing Optimization)
|
||
prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75%的预取元素已经被处理(32 *0.75 = 24),其自动会向上游发送一个新请求,要求补充75%的prefetch量。该过程是动态的,会在整个数据流处理过程中持续进行。
|
||
|
||
> 例如,prefetch的大小为10,其limit对应的值为`ceil(10 * 0.75) = 8`,每当其下游被处理的元素达到8个,其会重新请求8个数据,并且将`被下游处理元素的个数`重置,重新从0开始计数,直到该值再达到8,再次发送请求
|
||
|
||
> ##### 预加载数据
|
||
> 补充优化的优化点在于,当预取数据还剩余25%(8个)未被处理时,提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。
|
||
|
||
> ##### 平滑处理
|
||
> 通过prefetch逐步请求新数据,且每次请求固定的量,可以保证处理数据速率的稳定。如果source端同时来源大量数据,那么若不进行平滑处理,则大量数据的同时处理可能导致竞争,令性能下降。
|
||
|
||
有如下operators可以对请求的prefetch进行调整
|
||
|
||
#### limitRate
|
||
除了`prefetch`之外,还可以通过`limitRate`或`limitRequest`来直接针对请求进行调节。
|
||
|
||
`limitRate(N)`将来自下游的请求进行拆分,当来自下游的请求被传播到上游时,其会被拆分为small batches。例如,如果下游调用`request(100)`,此时`limitRate(10)`将会将其拆分为10个`request(10)`再传播给上游。并且,在此基础上,limitRate还实现了prefetch中的补充优化。
|
||
|
||
除了`limitRate(N)`之外(当没有传递`lowTie`时,limit默认会取`N - N>>2`,即`ceil(N * 0.75)`),limtRate还存在`limitRate(highTie, lowTie)`的重载方法。
|
||
|
||
##### lowTie
|
||
当lowTie取不同值时,其补充策略如下:
|
||
- `lowTie<=0`:如果`lowTie`小于或等于0,则limit取值和`prefetch`值相同,仅当prefetch中所有元素都被下游处理完时,limtRate operator才会向上游请求数据
|
||
- `lowTie>=prefetch`: 当lowTie大于或等于prefetch时,limit取值为`ceil(prefetch * 0.75)`,此时,补充策略和prefetch默认相同,当75%的数据被下游处理时,limitRte会重新向上游请求75%的数据
|
||
- 若lowTie位于`(0, prefetch)`区间之间
|
||
- 若prefetch的值为`Long.MAX_VALUE`,那么limit的值也为`Long.MAX_VALUE`
|
||
- 若prefetch值不为`Long.MAX_VALUE`,那么limit的值为`lowTie`,即`lowTie`的值即为消费后重新拉取的限制值
|
||
|
||
#### limitRequest
|
||
`limitRequest(N)`用于限制下游请求的总个数。例如,向`limitRequest(10)`发起两次request,一次请求3一次请求8,那么最后下游只会接收到10个元素。
|
||
|
||
> 一旦source发送的元素个数超过`N`时,`limitRequest`将会认为sequence已经完成,会向下游发送onComplete信号
|
||
|
||
### Create Sequence
|
||
#### 同步生成`generate`
|
||
创建`Flux`的最简单形式是通过`generate`方法生成,其接收一个`Consumer<SynchronousSink<T>>`类型的参数。
|
||
|
||
##### Sink
|
||
sink是spring reactor中的一个核心抽象概念,其可以被理解为数据流的出口或`发射器`,`负责将元素、error或complete信号推送到下游订阅者`。
|
||
|
||
sink可以分为如下种类:
|
||
- 同步/异步
|
||
- 单次/多次发射
|
||
|
||
sink的核心api如下:
|
||
- `next(T value)`: 向下游发送数据
|
||
- `complete`:表示数据流正常结束
|
||
- `error(throwable)`:代表数据流因为错误而终止
|
||
|
||
generate方法用于产生同步且`one by one`的emission,即sink为`SynchronousSink`且其单次invocation中`sink.next`只能够被调用一次。示例如下:
|
||
```java
|
||
Flux.generate(sink -> {
|
||
String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
|
||
sink.next(data); // 仅调用一次
|
||
if (isEndOfData()) sink.complete();
|
||
});
|
||
```
|
||
|
||
> 在调用`sink.next`后,可以调用`sink.complete`或`sink.error`,但是对`complete`和`error`的调用都是可选的。
|
||
|
||
示例如下:
|
||
```java
|
||
Flux.generate(sink -> {
|
||
sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
|
||
})
|
||
.take(10)
|
||
.subscribe(System.out::println);
|
||
```
|
||
|
||
通过generate的另一个重载方法`generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)`可以维护一个状态并进行管理,其中`S`为状态的对象类型。对于重载方法的两个参数,其含义如下:
|
||
- `stateSupplier`:该参数用于提供一个初始状态,每当有新请求进入时,都会调用supplier生成初始状态
|
||
- `generator`:generator会返回一个新的state
|
||
|
||
示例如下:
|
||
```java
|
||
Flux.generate(() -> 0, (s, sink) -> {
|
||
// System.out.printf("%s emitted\n", s);
|
||
sink.next(s);
|
||
return s + 1;
|
||
})
|
||
.take(5)
|
||
.subscribe(System.out::println);
|
||
```
|
||
其上游生成的整数依次增加,输出如下:
|
||
```
|
||
0
|
||
1
|
||
2
|
||
3
|
||
4
|
||
```
|
||
|
||
#### 异步多线程生成`create`
|
||
`Flux.create`针对每个round能够产生多个元素,其暴露的sink类型为`FluxSink`。相对于`Flux.generate`,`Flux.create`方法并没有接收`state`的重载版本。
|
||
|
||
> `Flux.create`并不会将你的代码变为并行或是异步的.
|
||
>
|
||
> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法,在create lambda中执行长阻塞操作仍然会阻塞item的处理,因为item的source产生和下游处理都处于同一线程中,但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。
|
||
|
||
|
||
##### `Flux.create`和`listener based api`进行适配
|
||
街射使用基于listener的api,api定义如下:
|
||
```java
|
||
interface MyEventListener<T> {
|
||
void onDataChunk(List<T> chunk);
|
||
void processComplete();
|
||
}
|
||
```
|
||
可以使用`Flux.create`将其与Flux相桥接:
|
||
```java
|
||
Flux<String> bridge = Flux.create(sink -> {
|
||
myEventProcessor.register(
|
||
new MyEventListener<String>() {
|
||
|
||
public void onDataChunk(List<String> chunk) {
|
||
for(String s : chunk) {
|
||
sink.next(s);
|
||
}
|
||
}
|
||
|
||
public void processComplete() {
|
||
sink.complete();
|
||
}
|
||
});
|
||
});
|
||
```
|
||
##### `Flux.create` backpressure
|
||
可以通过指定`OverflowStrategy`来优化backpressure行为,OverflowStrategy可选值如下:
|
||
- `IGNORE`: 该选项会完全无视下游的backpressure request,当下游queue被填充满时,会抛出`IllegalStateException`异常
|
||
> 部分操作符(例如`publishOn`),其subscriber会内置queue,用于存储上游通过`onNext`推送的数据。
|
||
>
|
||
> 当queue满时,将会抛出`IllegalStateException`异常
|
||
- `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常
|
||
> 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数,例如,若下游只发送了`request(1)`,但是source产生了两个数据,调用了两次`sink.next`,那么第二次调用时`requested`已经为0,会调用`onOverflow`方法发送`error`信号
|
||
|
||
- `DROP`:当下游无法`跟上`上游的数据发送速度时(`上游发送onNext信号之前,如果下游requested配额不足`),上游将会丢弃该数据,`sink.next`不做任何操作。
|
||
- `LATEST`: 当上游想要推送数据到下游时,如果下游`requested`不足,那么上游会用最新的数据覆盖下游之前缓存的数据
|
||
- 例如,下游堆积事件`1`,此时上游推送事件`2`,则`2`会覆盖`1`,之后上游再推送数据`3`,`3`会覆盖`2`...之后,下游调用`request(1)`后,获取到的是最新的数据`3`,数据`1,2`都被覆盖
|
||
- 其实现是通过`LatestAsyncSink`中的`queue`来实现的,queue是一个`AtomicReference`类型的field,当调用`sink.next`向下游发送数据时,如果下游不满足requested,那么将会将值`set`到queue中,从而实现后面的数据覆盖前面的数据
|
||
- `BUFFER`(默认): `BufferAsyncSink`中会存在一个无界队列,如果当上游调用`sink.next`尝试向下游发送数据时,如果下游requested条件不满足,会将数据缓存在无界队列中。(`使用无界队列进行缓存可能会导致OOM`)
|
||
|
||
#### 异步单线程生成`push`
|
||
`push`介于`generate`和`create`之间,其用于对单个生产者产生的事件进行处理,`push`和`generate`与`create`的相似点如下:
|
||
- 和`create`类似,其可以是异步的,并且可以通过`OverflowStrategy`来管理backpressure
|
||
- 和`generate`类似,对于`push`,其`sink.next`,`sink.complete`,`sink.error`在指定时刻只能被一个线程调用
|
||
|
||
其对异步事件的桥接示例如下:
|
||
```java
|
||
Flux<String> bridge = Flux.push(sink -> {
|
||
myEventProcessor.register(
|
||
new SingleThreadEventListener<String>() {
|
||
|
||
public void onDataChunk(List<String> chunk) {
|
||
for(String s : chunk) {
|
||
sink.next(s);
|
||
}
|
||
}
|
||
|
||
public void processComplete() {
|
||
sink.complete();
|
||
}
|
||
|
||
public void processError(Throwable e) {
|
||
sink.error(e);
|
||
}
|
||
});
|
||
});
|
||
```
|
||
|
||
##### hybird push-pull model
|
||
大多数reactor operators(例如create),都使用了混合推拉的模型。在混合推拉模型中,大多数事件的处理都是异步的(建议采用push的方法)。
|
||
|
||
在混合推拉模型中,consumer从source处拉取数据,`在consumer发送第一次request之前,source并不会emit任何数据`(source可能会生产数据,但是在requested余量不足时,并不会调用下游的onNext,根据OverflowStrategy的不同会执行不同处理)。`source只会在下游请求的范围内向下游推送数据`。
|
||
|
||
并且,在最下游的subscriber调用上游`subscribe`方法时,`source的数据生成逻辑才会被触发`,调用`subscribe`方法后,首先会调用subscriber的`onSubscribe`方法,然后会一直沿着reactive stream向前追溯,直到找到source,然后调用`Flux.create`接受的`Consumer<? super FluxSink<T>>`。
|
||
|
||
##### `sink.onRequest`
|
||
sink还提供了`onRequest`方法,其接受一个`LongConsumer`类似的参数,可以为sink注册一个`onRequest`回调,后续针对`sink`(sink实现了`Subscription`)调用request方法时,`consumer`都会被调用。
|
||
|
||
`sink.onRequest`的使用示例如下:
|
||
```java
|
||
Flux<String> bridge = Flux.create(sink -> {
|
||
myMessageProcessor.register(
|
||
new MyMessageListener<String>() {
|
||
|
||
public void onMessage(List<String> messages) {
|
||
//
|
||
for(String s : messages) {
|
||
sink.next(s);
|
||
}
|
||
}
|
||
});
|
||
sink.onRequest(n -> {
|
||
// 此处代码将会在`sink.request`被调用时触发
|
||
// 主动向processor拉取数据
|
||
List<String> messages = myMessageProcessor.getHistory(n);
|
||
for(String s : messages) {
|
||
sink.next(s);
|
||
}
|
||
});
|
||
});
|
||
```
|
||
|
||
|
||
#### Handle
|
||
`handle`为一个`instance method`(非静态方法),其关联了一个已经存在的source。`Mono`和`Flux`中都存在`handle`方法。
|
||
|
||
和`generate`类似,`handle`使用`SynchronousSink`,并且只允许逐个发出。`handle`的作用类似`map`和`filter`的组合,可以跳过部分元素,其签名如下
|
||
```java
|
||
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
|
||
```
|
||
|
||
其使用示例如下:
|
||
```java
|
||
public String alphabet(int letterNumber) {
|
||
if (letterNumber < 1 || letterNumber > 26) {
|
||
return null;
|
||
}
|
||
int letterIndexAscii = 'A' + letterNumber - 1;
|
||
return "" + (char) letterIndexAscii;
|
||
}
|
||
|
||
// 由于reactive steam中不允许有null,
|
||
// 故而当通过`alphabet`方法映射可能返回null值时,
|
||
// 可以使用handle对返回值进行过滤,只有不为空时才调用`sink.next`
|
||
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
|
||
.handle((i, sink) -> {
|
||
String letter = alphabet(i);
|
||
if (letter != null)
|
||
sink.next(letter);
|
||
});
|
||
|
||
alphabet.subscribe(System.out::println);
|
||
```
|
||
|
||
#### `onCancel` & `onDispose`
|
||
`sink`支持`onCancel`和`onDispose`回调,其区别如下:
|
||
- `onDispose`回调用于执行cleanup操作,其在`complete`, `error`, `cancel`之后都会被调用。
|
||
- `onCancel`回调用于执行取消操作,其只针对`cancel`执行,并且执行顺序位于cleanup之前
|
||
|
||
其使用示例如下所示:
|
||
```java
|
||
Flux<String> bridge = Flux.create(sink -> {
|
||
sink.onRequest(n -> channel.poll(n))
|
||
.onCancel(() -> channel.cancel())
|
||
.onDispose(() -> channel.close())
|
||
});
|
||
```
|
||
|
||
### Threading and Schedulers
|
||
reactor模型和rxjava模型类型,是并发无关的,并不强制要求并发模型。并且,大多数operator其运行的线程都和前一个operator相同。
|
||
|
||
`除非显式指定,否则最顶层的operator(source)也会运行在subscribe方法被调用的线程中`。
|
||
|
||
#### Scheduler
|
||
在reactor中,操作执行在哪个线程中取决于`使用的Scheduler`。Scheduler和ExectuorService类似,负责对任务进行调度,但是相比于ExecutorService其功能更加丰富。
|
||
|
||
`Scheudlers`类中存在如下静态方法,分别访问不同的`execution context`:
|
||
- `Schedulers.immediate()`:没有执行上下文,被提交的任务会在当前线程中被立马执行
|
||
- `Schedulers.single()`:线程上下文为一个`单个、可重用`的线程上下文。该方法会对所有的调用都使用相同的线程,直到scheduler被`disposed`
|
||
- `Schedulers.newSingle()`:每次调用时都使用一个专属线程
|
||
- `Schedulers.elastic()`:该上下文是一个`无界、弹性的线程池`。在引入`Schedulers.boundedElastic()`方法后,该方法不再推荐被使用。
|
||
- `Schedulers.boundedElastic()`:该上下文是一个`有界、弹性的线程池`。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现:
|
||
- `ExecutorService-based`:会在多个任务之间重用平台线程(即使用相同工作线程执行多个任务)
|
||
- `Virtual-thread-per-task-based`:`jdk21+`支持该特性,对每个任务,都会开启一个新的虚拟线程,并且实现并没有维护idle pools
|
||
|
||
#### createWorker
|
||
在Scheduler中,`idleQueue`中的线程会在空闲一段时间后自动销毁,但是通过`createWorker`手动创建的worker必须手动销毁,在使用完成后调用`release`。
|
||
|
||
#### operators using default scheduler
|
||
通常情况下,部分operators在未显式指定的前提下,会使用默认的scheduler(通常可显式指定一个不同的scheduler)。
|
||
|
||
例如,`Flux.interval(Duration.ofMillis(300))`方法会生成`间隔300ms的ticks`。默认情况下,其会使用`Schedulers.parallel`,可以通过如下代码指定一个新的scheduler:
|
||
```java
|
||
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
|
||
```
|
||
|
||
#### switch thread context
|
||
reactor提供了两种方法来切换执行的上下文:
|
||
- `publishOn`
|
||
- `subscribeOn`
|
||
|
||
> publishOn和subscribeOn用友如下区别:
|
||
> - publishOn方法在reactive chain中的位置若发生变化,会对各个节点的执行上下文造成影响
|
||
> - subscribeOn方法在reactive chain中位置若发生变化,并不会对各个节点的执行上下文造成影响
|
||
|
||
#### publishOn
|
||
`publishOn`操作符和作用和其他操作符类型,都是从上游接收到信号并且将信号传递到下游,但是,在执行下游的回调时,通过`Scheduler`中的worker进行调度。
|
||
|
||
故而,`publishOn`会对reactor chain中后续operators造成如下影响:
|
||
- 修改执行上下文,执行的线程由scheduler决定
|
||
- 按照规范,`onNext`的调用是有序的,故而publishOn后续的操作都会使用同一个worker进行调度,对于所有数据,都在同一线程中执行(worker在subscribe时决定publishOn使用的worker)
|
||
|
||
#### subscribeOn
|
||
`subscribeOn`影响订阅的过程,通常推荐将其放在source之后。
|
||
|
||
##### `subscribeOn`原理
|
||
在`FluxSubscribeOn#subscribeOrReturn`中,会通过`scheduler#createWorker`创建worker,并通过worker来对`执行source.subscribe`的方法进行调度。
|
||
|
||
故而,在consumer执行subscribe时,随着reactor chain从尾到头,下游subscriber都会调用上游publisher的subscribe方法。在`FluxSubscribeOn`调用上游的subscribe时,则是会通过worker在进行调度,实际调用在worker中执行。
|
||
|
||
subscribeOn方法使用方式如下:
|
||
```java
|
||
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
|
||
|
||
final Flux<String> flux = Flux
|
||
.range(1, 2)
|
||
.map(i -> 10 + i)
|
||
.subscribeOn(s)
|
||
.map(i -> "value " + i);
|
||
|
||
new Thread(() -> flux.subscribe(System.out::println));
|
||
```
|
||
### Handling Errors
|
||
在reactive stream中,errors是终止事件,一旦error被触发,sequence的生成将会被停止,并且将会沿着reactor chain的下游一直传递道`subscriber#onError`。
|
||
|
||
error应该在应用程序的级别被处理,处理方案如下:
|
||
- 在ui上展示错误信息
|
||
- 向rest endpoint发送error payload
|
||
|
||
故而,`subscriber#onError`方法应该被定义。
|
||
|
||
reactor同样支持在chain的中间通过`error-handling`来处理error,示例如下所示:
|
||
```java
|
||
Flux.just(1, 2, 0)
|
||
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
|
||
.onErrorReturn("Divided by zero :("); // error handling example
|
||
```
|
||
|
||
> 在reactor中,任何error都代表终止事件,`即使使用了error-handling operator,original sequence在出现error后也不会继续。
|
||
>
|
||
> 故而,error-handling将`onError`信号转化为了一个新sequence的开始(fallback one),即通过新sequence替换了上游旧的`terminated sequence`。
|
||
|
||
> 在`FluxOnErrorReturn.ReturnSubscriber#onError`的实现中,operator将`Throwable`转换为了`onNext(fallbackValue)`和`onComplete`两个调用,即在向下游发送`onNext(fallbackValue)`信号之后,也会终止序列。
|
||
|
||
通常来说,常见的异常才处理方式如下:
|
||
|
||
#### static fallback value
|
||
该场景类似于
|
||
```java
|
||
try {
|
||
return doSomethingDangerous(10);
|
||
}
|
||
catch (Throwable error) {
|
||
return "RECOVERED";
|
||
}
|
||
```
|
||
可以调用`onErrorReturn`方法,示例如下:
|
||
```java
|
||
Flux.just(10)
|
||
.map(this::doSomethingDangerous)
|
||
.onErrorReturn("RECOVERED");
|
||
```
|
||
`onErrorReturn`还存在一个接受`Predicate`的重载方法,可以决定是否对异常执行recover操作,示例如下:
|
||
```java
|
||
Flux.just(10)
|
||
.map(this::doSomethingDangerous)
|
||
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
|
||
```
|
||
|
||
#### catch and swallow error
|
||
如果不想将异常替换为fallback value,而是忽视产生的error,仅向下游传递正常生成的元素,可以使用`onErrorComplete`方法(例如, `1,2,3...`序列在向下进行传递时,如果`2`的下游operator执行发生异常,那么`onErrorComplete`会忽略`2`产生的error,仅向下游传递`1`)。
|
||
|
||
`onErrorComplete`的使用示例如下:
|
||
```java
|
||
Flux.just(10,20,30)
|
||
.map(this::doSomethingDangerousOn30)
|
||
.onErrorComplete();
|
||
```
|
||
> onErrorComplete会将onError信号转化为onComplete信号。
|
||
|
||
类似于`onErrorReturn`,`onErrorComplete`同样存在一个重载方法接受`Predicate`,可以通过该断言判断是否对error执行`recover`操作。
|
||
- 若执行recover操作,则会调用`subscriber#onComplete`
|
||
- 若不执行recover操作,则会调用`subscriber#onError`
|
||
|
||
#### fallback method
|
||
如果在处理数据时,存在多种方法,例如`m1,m2`,并且在尝试`m1`失败时想要再尝试`m2`,可以使用`onErrorResume`方法。
|
||
|
||
其等价于如下逻辑:
|
||
```java
|
||
String v1;
|
||
try {
|
||
v1 = callExternalService("key1");
|
||
}
|
||
catch (Throwable error) {
|
||
v1 = getFromCache("key1");
|
||
}
|
||
|
||
String v2;
|
||
try {
|
||
v2 = callExternalService("key2");
|
||
}
|
||
catch (Throwable error) {
|
||
v2 = getFromCache("key2");
|
||
}
|
||
```
|
||
和上述逻辑等价的是如下代码:
|
||
```java
|
||
Flux.just("key1", "key2")
|
||
.flatMap(k -> callExternalService(k)
|
||
.onErrorResume(e -> getFromCache(k))
|
||
);
|
||
```
|
||
其中,`onErrorResume`返回的都是一个`Publisher`
|
||
|
||
`onErrorResume`的底层逻辑实现如下:
|
||
- 其会根据`onErrorResume`中传入的`Function<? super Throwable, ? extends Publisher<? extends T>>`参数和`error`来构建一个新的publisher
|
||
- 然后,对新的publisher执行`subscribe`操作,并向下游返回新publisher的sequence
|
||
|
||
`onErrorResume`同样拥有一个重载函数接受`Predicate`,决定是否执行recover逻辑。除此之外,其接受`Function<? super Throwable, ? extends Publisher<? extends T>>`类型参数,可以根据抛出的异常来决定fallback sequence,示例如下:
|
||
```java
|
||
Flux.just("timeout1", "unknown", "key2")
|
||
.flatMap(k -> callExternalService(k)
|
||
.onErrorResume(error -> {
|
||
if (error instanceof TimeoutException)
|
||
return getFromCache(k);
|
||
else if (error instanceof UnknownKeyException)
|
||
return registerNewEntry(k, "DEFAULT");
|
||
else
|
||
return Flux.error(error);
|
||
})
|
||
);
|
||
```
|
||
> 在上述示例中,`Flux.error`会重新抛出上述异常
|
||
|
||
#### Dynamic Fallback Value
|
||
当出现error时,如果并不想调用下游的`onError`,而是想给下游一个`根据异常信息计算出的默认值`,同样可以使用`onErrorResume`方法,示例如下:
|
||
```java
|
||
try {
|
||
Value v = erroringMethod();
|
||
return MyWrapper.fromValue(v);
|
||
}
|
||
catch (Throwable error) {
|
||
return MyWrapper.fromError(error);
|
||
}
|
||
```
|
||
上述代码等价于
|
||
```java
|
||
erroringFlux.onErrorResume(error -> Mono.just(
|
||
MyWrapper.fromError(error)
|
||
));
|
||
```
|
||
|
||
#### Catch and rethrow
|
||
对于捕获异常然后重新抛出的逻辑
|
||
```java
|
||
try {
|
||
return callExternalService(k);
|
||
}
|
||
catch (Throwable error) {
|
||
throw new BusinessException("oops, SLA exceeded", error);
|
||
}
|
||
```
|
||
可以通过`onErrorResume`实现:
|
||
```java
|
||
Flux.just("timeout1")
|
||
.flatMap(k -> callExternalService(k))
|
||
.onErrorResume(original -> Flux.error(
|
||
new BusinessException("oops, SLA exceeded", original))
|
||
);
|
||
```
|
||
或者,可以调用`onErrorMap`,其是对`onErrorResume`方法的封装:
|
||
```java
|
||
Flux.just("timeout1")
|
||
.flatMap(k -> callExternalService(k))
|
||
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
|
||
```
|
||
|
||
#### log and react on the side
|
||
如果想要实现类似`将异常捕获、打印然后重新抛出`的逻辑,可以使用`doOnError`
|
||
```java
|
||
try {
|
||
return callExternalService(k);
|
||
}
|
||
catch (RuntimeException error) {
|
||
//make a record of the error
|
||
log("uh oh, falling back, service failed for key " + k);
|
||
throw error;
|
||
}
|
||
```
|
||
上述代码等价于
|
||
```java
|
||
LongAdder failureStat = new LongAdder();
|
||
Flux<String> flux =
|
||
Flux.just("unknown")
|
||
.flatMap(k -> callExternalService(k)
|
||
.doOnError(e -> {
|
||
failureStat.increment();
|
||
log("uh oh, falling back, service failed for key " + k);
|
||
})
|
||
|
||
);
|
||
```
|
||
##### doOnError
|
||
`doOnError`和其他以`doOn`为前缀的operators,其通常会被称为`副作用(side effect)`。通过这些operators,可以在不修改sequence的情况下监控chain中的事件。
|
||
|
||
|
||
#### Using resources and the finally block
|
||
对于`在finally block中清理资源`的行为,可以通过`doFinally`来实现
|
||
```java
|
||
Stats stats = new Stats();
|
||
stats.startTimer();
|
||
try {
|
||
doSomethingDangerous();
|
||
}
|
||
finally {
|
||
stats.stopTimerAndRecordTiming();
|
||
}
|
||
```
|
||
##### doFinally
|
||
`doFinally`是无论sequence结束(onError/onComplete)还是取消(cancel),都希望执行的副作用(side effect)。其接收参数类型为`Consumer<SignalType>`,可以通过`signaltype`判断`触发side-effect的terminaction类型`。
|
||
|
||
`doFinally`使用示例如下:
|
||
```java
|
||
Stats stats = new Stats();
|
||
LongAdder statsCancel = new LongAdder();
|
||
|
||
Flux<String> flux =
|
||
Flux.just("foo", "bar")
|
||
.doOnSubscribe(s -> stats.startTimer())
|
||
.doFinally(type -> {
|
||
stats.stopTimerAndRecordTiming();
|
||
if (type == SignalType.CANCEL)
|
||
statsCancel.increment();
|
||
})
|
||
.take(1);
|
||
```
|
||
|
||
对于`try(resource)`的代码,则是可以通过`using`来实现
|
||
```java
|
||
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
|
||
return disposableInstance.toString();
|
||
}
|
||
```
|
||
|
||
|
||
##### using
|
||
`using`代表`Flux由资源派生,并且在Flux处理完成之后都需要对资源执行操作`。
|
||
```java
|
||
// resource
|
||
AtomicBoolean isDisposed = new AtomicBoolean();
|
||
Disposable disposableInstance = new Disposable() {
|
||
@Override
|
||
public void dispose() {
|
||
isDisposed.set(true);
|
||
}
|
||
|
||
@Override
|
||
public String toString() {
|
||
return "DISPOSABLE";
|
||
}
|
||
};
|
||
```
|
||
|
||
```java
|
||
// using
|
||
Flux<String> flux =
|
||
Flux.using(
|
||
() -> disposableInstance,
|
||
disposable -> Flux.just(disposable.toString()),
|
||
Disposable::dispose
|
||
);
|
||
```
|
||
其中,`resource`实现了`Disposable`接口,而`Flux.using`接收的参数类型分别为
|
||
- `() -> disposableInstance`: resource supplier
|
||
- `disposable -> Flux.just(disposable.toString())`: 一个factory,根据resource产生publisher
|
||
- `Disposable::dispose`: resource cleanup
|
||
|
||
|
||
#### retry
|
||
`retry`允许对产生错误的sequence进行重试。
|
||
|
||
```java
|
||
Flux.interval(Duration.ofMillis(250))
|
||
.map(input -> {
|
||
if (input < 3) return "tick " + input;
|
||
throw new RuntimeException("boom");
|
||
})
|
||
.retry(1)
|
||
.elapsed()
|
||
.subscribe(System.out::println, System.err::println);
|
||
|
||
Thread.sleep(2100);
|
||
```
|
||
在上述示例中,`retry(1)`会对错误执行一次重试(`upstream重新开始生成`)。其产生结果如下所示:
|
||
```java
|
||
259,tick 0
|
||
249,tick 1
|
||
251,tick 2
|
||
506,tick 0
|
||
248,tick 1
|
||
253,tick 2
|
||
java.lang.RuntimeException: boom
|
||
```
|
||
|
||
##### retry原理
|
||
下游在上游调用`onError`时,会判断重试次数是否为0,如果不为0,会对重试次数进行减1,并且重新对上游调用`subscribe`,这会令上游`重新从头开始生成序列`。
|
||
|
||
直到重试次数为0之后,`FluxRetry`才会将onError传递给下游。
|
||
|
||
#### retryWhen
|
||
`retryWhen`会使用`companion flux`来判断错误时是否应该回滚。`companion flux`由operator创建,但是用户会通过装饰`companion flux`来自定义重试条件。
|
||
|
||
`companion flux`是一个`Flux<RetrySignal>`类型的值,该值将会被传递给`Retry`方法/策略(`retryWhen`只接受一个`Retry`类型的参数)。
|
||
|
||
使用者可以对`retry方法`进行指定,`retry方法`会返回一个`Publisher<?>`类型的值。
|
||
|
||
> Retry是一个抽象类,但是提供了`Retry#from`方法,通过该方法可以将lambda转换为Retry对象。
|
||
|
||
Retry周期如下所示:
|
||
- 每当error发生时,都会将`RetrySignal`发送到companion flux中,此时`companion flux`已经被`retry方法`装饰过了
|
||
- `Flux<RetrySignal>`将会包含至今所有的重试尝试信息,并且可以通过其访问error和error关联的元数据
|
||
- 如果`companion flux`发送了一个值,那么会触发重试
|
||
- 如果`companion flux`complete,将会对error执行swallow操作,retry cycle将会停止,并且也会导致sequence complete
|
||
- 如果`companion flux`产生了一个error,retry cycle将会终止,并且导致sequence按异常终止
|
||
|
||
#### retry helper
|
||
project reactor提供了`Retry` helper,如`RetrySpec`和`RetryBackoffSpec`,二者都允许进行如下所示的自定义行为:
|
||
- `filter`:通过filter设置`允许触发retry的异常`
|
||
- `modifyErrorFilter`: 对之前`filter`设置的异常进行修改
|
||
- `doBeforeRetry`和`doAfterRetry`:针对retrytrigger执行side effect
|
||
- > `doBeforeRetry()`方法触发在delay发生之前,而`doAfterRetry()`触发在delay之后
|
||
- `onRetryExhaustedThrow(BiFunction)`:在重试数量达到上限后,通过`onRetryExhaustedThrow(BiFunction)`来自定义异常。
|
||
- 通常情况下,当重试数量达到上限后,自定义异常类型通过`Exceptions.retryExhausted(…)`方法来构建,其返回的异常类型为`RetryExhaustedException`,可以通过`Exceptions.isRetryExhausted(Throwable)`方法来进行区分
|
||
##### retrying with transient errors
|
||
在`RetrySignal`中存在如下两个方法:
|
||
- `totalRetriesInARow()`:每当`error is recovered`(retry attempt导致了`onNext`而不是`onError`)时,index都会被设置为0
|
||
- `totalRetries()`:`totalRetries()`方法返回的值是单调递增的,并不会被重置
|
||
|
||
当使用`RetrySpec`和`RetryBackoffSpec`时,可以通过`transientErrors(true)`方法来令策略使用`totalRetriesInARow()`处理transient error。
|
||
|
||
示例如下所示:
|
||
```java
|
||
AtomicInteger errorCount = new AtomicInteger();
|
||
Flux<Integer> transientFlux = httpRequest.get()
|
||
.doOnError(e -> errorCount.incrementAndGet());
|
||
|
||
transientFlux.retryWhen(Retry.max(2).transientErrors(true))
|
||
.blockLast();
|
||
assertThat(errorCount).hasValue(6);
|
||
```
|
||
|
||
> `transientErrors`主要是为了处理周期性发生异常,异常能够重试成功,并且发生异常后一段时间平稳运行的场景
|
||
|
||
#### handling Exceptions in operators or functions
|
||
通常来说,operator的执行有可能会抛出异常,根据抛出异常类型的不同,存在如何区别:
|
||
- `unchecked Exception`:对于抛出的`unchecked exception`,其都会通过`onError`向下游传递,例如,map operator中抛出的`RuntimeException`将会被转化为对下游`onError`的调用
|
||
```java
|
||
Flux.just("foo")
|
||
.map(s -> { throw new IllegalArgumentException(s); })
|
||
.subscribe(v -> System.out.println("GOT VALUE"),
|
||
e -> System.out.println("ERROR: " + e));
|
||
```
|
||
上述代码示例将会触发下游subscriber的onError,输出内容如下:
|
||
```console
|
||
ERROR: java.lang.IllegalArgumentException: foo
|
||
```
|
||
- `fatal exceptions`:在reactor中,定义了一些被视为`致命`的异常(例如`OutOfMemoryError`),具体的`fatal`异常包含范围可见`Exceptions.throwIfFatal`
|
||
- 在抛出`fatal error`时,reactor operator会抛出异常而不是将其传递到下游
|
||
|
||
|
||
##### checked exception
|
||
对于`checked exception`,当调用方法签名中指定了`throws xxxException`的方法时,需要通过`try-catch block`对其进行处理。对于`checked exception`,有如下选择:
|
||
- 对异常进行捕获,并且对其执行`recover`操作,sequence会继续执行
|
||
- 对异常进行捕获,并且封装到`unchecked exception`中,并将`unchecked exception`进行rethrow(这将对sequence进行打断)
|
||
- 如果需要返回一个`Flux`(例如,使用`flatMap`方法),可以使用`Flux.error(checkedException)`返回一个`error-producing Flux`(sequence会终止)
|
||
|
||
##### Exceptions Utility
|
||
通过reactor提供的Exceptions Utility,可以保证`仅当异常为checked exception时,才会封装`
|
||
- `Exceptions.propagate`在必要时会封装exception。
|
||
- 其并不会对`RuntimeException`进行封装,并且该方法首先会调用`throwIfFatal`
|
||
- 该方法会将checked exception封装到`reactor.core.Exceptions.ReactiveException`中
|
||
- `Exceptions.unwrap`:
|
||
- 该方法会获取original unwrapped exception
|
||
|
||
使用示例如下所示:
|
||
```java
|
||
public String convert(int i) throws IOException {
|
||
if (i > 3) {
|
||
throw new IOException("boom " + i);
|
||
}
|
||
return "OK " + i;
|
||
}
|
||
|
||
Flux<String> converted = Flux
|
||
.range(1, 10)
|
||
.map(i -> {
|
||
try { return convert(i); }
|
||
catch (IOException e) { throw Exceptions.propagate(e); }
|
||
});
|
||
|
||
converted.subscribe(
|
||
v -> System.out.println("RECEIVED: " + v),
|
||
e -> {
|
||
if (Exceptions.unwrap(e) instanceof IOException) {
|
||
System.out.println("Something bad happened with I/O");
|
||
} else {
|
||
System.out.println("Something bad happened");
|
||
}
|
||
}
|
||
);
|
||
```
|
||
### Sinks
|
||
#### 在多线程环境下安全的使用`Sinks.One`和`Sinks.Many`
|
||
reactor-core提供的sinks支持多线程环境的使用,并不会触犯规范或导致下游subscribers产生未知行为。
|
||
|
||
#### `tryEmit` & `emit`
|
||
当尝试通过sinks向下游发送signal时,可以调用如下API:
|
||
- `tryEmit*`:并行调用将会导致fail fast
|
||
- `emit*`: 当调用`emit*`时,提供的`EmissionFailureHandler`,如果该接口的`onEmitFailure`方法返回为true,将会在争用场景下执行重试操作(例如busy loop);如果onEmitFailure返回为false,则sink会以error终止。
|
||
|
||
上述流程是对`Processor.onNext`的改进,`Processor.onNext`必须在外部进行同步,否则将会导致下游subscribers未定义的行为。
|
||
|
||
> 例如,`Flux.create`允许多线程调用`sink.onNext`,但是其使用的sink是`reactor.core.publisher.FluxCreate.SerializedFluxSink`,其在next操作中通过队列和`cas`对下游的`onNext`操作进行了同步。
|
||
>
|
||
> 故而,在传统的`Processor.onNext`中,如果要在多线程环境下使用,必须在上游做好同步操作,否则会导致下游的未定义行为。
|
||
|
||
##### Processor
|
||
Processor是一种特殊的`Publisher`,其在作为`publisher`的同时也是`Subscriber`。
|
||
|
||
在使用Processor是,一个很常见的问题是,直接对processor向外暴露的`onNext`,`onComplete`, `onError`方法进行调用。
|
||
|
||
实际上,对于processor中`onXXX`方法的调用必须要符合reactive stream规范,在外部对`onXXX`方法进行调用时,要做好同步操作。
|
||
|
||
sinks的使用示例如下所示:
|
||
```java
|
||
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
|
||
```
|
||
通过sink,多个线程可以并行的产生数据
|
||
```java
|
||
//thread1
|
||
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
|
||
|
||
//thread2, later
|
||
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);
|
||
|
||
//thread3, concurrently with thread 2
|
||
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
|
||
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));
|
||
|
||
//thread3, concurrently with thread 2
|
||
//would return FAIL_NON_SERIALIZED
|
||
EmitResult result = replaySink.tryEmitNext(4);
|
||
```
|
||
|
||
> 在使用`EmitFailureHandler.busyLooping`时,其返回的示例包含状态,并不能被重用
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|