Files
rikako-note/spring/webflux/Reactor.md
2025-04-18 12:53:55 +08:00

60 KiB
Raw Blame History

Reactor

Reactive Programming

响应式编程是一种异步编程范式,关注于数据流状态变化的传播。java的响应式编程接口被包含在java9的Flow中。

响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用next方法reactive stream是基于发布/订阅模型的。

迭代器模式是pull-based而reactive stream为push-based

命令式迁移到响应式

可组合性与可读性

“可组合性"代表编排多个异步任务的能力通过“组合”可以将前一个异步任务的输出作为后一个异步任务的输入。或者可以按照fork-join的形式对异步任务进行编排。

reactor同样能解决“可读性”的问题在使用传统的callback model编写程序时随着逻辑的复杂异步进行的层数也会增加这将会极大降低代码的可读性和可维护性。

在使用call model时通常需要在回调中执行另一个回调回调的嵌套通通常会被称为callback heil

reactor提供了复杂的“组合”选项能够反映抽象异步进程的组织并且所有的内容通常都会位于同一层级。

Assembly Line

响应式应用中的数据处理类似于流水线其中reactor既是传送带又是工作站。数据来源于original publisher,最终传传递给subscriber

数据在从publisher传送到subscriber的过程中可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间受影响的workstation会向上游发送消息来控制数据的生成速率。

Operators

在reactor中Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到publisher并且前一个publisher包装到一个新的publisher实例中。

故而operator将整个chain都链接起来数据来源于第一个publisher并随着chain移动依次被每个链接处理最终由subscriber结束该过程。

Nothing Happens Until You subscribe()

当通过reactor编写publisher chain时数据并不会被泵入到chain中编写chain只是创建了异步处理的抽象描述。

通过订阅行为将publisher和subscriber绑定到了一起订阅行为会触发chain中的数据流。该行为通过内部的signal实现subscriber将会发送一个reuqest signal该信号会被传递到chain上游一直被传递到source publisher。

backpressure

传递到上游的信号该机制也被用于实现backpressure在assembly line模型中也被描述为workstation传递给上游的反馈信号当workstation处理消息比上游workstation满时会发送该反馈。

reactive stream定义的机制接近于上述描述其提供两种模式

  • unbounded modesource publisher可以按其最高速率不受限制的推送数据
  • request mode通过request机制向source publisher发送信号告知其准备好处理最多n个元素。

中间的operator也可以在传输过程中对请求做出修改例如buffer operator可以将elements分割为以10个为单位的batch如果subscriber请求一个buffer那么上游source publisher可以产生10个元素。

通过backpressure可以将push模型转化为push-pull模型:

  • 当上游的n个元素已经准备好时下游可以从上游拉取n个元素
  • 当上有没有准备好n个元素时后续如果n个元素被准备好其将会被上游推送

hot & cold

对于响应式序列,其可以分为两种:

  • cold sequence对于cold sequence会为每个订阅者重新开始流程包括source publisher。例如source中若封装了http调用会为每个subscriber都执行一个新的http请求
  • hot sequencesubscriber只有在其订阅后才收到信号即使没有subscriber在监听hot sequence仍然能够发送signal

Subscriber和Publisher

Publisher

对于publisher其提供了subscribe方法供subscriber进行注册在执行subscribe方法并向其传入Subscriber对象后上游publisher会调用下游的onSubscribe方法,并向onSubscribe方法传入Subscription对象。下游可以通过Subscription对象调用request(n)请求。

若中间存在operator例如map在担任publisher角色的同时还对上游进行了订阅那么对上游的实际订阅操作只会在operator被下游subscriber订阅时触发。

任何变更状态的操作都只在实际subscriber执行订阅操作后被触发

Subscriber

当下游调用request(n)方法之后,会向上游请求n个数据。上游会向下游发送onNext信号来传输生成的数据。

Reactor Core

reactor引入了两个实现Publisher的类:MonoFlux

  • Flux代表包含0...N个items的reactive sequence
  • Mono代表包含0...1个items的reactive sequence

上述两个类代表了在异步处理场景中的大致基数。

  • Mono例如对于http请求的场景一个请求只会产生一个响应故而对响应执行count操作并没有任何意义。此时,可以通过Mono<HttpResponse>来代表http调用的结果Mono中只提供了上下文中包含0...1个元素的对应操作
  • 当执行某些可能会改变异步处理中最大基数的操作时,可能会导致类型的改变,例如执行Flux中的count操作将会返回Mono<Long>的类型

Flux 0...n

alt text

Flux<T>是一个标准的Publisher<T>,代表基数为0...n的异步序列,其可以被completion signal或异常所终止。根据reactive stream标准存在三种signal且信号会转化为对下游onNextonCompleteonError的调用。

Flux是一个通用的reactive类型并且所有的event type都是可选的。

  • 当没有onNext事件但是存在onComplete事件,代表一个空的有限序列
  • onNextonComplete事件都不存在时,代表一个空的无限序列
  • 无限序列并不一定为空,例如Flux.interval(Duration)会产生一个Flux<Long>其是无限的并且发送tick

Mono 0...1

