172 lines
10 KiB
Markdown
172 lines
10 KiB
Markdown
# Reactor
|
||
## Reactive Programming
|
||
响应式编程是一种异步编程范式,关注于`数据流`和`状态变化的传播`。java的响应式编程接口被包含在java9的`Flow`中。
|
||
|
||
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用`next`方法,reactive stream是基于`发布/订阅`模型的。
|
||
|
||
> 迭代器模式是`pull-based`,而reactive stream为`push-based`。
|
||
|
||
### 命令式迁移到响应式
|
||
#### 可组合性与可读性
|
||
“可组合性"代表编排多个异步任务的能力,通过“组合”,可以将前一个异步任务的输出作为后一个异步任务的输入。或者,可以按照fork-join的形式对异步任务进行编排。
|
||
|
||
reactor同样能解决“可读性”的问题,在使用传统的callback model编写程序时,随着逻辑的复杂,异步进行的层数也会增加,这将会极大降低代码的可读性和可维护性。
|
||
|
||
> 在使用call model时,通常需要在回调中执行另一个回调,回调的嵌套通通常会被称为`callback heil`。
|
||
|
||
reactor提供了复杂的“组合”选项,能够反映抽象异步进程的组织,并且,所有的内容通常都会位于同一层级。
|
||
|
||
#### Assembly Line
|
||
响应式应用中的数据处理类似于流水线,其中,reactor既是传送带又是工作站。数据来源于`original publisher`,最终传传递给`subscriber`。
|
||
|
||
数据在从publisher传送到subscriber的过程中,可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间,受影响的workstation会向上游发送消息来控制数据的生成速率。
|
||
|
||
#### Operators
|
||
在reactor中,Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到`publisher`中,并且前一个publisher包装到一个新的publisher实例中。
|
||
|
||
故而,operator将整个chain都链接起来,数据来源于第一个publisher,并随着chain移动,依次被每个链接处理,最终由subscriber结束该过程。
|
||
|
||
#### Nothing Happens Until You subscribe()
|
||
当通过reactor编写publisher chain时,数据并不会被泵入到chain中,编写chain只是创建了异步处理的抽象描述。
|
||
|
||
通过订阅行为,将publisher和subscriber绑定到了一起,订阅行为会触发chain中的数据流。该行为通过内部的signal实现,subscriber将会发送一个`reuqest signal`,该信号会被传递到chain上游,一直被传递到source publisher。
|
||
|
||
#### backpressure
|
||
`传递到上游的信号`该机制也被用于实现backpressure,在assembly line模型中,也被描述为workstation传递给上游的反馈信号,当workstation处理消息比上游workstation满时,会发送该反馈。
|
||
|
||
reactive stream定义的机制接近于上述描述,其提供两种模式:
|
||
- unbounded mode:source publisher可以按其最高速率不受限制的推送数据
|
||
- request mode:通过`request`机制向source publisher发送信号,告知其准备好处理最多`n`个元素。
|
||
|
||
中间的operator也可以在传输过程中对请求做出修改,例如`buffer` operator可以将elements分割为以10个为单位的batch,如果subscriber请求一个buffer,那么上游source publisher可以产生10个元素。
|
||
|
||
通过backpressure,可以将`push`模型转化为`push-pull`模型:
|
||
- 当上游的n个元素已经准备好时,下游可以从上游拉取n个元素
|
||
- 当上有没有准备好n个元素时,后续如果n个元素被准备好,其将会被上游推送
|
||
|
||
#### hot & cold
|
||
对于响应式序列,其可以分为两种:
|
||
- cold sequence:对于cold sequence,会为每个订阅者重新开始流程,包括source publisher。例如source中若封装了http调用,会为每个subscriber都执行一个新的http请求
|
||
- hot sequence:subscriber只有在其订阅后才收到信号,即使没有subscriber在监听,hot sequence仍然能够发送signal
|
||
|
||
## Subscriber和Publisher
|
||
### Publisher
|
||
对于publisher,其提供了`subscribe`方法供subscriber进行注册,在执行subscribe方法并向其传入`Subscriber`对象后,上游publisher会调用下游的`onSubscribe`方法,并向`onSubscribe`方法传入`Subscription`对象。下游可以通过`Subscription`对象调用`request(n)`请求。
|
||
|
||
> 若中间存在operator(例如map)在担任publisher角色的同时,还对上游进行了订阅,那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
|
||
>
|
||
> `任何变更状态的操作都只在实际subscriber执行订阅操作后被触发`。
|
||
|
||
### Subscriber
|
||
当下游调用`request(n)`方法之后,会向上游请求`n`个数据。上游会向下游发送`onNext`信号来传输生成的数据。
|
||
|
||
## Reactor Core
|
||
reactor引入了两个实现`Publisher`的类:`Mono`和`Flux`。
|
||
- Flux:代表包含`0...N`个items的reactive sequence
|
||
- Mono:代表包含`0...1`个items的reactive sequence
|
||
|
||
上述两个类代表了在异步处理场景中的大致基数。
|
||
- Mono:例如,对于http请求的场景,一个请求只会产生一个响应,故而对响应执行`count`操作并没有任何意义。此时,可以通过`Mono<HttpResponse>`来代表http调用的结果,`Mono`中只提供了上下文中包含`0...1`个元素的对应操作
|
||
- 当执行某些`可能会改变异步处理中最大基数的操作`时,可能会导致类型的改变,例如执行`Flux`中的`count`操作将会返回`Mono<Long>`的类型
|
||
|
||
### Flux `0...n`
|
||

