Files
rikako-note/spring/webflux/Reactor.md
2025-03-24 01:11:38 +08:00

154 lines
9.0 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Reactor
## Reactive Programming
响应式编程是一种异步编程范式,关注于`数据流``状态变化的传播`。java的响应式编程接口被包含在java9的`Flow`中。
响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用`next`方法reactive stream是基于`发布/订阅`模型的。
> 迭代器模式是`pull-based`而reactive stream为`push-based`。
### 命令式迁移到响应式
#### 可组合性与可读性
“可组合性"代表编排多个异步任务的能力通过“组合”可以将前一个异步任务的输出作为后一个异步任务的输入。或者可以按照fork-join的形式对异步任务进行编排。
reactor同样能解决“可读性”的问题在使用传统的callback model编写程序时随着逻辑的复杂异步进行的层数也会增加这将会极大降低代码的可读性和可维护性。
> 在使用call model时通常需要在回调中执行另一个回调回调的嵌套通通常会被称为`callback heil`。
reactor提供了复杂的“组合”选项能够反映抽象异步进程的组织并且所有的内容通常都会位于同一层级。
#### Assembly Line
响应式应用中的数据处理类似于流水线其中reactor既是传送带又是工作站。数据来源于`original publisher`,最终传传递给`subscriber`
数据在从publisher传送到subscriber的过程中可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间受影响的workstation会向上游发送消息来控制数据的生成速率。
#### Operators
在reactor中Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到`publisher`并且前一个publisher包装到一个新的publisher实例中。
故而operator将整个chain都链接起来数据来源于第一个publisher并随着chain移动依次被每个链接处理最终由subscriber结束该过程。
#### Nothing Happens Until You subscribe()
当通过reactor编写publisher chain时数据并不会被泵入到chain中编写chain只是创建了异步处理的抽象描述。
通过订阅行为将publisher和subscriber绑定到了一起订阅行为会触发chain中的数据流。该行为通过内部的signal实现subscriber将会发送一个`reuqest signal`该信号会被传递到chain上游一直被传递到source publisher。
#### backpressure
`传递到上游的信号`该机制也被用于实现backpressure在assembly line模型中也被描述为workstation传递给上游的反馈信号当workstation处理消息比上游workstation满时会发送该反馈。
reactive stream定义的机制接近于上述描述其提供两种模式
- unbounded modesource publisher可以按其最高速率不受限制的推送数据
- request mode通过`request`机制向source publisher发送信号告知其准备好处理最多`n`个元素。
中间的operator也可以在传输过程中对请求做出修改例如`buffer` operator可以将elements分割为以10个为单位的batch如果subscriber请求一个buffer那么上游source publisher可以产生10个元素。
通过backpressure可以将`push`模型转化为`push-pull`模型:
- 当上游的n个元素已经准备好时下游可以从上游拉取n个元素
- 当上有没有准备好n个元素时后续如果n个元素被准备好其将会被上游推送
#### hot & cold
对于响应式序列,其可以分为两种:
- cold sequence对于cold sequence会为每个订阅者重新开始流程包括source publisher。例如source中若封装了http调用会为每个subscriber都执行一个新的http请求
- hot sequencesubscriber只有在其订阅后才收到信号即使没有subscriber在监听hot sequence仍然能够发送signal
## Subscriber和Publisher
### Publisher
对于publisher其提供了`subscribe`方法供subscriber进行注册在执行subscribe方法并向其传入`Subscriber`对象后上游publisher会调用下游的`onSubscribe`方法,并向`onSubscribe`方法传入`Subscription`对象。下游可以通过`Subscription`对象调用`request(n)`请求。
> 若中间存在operator例如map在担任publisher角色的同时还对上游进行了订阅那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。
>
> `任何变更状态的操作都只在实际subscriber执行订阅操作后被触发`。
### Subscriber
当下游调用`request(n)`方法之后,会向上游请求`n`个数据。上游会向下游发送`onNext`信号来传输生成的数据。
## Reactor Core
reactor引入了两个实现`Publisher`的类:`Mono``Flux`
- Flux代表包含`0...N`个items的reactive sequence
- Mono代表包含`0...1`个items的reactive sequence
上述两个类代表了在异步处理场景中的大致基数。
- Mono例如对于http请求的场景一个请求只会产生一个响应故而对响应执行`count`操作并没有任何意义。此时,可以通过`Mono<HttpResponse>`来代表http调用的结果`Mono`中只提供了上下文中包含`0...1`个元素的对应操作
- 当执行某些`可能会改变异步处理中最大基数的操作`时,可能会导致类型的改变,例如执行`Flux`中的`count`操作将会返回`Mono<Long>`的类型
### Flux `0...n`
![alt text](image.png)
`Flux<T>`是一个标准的`Publisher<T>`,代表基数为`0...n`的异步序列,其可以被`completion signal`或异常所终止。根据reactive stream标准存在三种signal且信号会转化为对下游`onNext``onComplete``onError`的调用。
Flux是一个通用的reactive类型并且所有的event type都是可选的。
- 当没有`onNext`事件但是存在`onComplete`事件,代表一个空的有限序列
-`onNext``onComplete`事件都不存在时,代表一个空的无限序列
- 无限序列并不一定为空,例如`Flux.interval(Duration)`会产生一个`Flux<Long>`其是无限的并且发送tick
### Mono `0...1`
`Mono<T>`是一个标准的`Publisher<T>`,其通过`onNext`信号发送至多一个item然后再结束时发送`onComplete`信号结束(成功场景);或直接发送`onError`信号结束(失败场景)。
大多数Mono实现在调用完subscriber的`onNext`方法之后预计会立马调用subscriver的`onComplete`方法。但是,`Mono.never`是一个例外,其并不会发送任何信号,并且其`onNext``onError`的组合是被明确禁止的。
### 创建Mono/Flux并进行订阅的方式
#### String sequence
如果想要创建String序列可以通过如下方式
```java
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
```
#### Flux.empty
```java
Mono<String> noData = Mono.empty();
```
#### Flux.range
在下面示例中,`Flux.range`第一个参数是range的起始值第二个参数是要产生的元素个数
```java
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
```
故而,其产生的内容为`5,6,7`
#### Lambda Subscribe
在进行订阅时,`Flux``Mono`使用了lambda在调用subscribe时有如下几种重载的选择
```java
// 订阅并触发sequence的产生
subscribe();
// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer);
// 在reactive stream异常终止时对error进行处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 在sequence处理完时执行额外的complete操作
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
```
对于subscribe的使用示例如下
```java
Flux.range(5,3)
.map(x->{
if(x<7) {
return x;
}
throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
})
.subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
(e) -> {
System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
},
()->{
System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
});
```
其执行结果如下:
```
[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7
```