doc: 阅读redis stream文档

This commit is contained in:
asahi
2025-09-09 21:02:33 +08:00
parent a1bded7e5e
commit 172fd4c0ae

View File

@@ -92,6 +92,12 @@
- [XLEN](#xlen)
- [Entry IDs](#entry-ids)
- [entry ID设计](#entry-id设计)
- [Redis Stream consumer query model](#redis-stream-consumer-query-model)
- [Querying by range: XRANGE and XREVRANGE](#querying-by-range-xrange-and-xrevrange)
- [query by time range](#query-by-time-range)
- [count option](#count-option)
- [Listening for new items with XREAD](#listening-for-new-items-with-xread)
- [XREAD with STREAMS option](#xread-with-streams-option)
# redis
@@ -1390,3 +1396,188 @@ entry ID中包含millisecondsTime的原因是Reids Stream支持`range queries
> XADD race:usa 0-* racer Prickett
0-3
```
#### Redis Stream consumer query model
向redis stream中添加entry的操作可以通过`XADD`进行实现。而从redis stream从读取数据redis stream支持如下query model
- `Listening for new items`: 和unix command `tail -f`类似reids stream consumer会监听`new messages that are appended to the stream`。
- 但是和BLPOP这类阻塞操作不同的是对于`BLPOP`,一个给定元素只会被传递给`一个client`而在使用stream时我们希望`new messages appended to the stream时`新消息对多个consumers可见。`tail -f`同样支持新被添加到log文件中的内容被多个进程可见
- 故而,在`Listening for new items`这种query model下`stream is able to fan out messages to multiple clients`
- `Query by range`: 除了上述类似`tail -f`的query model外可能还希望将redis stream以另一种方式进行使用并不将redis stream作为messaging system而是将其看作`time series store`
- 在将其作为`time series store`的场景下其仍然能访问最新的messages但是其还支持`get messages by ranges of time`的操作,或是`iterate the messages using a cursor to incrementally check all the history`
- `Consumer group`: 上述两种场景都是从`consuemrs`的视角来读取/处理消息但是redis stream支持另一种抽象`a stream of messages that can be partitioned to multiple consumers that are processing such messages`。
- 在该场景下并非每个consumer都必须处理所有的messsages每个consumer可以获取不同的messages并进行处理
redis stream通过不同的命令支持了以上三种query model。
#### Querying by range: XRANGE and XREVRANGE
为了根据范围查询stream需要指定两个id`start ID`和`end ID`。指定的范围为`inclusive`的,包含`start ID`和`end ID`。
`+`和`-`则是代表`greatest ID`和`smallest ID`,示例如下所示:
```redis-cli
> XRANGE race:france - +
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
3) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
4) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
```
如上所示返回的每个entry都是由`ID`和`field-value pairs`组成的数组。由于entry ID和时间`currentMillisTime`相关联故而可以通过XRANGE根据时间范围来查询records。
##### query by time range
`并且在根据时间范围查询records时可以省略sequence part的部分`
- 在省略sequence part时`start`的sequence part将会被设置为0而`end`的sequence part将会被设置为最大值。
- 故而可以通过两个milliseconds unix time来进行时间范围内的查询可以获取在该时间范围内生成的entries`range is inclusive`
示例如下
```redis-cli
> XRANGE race:france 1692632086369 1692632086371
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
```
##### count option
`XRANGE`在范围查询时还支持指定一个COUNT选项用于`get first N items`。如果想要进行增量查询,可以用`上次查询的最大ID + 1`作为新一轮查询的`start`。
增量迭代示例如下:
```redis-cli
> XRANGE race:france - + COUNT 2
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
# 通过(符号指定了左开右闭的区间
> XRANGE race:france (1692632094485-0 + COUNT 2
1) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
2) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
# 返回为空,迭代完成
> XRANGE race:france (1692632147973-0 + COUNT 2
(empty array)
```
`XRANGE`操作查找的时间复杂度为`O(long(N))`切返回M个元素的时间复杂度为`O(M)`。
命令`XREVRANGE`和`XRANGE`集合等价,但是会按照逆序来返回元素,故而,针对`XREVRANGE`传参时参数的顺序也需要颠倒
```redis-cli
> XREVRANGE race:france + - COUNT 1
1) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
```
#### Listening for new items with XREAD
在某些场景下,我们可能希望对`new items arriving to the stream`进行订阅,如下两种场景都希望对`new items`进行订阅
- 在`Redis Pub/Sub`场景中通过redis stream来对channel进行订阅
- 在Redis blocking lists场景中从redis stream中等并并获取new element
但是,上述两种场景在如下方面有所不同
- 一个stream可以包含多个clients(consumers)对于new item默认情况下`会从传递给每一个等待当前stream的consumer`。
- 在`Pub/Sub`场景和上述行为相符其会将new item传递给每一个consumer即`fan out`
- 在blocking lists场景和上述行为并不相符每个consumer都会获取到不同的element相同的element只会被传递给一个consumer
- 其对message的处理也有不同
- `Pub/Sub`场景下,将会对消息进行`fire and forget`操作,消息将永远不会被存储
- 在使用`blocking lists`时如果消息被client接收其将会从list中移除
- Stream Consumer Groups提供了`Pub/Sub`或`blocking lsits`都无法实现的控制不同的groups可以针对相同stream进行订阅并且对被处理items进行显式的ack可以对`unprocessed messages`进行声明;只有`private past history of messages`对client可见
对于提供`Listening for new items arriving into stream`支持的命令,被称为`XREAD`。其使用示例如下:
```redis-cli
> XREAD COUNT 2 STREAMS race:france 0
1) 1) "race:france"
2) 1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
```
##### XREAD with STREAMS option
上述示例为`XREAD`命令的`non-blocking`形式,并且`COUNT option并非强制的`唯一的强制option为`STREAMS`。
XREAD在和STREAMS一起使用时并且需要指定`a list of keys`和`maximum ID already seen for each stream by the calling consumer`故而该命令只会向consumer返回`messages with and ID greater than the one we specified`。
在上述示例中,命令为`STREAMS race:france 0`,其代表`race:france流中所有ID比0-0大的消息`。故而返回的结构中顶层为stream的key name`因为上述命令可以指定多个stream针对多个stream进行监听`。