Mono<T>是一个标准的Publisher<T>,其通过onNext信号发送至多一个item然后再结束时发送onComplete信号结束(成功场景);或直接发送onError信号结束(失败场景)。

大多数Mono实现在调用完subscriber的onNext方法之后预计会立马调用subscriver的onComplete方法。但是,Mono.never是一个例外,其并不会发送任何信号,并且其onNextonError的组合是被明确禁止的。

创建Mono/Flux并进行订阅的方式

String sequence

如果想要创建String序列可以通过如下方式

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Flux.empty

Mono<String> noData = Mono.empty();

Flux.range

在下面示例中,Flux.range第一个参数是range的起始值第二个参数是要产生的元素个数

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

故而,其产生的内容为5,6,7

Lambda Subscribe

在进行订阅时,FluxMono使用了lambda在调用subscribe时有如下几种重载的选择

// 订阅并触发sequence的产生
subscribe(); 

// 对每个产生的值通过consumer执行处理操作
subscribe(Consumer<? super T> consumer); 

// 在reactive stream异常终止时对error进行处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

// 在sequence处理完时执行额外的complete操作
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);

// 该方法会针对`subscribe`方法产生的`Subscription`对象执行操作
// 该重载已废弃
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

对于subscribe的使用示例如下

Flux.range(5,3)
                .map(x->{
                    if(x<7) {
                        return x;
                    }
                    throw new RuntimeException(String.format("fucking value {%s} equals or greater than 7", x));
                })
                .subscribe(v->System.out.printf("[%s]-%d\n", Thread.currentThread().getName(), v),
                        (e) -> {
                            System.out.printf("[%s]-Error Caught: %s\n", Thread.currentThread().getName(), e.getMessage());
                        },
                        ()->{
                            System.out.printf("[%s]-Complete: %s\n", Thread.currentThread().getName(), "fucking ended");
                        });

其执行结果如下:

[main]-5
[main]-6
[main]-Error Caught: fucking value {7} equals or greater than 7

Disposable

上述subscribe方法的返回类型为Disposable该接口代表subscriber对publisher的订阅是可取消的如需取消订阅调用dispose方法即可。

对于Mono和Flux而言source publisher应该在接收到cancellation信号之后停止产生元素并不能保证取消信号是即时的。(若source产生数据的速度过快可能在接收到cancel信号之前source就已经complete)。

Disposables类中存在一些对Disposable的工具方法,例如swapcomposite

BaseSubscriber

subscribe方法除了接收lambda外还存在更通用的重载方法接收Subscriber类型的参数。

在这种场景下,传参可以继承BaseSubscriber类。

并且,BaseSubscriber该类是一次性的,其只能够订阅一个publisher如果其订阅了第二个publisher那么其对第一个publisher的订阅将会被取消

BaseSubscriber只能订阅一个publisher的原因是reactive stream规范要求onNext方法不能被并行调用。

示例如下:

Flux.range(3, 5)
                .subscribe(new BaseSubscriber<Integer>() {
                    private Subscription subscription;

                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        subscription.request(5);
                    }

                    @Override
                    protected void hookOnNext(Integer value) {
                        log.info("onNext called: {}", value);
//                        this.subscription.request(1);
                    }

                    @Override
                    protected void hookOnComplete() {
                        log.info("onComplete called");
                        super.hookOnComplete();
                    }

                    @Override
                    protected void hookOnError(Throwable throwable) {
                        log.info("onError called: {}", throwable.getMessage());
                        super.hookOnError(throwable);
                    }
                });

上述示例中,通过向subscribe方法中传递自定义的BaseSubscriber来实现对上游的订阅,执行结果如下:

2025-03-24T19:21:09.818+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 3
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 4
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 5
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 6
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onNext called: 7
2025-03-24T19:21:09.819+08:00  INFO 27440 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : onComplete called

BaseSubscriberhookOnSubscribe默认实现如下:

protected void hookOnSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

其请求的数量为Long.MAX_VALUE代表其publisher为effectively unbounded

可以通过重写hookOnSubscribe方法来自己指定request数量如果需要自己指定请求数量最少需要重写hookOnSubscribehookOnNext方法。

BaseSubscriber提供了requestUnbounded方法(其方法和request(Long.MAX_VALUE)等价)和cancel方法。

除了上述列出的hook外BaseSubscriber还支持如下hooks

  • hookOnComplete
  • hookOnError
  • hookOnCancel
  • hookFinally(当sequence终止时都会被调用可以用参数来判断终止类型为complete或error)
    • hookFinally的调用顺序位于hookOnComplete和hookOnError之后

backpressure

在reactor的backpressure实现中consumer pressure传播到上游source的机制是向上游operator发送request请求。当前已发送的请求个数之和被称为demand,并且demand的上限为Long.MAX_VALUE当demand的值为Long.MAX_VALUE或更大时,代表unbound request

unbound request代表尽可能快的产生数据即backpressure关闭。

在reactive chain中第一个请求来自于final subscriber,其在订阅时(onSubscribe)会发送第一个request请求。目前,大多直接订阅的方法都会通过Long.MAX_VALUE创建一个unbounded request示例如下

  • subcribe()方法和大多数基于lambda的重载方法除了包含Consumer<Subscription>参数的重载)
  • block, blockFirst blockLast
  • 通过toIterabletoStream进行遍历

