From 95793600ba2a9e94ce11863ed320463131b165ca Mon Sep 17 00:00:00 2001 From: asahi Date: Mon, 31 Mar 2025 12:54:04 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBspring=20reactor?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 44 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index edf4e6e..e247943 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -37,6 +37,8 @@ - [异步单线程生成`push`](#异步单线程生成push) - [hybird push-pull model](#hybird-push-pull-model) - [`sink.onRequest`](#sinkonrequest) + - [`onCancel` \& `onDispose`](#oncancel--ondispose) + - [Handle](#handle) # Reactor @@ -523,5 +525,47 @@ Flux bridge = Flux.create(sink -> { }); ``` +#### `onCancel` & `onDispose` +`sink`支持`onCancel`和`onDispose`回调,其区别如下: +- `onDispose`回调用于执行cleanup操作,其在`complete`, `error`, `cancel`之后都会被调用。 +- `onCancel`回调用于执行取消操作,其只针对`cancel`执行,并且执行顺序位于cleanup之前 +其使用示例如下所示: +```java +Flux bridge = Flux.create(sink -> { + sink.onRequest(n -> channel.poll(n)) + .onCancel(() -> channel.cancel()) + .onDispose(() -> channel.close()) + }); +``` +#### Handle +`handle`为一个`instance method`(非静态方法),其关联了一个已经存在的source。`Mono`和`Flux`中都存在`handle`方法。 + +和`generate`类似,`handle`使用`SynchronousSink`,并且只允许逐个发出。`handle`的作用类似`map`和`filter`的组合,可以跳过部分元素,其签名如下 +```java +Flux handle(BiConsumer>); +``` + +其使用示例如下: +```java +public String alphabet(int letterNumber) { + if (letterNumber < 1 || letterNumber > 26) { + return null; + } + int letterIndexAscii = 'A' + letterNumber - 1; + return "" + (char) letterIndexAscii; +} + +// 由于reactive steam中不允许有null, +// 故而当通过`alphabet`方法映射可能返回null值时, +// 可以使用handle对返回值进行过滤,只有不为空时才调用`sink.next` +Flux alphabet = Flux.just(-1, 30, 13, 9, 20) + .handle((i, sink) -> { + String letter = alphabet(i); + if (letter != null) + sink.next(letter); + }); + +alphabet.subscribe(System.out::println); +```