diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 9f12bc4..0dc745f 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -169,3 +169,125 @@ Flux.range(5,3) > `BaseSubscriber`只能订阅一个publisher的原因是reactive stream规范要求`onNext`方法不能被并行调用。 +示例如下: +```java +Flux.range(3, 5) + .subscribe(new BaseSubscriber() { + private Subscription subscription; + + @Override + protected void hookOnSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(5); + } + + @Override + protected void hookOnNext(Integer value) { + log.info("onNext called: {}", value); +// this.subscription.request(1); + } + + @Override + protected void hookOnComplete() { + log.info("onComplete called"); + super.hookOnComplete(); + } + + @Override + protected void hookOnError(Throwable throwable) { + log.info("onError called: {}", throwable.getMessage()); + super.hookOnError(throwable); + } + }); +``` +上述示例中,通过向`subscribe`方法中传递自定义的`BaseSubscriber`来实现对上游的订阅,执行结果如下: +``` +2025-03-24T19:21:09.818+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 3 +2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 4 +2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 5 +2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 6 +2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onNext called: 7 +2025-03-24T19:21:09.819+08:00 INFO 27440 --- [ main] cc.rikako.springdemo.runner.CmdRunner : onComplete called +``` + +`BaseSubscriber`的`hookOnSubscribe`默认实现如下: +```java +protected void hookOnSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } +``` +其请求的数量为`Long.MAX_VALUE`,代表其publisher为`effectively unbounded`。 + +可以通过重写`hookOnSubscribe`方法来自己指定request数量,如果需要自己指定请求数量,最少需要重写`hookOnSubscribe`和`hookOnNext`方法。 + +`BaseSubscriber`提供了`requestUnbounded`方法(`其方法和request(Long.MAX_VALUE)等价`)和`cancel`方法。 + +除了上述列出的hook外,`BaseSubscriber`还支持如下hooks: +- hookOnComplete +- hookOnError +- hookOnCancel +- hookFinally(当sequence终止时,都会被调用,可以用参数来判断终止类型为complete或error) + - hookFinally的调用顺序位于hookOnComplete和hookOnError之后 + +#### backpressure +在reactor的backpressure实现中,consumer pressure传播到上游source的机制是向上游operator发送`request`请求。当前已发送的请求个数之和被称为`demand`,并且`demand`的上限为`Long.MAX_VALUE`,当demand的值为`Long.MAX_VALUE`或更大时,代表`unbound request`。 + +> `unbound request`代表尽可能快的产生数据,即backpressure关闭。 + +在reactive chain中,第一个请求来自于`final subscriber`,其在订阅时(onSubscribe)会发送第一个`request`请求。目前,大多直接订阅的方法都会通过`Long.MAX_VALUE`创建一个unbounded request,示例如下: +- `subcribe()`方法和大多数基于lambda的重载方法(除了包含`Consumer`参数的重载) +- `block`, `blockFirst`, `blockLast` +- 通过`toIterable`或`toStream`进行遍历 + +目前,定义初始请求最简单的方法为`通过BaseSubscription对上游进行订阅,并且重写onSubscribe方法`。 + +#### buffer +在reactor中,可以通过部分operator进行request的reshape。示例如下: +```java +Flux.range(1, 1000) + .buffer(3) + .subscribe(new BaseSubscriber>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.request(2); + } + + @Override + protected void hookOnNext(List value) { + for (Integer v : value) { + log.info("item received: {}", v); + } + } + }); +``` +在上述示例中,`request(2)`代表请求`2个buffer`,而每个`buffer`中包含`3`个`Integer`,故而总共会接收到`2 * 3 = 6`个元素。 + +输出如下: +``` +2025-03-24T20:02:52.438+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 1 +2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 2 +2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 3 +2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 4 +2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 5 +2025-03-24T20:02:52.439+08:00 INFO 9472 --- [ main] cc.rikako.springdemo.runner.CmdRunner : item received: 6 +``` + +#### prefetch +`prefetch`机制是一种backpressure的优化策略,用于确保下游处理数据时上游的数据能够即时补充,对吞吐量和资源利用率进行平衡。 + +prefetch机制通常分为如下部分: +##### 初始请求 +在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时,会向上游发送一个初始请求,请求大小为32个元素。 + +##### 补充优化(Replenishing Optimization) +prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75%的预取元素已经被处理(32 *0.75 = 24),其自动会向上游发送一个新请求,要求补充75%的prefetch量。该过程是动态的,会在整个数据流处理过程中持续进行。 + +> ##### 预加载数据 +> 补充优化的优化点在于,当预取数据还剩余25%(8个)未被处理时,提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。 + +> ##### 平滑处理 +> 通过prefetch逐步请求新数据,且每次请求固定的量,可以保证处理数据速率的稳定。如果source端同时来源大量数据,那么若不进行平滑处理,则大量数据的同时处理可能导致竞争,令性能下降。 + +有如下operators可以对请求的prefetch进行调整 + +##### limitRate