目前,定义初始请求最简单的方法为通过BaseSubscription对上游进行订阅并且重写onSubscribe方法

buffer

在reactor中可以通过部分operator进行request的reshape。示例如下

Flux.range(1, 1000)
                .buffer(3)
                .subscribe(new BaseSubscriber<List<Integer>>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        subscription.request(2);
                    }

                    @Override
                    protected void hookOnNext(List<Integer> value) {
                        for (Integer v : value) {
                            log.info("item received: {}", v);
                        }
                    }
                });

在上述示例中,request(2)代表请求2个buffer,而每个buffer中包含3Integer,故而总共会接收到2 * 3 = 6个元素。

输出如下:

2025-03-24T20:02:52.438+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 1
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 2
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 3
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 4
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 5
2025-03-24T20:02:52.439+08:00  INFO 9472 --- [           main] cc.rikako.springdemo.runner.CmdRunner    : item received: 6

prefetch

prefetch机制是一种backpressure的优化策略用于确保下游处理数据时上游的数据能够即时补充对吞吐量和资源利用率进行平衡。

prefetch机制通常分为如下部分

初始请求

在未显式指定的前提下,大多数操作符(例如flatMap、concatMap)在处理内部数据流时会向上游发送一个初始请求请求大小为32个元素。

补充优化Replenishing Optimization

prefetch的补充优化通常采用75%的启发规则一旦操作符发现75%的预取元素已经被处理32 *0.75 = 24其自动会向上游发送一个新请求要求补充75%的prefetch量。该过程是动态的会在整个数据流处理过程中持续进行。

例如prefetch的大小为10其limit对应的值为ceil(10 * 0.75) = 8每当其下游被处理的元素达到8个其会重新请求8个数据并且将被下游处理元素的个数重置重新从0开始计数直到该值再达到8再次发送请求

预加载数据

补充优化的优化点在于当预取数据还剩余25%8个未被处理时提前在请求75%的数据,可以避免在下游处理完剩余数据后,需要等待上游推送新的数据(消费速率大于生产速率造成消费者饥饿)。

平滑处理

通过prefetch逐步请求新数据且每次请求固定的量可以保证处理数据速率的稳定。如果source端同时来源大量数据那么若不进行平滑处理则大量数据的同时处理可能导致竞争令性能下降。

有如下operators可以对请求的prefetch进行调整

limitRate

除了prefetch之外,还可以通过limitRatelimitRequest来直接针对请求进行调节。

limitRate(N)将来自下游的请求进行拆分当来自下游的请求被传播到上游时其会被拆分为small batches。例如如果下游调用request(100),此时limitRate(10)将会将其拆分为10个request(10)再传播给上游。并且在此基础上limitRate还实现了prefetch中的补充优化。

除了limitRate(N)之外(当没有传递lowTielimit默认会取N - N>>2,即ceil(N * 0.75)limtRate还存在limitRate(highTie, lowTie)的重载方法。

lowTie

当lowTie取不同值时其补充策略如下

  • lowTie<=0:如果lowTie小于或等于0则limit取值和prefetch值相同仅当prefetch中所有元素都被下游处理完时limtRate operator才会向上游请求数据
  • lowTie>=prefetch 当lowTie大于或等于prefetch时limit取值为ceil(prefetch * 0.75)此时补充策略和prefetch默认相同当75%的数据被下游处理时limitRte会重新向上游请求75%的数据
  • 若lowTie位于(0, prefetch)区间之间
    • 若prefetch的值为Long.MAX_VALUE那么limit的值也为Long.MAX_VALUE
    • 若prefetch值不为Long.MAX_VALUE那么limit的值为lowTie,即lowTie的值即为消费后重新拉取的限制值

limitRequest

limitRequest(N)用于限制下游请求的总个数。例如,向limitRequest(10)发起两次request一次请求3一次请求8那么最后下游只会接收到10个元素。

一旦source发送的元素个数超过N时,limitRequest将会认为sequence已经完成会向下游发送onComplete信号

Create Sequence

同步生成generate

创建Flux的最简单形式是通过generate方法生成,其接收一个Consumer<SynchronousSink<T>>类型的参数。

Sink

sink是spring reactor中的一个核心抽象概念其可以被理解为数据流的出口或发射器负责将元素、error或complete信号推送到下游订阅者

sink可以分为如下种类

  • 同步/异步
  • 单次/多次发射

sink的核心api如下

  • next(T value): 向下游发送数据
  • complete:表示数据流正常结束
  • error(throwable):代表数据流因为错误而终止

generate方法用于产生同步且one by one的emission即sink为SynchronousSink且其单次invocation中sink.next只能够被调用一次。示例如下:

Flux.generate(sink -> {
    String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
    sink.next(data); // 仅调用一次
    if (isEndOfData()) sink.complete();
});

在调用sink.next后,可以调用sink.completesink.error,但是对completeerror的调用都是可选的。

示例如下:

Flux.generate(sink -> {
                    sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
                })
                .take(10)
                .subscribe(System.out::println);

通过generate的另一个重载方法generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)可以维护一个状态并进行管理,其中S为状态的对象类型。对于重载方法的两个参数,其含义如下:

  • stateSupplier该参数用于提供一个初始状态每当有新请求进入时都会调用supplier生成初始状态
  • generatorgenerator会返回一个新的state

