diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 44f7047..ccf1b52 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -81,6 +81,13 @@ - [Sinks.unsafe().many()](#sinksunsafemany) - [Sinks.one()](#sinksone) - [Sinks.empty()](#sinksempty) + - [Advanced](#advanced) + - [Batching](#batching) + - [Grouping With `Flux>`](#grouping-with-fluxgroupedfluxt) + - [windowing with `Flux>`](#windowing-with-fluxfluxt) + - [windowWhile](#windowwhile) + - [windowUntil](#windowuntil) + - [Buffering with `Flux>`](#buffering-with-fluxlistt) # Reactor ## Reactive Programming @@ -1196,6 +1203,114 @@ Sinks的类别包括: `Sinks.Empty`无法触发onNext,但是仍可以指定``泛型类型。 +## Advanced +### Batching +#### Grouping With `Flux>` +grouping将会把`source Flux`分割为多个batch,每个batch都有一个key,grouping对应的operator是`groupBy`。 + +其中,每个group都被表示为`GroupedFlux`,可以通过`key`方法获取该group关联的key。 + +一旦由source产生的一个element对应一个新key,那么该key对应的group就会被打开,并且,匹配该key的元素都会在该group中出现(在同一时刻,可能会有多个group处于开启状态)。 + +故而,group特性如下: +- group之间元素是不相交的(一个source element只能属于一个group) +- group不可能为空(只有group对应key的相匹配元素出现后,group才会被开启) + +groupBy使用示例如下: +```java +StepVerifier.create( + Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) + .groupBy(i -> i % 2 == 0 ? "even" : "odd") + .concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them + .map(String::valueOf) //map to string + .startWith(g.key())) //start with the group's key + ) + .expectNext("odd", "1", "3", "5", "11", "13") + .expectNext("even", "2", "4", "6", "12") + .verifyComplete(); +``` +> 当使用grouping时,最好包含较少的`group数量`。如果group数量较多,在下游进行`flatMap`时,如果flatMap concurrency较小,将会造成挂起。 +#### windowing with `Flux>` +windowing指将`Flux`分割为windows,根据大小、时间、boundary-defining predicates、boundary-defining publisher对flux进行分割。 + +`windowing`关联的操作符为`window`, `windowTimeout`, `windowUntil`, `windowWhile`, `windowWhen`。 + +windowXXX和groupBy不同的是, groupBy通常会基于key随机的切换group,但是window却是顺序打开的。 + +##### windowWhile +windowWhile通过predicate判断是否window应当被关闭,如果source element满足predicate条件,那么window将仍然开启。`一旦predicate返回false,那么windows将会被关闭,并且触发的元素也会被丢丢弃`。 + +示例如下: +``` +// 元素sequence +1, 3, 5, 2, 4, 6, 11, 12, 13 + + +// predicate +i -> i % 2 == 0 + +// windowWhile产生的windows +empty window (由1产生) +empty window (由3产生) +empty window (由5产生) +2, 4, 6 (由11产生) +12 (由13产生) +``` + +##### windowUntil +windowUntil通过predicate是否应该开启一个新的window,如果source element满足predicate条件,那么就会开启一个新的window,并且之前的windows会关闭,`并且之前的windows会收到触发的元素`。 + +示例如下: +``` +// 元素sequence +1, 3, 5, 2, 4, 6, 11, 12, 13 + +// predicate +i -> i % 2 == 0 + +// windowUntil产生的windows +1, 3, 5, 2 (由2触发) +4 (由4触发) +6 (由6触发) +11, 12 (由12触发) +13 +``` + +#### Buffering with `Flux>` +buffering和windowing类似,但是区别如下: +- windows对window进行emit时,类型为`Flux` +- buffering对buffer进行emit,类型为`Collection` + +对于buffer的operator和window类似,`buffer, bufferTimeout, bufferUntil, bufferWhile, bufferWhen`。 + +相比于windowing operator开启一个新的window,buffer operator会创建一个新的Collection,并向其中添加元素。 + +同样的,buffer也会造成元素的overlap,示例如下: +```java +StepVerifier.create( + Flux.range(1, 10) + .buffer(5, 3) //overlapping buffers + ) + .expectNext(Arrays.asList(1, 2, 3, 4, 5)) + .expectNext(Arrays.asList(4, 5, 6, 7, 8)) + .expectNext(Arrays.asList(7, 8, 9, 10)) + .expectNext(Collections.singletonList(10)) + .verifyComplete(); +``` + +由于`maxSize > skip`,导致buffer也出现了元素overlap。 + +但是,和windowing不同的是,`bufferUntil`和`bufferWhile`并不会发送empty collection,示例如下: +```java +StepVerifier.create( + Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) + .bufferWhile(i -> i % 2 == 0) + ) + .expectNext(Arrays.asList(2, 4, 6)) // triggered by 11 + .expectNext(Collections.singletonList(12)) // triggered by 13 + .verifyComplete(); +``` +