diff --git a/中间件/redis/redis.md b/中间件/redis/redis.md index 0fe4f90..0c7347f 100644 --- a/中间件/redis/redis.md +++ b/中间件/redis/redis.md @@ -99,6 +99,7 @@ - [Listening for new items with XREAD](#listening-for-new-items-with-xread) - [XREAD with STREAMS option](#xread-with-streams-option) - [XREAD with BLOCK argument](#xread-with-block-argument) + - [Consumer groups](#consumer-groups) # redis @@ -1606,5 +1607,51 @@ XREAD的blocking形式也支持监听多个streams,也可以指定多个key na 和blocking list操作类似,`blocking stream reads`对`clients wating for data`而言是公平的,也支持FIFO。`the first client that blocked for a given stream will be the first to be unlocked when new items are available`。 -`XREAD`除了` +#### Consumer groups +当从不同的clients消费相同stream时,可通过`XREAD`来进行读取,此时可将message `fan-out`给多个clients,在此种场景下,同一条消息会被多个clients进行处理。 + +在某些场景下,可能不希望上述行为,而是希望每个client都负责处理stream中的不同消息subset,特别是在消息处理耗时较长的场景下。假设拥有3个消费者`C1, C2, C3`和包含消息`1,2,3,4,5,6,7`的stream,则消费者对stream进行消费时,消费情形可能如下: +``` +1 -> C1 +2 -> C2 +3 -> C3 +4 -> C1 +5 -> C2 +6 -> C3 +7 -> C1 +``` +为了实现上述消费方式,redis引入了`consumer groups`的概念。`consumer group`从stream中获取数据,并且`serves multiple consumers`。consumer group提供了如下保证: +- `每条消息只会被传递给一个消费者,不存在一条消息传递给多个消费者的情形` +- 在consumer group内consumer根据name进行标识,name是大小写敏感的。即使在redis client断连后,stream consumer group也会保留对应的状态,当client重连后会被标识为之前的consumer +- 每个consumer group都拥有`first ID never consumed`的概念,当consumer请求新消息是,其仅可向consumer提供尚未被传递过的消息 +- 当消费消息时,需要通过特定命令来进行显式的ack。 + - 在redis中,`ack`代表如下含义:该消息已经被正确的处理,并且该消息可在consumer group中被淘汰 +- consumer group会追踪当前所有的`pending messages` + - `pending messages`代表已经被传递给consumer但是尚未被ack的消息 + - 由于consumer group对pending message的追踪,当访问stream的message hisory(记录message被传递给哪个consumer instance)时,每个consumer只能看到被传递给其自身的messages + +在某种程度上,consumer group可以被看作是`amount of state abount a stream`: +``` ++----------------------------------------+ +| consumer_group_name: mygroup | +| consumer_group_stream: somekey | +| last_delivered_id: 1292309234234-92 | +| | +| consumers: | +| "consumer-1" with pending messages | +| 1292309234234-4 | +| 1292309234232-8 | +| "consumer-42" with pending messages | +| ... (and so forth) | ++----------------------------------------+ +``` +consumer group能够为consumer instance提供`history of pending messages`,情切当consumer请求new messages时,只会向其传递`ID大于last_delivered_id的message`。 + +对于redis stream而言,其可以包含多个consumer groups。并且,对于同一redis stream,可以同时包含`consumer reading without consumer groups via XREAD`和`consumer reading via XREADGROUP in consumer group`两种类型的consumer。 + +consumer group包含如下的基本命令: +- XGROUP +- XREADGROUP +- XACK +- XACKDEL