示例如下:

Flux.generate(() -> 0, (s, sink) -> {
//                    System.out.printf("%s emitted\n", s);
                    sink.next(s);
                    return s + 1;
                })
                .take(5)
                .subscribe(System.out::println);

其上游生成的整数依次增加,输出如下:

0
1
2
3
4

异步多线程生成create

Flux.create针对每个round能够产生多个元素其暴露的sink类型为FluxSink。相对于Flux.generateFlux.create方法并没有接收state的重载版本。

Flux.create并不会将你的代码变为并行或是异步的.

如果在Flux.create中存在阻塞操作,那么将存在死锁的风险。即使使用了subscribeOn方法在create lambda中执行长阻塞操作仍然会阻塞item的处理因为item的source产生和下游处理都处于同一线程中但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游。

Flux.createlistener based api进行适配

街射使用基于listener的apiapi定义如下

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

可以使用Flux.create将其与Flux相桥接

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(
      new MyEventListener<String>() {

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);
          }
        }

        public void processComplete() {
            sink.complete();
        }
    });
});
Flux.create backpressure

可以通过指定OverflowStrategy来优化backpressure行为OverflowStrategy可选值如下

  • IGNORE: 该选项会完全无视下游的backpressure request当下游queue被填充满时会抛出IllegalStateException异常

    部分操作符(例如publishOn其subscriber会内置queue用于存储上游通过onNext推送的数据。

    当queue满时将会抛出IllegalStateException异常

  • ERROR:当下游无法跟上上游的数据发送速度时,上游将会发送IllegalStateException异常

    在上述描述中,跟上代表subscriber发送的request个数是否大于source产生的数据个数例如若下游只发送了request(1)但是source产生了两个数据调用了两次sink.next,那么第二次调用时requested已经为0会调用onOverflow方法发送error信号

  • DROP:当下游无法跟上上游的数据发送速度时(上游发送onNext信号之前如果下游requested配额不足),上游将会丢弃该数据,sink.next不做任何操作。

  • LATEST: 当上游想要推送数据到下游时,如果下游requested不足,那么上游会用最新的数据覆盖下游之前缓存的数据

    • 例如,下游堆积事件1,此时上游推送事件2,则2会覆盖1,之后上游再推送数据33会覆盖2...之后,下游调用request(1)后,获取到的是最新的数据3,数据1,2都被覆盖
    • 其实现是通过LatestAsyncSink中的queue来实现的queue是一个AtomicReference类型的field当调用sink.next向下游发送数据时如果下游不满足requested那么将会将值set到queue中从而实现后面的数据覆盖前面的数据
  • BUFFER(默认): BufferAsyncSink中会存在一个无界队列,如果当上游调用sink.next尝试向下游发送数据时如果下游requested条件不满足会将数据缓存在无界队列中。使用无界队列进行缓存可能会导致OOM

异步单线程生成push

push介于generatecreate之间,其用于对单个生产者产生的事件进行处理,pushgeneratecreate的相似点如下:

  • create类似,其可以是异步的,并且可以通过OverflowStrategy来管理backpressure
  • generate类似,对于push,其sink.next,sink.complete,sink.error在指定时刻只能被一个线程调用

其对异步事件的桥接示例如下:

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }

        public void processError(Throwable e) {
            sink.error(e); 
        }
    });
});
hybird push-pull model

大多数reactor operators(例如create)都使用了混合推拉的模型。在混合推拉模型中大多数事件的处理都是异步的建议采用push的方法

在混合推拉模型中consumer从source处拉取数据在consumer发送第一次request之前source并不会emit任何数据source可能会生产数据但是在requested余量不足时并不会调用下游的onNext根据OverflowStrategy的不同会执行不同处理source只会在下游请求的范围内向下游推送数据

并且在最下游的subscriber调用上游subscribe方法时,source的数据生成逻辑才会被触发,调用subscribe方法后首先会调用subscriber的onSubscribe方法然后会一直沿着reactive stream向前追溯直到找到source然后调用Flux.create接受的Consumer<? super FluxSink<T>>

sink.onRequest

sink还提供了onRequest方法,其接受一个LongConsumer类似的参数可以为sink注册一个onRequest回调,后续针对sink(sink实现了Subscription)调用request方法时consumer都会被调用。

sink.onRequest的使用示例如下:

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          // 
          for(String s : messages) {
            sink.next(s); 
          }
        }
    });
    sink.onRequest(n -> {
      // 此处代码将会在`sink.request`被调用时触发
      // 主动向processor拉取数据
        List<String> messages = myMessageProcessor.getHistory(n); 
        for(String s : messages) {
           sink.next(s); 
        }
    });
});

Handle

handle为一个instance method非静态方法其关联了一个已经存在的source。MonoFlux中都存在handle方法。

generate类似,handle使用SynchronousSink,并且只允许逐个发出。handle的作用类似mapfilter的组合,可以跳过部分元素,其签名如下

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

其使用示例如下:

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}

