From c6be75ec7a9e23ee732148285e876994a02592cc Mon Sep 17 00:00:00 2001 From: asahi Date: Thu, 17 Apr 2025 12:49:28 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBSinks=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/webflux/Reactor.md | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/spring/webflux/Reactor.md b/spring/webflux/Reactor.md index 65d5860..8e96348 100644 --- a/spring/webflux/Reactor.md +++ b/spring/webflux/Reactor.md @@ -71,6 +71,9 @@ - [`tryEmit` \& `emit`](#tryemit--emit) - [Processor](#processor) - [`asFlux`和`asMono`](#asflux和asmono) + - [`Sinks.many().unicast().onBackpressureBuffer(args?)`](#sinksmanyunicastonbackpressurebufferargs) + - [onBackpressureBuffer](#onbackpressurebuffer) + - [Sinks.many().multicast().onBackpressureBuffer(args?)](#sinksmanymulticastonbackpressurebufferargs) >>>>>>> 8d41980 (doc: 阅读transient errors文档) # Reactor @@ -1118,8 +1121,24 @@ Sinks的类别包括: - `many().unicast()`:和`many().multicast()`类似,也是在订阅之后将`最新的数据`推送给订阅者,但是和`many().multicast()`不同之处如下: - `many().unicast()`带有缓冲区,在第一个订阅者订阅`many().unicast()`之前,会将推送给`many().unicast()`的数据都缓冲到缓冲区中,待后续有订阅者订阅后,会将缓冲区数据发送给订阅者 - `many().multicast()`默认不带有缓冲区 - - +- `many().replay()`: 对于新订阅的订阅者,会将一定数量的`pushed history data`进行`replay`操作,之后再推送新的数据 +- `one()`:sink只会向subscriberr推送一个数据 +- `empty()`:该sink会向subscriber推送`termianl signal`(error或complete) + +#### `Sinks.many().unicast().onBackpressureBuffer(args?)` +一个`unicast Sinks.Many`可以通过其内置buffer处理backpressure,但是,其最多只能有一个subscriber。 + +通常,可以通过`Sinks.many().unicast().onBackpressureBuffer()`来创建`unicast sink`。但是,`Sinks.many().unicast()`中包含更多的静态方法,可以对其进行更精细的调整。 + +##### onBackpressureBuffer +在默认情况下,`onBackpressureBuffer()`其是无界(unbounded)的: +- 在subscriber尚未请求数据的情况下,如果通过sink推送了数据,那么这些数据都会被缓冲到缓冲区中,缓冲区是无界的 + +故而,`onBackPressureBuffer`方法存在一个接收`Queue`类型参数的重载方法。可以向其传递一个有界队列,该队列将会被用作内部缓冲区。 + +> 在为`onBackpressureBuffer`指定了`Queue`的情况下,如果`queue已满并且下游并没有向上游发送足够的reqeust`时,`sink将会拒绝该value的推送`。 + +#### Sinks.many().multicast().onBackpressureBuffer(args?)