doc: 阅读spring reactor文档

This commit is contained in:
asahi
2025-03-28 01:34:50 +08:00
parent e55fa5a32b
commit cf6f069e79

View File

@@ -32,6 +32,8 @@
- [同步生成`generate`](#同步生成generate)
- [Sink](#sink)
- [异步多线程生成`create`](#异步多线程生成create)
- [`Flux.create`和`listener based api`进行适配](#fluxcreate和listener-based-api进行适配)
- [`Flux.create` backpressure](#fluxcreate-backpressure)
# Reactor
@@ -412,6 +414,41 @@ Flux.generate(() -> 0, (s, sink) -> {
> `Flux.create`并不会将你的代码变为并行或是异步的.
>
> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法在create lambda中执行长阻塞操作仍然会阻塞item的处理因为item的source产生和下游处理都处于同一线程中但是上游对线程的阻塞可能导致下游的消费逻辑无法被执行
>
>
> 如果在`Flux.create`中存在阻塞操作,那么将存在死锁的风险。即使使用了`subscribeOn`方法在create lambda中执行长阻塞操作仍然会阻塞item的处理因为item的source产生和下游处理都处于同一线程中但是上游对线程的阻塞可能导致下游发送的request请求无法被传送到上游
##### `Flux.create`和`listener based api`进行适配
街射使用基于listener的apiapi定义如下
```java
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
```
可以使用`Flux.create`将其与Flux相桥接
```java
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
```
##### `Flux.create` backpressure
可以通过指定`OverflowStrategy`来优化backpressure行为OverflowStrategy可选值如下
- `IGNORE`: 该选项会完全无视下游的backpressure request当下游queue被填充满时会抛出`IllegalStateException`异常
> 部分操作符(例如`publishOn`其subscriber会内置queue用于存储上游通过`onNext`推送的数据。
>
> 当queue满时将会抛出`IllegalStateException`异常
- `ERROR`:当下游无法`跟上`上游的数据发送速度时,上游将会发送`IllegalStateException`异常
> 在上述描述中,`跟上`代表subscriber发送的request个数是否大于source产生的数据个数例如若下游只发送了`request(1)`但是source产生了两个数据调用了两次`sink.next`,那么第二次调用时`requested`已经为0会调用`onOverflow`方法发送`error`信号