From da4093838f47968844600e50566e06ec7689e029 Mon Sep 17 00:00:00 2001 From: asahi Date: Mon, 21 Apr 2025 01:00:07 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBreactor=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 41 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index ccf1b52..0aefd05 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -88,6 +88,10 @@ - [windowWhile](#windowwhile) - [windowUntil](#windowuntil) - [Buffering with `Flux>`](#buffering-with-fluxlistt) + - [flatmap](#flatmap) + - [Pipeline Operations](#pipeline-operations) + - [concurrency](#concurrency) + - [prefetch](#prefetch-1) # Reactor ## Reactive Programming @@ -1313,4 +1317,41 @@ StepVerifier.create( .verifyComplete(); ``` +### flatmap +flatMap方法接收一个Function类型的参数,该Function会将一个`input item`转化为一个`Publisher`,示例如下: +```java +Function> mapper = s -> Flux.just(s.toUpperCase().split("")); +``` +上述Function的返回类型为`Pbulisher`,其将字符串转为大写并对转换后的字符串进行分割,并基于分割后的字符串集合创建了一个新的reactive stream。 + +上述function的使用如下: +```java +Flux inFlux = Flux.just("baeldung", ".", "com"); +Flux outFlux = inFlux.flatMap(mapper); +``` +由于上游存在三个字符串,故而flatMap方法基于上游的三个字符串创建了三个新的reactive stream。新建的三个stream,其中元素由上游字符串分割而得到,`并且三个stream中的元素会被填充到另一个新建的reactive stream中。` + +对其进行subscribe之后,预期结果如下: +```java +List output = new ArrayList<>(); +outFlux.subscribe(output::add); +assertThat(output).containsExactlyInAnyOrder("B", "A", "E", "L", "D", "U", "N", "G", ".", "C", "O", "M"); +``` +> 注意,最后输出字符的顺序可能是无序的。 + +#### Pipeline Operations +flatMap会通过传递给其的`Function`和`onNext element`创建新的reactive stream,并且,`一旦新的stream(由Publisher表示)创建好后,flatMap会马上对其进行订阅`。并且,`订阅操作并不是阻塞的`,operator在继续下一个stream之前并不需要等待当前订阅操作终止。 + +> pipeline会同时处理所有基于input item派生的stream,并且,派生stream中的元素随时可能到达。故而,original order可能会被丢失。 + + +如果original order比较重要,可以使用` flatMapSequential`操作符。 + +#### concurrency +concurrency用于控制在途的inner sequences上限。 + +#### prefetch +prefetch用于控制每个inner publisher在途的元素上限 + +