Files
rikako-note/spring/webflux/Reactor.md
2025-03-21 00:33:06 +08:00

4.6 KiB
Raw Blame History

Reactor

Reactive Programming

响应式编程是一种异步编程范式,关注于数据流状态变化的传播。java的响应式编程接口被包含在java9的Flow中。

响应式编程范式和观察者设计模式类似,相比于迭代器模式,用户可以选择何时调用next方法reactive stream是基于发布/订阅模型的。

迭代器模式是pull-based而reactive stream为push-based

命令式迁移到响应式

可组合性与可读性

“可组合性"代表编排多个异步任务的能力通过“组合”可以将前一个异步任务的输出作为后一个异步任务的输入。或者可以按照fork-join的形式对异步任务进行编排。

reactor同样能解决“可读性”的问题在使用传统的callback model编写程序时随着逻辑的复杂异步进行的层数也会增加这将会极大降低代码的可读性和可维护性。

在使用call model时通常需要在回调中执行另一个回调回调的嵌套通通常会被称为callback heil

reactor提供了复杂的“组合”选项能够反映抽象异步进程的组织并且所有的内容通常都会位于同一层级。

Assembly Line

响应式应用中的数据处理类似于流水线其中reactor既是传送带又是工作站。数据来源于original publisher,最终传传递给subscriber

数据在从publisher传送到subscriber的过程中可以经过各种转换和其他中间步骤。如果在数据处理的过程中耗费了较多时间受影响的workstation会向上游发送消息来控制数据的生成速率。

Operators

在reactor中Operator即是Assembly Line中的Workstation。每个operator都会将新的行为添加到publisher并且前一个publisher包装到一个新的publisher实例中。

故而operator将整个chain都链接起来数据来源于第一个publisher并随着chain移动依次被每个链接处理最终由subscriber结束该过程。

Nothing Happens Until You subscribe()

当通过reactor编写publisher chain时数据并不会被泵入到chain中编写chain只是创建了异步处理的抽象描述。

通过订阅行为将publisher和subscriber绑定到了一起订阅行为会触发chain中的数据流。该行为通过内部的signal实现subscriber将会发送一个reuqest signal该信号会被传递到chain上游一直被传递到source publisher。

backpressure

传递到上游的信号该机制也被用于实现backpressure在assembly line模型中也被描述为workstation传递给上游的反馈信号当workstation处理消息比上游workstation满时会发送该反馈。

reactive stream定义的机制接近于上述描述其提供两种模式

  • unbounded modesource publisher可以按其最高速率不受限制的推送数据
  • request mode通过request机制向source publisher发送信号告知其准备好处理最多n个元素。

中间的operator也可以在传输过程中对请求做出修改例如buffer operator可以将elements分割为以10个为单位的batch如果subscriber请求一个buffer那么上游source publisher可以产生10个元素。

通过backpressure可以将push模型转化为push-pull模型:

  • 当上游的n个元素已经准备好时下游可以从上游拉取n个元素
  • 当上有没有准备好n个元素时后续如果n个元素被准备好其将会被上游推送

hot & cold

对于响应式序列,其可以分为两种:

  • cold sequence对于cold sequence会为每个订阅者重新开始流程包括source publisher。例如source中若封装了http调用会为每个subscriber都执行一个新的http请求
  • hot sequencesubscriber只有在其订阅后才收到信号即使没有subscriber在监听hot sequence仍然能够发送signal

Reactor Core

reactor引入了两个实现Publisher的类:MonoFlux

  • Flux代表包含0...N个items的reactive sequence
  • Mono代表包含0...1个items的reactive sequence

上述两个类代表了在异步处理场景中的大致基数。

  • Mono例如对于http请求的场景一个请求只会产生一个响应故而对响应执行count操作并没有任何意义。此时,可以通过Mono<HttpResponse>来代表http调用的结果Mono中只提供了上下文中包含0...1个元素的对应操作
  • 当执行某些可能会改变异步处理中最大基数的操作时,可能会导致类型的改变,例如执行Flux中的count操作将会返回Mono<Long>的类型