Files
rikako-note/spring/webflux/Reactor.md
2025-03-25 12:49:31 +08:00

17 KiB
Raw Blame History

Reactor

Reactive Programming

响应式编程是一种异步编程范式,关注于数据流状态变化的传播。java的响应式编程接口被包含在java9的Flow中。

响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用next方法reactive stream是基于发布/订阅模型的。

迭代器模式是pull-based而reactive stream为push-based

命令式迁移到响应式

可组合性与可读性

“可组合性"代表编排多个异步任务的能力通过“组合”可以将前一个异步任务的输出作为后一个异步任务的输入。或者可以按照fork-join的形式对异步任务进行编排。

reactor同样能解决“可读性”的问题在使用传统的callback model编写程序时随着逻辑的复杂异步进行的层数也会增加这将会极大降低代码的可读性和可维护性。

在使用call model时通常需要在回调中执行另一个回调回调的嵌套通通常会被称为callback heil

reactor提供了复杂的“组合”选项能够反映抽象异步进程的组织并且所有的内容通常都会位于同一层级。

Assembly Line

响应式应用中的数据处理类似于流水线其中reactor既是传送带又是工作站。数据来源于original publisher,最终传传递给subscriber

数据在从publisher传送到subscriber的过程中可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间受影响的workstation会向上游发送消息来控制数据的生成速率。

Operators

在reactor中Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到publisher并且前一个publisher包装到一个新的publisher实例中。

故而operator将整个chain都链接起来数据来源于第一个publisher并随着chain移动依次被每个链接处理最终由subscriber结束该过程。

Nothing Happens Until You subscribe()

当通过reactor编写publisher chain时数据并不会被泵入到chain中编写chain只是创建了异步处理的抽象描述。

通过订阅行为将publisher和subscriber绑定到了一起订阅行为会触发chain中的数据流。该行为通过内部的signal实现subscriber将会发送一个reuqest signal该信号会被传递到chain上游一直被传递到source publisher。

backpressure

传递到上游的信号该机制也被用于实现backpressure在assembly line模型中也被描述为workstation传递给上游的反馈信号当workstation处理消息比上游workstation满时会发送该反馈。

reactive stream定义的机制接近于上述描述其提供两种模式

  • unbounded modesource publisher可以按其最高速率不受限制的推送数据
  • request mode通过request机制向source publisher发送信号告知其准备好处理最多n个元素。

中间的operator也可以在传输过程中对请求做出修改例如buffer operator可以将elements分割为以10个为单位的batch如果subscriber请求一个buffer那么上游source publisher可以产生10个元素。

通过backpressure可以将push模型转化为push-pull模型:

  • 当上游的n个元素已经准备好时下游可以从上游拉取n个元素
  • 当上有没有准备好n个元素时后续如果n个元素被准备好其将会被上游推送

hot & cold

对于响应式序列,其可以分为两种:

  • cold sequence对于cold sequence会为每个订阅者重新开始流程包括source publisher。例如source中若封装了http调用会为每个subscriber都执行一个新的http请求
  • hot sequencesubscriber只有在其订阅后才收到信号即使没有subscriber在监听hot sequence仍然能够发送signal

Subscriber和Publisher

Publisher

对于publisher其提供了subscribe方法供subscriber进行注册在执行subscribe方法并向其传入Subscriber对象后上游publisher会调用下游的onSubscribe方法,并向onSubscribe方法传入Subscription对象。下游可以通过Subscription对象调用request(n)请求。

若中间存在operator例如map在担任publisher角色的同时还对上游进行了订阅那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。

任何变更状态的操作都只在实际subscriber执行订阅操作后被触发

Subscriber

当下游调用request(n)方法之后,会向上游请求n个数据。上游会向下游发送onNext信号来传输生成的数据。

Reactor Core

reactor引入了两个实现Publisher的类:MonoFlux

  • Flux代表包含0...N个items的reactive sequence
  • Mono代表包含0...1个items的reactive sequence

