From 25b226d1a686ba4ad49f4c35212bafbc7cd1fab3 Mon Sep 17 00:00:00 2001 From: asahi Date: Wed, 26 Mar 2025 01:15:21 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBFlux.generate=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 64 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 7564ae7..0528767 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -28,6 +28,10 @@ - [limitRate](#limitrate) - [lowTie](#lowtie) - [limitRequest](#limitrequest) + - [Create Sequence](#create-sequence) + - [同步生成`generate`](#同步生成generate) + - [Sink](#sink) + - [异步多线程生成`create`](#异步多线程生成create) # Reactor @@ -344,3 +348,63 @@ prefetch的补充优化通常采用75%的启发规则,一旦操作符发现75% > 一旦source发送的元素个数超过`N`时,`limitRequest`将会认为sequence已经完成,会向下游发送onComplete信号 +### Create Sequence +#### 同步生成`generate` +创建`Flux`的最简单形式是通过`generate`方法生成,其接收一个`Consumer>`类型的参数。 + +##### Sink +sink是spring reactor中的一个核心抽象概念,其可以被理解为数据流的出口或`发射器`,`负责将元素、error或complete信号推送到下游订阅者`。 + +sink可以分为如下种类: +- 同步/异步 +- 单次/多次发射 + +sink的核心api如下: +- `next(T value)`: 向下游发送数据 +- `complete`:表示数据流正常结束 +- `error(throwable)`:代表数据流因为错误而终止 + +generate方法用于产生同步且`one by one`的emission,即sink为`SynchronousSink`且其单次invocation中`sink.next`只能够被调用一次。示例如下: +```java +Flux.generate(sink -> { + String data = fetchDataFromBlockingIO(); // 阻塞操作(安全,因为是同步的) + sink.next(data); // 仅调用一次 + if (isEndOfData()) sink.complete(); +}); +``` + +> 在调用`sink.next`后,可以调用`sink.complete`或`sink.error`,但是对`complete`和`error`的调用都是可选的。 + +示例如下: +```java +Flux.generate(sink -> { + sink.next(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())); + }) + .take(10) + .subscribe(System.out::println); +``` + +通过generate的另一个重载方法`generate(Callable stateSupplier, BiFunction, S> generator)`可以维护一个状态并进行管理,其中`S`为状态的对象类型。对于重载方法的两个参数,其含义如下: +- `stateSupplier`:该参数用于提供一个初始状态,每当有新请求进入时,都会调用supplier生成初始状态 +- `generator`:generator会返回一个新的state + +示例如下: +```java +Flux.generate(() -> 0, (s, sink) -> { +// System.out.printf("%s emitted\n", s); + sink.next(s); + return s + 1; + }) + .take(5) + .subscribe(System.out::println); +``` +其上游生成的整数依次增加,输出如下: +``` +0 +1 +2 +3 +4 +``` + +#### 异步多线程生成`create`