doc: 阅读flux分组文档

This commit is contained in:
asahi
2025-04-19 19:18:48 +08:00
parent e71cde7bb8
commit f437f84ba4

View File

@@ -81,6 +81,13 @@
- [Sinks.unsafe().many()](#sinksunsafemany) - [Sinks.unsafe().many()](#sinksunsafemany)
- [Sinks.one()](#sinksone) - [Sinks.one()](#sinksone)
- [Sinks.empty()](#sinksempty) - [Sinks.empty()](#sinksempty)
- [Advanced](#advanced)
- [Batching](#batching)
- [Grouping With `Flux<GroupedFlux<T>>`](#grouping-with-fluxgroupedfluxt)
- [windowing with `Flux<Flux<T>>`](#windowing-with-fluxfluxt)
- [windowWhile](#windowwhile)
- [windowUntil](#windowuntil)
- [Buffering with `Flux<List<T>>`](#buffering-with-fluxlistt)
# Reactor # Reactor
## Reactive Programming ## Reactive Programming
@@ -1196,6 +1203,114 @@ Sinks的类别包括
`Sinks.Empty`无法触发onNext但是仍可以指定`<T>`泛型类型。 `Sinks.Empty`无法触发onNext但是仍可以指定`<T>`泛型类型。
## Advanced
### Batching
#### Grouping With `Flux<GroupedFlux<T>>`
grouping将会把`source Flux`分割为多个batch每个batch都有一个keygrouping对应的operator是`groupBy`。
其中每个group都被表示为`GroupedFlux<T>`,可以通过`key`方法获取该group关联的key。
一旦由source产生的一个element对应一个新key那么该key对应的group就会被打开并且匹配该key的元素都会在该group中出现在同一时刻可能会有多个group处于开启状态
故而group特性如下
- group之间元素是不相交的一个source element只能属于一个group
- group不可能为空只有group对应key的相匹配元素出现后group才会被开启
groupBy使用示例如下
```java
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
```
> 当使用grouping时最好包含较少的`group数量`。如果group数量较多在下游进行`flatMap`时如果flatMap concurrency较小将会造成挂起。
#### windowing with `Flux<Flux<T>>`
windowing指将`Flux<T>`分割为windows根据大小、时间、boundary-defining predicates、boundary-defining publisher对flux进行分割。
`windowing`关联的操作符为`window`, `windowTimeout`, `windowUntil`, `windowWhile`, `windowWhen`。
windowXXX和groupBy不同的是 groupBy通常会基于key随机的切换group但是window却是顺序打开的。
##### windowWhile
windowWhile通过predicate判断是否window应当被关闭如果source element满足predicate条件那么window将仍然开启。`一旦predicate返回false那么windows将会被关闭并且触发的元素也会被丢丢弃`。
示例如下:
```
// 元素sequence
1, 3, 5, 2, 4, 6, 11, 12, 13
// predicate
i -> i % 2 == 0
// windowWhile产生的windows
empty window 由1产生
empty window 由3产生
empty window 由5产生
2, 4, 6 由11产生
12 由13产生
```
##### windowUntil
windowUntil通过predicate是否应该开启一个新的window如果source element满足predicate条件那么就会开启一个新的window并且之前的windows会关闭`并且之前的windows会收到触发的元素`。
示例如下:
```
// 元素sequence
1, 3, 5, 2, 4, 6, 11, 12, 13
// predicate
i -> i % 2 == 0
// windowUntil产生的windows
1, 3, 5, 2 (由2触发)
4 (由4触发)
6 由6触发
11, 12 由12触发
13
```
#### Buffering with `Flux<List<T>>`
buffering和windowing类似但是区别如下
- windows对window进行emit时类型为`Flux<T>`
- buffering对buffer进行emit类型为`Collection<T>`
对于buffer的operator和window类似`buffer, bufferTimeout, bufferUntil, bufferWhile, bufferWhen`。
相比于windowing operator开启一个新的windowbuffer operator会创建一个新的Collection并向其中添加元素。
同样的buffer也会造成元素的overlap示例如下
```java
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
```
由于`maxSize > skip`导致buffer也出现了元素overlap。
但是和windowing不同的是`bufferUntil`和`bufferWhile`并不会发送empty collection示例如下
```java
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered by 13
.verifyComplete();
```