上述两个类代表了在异步处理场景中的大致基数。

  • Mono例如对于http请求的场景一个请求只会产生一个响应故而对响应执行count操作并没有任何意义。此时,可以通过Mono<HttpResponse>来代表http调用的结果Mono中只提供了上下文中包含0...1个元素的对应操作
  • 当执行某些可能会改变异步处理中最大基数的操作时,可能会导致类型的改变,例如执行Flux中的count操作将会返回Mono<Long>的类型

Flux 0...n

alt text

Flux<T>是一个标准的Publisher<T>,代表基数为0...n的异步序列,其可以被completion signal或异常所终止。根据reactive stream标准存在三种signal且信号会转化为对下游onNextonCompleteonError的调用。

Flux是一个通用的reactive类型并且所有的event type都是可选的。

  • 当没有onNext事件但是存在onComplete事件,代表一个空的有限序列
  • onNextonComplete事件都不存在时,代表一个空的无限序列
  • 无限序列并不一定为空,例如Flux.interval(Duration)会产生一个Flux<Long>其是无限的并且发送tick

Mono 0...1

Mono<T>是一个标准的Publisher<T>,其通过onNext信号发送至多一个item然后再结束时发送onComplete信号结束(成功场景);或直接发送onError信号结束(失败场景)。

大多数Mono实现在调用完subscriber的onNext方法之后预计会立马调用subscriver的onComplete方法。但是,Mono.never是一个例外,其并不会发送任何信号,并且其onNextonError的组合是被明确禁止的。

创建Mono/Flux并进行订阅的方式

String sequence

如果想要创建String序列可以通过如下方式

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Flux.empty

Mono<String> noData = Mono.empty();

Flux.range

在下面示例中,Flux.range第一个参数是range的起始值第二个参数是要产生的元素个数

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

故而,其产生的内容为5,6,7

Lambda Subscribe

在进行订阅时,FluxMono使用了lambda在调用subscribe时有如下几种重载的选择

// 订阅并触发sequence的产生
subscribe(); 

// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer); 

// 在reactive stream异常终止时对error进行处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

// 在sequence处理完时执行额外的complete操作
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);

// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

对于subscribe的使用示例如下

Flux.range(5,3)
                .map(x->{
                    if(x<7) {
                        return x;
                    }
                    throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
                })
                .subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
                        (e) -> {
                            System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
                        },
                        ()->{
                            System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
                        });

其执行结果如下:

[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7

Disposable

上述subscribe方法的返回类型为Disposable该接口代表subscriber对publisher的订阅是可取消的如需取消订阅调用dispose方法即可。

对于Mono和Flux而言source publisher应该在接收到cancellation信号之后停止产生元素并不能保证取消信号是即时的。(若source产生数据的速度过快可能在接收到cancel信号之前source就已经complete)。

Disposables类中存在一些对Disposable的工具方法,例如swapcomposite

BaseSubscriber

subscribe方法除了接收lambda外还存在更通用的重载方法接收Subscriber类型的参数。

在这种场景下,传参可以继承BaseSubscriber类。

并且,BaseSubscriber该类是一次性的,其只能够订阅一个publisher如果其订阅了第二个publisher那么其对第一个publisher的订阅将会被取消

BaseSubscriber只能订阅一个publisher的原因是reactive stream规范要求onNext方法不能被并行调用。

示例如下:

Flux.range(3, 5)
                .subscribe(new BaseSubscriber<Integer>() {
                    private Subscription subscription;

                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        subscription.request(5);
                    }

                    @Override
                    protected void hookOnNext(Integer value) {
                        log.info("onNext called: {}", value);
//                        this.subscription.request(1);
                    }

                    @Override
                    protected void hookOnComplete() {
                        log.info("onComplete called");
                        super.hookOnComplete();
                    }

                    @Override
                    protected void hookOnError(Throwable throwable) {
                        log.info("onError called: {}", throwable.getMessage());
                        super.hookOnError(throwable);
                    }
                });

