doc: 阅读reactor core文档
This commit is contained in:
@@ -169,3 +169,125 @@ Flux.range(5,3)
|
|||||||
|
|
||||||
> `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。
|
> `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。
|
||||||
|
|
||||||
|
示例如下:
|
||||||
|
```java
|
||||||
|
Flux.range(3, 5)
|
||||||
|
.subscribe(new BaseSubscriber<Integer>() {
|
||||||
|
private Subscription subscription;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void hookOnSubscribe(Subscription subscription) {
|
||||||
|
this.subscription = subscription;
|
||||||
|
subscription.request(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void hookOnNext(Integer value) {
|
||||||
|
log.info("onNext called: {}", value);
|
||||||
|
// this.subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void hookOnComplete() {
|
||||||
|
log.info("onComplete called");
|
||||||
|
super.hookOnComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void hookOnError(Throwable throwable) {
|
||||||
|
log.info("onError called: {}", throwable.getMessage());
|
||||||
|
super.hookOnError(throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
```
|
||||||
|
上述示例中,通过向`subscribe`方法中传递自定义的`BaseSubscriber`来实现对上游的订阅,执行结果如下:
|
||||||
|
```
|
||||||
|
2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3
|
||||||
|
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4
|
||||||
|
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5
|
||||||
|
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6
|
||||||
|
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7
|
||||||
|
2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called
|
||||||
|
```
|
||||||
|
|
||||||
|
`BaseSubscriber`的`hookOnSubscribe`默认实现如下:
|
||||||
|
```java
|
||||||
|
protected void hookOnSubscribe(Subscription subscription) {
|
||||||
|
subscription.request(Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
其请求的数量为`Long.MAX_VALUE`,代表其publisher为`effectively unbounded`。
|
||||||
|
|
||||||
|
可以通过重写`hookOnSubscribe`方法来自己指定request数量,如果需要自己指定请求数量,最少需要重写`hookOnSubscribe`和`hookOnNext`方法。
|
||||||
|
|
||||||
|
`BaseSubscriber`提供了`requestUnbounded`方法(`其方法和request(Long.MAX_VALUE)等价`)和`cancel`方法。
|
||||||
|
|
||||||
|
除了上述列出的hook外,`BaseSubscriber`还支持如下hooks:
|
||||||
|
- hookOnComplete
|
||||||
|
- hookOnError
|
||||||
|
- hookOnCancel
|
||||||
|
- hookFinally(当sequence终止时,都会被调用,可以用参数来判断终止类型为complete或error)
|
||||||
|
- hookFinally的调用顺序位于hookOnComplete和hookOnError之后
|
||||||
|
|
||||||
|
#### backpressure
|
||||||
|
在reactor的backpressure实现中,consumer pressure传播到上游source的机制是向上游operator发送`request`请求。当前已发送的请求个数之和被称为`demand`,并且`demand`的上限为`Long.MAX_VALUE`,当demand的值为`Long.MAX_VALUE`或更大时,代表`unbound request`。
|
||||||
|
|
||||||
|
> `unbound request`代表尽可能快的产生数据,即backpressure关闭。
|
||||||
|
|
||||||
|
在reactive chain中,第一个请求来自于`final subscriber`,其在订阅时(onSubscribe)会发送第一个`request`请求。目前,大多直接订阅的方法都会通过`Long.MAX_VALUE`创建一个unbounded request,示例如下:
|
||||||
|
- `subcribe()`方法和大多数基于lambda的重载方法(除了包含`Consumer<Subscription>`参数的重载)
|
||||||
|
- `block`, `blockFirst`, `blockLast`
|
||||||
|
- 通过`toIterable`或`toStream`进行遍历
|
||||||
|
|
||||||
|
目前,定义初始请求最简单的方法为`通过BaseSubscription对上游进行订阅,并且重写onSubscribe方法`。
|
||||||
|
|
||||||
|
#### buffer
|
||||||
|
在reactor中,可以通过部分operator进行request的reshape。示例如下:
|
||||||
|
```java
|
||||||
|
Flux.range(1, 1000)
|
||||||
|
.buffer(3)
|
||||||
|
.subscribe(new BaseSubscriber<List<Integer>>() {
|
||||||
|
@Override
|
||||||
|
protected void hookOnSubscribe(Subscription subscription) {
|
||||||
|
subscription.request(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void hookOnNext(List<Integer> value) {
|
||||||
|
for (Integer v : value) {
|
||||||
|
log.info("item received: {}", v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
```
|
||||||
|
在上述示例中,`request(2)`代表请求`2个buffer`,而每个`buffer`中包含`3`个`Integer`,故而总共会接收到`2 * 3 = 6`个元素。
|
||||||
|
|
||||||
|
输出如下:
|
||||||
|
```
|
||||||
|
2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1
|
||||||
|
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2
|
||||||
|
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3
|
||||||
|
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4
|
||||||
|
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5
|
||||||
|
2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6
|
||||||
|
```
|
||||||
|
|
||||||
|
#### prefetch
|
||||||
|
`prefetch`机制是一种backpressure的优化策略,用于确保下游处理数据时上游的数据能够即时补充,对吞吐量和资源利用率进行平衡。
|
||||||
|
|
||||||
|
prefetch机制通常分为如下部分:
|
||||||
|
##### 初始请求
|
||||||
|
在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时,会向上游发送一个初始请求,请求大小为32个元素。
|
||||||
|
|
||||||
|
##### 补充优化(Replenishing Optimization)
|
||||||
|
prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75%的预取元素已经被处理(32 *0.75 = 24),其自动会向上游发送一个新请求,要求补充75%的prefetch量。该过程是动态的,会在整个数据流处理过程中持续进行。
|
||||||
|
|
||||||
|
> ##### 预加载数据
|
||||||
|
> 补充优化的优化点在于,当预取数据还剩余25%(8个)未被处理时,提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。
|
||||||
|
|
||||||
|
> ##### 平滑处理
|
||||||
|
> 通过prefetch逐步请求新数据,且每次请求固定的量,可以保证处理数据速率的稳定。如果source端同时来源大量数据,那么若不进行平滑处理,则大量数据的同时处理可能导致竞争,令性能下降。
|
||||||
|
|
||||||
|
有如下operators可以对请求的prefetch进行调整
|
||||||
|
|
||||||
|
##### limitRate
|
||||||
|
|||||||
Reference in New Issue
Block a user