diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 02730b0..ad96088 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -37,10 +37,10 @@ - [异步单线程生成`push`](#异步单线程生成push) - [hybird push-pull model](#hybird-push-pull-model) - [`sink.onRequest`](#sinkonrequest) + - [Handle](#handle) - [`onCancel` \& `onDispose`](#oncancel--ondispose) - [Threading and Schedulers](#threading-and-schedulers) - [Scheduler](#scheduler) - - [Handle](#handle) >>>>>>> ff85f3d (doc: 阅读reactor文档) @@ -528,34 +528,6 @@ Flux bridge = Flux.create(sink -> { }); ``` -#### `onCancel` & `onDispose` -`sink`支持`onCancel`和`onDispose`回调,其区别如下: -- `onDispose`回调用于执行cleanup操作,其在`complete`, `error`, `cancel`之后都会被调用。 -- `onCancel`回调用于执行取消操作,其只针对`cancel`执行,并且执行顺序位于cleanup之前 -### Threading and Schedulers -reactor模型和rxjava模型类型,是并发无关的,并不强制要求并发模型。并且,大多数operator其运行的线程都和前一个operator相同。 - -`除非显式指定,否则最顶层的operator(source)也会运行在subscribe方法被调用的线程中`。 - -#### Scheduler -在reactor中,操作执行在哪个线程中取决于`使用的Scheduler`。Scheduler和ExectuorService类似,负责对任务进行调度,但是相比于ExecutorService其功能更加丰富。 - -`Scheudlers`类中存在如下静态方法,分别访问不同的`execution context`: -- `Schedulers.immediate()`:没有执行上下文,被提交的任务会在当前线程中被立马执行 -- `Schedulers.single()`:线程上下文为一个`单个、可重用`的线程上下文。该方法会对所有的调用都使用相同的线程,直到scheduler被`disposed` -- `Schedulers.newSingle()`:每次调用时都使用一个专属线程 -- `Schedulers.elastic()`:该上下文是一个`无界、弹性的线程池`。在引入`Schedulers.boundedElastic()`方法后,该方法不再推荐被使用。 -- `Schedulers.boundedElastic()`:该上下文是一个`有界、弹性的线程池`。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现: - - `ExecutorService-based`:会在多个tasks之间对平台线程进行重用, - -其使用示例如下所示: -```java -Flux bridge = Flux.create(sink -> { - sink.onRequest(n -> channel.poll(n)) - .onCancel(() -> channel.cancel()) - .onDispose(() -> channel.close()) - }); -``` #### Handle `handle`为一个`instance method`(非静态方法),其关联了一个已经存在的source。`Mono`和`Flux`中都存在`handle`方法。 @@ -587,3 +559,35 @@ Flux alphabet = Flux.just(-1, 30, 13, 9, 20) alphabet.subscribe(System.out::println); ``` + +#### `onCancel` & `onDispose` +`sink`支持`onCancel`和`onDispose`回调,其区别如下: +- `onDispose`回调用于执行cleanup操作,其在`complete`, `error`, `cancel`之后都会被调用。 +- `onCancel`回调用于执行取消操作,其只针对`cancel`执行,并且执行顺序位于cleanup之前 + +其使用示例如下所示: +```java +Flux bridge = Flux.create(sink -> { + sink.onRequest(n -> channel.poll(n)) + .onCancel(() -> channel.cancel()) + .onDispose(() -> channel.close()) + }); +``` + +### Threading and Schedulers +reactor模型和rxjava模型类型,是并发无关的,并不强制要求并发模型。并且,大多数operator其运行的线程都和前一个operator相同。 + +`除非显式指定,否则最顶层的operator(source)也会运行在subscribe方法被调用的线程中`。 + +#### Scheduler +在reactor中,操作执行在哪个线程中取决于`使用的Scheduler`。Scheduler和ExectuorService类似,负责对任务进行调度,但是相比于ExecutorService其功能更加丰富。 + +`Scheudlers`类中存在如下静态方法,分别访问不同的`execution context`: +- `Schedulers.immediate()`:没有执行上下文,被提交的任务会在当前线程中被立马执行 +- `Schedulers.single()`:线程上下文为一个`单个、可重用`的线程上下文。该方法会对所有的调用都使用相同的线程,直到scheduler被`disposed` +- `Schedulers.newSingle()`:每次调用时都使用一个专属线程 +- `Schedulers.elastic()`:该上下文是一个`无界、弹性的线程池`。在引入`Schedulers.boundedElastic()`方法后,该方法不再推荐被使用。 +- `Schedulers.boundedElastic()`:该上下文是一个`有界、弹性的线程池`。通常将阻塞的任务放到该线程池中,令其不会占用其他资源。根据设置,该方法能够提供两种不同的实现: + - `ExecutorService-based`:会在多个tasks之间对平台线程进行重用, + +