上述示例中,通过向subscribe方法中传递自定义的BaseSubscriber来实现对上游的订阅,执行结果如下:

2025-03-24T19:21:09.818+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 3
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 4
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 5
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 6
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 7
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onComplete called

BaseSubscriberhookOnSubscribe默认实现如下:

protected void hookOnSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

其请求的数量为Long.MAX_VALUE代表其publisher为effectively unbounded

可以通过重写hookOnSubscribe方法来自己指定request数量如果需要自己指定请求数量最少需要重写hookOnSubscribehookOnNext方法。

BaseSubscriber提供了requestUnbounded方法(其方法和request(Long.MAX_VALUE)等价)和cancel方法。

除了上述列出的hook外BaseSubscriber还支持如下hooks

  • hookOnComplete
  • hookOnError
  • hookOnCancel
  • hookFinally(当sequence终止时都会被调用可以用参数来判断终止类型为complete或error)
    • hookFinally的调用顺序位于hookOnComplete和hookOnError之后

backpressure

在reactor的backpressure实现中consumer pressure传播到上游source的机制是向上游operator发送request请求。当前已发送的请求个数之和被称为demand,并且demand的上限为Long.MAX_VALUE当demand的值为Long.MAX_VALUE或更大时,代表unbound request

unbound request代表尽可能快的产生数据即backpressure关闭。

在reactive chain中第一个请求来自于final subscriber,其在订阅时(onSubscribe)会发送第一个request请求。目前,大多直接订阅的方法都会通过Long.MAX_VALUE创建一个unbounded request示例如下

  • subcribe()方法和大多数基于lambda的重载方法除了包含Consumer<Subscription>参数的重载)
  • block, blockFirst blockLast
  • 通过toIterabletoStream进行遍历

目前,定义初始请求最简单的方法为通过BaseSubscription对上游进行订阅并且重写onSubscribe方法

buffer

在reactor中可以通过部分operator进行request的reshape。示例如下

Flux.range(1, 1000)
                .buffer(3)
                .subscribe(new BaseSubscriber<List<Integer>>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        subscription.request(2);
                    }

                    @Override
                    protected void hookOnNext(List<Integer> value) {
                        for (Integer v : value) {
                            log.info("item received: {}", v);
                        }
                    }
                });

在上述示例中,request(2)代表请求2个buffer,而每个buffer中包含3Integer,故而总共会接收到2 * 3 = 6个元素。

输出如下:

2025-03-24T20:02:52.438+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 1
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 2
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 3
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 4
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 5
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 6

prefetch

prefetch机制是一种backpressure的优化策略用于确保下游处理数据时上游的数据能够即时补充对吞吐量和资源利用率进行平衡。

prefetch机制通常分为如下部分

初始请求

在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时会向上游发送一个初始请求请求大小为32个元素。

补充优化Replenishing Optimization

prefetch的补充优化通常采用75%的启发规则一旦操作符发现75%的预取元素已经被处理32 *0.75 = 24其自动会向上游发送一个新请求要求补充75%的prefetch量。该过程是动态的会在整个数据流处理过程中持续进行。

例如prefetch的大小为10其limit对应的值为ceil(10 * 0.75) = 8每当其下游被处理的元素达到8个其会重新请求8个数据并且将被下游处理元素的个数重置重新从0开始计数直到该值再达到8再次发送请求

预加载数据

补充优化的优化点在于当预取数据还剩余25%8个未被处理时提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。

平滑处理

通过prefetch逐步请求新数据且每次请求固定的量可以保证处理数据速率的稳定。如果source端同时来源大量数据那么若不进行平滑处理则大量数据的同时处理可能导致竞争令性能下降。

有如下operators可以对请求的prefetch进行调整

limitRate