doc: 阅读spring reactor文档

This commit is contained in:
asahi
2025-04-06 00:46:29 +08:00
parent b5f09da87d
commit d640bd00e8

View File

@@ -47,6 +47,10 @@
- [publishOn](#publishon)
- [subscribeOn](#subscribeon)
- [`subscribeOn`原理](#subscribeon原理)
- [Handling Errors](#handling-errors)
- [static fallback value](#static-fallback-value)
- [catch and swallow error](#catch-and-swallow-error)
- [fallback method](#fallback-method)
# Reactor
## Reactive Programming
@@ -642,8 +646,117 @@ final Flux<String> flux = Flux
new Thread(() -> flux.subscribe(System.out::println));
```
### Handling Errors
在reactive stream中errors是终止事件一旦error被触发sequence的生成将会被停止并且将会沿着reactor chain的下游一直传递道`subscriber#onError`
error应该在应用程序的级别被处理处理方案如下
- 在ui上展示错误信息
- 向rest endpoint发送error payload
故而,`subscriber#onError`方法应该被定义。
reactor同样支持在chain的中间通过`error-handling`来处理error示例如下所示
```java
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example
```
> 在reactor中任何error都代表终止事件`即使使用了error-handling operatororiginal sequence在出现error后也不会继续。
>
> 故而error-handling将`onError`信号转化为了一个新sequence的开始fallback one即通过新sequence替换了上游旧的`terminated sequence`。
> 在`FluxOnErrorReturn.ReturnSubscriber#onError`的实现中operator将`Throwable`转换为了`onNext(fallbackValue)`和`onComplete`两个调用,即在向下游发送`onNext(fallbackValue)`信号之后,也会终止序列。
通常来说,常见的异常才处理方式如下:
#### static fallback value
该场景类似于
```java
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
```
可以调用`onErrorReturn`方法,示例如下:
```java
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
```
`onErrorReturn`还存在一个接受`Predicate`的重载方法可以决定是否对异常执行recover操作示例如下
```java
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
```
#### catch and swallow error
如果不想将异常替换为fallback value而是忽视产生的error仅向下游传递正常生成的元素可以使用`onErrorComplete`方法(例如, `1,2,3...`序列在向下进行传递时,如果`2`的下游operator执行发生异常那么`onErrorComplete`会忽略`2`产生的error仅向下游传递`1`)。
`onErrorComplete`的使用示例如下:
```java
Flux.just(10,20,30)
.map(this::doSomethingDangerousOn30)
.onErrorComplete();
```
> onErrorComplete会将onError信号转化为onComplete信号。
类似于`onErrorReturn``onErrorComplete`同样存在一个重载方法接受`Predicate`可以通过该断言判断是否对error执行`recover`操作。
- 若执行recover操作则会调用`subscriber#onComplete`
- 若不执行recover操作则会调用`subscriber#onError`
#### fallback method
如果在处理数据时,存在多种方法,例如`m1,m2`,并且在尝试`m1`失败时想要再尝试`m2`,可以使用`onErrorResume`方法。
其等价于如下逻辑:
```java
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
```
和上述逻辑等价的是如下代码:
```java
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(e -> getFromCache(k))
);
```
其中,`onErrorResume`返回的都是一个`Publisher`
`onErrorResume`的底层逻辑实现如下:
- 其会根据`onErrorResume`中传入的`Function<? super Throwable, ? extends Publisher<? extends T>>`参数和`error`来构建一个新的publisher
- 然后对新的publisher执行`subscribe`操作并向下游返回新publisher的sequence
`onErrorResume`同样拥有一个重载函数接受`Predicate`决定是否执行recover逻辑。除此之外其接受`Function<? super Throwable, ? extends Publisher<? extends T>>`类型参数可以根据抛出的异常来决定fallback sequence示例如下
```java
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> {
if (error instanceof TimeoutException)
return getFromCache(k);
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error);
})
);
```
> 在上述示例中,`Flux.error`会重新抛出上述异常