doc: 阅读Flux.generate文档

This commit is contained in:
asahi
2025-03-26 01:15:21 +08:00
parent a74f948934
commit 25b226d1a6

View File

@@ -28,6 +28,10 @@
- [limitRate](#limitrate)
- [lowTie](#lowtie)
- [limitRequest](#limitrequest)
- [Create Sequence](#create-sequence)
- [同步生成`generate`](#同步生成generate)
- [Sink](#sink)
- [异步多线程生成`create`](#异步多线程生成create)
# Reactor
@@ -344,3 +348,63 @@ prefetch的补充优化通常采用75%的启发规则一旦操作符发现75%
> 一旦source发送的元素个数超过`N`时,`limitRequest`将会认为sequence已经完成会向下游发送onComplete信号
### Create Sequence
#### 同步生成`generate`
创建`Flux`的最简单形式是通过`generate`方法生成,其接收一个`Consumer<SynchronousSink<T>>`类型的参数。
##### Sink
sink是spring reactor中的一个核心抽象概念其可以被理解为数据流的出口或`发射器``负责将元素、error或complete信号推送到下游订阅者`
sink可以分为如下种类
- 同步/异步
- 单次/多次发射
sink的核心api如下
- `next(T value)`: 向下游发送数据
- `complete`:表示数据流正常结束
- `error(throwable)`:代表数据流因为错误而终止
generate方法用于产生同步且`one by one`的emission即sink为`SynchronousSink`且其单次invocation中`sink.next`只能够被调用一次。示例如下:
```java
Flux.generate(sink -> {
String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的)
sink.next(data); // 仅调用一次
if (isEndOfData()) sink.complete();
});
```
> 在调用`sink.next`后,可以调用`sink.complete`或`sink.error`,但是对`complete`和`error`的调用都是可选的。
示例如下:
```java
Flux.generate(sink -> {
sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
})
.take(10)
.subscribe(System.out::println);
```
通过generate的另一个重载方法`generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)`可以维护一个状态并进行管理,其中`S`为状态的对象类型。对于重载方法的两个参数,其含义如下:
- `stateSupplier`该参数用于提供一个初始状态每当有新请求进入时都会调用supplier生成初始状态
- `generator`generator会返回一个新的state
示例如下:
```java
Flux.generate(() -> 0, (s, sink) -> {
// System.out.printf("%s emitted\n", s);
sink.next(s);
return s + 1;
})
.take(5)
.subscribe(System.out::println);
```
其上游生成的整数依次增加,输出如下:
```
0
1
2
3
4
```
#### 异步多线程生成`create`