diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index a43a0a1..44f7047 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -76,7 +76,11 @@ - [Sinks.many().multicast().onBackpressureBuffer(args?)](#sinksmanymulticastonbackpressurebufferargs) - [autoCancel](#autocancel) - [Sinks.many().multicast().directAllOrNothing()](#sinksmanymulticastdirectallornothing) ->>>>>>> 8d41980 (doc: 阅读transient errors文档) + - [Sinks.many().multicast().directBestEffort()](#sinksmanymulticastdirectbesteffort) + - [Sinks.many().replay()](#sinksmanyreplay) + - [Sinks.unsafe().many()](#sinksunsafemany) + - [Sinks.one()](#sinksone) + - [Sinks.empty()](#sinksempty) # Reactor ## Reactive Programming @@ -1160,6 +1164,38 @@ Sinks的类别包括: 在`Sinks.many()`终止后(通常是通过调用`emitError, emitComplete`方法),其仍然允许新的subscriber对其进行订阅,但是只会对新订阅者replay termination signal。 +#### Sinks.many().multicast().directBestEffort() +对于该类型的`multicast Sinks.Many`,若`subscriber is too slow`(该subscriber的demand为0),那么该`onNext`信号`仅会针对该slow subscriber`进行丢弃。 + +但是,`slow subscribers`并不会被终止,一旦`slow subscribers`开始请求数据,其会重新开始接收新推送的数据。 + +当`Sinks.Many`被`emitError, emitComplete`终止后,其仍然允许新的subscribers对其进行订阅,但是对新订阅的订阅者,只会向新订阅者发送termination signal。 + +#### Sinks.many().replay() +一个`replay Sinks.Many`可以将已发送元素进行缓存,并且对后续的subscriber进行replay。 + +`replay Sinks.Many`可以通过如下方式进行创建: +- 缓存指定数量的历史数据`Sinks.many().replay().limit(int)` +- 缓存所有历史数据,没有上限限制`Sinks.many().replay().all()` +- 基于time-based window进行缓存`Sinks.many().replay().limit(Duration)` +- hisotry size limit和time window相结合`Sinks.many().replay().limit(int, Duration)` + +除此之外,`Sinks.many().replay()`还包含其他的重载方法,例如可以通过`latest()`和`latestOrDefault()`对单个元素进行缓存和replay + +#### Sinks.unsafe().many() +`Sinks.unsafe().many()`返回的`Sinks.Many factory`并不会提供producer thread safety,在使用`Sinks.unsafe().many()`时,需要确保对`可能导致onNext, onComplete, onError方法`的调用需要保证外部的同步,以确保其满足reactive stream规范。 + +#### Sinks.one() +该方法会简单创建一个`Sinks.One`实例,该实例可以看作`Mono`,并且其emit方法和`many`稍有不同: +- `emitValue(T value)`: 产生一个`onNext(value)`信号,并且,在大多数实现中会产生一个`onComplete`信号 +- `emptyEmpty()`: 只会产生一个`onComplete`信号,其和`empty Mono`等效 +- `emitError(Throwable t)`:产生一个`onError(t)`信号 + +#### Sinks.empty() +该方法会创建一个`Sinks.Empty`实例,`Sinks.Empty`和`Sinks.One`类似,但是`Sinks.Empty`不提供`emitValue`方法。 + +`Sinks.Empty`无法触发onNext,但是仍可以指定``泛型类型。 +