From 78eb257c4f99a25d13a226038917d2354ff4fb32 Mon Sep 17 00:00:00 2001 From: asahi Date: Thu, 3 Apr 2025 12:54:05 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBreactor=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 46 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index ad96088..a62e034 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -41,8 +41,12 @@ - [`onCancel` \& `onDispose`](#oncancel--ondispose) - [Threading and Schedulers](#threading-and-schedulers) - [Scheduler](#scheduler) ->>>>>>> ff85f3d (doc: 阅读reactor文档) - + - [createWorker](#createworker) + - [operators using default scheduler](#operators-using-default-scheduler) + - [switch thread context](#switch-thread-context) + - [publishOn](#publishon) + - [subscribeOn](#subscribeon) + - [`subscribeOn`原理](#subscribeon原理) # Reactor ## Reactive Programming @@ -588,6 +592,42 @@ reactor模型和rxjava模型类型,是并发无关的,并不强制要求并 - `Schedulers.newSingle()`:每次调用时都使用一个专属线程 - `Schedulers.elastic()`:该上下文是一个`无界、弹性的线程池`。在引入`Schedulers.boundedElastic()`方法后,该方法不再推荐被使用。 - `Schedulers.boundedElastic()`:该上下文是一个`有界、弹性的线程池`。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现: - - `ExecutorService-based`:会在多个tasks之间对平台线程进行重用, + - `ExecutorService-based`:会在多个任务之间重用平台线程(即使用相同工作线程执行多个任务) + - `Virtual-thread-per-task-based`:`jdk21+`支持该特性,对每个任务,都会开启一个新的虚拟线程,并且实现并没有维护idle pools + +#### createWorker +在Scheduler中,`idleQueue`中的线程会在空闲一段时间后自动销毁,但是通过`createWorker`手动创建的worker必须手动销毁,在使用完成后调用`release`。 + +#### operators using default scheduler +通常情况下,部分operators在未显式指定的前提下,会使用默认的scheduler(通常可显式指定一个不同的scheduler)。 + +例如,`Flux.interval(Duration.ofMillis(300))`方法会生成`间隔300ms的ticks`。默认情况下,其会使用`Schedulers.parallel`,可以通过如下代码指定一个新的scheduler: +```java +Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test")) +``` + +#### switch thread context +reactor提供了两种方法来切换执行的上下文: +- `publishOn` +- `subscribeOn` + +> publishOn和subscribeOn用友如下区别: +> - publishOn方法在reactive chain中的位置若发生变化,会对各个节点的执行上下文造成影响 +> - subscribeOn方法在reactive chain中位置若发生变化,并不会对各个节点的执行上下文造成影响 + +#### publishOn +`publishOn`操作符和作用和其他操作符类型,都是从上游接收到信号并且将信号传递到下游,但是,在执行下游的回调时,通过`Scheduler`中的worker进行调度。 + +故而,`publishOn`会对reactor chain中后续operators造成如下影响: +- 修改执行上下文,执行的线程由scheduler决定 +- 按照规范,`onNext`的调用是有序的,故而publishOn后续的操作都会使用同一个worker进行调度,对于所有数据,都在同一线程中执行(worker在subscribe时决定publishOn使用的worker) + +#### subscribeOn +`subscribeOn`影响订阅的过程,通常推荐将其放在source之后。 + +##### `subscribeOn`原理 +在`FluxSubscribeOn#subscribeOrReturn`中,会通过`scheduler#createWorker`创建worker + +