// 由于reactive steam中不允许有null
// 故而当通过`alphabet`方法映射可能返回null值时
// 可以使用handle对返回值进行过滤只有不为空时才调用`sink.next`
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); 
        if (letter != null) 
            sink.next(letter); 
    });

alphabet.subscribe(System.out::println);

onCancel & onDispose

sink支持onCancelonDispose回调,其区别如下:

  • onDispose回调用于执行cleanup操作其在complete, error, cancel之后都会被调用。
  • onCancel回调用于执行取消操作,其只针对cancel执行并且执行顺序位于cleanup之前

其使用示例如下所示:

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel())
        .onDispose(() -> channel.close())
    });

Threading and Schedulers

reactor模型和rxjava模型类型是并发无关的并不强制要求并发模型。并且大多数operator其运行的线程都和前一个operator相同。

除非显式指定否则最顶层的operatorsource也会运行在subscribe方法被调用的线程中

Scheduler

在reactor中操作执行在哪个线程中取决于使用的Scheduler。Scheduler和ExectuorService类似负责对任务进行调度但是相比于ExecutorService其功能更加丰富。

Scheudlers类中存在如下静态方法,分别访问不同的execution context

  • Schedulers.immediate():没有执行上下文,被提交的任务会在当前线程中被立马执行
  • Schedulers.single():线程上下文为一个单个、可重用的线程上下文。该方法会对所有的调用都使用相同的线程直到scheduler被disposed
  • Schedulers.newSingle():每次调用时都使用一个专属线程
  • Schedulers.elastic():该上下文是一个无界、弹性的线程池。在引入Schedulers.boundedElastic()方法后,该方法不再推荐被使用。
  • Schedulers.boundedElastic():该上下文是一个有界、弹性的线程池。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现:
    • ExecutorService-based:会在多个任务之间重用平台线程(即使用相同工作线程执行多个任务)
    • Virtual-thread-per-task-basedjdk21+支持该特性对每个任务都会开启一个新的虚拟线程并且实现并没有维护idle pools

createWorker

在Scheduler中idleQueue中的线程会在空闲一段时间后自动销毁,但是通过createWorker手动创建的worker必须手动销毁在使用完成后调用release

operators using default scheduler

通常情况下部分operators在未显式指定的前提下会使用默认的scheduler通常可显式指定一个不同的scheduler

例如,Flux.interval(Duration.ofMillis(300))方法会生成间隔300ms的ticks。默认情况下,其会使用Schedulers.parallel可以通过如下代码指定一个新的scheduler:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

switch thread context

reactor提供了两种方法来切换执行的上下文

  • publishOn
  • subscribeOn

publishOn和subscribeOn用友如下区别

  • publishOn方法在reactive chain中的位置若发生变化会对各个节点的执行上下文造成影响
  • subscribeOn方法在reactive chain中位置若发生变化并不会对各个节点的执行上下文造成影响

publishOn

publishOn操作符和作用和其他操作符类型,都是从上游接收到信号并且将信号传递到下游,但是,在执行下游的回调时,通过Scheduler中的worker进行调度。

故而,publishOn会对reactor chain中后续operators造成如下影响

  • 修改执行上下文执行的线程由scheduler决定
  • 按照规范,onNext的调用是有序的故而publishOn后续的操作都会使用同一个worker进行调度对于所有数据都在同一线程中执行worker在subscribe时决定publishOn使用的worker

subscribeOn

subscribeOn影响订阅的过程通常推荐将其放在source之后。

subscribeOn原理

FluxSubscribeOn#subscribeOrReturn中,会通过scheduler#createWorker创建worker并通过worker来对执行source.subscribe的方法进行调度。

故而在consumer执行subscribe时随着reactor chain从尾到头下游subscriber都会调用上游publisher的subscribe方法。在FluxSubscribeOn调用上游的subscribe时则是会通过worker在进行调度实际调用在worker中执行。

subscribeOn方法使用方式如下

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  
    .subscribeOn(s)  
    .map(i -> "value " + i);  

new Thread(() -> flux.subscribe(System.out::println)); 

Handling Errors

在reactive stream中errors是终止事件一旦error被触发sequence的生成将会被停止并且将会沿着reactor chain的下游一直传递道subscriber#onError

error应该在应用程序的级别被处理处理方案如下

  • 在ui上展示错误信息
  • 向rest endpoint发送error payload

故而,subscriber#onError方法应该被定义。

reactor同样支持在chain的中间通过error-handling来处理error示例如下所示

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example

