diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 28d131d..e528af9 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -47,6 +47,10 @@ - [publishOn](#publishon) - [subscribeOn](#subscribeon) - [`subscribeOn`原理](#subscribeon原理) + - [Handling Errors](#handling-errors) + - [static fallback value](#static-fallback-value) + - [catch and swallow error](#catch-and-swallow-error) + - [fallback method](#fallback-method) # Reactor ## Reactive Programming @@ -642,8 +646,117 @@ final Flux flux = Flux new Thread(() -> flux.subscribe(System.out::println)); ``` - - +### Handling Errors +在reactive stream中,errors是终止事件,一旦error被触发,sequence的生成将会被停止,并且将会沿着reactor chain的下游一直传递道`subscriber#onError`。 + +error应该在应用程序的级别被处理,处理方案如下: +- 在ui上展示错误信息 +- 向rest endpoint发送error payload + +故而,`subscriber#onError`方法应该被定义。 + +reactor同样支持在chain的中间通过`error-handling`来处理error,示例如下所示: +```java +Flux.just(1, 2, 0) + .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0 + .onErrorReturn("Divided by zero :("); // error handling example +``` + +> 在reactor中,任何error都代表终止事件,`即使使用了error-handling operator,original sequence在出现error后也不会继续。 +> +> 故而,error-handling将`onError`信号转化为了一个新sequence的开始(fallback one),即通过新sequence替换了上游旧的`terminated sequence`。 + +> 在`FluxOnErrorReturn.ReturnSubscriber#onError`的实现中,operator将`Throwable`转换为了`onNext(fallbackValue)`和`onComplete`两个调用,即在向下游发送`onNext(fallbackValue)`信号之后,也会终止序列。 + +通常来说,常见的异常才处理方式如下: + +#### static fallback value +该场景类似于 +```java +try { + return doSomethingDangerous(10); +} +catch (Throwable error) { + return "RECOVERED"; +} +``` +可以调用`onErrorReturn`方法,示例如下: +```java +Flux.just(10) + .map(this::doSomethingDangerous) + .onErrorReturn("RECOVERED"); +``` +`onErrorReturn`还存在一个接受`Predicate`的重载方法,可以决定是否对异常执行recover操作,示例如下: +```java +Flux.just(10) + .map(this::doSomethingDangerous) + .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); +``` + +#### catch and swallow error +如果不想将异常替换为fallback value,而是忽视产生的error,仅向下游传递正常生成的元素,可以使用`onErrorComplete`方法(例如, `1,2,3...`序列在向下进行传递时,如果`2`的下游operator执行发生异常,那么`onErrorComplete`会忽略`2`产生的error,仅向下游传递`1`)。 + +`onErrorComplete`的使用示例如下: +```java +Flux.just(10,20,30) + .map(this::doSomethingDangerousOn30) + .onErrorComplete(); +``` +> onErrorComplete会将onError信号转化为onComplete信号。 + +类似于`onErrorReturn`,`onErrorComplete`同样存在一个重载方法接受`Predicate`,可以通过该断言判断是否对error执行`recover`操作。 +- 若执行recover操作,则会调用`subscriber#onComplete` +- 若不执行recover操作,则会调用`subscriber#onError` + +#### fallback method +如果在处理数据时,存在多种方法,例如`m1,m2`,并且在尝试`m1`失败时想要再尝试`m2`,可以使用`onErrorResume`方法。 + +其等价于如下逻辑: +```java +String v1; +try { + v1 = callExternalService("key1"); +} +catch (Throwable error) { + v1 = getFromCache("key1"); +} + +String v2; +try { + v2 = callExternalService("key2"); +} +catch (Throwable error) { + v2 = getFromCache("key2"); +} +``` +和上述逻辑等价的是如下代码: +```java +Flux.just("key1", "key2") + .flatMap(k -> callExternalService(k) + .onErrorResume(e -> getFromCache(k)) + ); +``` +其中,`onErrorResume`返回的都是一个`Publisher` + +`onErrorResume`的底层逻辑实现如下: +- 其会根据`onErrorResume`中传入的`Function>`参数和`error`来构建一个新的publisher +- 然后,对新的publisher执行`subscribe`操作,并向下游返回新publisher的sequence + +`onErrorResume`同样拥有一个重载函数接受`Predicate`,决定是否执行recover逻辑。除此之外,其接受`Function>`类型参数,可以根据抛出的异常来决定fallback sequence,示例如下: +```java +Flux.just("timeout1", "unknown", "key2") + .flatMap(k -> callExternalService(k) + .onErrorResume(error -> { + if (error instanceof TimeoutException) + return getFromCache(k); + else if (error instanceof UnknownKeyException) + return registerNewEntry(k, "DEFAULT"); + else + return Flux.error(error); + }) + ); +``` +> 在上述示例中,`Flux.error`会重新抛出上述异常