From 0822e5052f6d672dbb1b669ec57801661b9422c8 Mon Sep 17 00:00:00 2001 From: asahi Date: Wed, 16 Apr 2025 00:10:03 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBproject=20reactor?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 54 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 8818a99..75e826d 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -66,6 +66,10 @@ - [handling Exceptions in operators or functions](#handling-exceptions-in-operators-or-functions) - [checked exception](#checked-exception) - [Exceptions Utility](#exceptions-utility) + - [Sinks](#sinks) + - [在多线程环境下安全的使用`Sinks.One`和`Sinks.Many`](#在多线程环境下安全的使用sinksone和sinksmany) + - [`tryEmit` \& `emit`](#tryemit--emit) + - [Processor](#processor) >>>>>>> 8d41980 (doc: 阅读transient errors文档) # Reactor @@ -1051,5 +1055,55 @@ converted.subscribe( } ); ``` +### Sinks +#### 在多线程环境下安全的使用`Sinks.One`和`Sinks.Many` +reactor-core提供的sinks支持多线程环境的使用,并不会触犯规范或导致下游subscribers产生未知行为。 + +#### `tryEmit` & `emit` +当尝试通过sinks向下游发送signal时,可以调用如下API: +- `tryEmit*`:并行调用将会导致fail fast +- `emit*`: 当调用`emit*`时,提供的`EmissionFailureHandler`,如果该接口的`onEmitFailure`方法返回为true,将会在争用场景下执行重试操作(例如busy loop);如果onEmitFailure返回为false,则sink会以error终止。 + +上述流程是对`Processor.onNext`的改进,`Processor.onNext`必须在外部进行同步,否则将会导致下游subscribers未定义的行为。 + +> 例如,`Flux.create`允许多线程调用`sink.onNext`,但是其使用的sink是`reactor.core.publisher.FluxCreate.SerializedFluxSink`,其在next操作中通过队列和`cas`对下游的`onNext`操作进行了同步。 +> +> 故而,在传统的`Processor.onNext`中,如果要在多线程环境下使用,必须在上游做好同步操作,否则会导致下游的未定义行为。 + +##### Processor +Processor是一种特殊的`Publisher`,其在作为`publisher`的同时也是`Subscriber`。 + +在使用Processor是,一个很常见的问题是,直接对processor向外暴露的`onNext`,`onComplete`, `onError`方法进行调用。 + +实际上,对于processor中`onXXX`方法的调用必须要符合reactive stream规范,在外部对`onXXX`方法进行调用时,要做好同步操作。 + +sinks的使用示例如下所示: +```java +Sinks.Many replaySink = Sinks.many().replay().all(); +``` +通过sink,多个线程可以并行的产生数据 +```java +//thread1 +replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST); + +//thread2, later +replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST); + +//thread3, concurrently with thread 2 +//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful +replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2))); + +//thread3, concurrently with thread 2 +//would return FAIL_NON_SERIALIZED +EmitResult result = replaySink.tryEmitNext(4); +``` + +> 在使用`EmitFailureHandler.busyLooping`时,其返回的示例包含状态,并不能被重用 + + + + + +