From e5b3cfead43d8678dd92f2850c6e2beaed57b3a8 Mon Sep 17 00:00:00 2001 From: asahi Date: Mon, 7 Apr 2025 12:50:26 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBspring=20reactor?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 152 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index e528af9..1e5d89e 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -51,6 +51,13 @@ - [static fallback value](#static-fallback-value) - [catch and swallow error](#catch-and-swallow-error) - [fallback method](#fallback-method) + - [Dynamic Fallback Value](#dynamic-fallback-value) + - [Catch and rethrow](#catch-and-rethrow) + - [log and react on the side](#log-and-react-on-the-side) + - [doOnError](#doonerror) + - [Using resources and the finally block](#using-resources-and-the-finally-block) + - [doFinally](#dofinally) + - [using](#using) # Reactor ## Reactive Programming @@ -758,5 +765,150 @@ Flux.just("timeout1", "unknown", "key2") ``` > 在上述示例中,`Flux.error`会重新抛出上述异常 +#### Dynamic Fallback Value +当出现error时,如果并不想调用下游的`onError`,而是想给下游一个`根据异常信息计算出的默认值`,同样可以使用`onErrorResume`方法,示例如下: +```java +try { + Value v = erroringMethod(); + return MyWrapper.fromValue(v); +} +catch (Throwable error) { + return MyWrapper.fromError(error); +} +``` +上述代码等价于 +```java +erroringFlux.onErrorResume(error -> Mono.just( + MyWrapper.fromError(error) +)); +``` + +#### Catch and rethrow +对于捕获异常然后重新抛出的逻辑 +```java +try { + return callExternalService(k); +} +catch (Throwable error) { + throw new BusinessException("oops, SLA exceeded", error); +} +``` +可以通过`onErrorResume`实现: +```java +Flux.just("timeout1") + .flatMap(k -> callExternalService(k)) + .onErrorResume(original -> Flux.error( + new BusinessException("oops, SLA exceeded", original)) + ); +``` +或者,可以调用`onErrorMap`,其是对`onErrorResume`方法的封装: +```java +Flux.just("timeout1") + .flatMap(k -> callExternalService(k)) + .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original)); +``` + +#### log and react on the side +如果想要实现类似`将异常捕获、打印然后重新抛出`的逻辑,可以使用`doOnError` +```java +try { + return callExternalService(k); +} +catch (RuntimeException error) { + //make a record of the error + log("uh oh, falling back, service failed for key " + k); + throw error; +} +``` +上述代码等价于 +```java +LongAdder failureStat = new LongAdder(); +Flux flux = +Flux.just("unknown") + .flatMap(k -> callExternalService(k) + .doOnError(e -> { + failureStat.increment(); + log("uh oh, falling back, service failed for key " + k); + }) + + ); +``` +##### doOnError +`doOnError`和其他以`doOn`为前缀的operators,其通常会被称为`副作用(side effect)`。通过这些operators,可以在不修改sequence的情况下监控chain中的事件。 + + +#### Using resources and the finally block +对于`在finally block中清理资源`的行为,可以通过`doFinally`来实现 +```java +Stats stats = new Stats(); +stats.startTimer(); +try { + doSomethingDangerous(); +} +finally { + stats.stopTimerAndRecordTiming(); +} +``` +##### doFinally +`doFinally`是无论sequence结束(onError/onComplete)还是取消(cancel),都希望执行的副作用(side effect)。其接收参数类型为`Consumer`,可以通过`signaltype`判断`触发side-effect的terminaction类型`。 + +`doFinally`使用示例如下: +```java +Stats stats = new Stats(); +LongAdder statsCancel = new LongAdder(); + +Flux flux = +Flux.just("foo", "bar") + .doOnSubscribe(s -> stats.startTimer()) + .doFinally(type -> { + stats.stopTimerAndRecordTiming(); + if (type == SignalType.CANCEL) + statsCancel.increment(); + }) + .take(1); +``` + +对于`try(resource)`的代码,则是可以通过`using`来实现 +```java +try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { + return disposableInstance.toString(); +} +``` + + +##### using +`using`代表`Flux由资源派生,并且在Flux处理完成之后都需要对资源执行操作`。 +```java +// resource +AtomicBoolean isDisposed = new AtomicBoolean(); +Disposable disposableInstance = new Disposable() { + @Override + public void dispose() { + isDisposed.set(true); + } + + @Override + public String toString() { + return "DISPOSABLE"; + } +}; +``` + +```java +// using +Flux flux = +Flux.using( + () -> disposableInstance, + disposable -> Flux.just(disposable.toString()), + Disposable::dispose +); +``` +其中,`resource`实现了`Disposable`接口,而`Flux.using`接收的参数类型分别为 +- `() -> disposableInstance`: resource supplier +- `disposable -> Flux.just(disposable.toString())`: 一个factory,根据resource产生publisher +- `Disposable::dispose`: resource cleanup + + +