doc: 阅读spring reactor文档

This commit is contained in:
asahi
2025-04-07 12:50:26 +08:00
parent d640bd00e8
commit e5b3cfead4

View File

@@ -51,6 +51,13 @@
- [static fallback value](#static-fallback-value)
- [catch and swallow error](#catch-and-swallow-error)
- [fallback method](#fallback-method)
- [Dynamic Fallback Value](#dynamic-fallback-value)
- [Catch and rethrow](#catch-and-rethrow)
- [log and react on the side](#log-and-react-on-the-side)
- [doOnError](#doonerror)
- [Using resources and the finally block](#using-resources-and-the-finally-block)
- [doFinally](#dofinally)
- [using](#using)
# Reactor
## Reactive Programming
@@ -758,5 +765,150 @@ Flux.just("timeout1", "unknown", "key2")
```
> 在上述示例中,`Flux.error`会重新抛出上述异常
#### Dynamic Fallback Value
当出现error时如果并不想调用下游的`onError`,而是想给下游一个`根据异常信息计算出的默认值`,同样可以使用`onErrorResume`方法,示例如下:
```java
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
```
上述代码等价于
```java
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
```
#### Catch and rethrow
对于捕获异常然后重新抛出的逻辑
```java
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
```
可以通过`onErrorResume`实现:
```java
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
```
或者,可以调用`onErrorMap`,其是对`onErrorResume`方法的封装:
```java
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
```
#### log and react on the side
如果想要实现类似`将异常捕获、打印然后重新抛出`的逻辑,可以使用`doOnError`
```java
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
```
上述代码等价于
```java
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
);
```
##### doOnError
`doOnError`和其他以`doOn`为前缀的operators其通常会被称为`副作用side effect`。通过这些operators可以在不修改sequence的情况下监控chain中的事件。
#### Using resources and the finally block
对于`在finally block中清理资源`的行为,可以通过`doFinally`来实现
```java
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
```
##### doFinally
`doFinally`是无论sequence结束onError/onComplete还是取消cancel都希望执行的副作用side effect。其接收参数类型为`Consumer<SignalType>`,可以通过`signaltype`判断`触发side-effect的terminaction类型`
`doFinally`使用示例如下:
```java
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
```
对于`try(resource)`的代码,则是可以通过`using`来实现
```java
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
```
##### using
`using`代表`Flux由资源派生并且在Flux处理完成之后都需要对资源执行操作`
```java
// resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
```
```java
// using
Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
```
其中,`resource`实现了`Disposable`接口,而`Flux.using`接收的参数类型分别为
- `() -> disposableInstance`: resource supplier
- `disposable -> Flux.just(disposable.toString())`: 一个factory根据resource产生publisher
- `Disposable::dispose`: resource cleanup