doc: 阅读redis stream文档
This commit is contained in:
@@ -99,6 +99,7 @@
|
|||||||
- [Listening for new items with XREAD](#listening-for-new-items-with-xread)
|
- [Listening for new items with XREAD](#listening-for-new-items-with-xread)
|
||||||
- [XREAD with STREAMS option](#xread-with-streams-option)
|
- [XREAD with STREAMS option](#xread-with-streams-option)
|
||||||
- [XREAD with BLOCK argument](#xread-with-block-argument)
|
- [XREAD with BLOCK argument](#xread-with-block-argument)
|
||||||
|
- [Consumer groups](#consumer-groups)
|
||||||
|
|
||||||
|
|
||||||
# redis
|
# redis
|
||||||
@@ -1606,5 +1607,51 @@ XREAD的blocking形式也支持监听多个streams,也可以指定多个key na
|
|||||||
|
|
||||||
和blocking list操作类似,`blocking stream reads`对`clients wating for data`而言是公平的,也支持FIFO。`the first client that blocked for a given stream will be the first to be unlocked when new items are available`。
|
和blocking list操作类似,`blocking stream reads`对`clients wating for data`而言是公平的,也支持FIFO。`the first client that blocked for a given stream will be the first to be unlocked when new items are available`。
|
||||||
|
|
||||||
`XREAD`除了`
|
#### Consumer groups
|
||||||
|
当从不同的clients消费相同stream时,可通过`XREAD`来进行读取,此时可将message `fan-out`给多个clients,在此种场景下,同一条消息会被多个clients进行处理。
|
||||||
|
|
||||||
|
在某些场景下,可能不希望上述行为,而是希望每个client都负责处理stream中的不同消息subset,特别是在消息处理耗时较长的场景下。假设拥有3个消费者`C1, C2, C3`和包含消息`1,2,3,4,5,6,7`的stream,则消费者对stream进行消费时,消费情形可能如下:
|
||||||
|
```
|
||||||
|
1 -> C1
|
||||||
|
2 -> C2
|
||||||
|
3 -> C3
|
||||||
|
4 -> C1
|
||||||
|
5 -> C2
|
||||||
|
6 -> C3
|
||||||
|
7 -> C1
|
||||||
|
```
|
||||||
|
为了实现上述消费方式,redis引入了`consumer groups`的概念。`consumer group`从stream中获取数据,并且`serves multiple consumers`。consumer group提供了如下保证:
|
||||||
|
- `每条消息只会被传递给一个消费者,不存在一条消息传递给多个消费者的情形`
|
||||||
|
- 在consumer group内consumer根据name进行标识,name是大小写敏感的。即使在redis client断连后,stream consumer group也会保留对应的状态,当client重连后会被标识为之前的consumer
|
||||||
|
- 每个consumer group都拥有`first ID never consumed`的概念,当consumer请求新消息是,其仅可向consumer提供尚未被传递过的消息
|
||||||
|
- 当消费消息时,需要通过特定命令来进行显式的ack。
|
||||||
|
- 在redis中,`ack`代表如下含义:该消息已经被正确的处理,并且该消息可在consumer group中被淘汰
|
||||||
|
- consumer group会追踪当前所有的`pending messages`
|
||||||
|
- `pending messages`代表已经被传递给consumer但是尚未被ack的消息
|
||||||
|
- 由于consumer group对pending message的追踪,当访问stream的message hisory(记录message被传递给哪个consumer instance)时,每个consumer只能看到被传递给其自身的messages
|
||||||
|
|
||||||
|
在某种程度上,consumer group可以被看作是`amount of state abount a stream`:
|
||||||
|
```
|
||||||
|
+----------------------------------------+
|
||||||
|
| consumer_group_name: mygroup |
|
||||||
|
| consumer_group_stream: somekey |
|
||||||
|
| last_delivered_id: 1292309234234-92 |
|
||||||
|
| |
|
||||||
|
| consumers: |
|
||||||
|
| "consumer-1" with pending messages |
|
||||||
|
| 1292309234234-4 |
|
||||||
|
| 1292309234232-8 |
|
||||||
|
| "consumer-42" with pending messages |
|
||||||
|
| ... (and so forth) |
|
||||||
|
+----------------------------------------+
|
||||||
|
```
|
||||||
|
consumer group能够为consumer instance提供`history of pending messages`,情切当consumer请求new messages时,只会向其传递`ID大于last_delivered_id的message`。
|
||||||
|
|
||||||
|
对于redis stream而言,其可以包含多个consumer groups。并且,对于同一redis stream,可以同时包含`consumer reading without consumer groups via XREAD`和`consumer reading via XREADGROUP in consumer group`两种类型的consumer。
|
||||||
|
|
||||||
|
consumer group包含如下的基本命令:
|
||||||
|
- XGROUP
|
||||||
|
- XREADGROUP
|
||||||
|
- XACK
|
||||||
|
- XACKDEL
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user