在reactor中任何error都代表终止事件`即使使用了error-handling operatororiginal sequence在出现error后也不会继续。

故而error-handling将onError信号转化为了一个新sequence的开始fallback one即通过新sequence替换了上游旧的terminated sequence

FluxOnErrorReturn.ReturnSubscriber#onError的实现中operator将Throwable转换为了onNext(fallbackValue)onComplete两个调用,即在向下游发送onNext(fallbackValue)信号之后,也会终止序列。

通常来说,常见的异常才处理方式如下:

static fallback value

该场景类似于

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

可以调用onErrorReturn方法,示例如下:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

onErrorReturn还存在一个接受Predicate的重载方法可以决定是否对异常执行recover操作示例如下

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");

catch and swallow error

如果不想将异常替换为fallback value而是忽视产生的error仅向下游传递正常生成的元素可以使用onErrorComplete方法(例如, 1,2,3...序列在向下进行传递时,如果2的下游operator执行发生异常那么onErrorComplete会忽略2产生的error仅向下游传递1)。

onErrorComplete的使用示例如下:

Flux.just(10,20,30)
    .map(this::doSomethingDangerousOn30)
    .onErrorComplete();

onErrorComplete会将onError信号转化为onComplete信号。

类似于onErrorReturnonErrorComplete同样存在一个重载方法接受Predicate可以通过该断言判断是否对error执行recover操作。

  • 若执行recover操作则会调用subscriber#onComplete
  • 若不执行recover操作则会调用subscriber#onError

fallback method

如果在处理数据时,存在多种方法,例如m1,m2,并且在尝试m1失败时想要再尝试m2,可以使用onErrorResume方法。

其等价于如下逻辑:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}

和上述逻辑等价的是如下代码:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(e -> getFromCache(k))
    );

其中,onErrorResume返回的都是一个Publisher

onErrorResume的底层逻辑实现如下:

  • 其会根据onErrorResume中传入的Function<? super Throwable, ? extends Publisher<? extends T>>参数和error来构建一个新的publisher
  • 然后对新的publisher执行subscribe操作并向下游返回新publisher的sequence

onErrorResume同样拥有一个重载函数接受Predicate决定是否执行recover逻辑。除此之外其接受Function<? super Throwable, ? extends Publisher<? extends T>>类型参数可以根据抛出的异常来决定fallback sequence示例如下

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> {
            if (error instanceof TimeoutException)
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)
                return registerNewEntry(k, "DEFAULT");
            else
                return Flux.error(error);
        })
    );

在上述示例中,Flux.error会重新抛出上述异常

Dynamic Fallback Value

当出现error时如果并不想调用下游的onError,而是想给下游一个根据异常信息计算出的默认值,同样可以使用onErrorResume方法,示例如下:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}

上述代码等价于

erroringFlux.onErrorResume(error -> Mono.just( 
        MyWrapper.fromError(error) 
));

Catch and rethrow

对于捕获异常然后重新抛出的逻辑

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

可以通过onErrorResume实现:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );

或者,可以调用onErrorMap,其是对onErrorResume方法的封装:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

log and react on the side

如果想要实现类似将异常捕获、打印然后重新抛出的逻辑,可以使用doOnError

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}

上述代码等价于

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k)
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k);
        })

    );
doOnError

doOnError和其他以doOn为前缀的operators其通常会被称为副作用side effect。通过这些operators可以在不修改sequence的情况下监控chain中的事件。

Using resources and the finally block

对于在finally block中清理资源的行为,可以通过doFinally来实现

Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
doFinally

doFinally是无论sequence结束onError/onComplete还是取消cancel都希望执行的副作用side effect。其接收参数类型为Consumer<SignalType>,可以通过signaltype判断触发side-effect的terminaction类型

doFinally使用示例如下:

Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> {
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL)
          statsCancel.increment();
    })
    .take(1);

对于try(resource)的代码,则是可以通过using来实现

try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}
using

using代表Flux由资源派生并且在Flux处理完成之后都需要对资源执行操作

// resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); 
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};
// using
Flux<String> flux =
Flux.using(
        () -> disposableInstance, 
        disposable -> Flux.just(disposable.toString()), 
        Disposable::dispose 
);

其中,resource实现了Disposable接口,而Flux.using接收的参数类型分别为

  • () -> disposableInstance: resource supplier
  • disposable -> Flux.just(disposable.toString()): 一个factory根据resource产生publisher
  • Disposable::dispose: resource cleanup

retry

retry允许对产生错误的sequence进行重试。

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() 
    .subscribe(System.out::println, System.err::println); 

Thread.sleep(2100);

在上述示例中,retry(1)会对错误执行一次重试(upstream重新开始生成)。其产生结果如下所示:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
retry原理

下游在上游调用onError会判断重试次数是否为0如果不为0会对重试次数进行减1并且重新对上游调用subscribe,这会令上游重新从头开始生成序列

直到重试次数为0之后FluxRetry才会将onError传递给下游。

retryWhen

retryWhen会使用companion flux来判断错误时是否应该回滚。companion flux由operator创建但是用户会通过装饰companion flux来自定义重试条件。

companion flux是一个Flux<RetrySignal>类型的值,该值将会被传递给Retry方法/策略(retryWhen只接受一个Retry类型的参数)。

使用者可以对retry方法进行指定,retry方法会返回一个Publisher<?>类型的值。

Retry是一个抽象类但是提供了Retry#from方法通过该方法可以将lambda转换为Retry对象。

Retry周期如下所示

  • 每当error发生时都会将RetrySignal发送到companion flux中此时companion flux已经被retry方法装饰过了
    • Flux<RetrySignal>将会包含至今所有的重试尝试信息并且可以通过其访问error和error关联的元数据
  • 如果companion flux发送了一个值,那么会触发重试
  • 如果companion fluxcomplete将会对error执行swallow操作retry cycle将会停止并且也会导致sequence complete
  • 如果companion flux产生了一个error,retry cycle将会终止并且导致sequence按异常终止

retry helper

project reactor提供了Retry helperRetrySpecRetryBackoffSpec,二者都允许进行如下所示的自定义行为:

  • filter通过filter设置允许触发retry的异常
  • modifyErrorFilter 对之前filter设置的异常进行修改
  • doBeforeRetrydoAfterRetry针对retrytrigger执行side effect
    • doBeforeRetry()方法触发在delay发生之前doAfterRetry()触发在delay之后

  • onRetryExhaustedThrow(BiFunction):在重试数量达到上限后,通过onRetryExhaustedThrow(BiFunction)来自定义异常。
    • 通常情况下,当重试数量达到上限后,自定义异常类型通过Exceptions.retryExhausted(…​)方法来构建,其返回的异常类型为RetryExhaustedException,可以通过Exceptions.isRetryExhausted(Throwable)方法来进行区分
retrying with transient errors

RetrySignal中存在如下两个方法:

  • totalRetriesInARow():每当error is recoveredretry attempt导致了onNext而不是onErrorindex都会被设置为0
  • totalRetries()totalRetries()方法返回的值是单调递增的,并不会被重置

当使用RetrySpecRetryBackoffSpec时,可以通过transientErrors(true)方法来令策略使用totalRetriesInARow()处理transient error。

示例如下所示:

AtomicInteger errorCount = new AtomicInteger();
Flux<Integer> transientFlux = httpRequest.get()
        .doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true))
             .blockLast();
assertThat(errorCount).hasValue(6);

transientErrors主要是为了处理周期性发生异常,异常能够重试成功,并且发生异常后一段时间平稳运行的场景

handling Exceptions in operators or functions

通常来说operator的执行有可能会抛出异常根据抛出异常类型的不同存在如何区别

  • unchecked Exception:对于抛出的unchecked exception,其都会通过onError向下游传递例如map operator中抛出的RuntimeException将会被转化为对下游onError的调用
    Flux.just("foo")
      .map(s -> { throw new IllegalArgumentException(s); })
      .subscribe(v -> System.out.println("GOT VALUE"),
                 e -> System.out.println("ERROR: " + e));
    
    上述代码示例将会触发下游subscriber的onError输出内容如下
    ERROR: java.lang.IllegalArgumentException: foo
    
  • fatal exceptions在reactor中定义了一些被视为致命的异常(例如OutOfMemoryError),具体的fatal异常包含范围可见Exceptions.throwIfFatal
    • 在抛出fatal errorreactor operator会抛出异常而不是将其传递到下游
checked exception

对于checked exception,当调用方法签名中指定了throws xxxException的方法时,需要通过try-catch block对其进行处理。对于checked exception,有如下选择:

  • 对异常进行捕获,并且对其执行recover操作sequence会继续执行
  • 对异常进行捕获,并且封装到unchecked exception中,并将unchecked exception进行rethrow(这将对sequence进行打断)
  • 如果需要返回一个Flux(例如,使用flatMap方法),可以使用Flux.error(checkedException)返回一个error-producing Fluxsequence会终止
Exceptions Utility

通过reactor提供的Exceptions Utility可以保证仅当异常为checked exception时才会封装

  • Exceptions.propagate在必要时会封装exception。
    • 其并不会对RuntimeException进行封装,并且该方法首先会调用throwIfFatal
    • 该方法会将checked exception封装到reactor.core.Exceptions.ReactiveException
  • Exceptions.unwrap
    • 该方法会获取original unwrapped exception

使用示例如下所示:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

Sinks

在多线程环境下安全的使用Sinks.OneSinks.Many

reactor-core提供的sinks支持多线程环境的使用并不会触犯规范或导致下游subscribers产生未知行为。

tryEmit & emit

当尝试通过sinks向下游发送signal时可以调用如下API

  • tryEmit*并行调用将会导致fail fast
  • emit*: 当调用emit*时,提供的EmissionFailureHandler,如果该接口的onEmitFailure方法返回为true将会在争用场景下执行重试操作例如busy loop如果onEmitFailure返回为false则sink会以error终止。

上述流程是对Processor.onNext的改进,Processor.onNext必须在外部进行同步否则将会导致下游subscribers未定义的行为。

例如,Flux.create允许多线程调用sink.onNext但是其使用的sink是reactor.core.publisher.FluxCreate.SerializedFluxSink其在next操作中通过队列和cas对下游的onNext操作进行了同步。

故而,在传统的Processor.onNext中,如果要在多线程环境下使用,必须在上游做好同步操作,否则会导致下游的未定义行为。

Processor

Processor是一种特殊的Publisher,其在作为publisher的同时也是Subscriber

在使用Processor是一个很常见的问题是直接对processor向外暴露的onNextonComplete onError方法进行调用。

实际上对于processor中onXXX方法的调用必须要符合reactive stream规范在外部对onXXX方法进行调用时,要做好同步操作。

sinks的使用示例如下所示

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();

通过sink多个线程可以并行的产生数据

//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);

//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));

//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);

在使用EmitFailureHandler.busyLooping时,其返回的示例包含状态,并不能被重用

asFluxasMono

Sinks.Many支持被转化为Flux,下游可以对Sinks.Many转化后的Flux进行订阅,实例如下所示:

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();

同样的,Sinks.OneSinks.Empty可以通过asMono()方法被转化为Mono

Sinks的类别包括

  • many().multicast()sink只会基于subscribers的backpressure请求将最新的数据传递到其subscribers
    • 最新的数据代表subscriber订阅之后的数据
  • many().unicast():和many().multicast()类似,也是在订阅之后将最新的数据推送给订阅者,但是和many().multicast()不同之处如下:
    • many().unicast()带有缓冲区,在第一个订阅者订阅many().unicast()之前,会将推送给many().unicast()的数据都缓冲到缓冲区中,待后续有订阅者订阅后,会将缓冲区数据发送给订阅者
    • many().multicast()默认不带有缓冲区
  • many().replay(): 对于新订阅的订阅者,会将一定数量的pushed history data进行replay操作,之后再推送新的数据
  • one()sink只会向subscriberr推送一个数据
  • empty()该sink会向subscriber推送termianl signalerror或complete

Sinks.many().unicast().onBackpressureBuffer(args?)

一个unicast Sinks.Many可以通过其内置buffer处理backpressure但是其最多只能有一个subscriber。

通常,可以通过Sinks.many().unicast().onBackpressureBuffer()来创建unicast sink。但是,Sinks.many().unicast()中包含更多的静态方法,可以对其进行更精细的调整。

onBackpressureBuffer

在默认情况下,onBackpressureBuffer()其是无界unbounded

  • 在subscriber尚未请求数据的情况下如果通过sink推送了数据那么这些数据都会被缓冲到缓冲区中缓冲区是无界的

故而,onBackPressureBuffer方法存在一个接收Queue类型参数的重载方法。可以向其传递一个有界队列,该队列将会被用作内部缓冲区。

在为onBackpressureBuffer指定了Queue的情况下,如果queue已满并且下游并没有向上游发送足够的reqeust时,sink将会拒绝该value的推送

Sinks.many().multicast().onBackpressureBuffer(args?)

对于该方法创建的multicast Sinks.Many可以向多个subscribers发送数据并且对每个subscriber都能独立地接收backpressure。

对于每个subscriber只会接收在其subscribe之后推送到sink的signal

基础的multicast sink可以通过Sinks.many().multicast().onBackpresuureBuffer()来进行创建。

autoCancel

在默认情况下如果所有subscribers都被取消cancelled即取消订阅其会对internal buffer进行清空并且停止接收新的subscriber。

如果想要修改autoCancel的行为,可以调用Sinks.many().multicast()中的静态工厂方法通过autoCancel参数来调整autoCancel的行为。

Sinks.many().multicast().directAllOrNothing()

该方法创建的拥有最简单的backpressure处理策略

  • 如果任何subscriber处于too slow状态demand为0那么对于所有的subscriber该onNext都会被丢弃。

但是slow subscribers并不会被终止一旦slow subscribers又发送了request所有subscribers都会重新从sinks.many接收数据。

Sinks.many()终止后(通常是通过调用emitError, emitComplete方法其仍然允许新的subscriber对其进行订阅但是只会对新订阅者replay termination signal。

Sinks.many().multicast().directBestEffort()

对于该类型的multicast Sinks.Many,若subscriber is too slow(该subscriber的demand为0),那么该onNext信号仅会针对该slow subscriber进行丢弃。

但是,slow subscribers并不会被终止,一旦slow subscribers开始请求数据,其会重新开始接收新推送的数据。

Sinks.ManyemitError, emitComplete终止后其仍然允许新的subscribers对其进行订阅但是对新订阅的订阅者只会向新订阅者发送termination signal。

Sinks.many().replay()

一个replay Sinks.Many可以将已发送元素进行缓存并且对后续的subscriber进行replay。

replay Sinks.Many可以通过如下方式进行创建:

  • 缓存指定数量的历史数据Sinks.many().replay().limit(int)
  • 缓存所有历史数据,没有上限限制Sinks.many().replay().all()
  • 基于time-based window进行缓存Sinks.many().replay().limit(Duration)
  • hisotry size limit和time window相结合Sinks.many().replay().limit(int, Duration)

除此之外,Sinks.many().replay()还包含其他的重载方法,例如可以通过latest()latestOrDefault()对单个元素进行缓存和replay

Sinks.unsafe().many()

Sinks.unsafe().many()返回的Sinks.Many factory并不会提供producer thread safety在使用Sinks.unsafe().many()时,需要确保对可能导致onNext, onComplete, onError方法的调用需要保证外部的同步以确保其满足reactive stream规范。

Sinks.one()

该方法会简单创建一个Sinks.One<T>实例,该实例可以看作Mono<T>并且其emit方法和many稍有不同:

  • emitValue(T value): 产生一个onNext(value)信号,并且,在大多数实现中会产生一个onComplete信号
  • emptyEmpty() 只会产生一个onComplete信号,其和empty Mono等效
  • emitError(Throwable t):产生一个onError(t)信号

Sinks.empty()

该方法会创建一个Sinks.Empty<T>实例,Sinks.EmptySinks.One类似,但是Sinks.Empty不提供emitValue方法。

Sinks.Empty无法触发onNext但是仍可以指定<T>泛型类型。