|
||
|
||
`Flux<T>`是一个标准的`Publisher<T>`,代表基数为`0...n`的异步序列,其可以被`completion signal`或异常所终止。根据reactive stream标准,存在三种signal,且信号会转化为对下游`onNext`、`onComplete`、`onError`的调用。
|
||
|
||
Flux是一个通用的reactive类型,并且,所有的event type都是可选的。
|
||
- 当没有`onNext`事件但是存在`onComplete`事件,代表一个空的有限序列
|
||
- 当`onNext`和`onComplete`事件都不存在时,代表一个空的无限序列
|
||
- 无限序列并不一定为空,例如`Flux.interval(Duration)`会产生一个`Flux<Long>`,其是无限的并且发送tick
|
||
|
||
### Mono `0...1`
|
||
`Mono<T>`是一个标准的`Publisher<T>`,其通过`onNext`信号发送至多一个item,然后再结束时发送`onComplete`信号结束(成功场景);或直接发送`onError`信号结束(失败场景)。
|
||
|
||
大多数Mono实现在调用完subscriber的`onNext`方法之后,预计会立马调用subscriver的`onComplete`方法。但是,`Mono.never`是一个例外,其并不会发送任何信号,并且其`onNext`和`onError`的组合是被明确禁止的。
|
||
|
||
### 创建Mono/Flux并进行订阅的方式
|
||
#### String sequence
|
||
如果想要创建String序列,可以通过如下方式:
|
||
```java
|
||
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
|
||
|
||
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
|
||
Flux<String> seq2 = Flux.fromIterable(iterable);
|
||
```
|
||
#### Flux.empty
|
||
```java
|
||
Mono<String> noData = Mono.empty();
|
||
```
|
||
#### Flux.range
|
||
在下面示例中,`Flux.range`第一个参数是range的起始值,第二个参数是要产生的元素个数
|
||
```java
|
||
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
|
||
```
|
||
故而,其产生的内容为`5,6,7`。
|
||
|
||
#### Lambda Subscribe
|
||
在进行订阅时,`Flux`和`Mono`使用了lambda,在调用subscribe时,有如下几种重载的选择:
|
||
```java
|
||
// 订阅并触发sequence的产生
|
||
subscribe();
|
||
|
||
// 对每个产生的值通过consumer执行处理操作
|
||
subscribe(Consumer<? super T> consumer);
|
||
|
||
// 在reactive stream异常终止时,对error进行处理
|
||
subscribe(Consumer<? super T> consumer,
|
||
Consumer<? super Throwable> errorConsumer);
|
||
|
||
// 在sequence处理完时,执行额外的complete操作
|
||
subscribe(Consumer<? super T> consumer,
|
||
Consumer<? super Throwable> errorConsumer,
|
||
Runnable completeConsumer);
|
||
|
||
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
|
||
// 该重载已废弃
|
||
subscribe(Consumer<? super T> consumer,
|
||
Consumer<? super Throwable> errorConsumer,
|
||
Runnable completeConsumer,
|
||
Consumer<? super Subscription> subscriptionConsumer);
|
||
```
|
||
对于subscribe的使用,示例如下
|
||
```java
|
||
Flux.range(5,3)
|
||
.map(x->{
|
||
if(x<7) {
|
||
return x;
|
||
}
|
||
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
|
||
})
|
||
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
|
||
(e) -> {
|
||
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
|
||
},
|
||
()->{
|
||
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
|
||
});
|
||
```
|
||
其执行结果如下:
|
||
```
|
||
[main]-5
|
||
[main]-6
|
||
[main]-Error Caught: fucking value {7} equals or greater than 7
|
||
```
|
||
|
||
#### Disposable
|
||
上述`subscribe`方法的返回类型为`Disposable`,该接口代表subscriber对publisher的订阅是可取消的,如需取消订阅,调用`dispose`方法即可。
|
||
|
||
对于Mono和Flux而言,source publisher应该在接收到cancellation信号之后停止产生元素,`并不能保证取消信号是即时的`。(`若source产生数据的速度过快,可能在接收到cancel信号之前,source就已经complete`)。
|
||
|
||
`Disposables`类中存在一些对`Disposable`的工具方法,例如`swap`和`composite`。
|
||
|
||
#### BaseSubscriber
|
||
`subscribe`方法除了接收lambda外,还存在更通用的重载方法,接收`Subscriber`类型的参数。
|
||
|
||
在这种场景下,传参可以继承`BaseSubscriber`类。
|
||
|
||
并且,`BaseSubscriber`该类是一次性的,`其只能够订阅一个publisher,如果其订阅了第二个publisher,那么其对第一个publisher的订阅将会被取消`。
|
||
|
||
> `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。
|
||
|