diff --git a/中间件/redis/redis.md b/中间件/redis/redis.md index b531c46..72a0844 100644 --- a/中间件/redis/redis.md +++ b/中间件/redis/redis.md @@ -92,6 +92,12 @@ - [XLEN](#xlen) - [Entry IDs](#entry-ids) - [entry ID设计](#entry-id设计) + - [Redis Stream consumer query model](#redis-stream-consumer-query-model) + - [Querying by range: XRANGE and XREVRANGE](#querying-by-range-xrange-and-xrevrange) + - [query by time range](#query-by-time-range) + - [count option](#count-option) + - [Listening for new items with XREAD](#listening-for-new-items-with-xread) + - [XREAD with STREAMS option](#xread-with-streams-option) # redis @@ -1390,3 +1396,188 @@ entry ID中包含millisecondsTime的原因是,Reids Stream支持`range queries > XADD race:usa 0-* racer Prickett 0-3 ``` +#### Redis Stream consumer query model +向redis stream中添加entry的操作可以通过`XADD`进行实现。而从redis stream从读取数据,redis stream支持如下query model: +- `Listening for new items`: 和unix command `tail -f`类似,reids stream consumer会监听`new messages that are appended to the stream`。 + - 但是,和BLPOP这类阻塞操作不同的是,对于`BLPOP`,一个给定元素只会被传递给`一个client`;而在使用stream时,我们希望`new messages appended to the stream时`,新消息对多个consumers可见。(`tail -f`同样支持新被添加到log文件中的内容被多个进程可见) + - 故而,在`Listening for new items`这种query model下,`stream is able to fan out messages to multiple clients` +- `Query by range`: 除了上述类似`tail -f`的query model外,可能还希望将redis stream以另一种方式进行使用:并不将redis stream作为messaging system,而是将其看作`time series store` + - 在将其作为`time series store`的场景下,其仍然能访问最新的messages,但是其还支持`get messages by ranges of time`的操作,或是`iterate the messages using a cursor to incrementally check all the history` +- `Consumer group`: 上述两种场景都是从`consuemrs`的视角来读取/处理消息,但是,redis stream支持另一种抽象:`a stream of messages that can be partitioned to multiple consumers that are processing such messages`。 + - 在该场景下,并非每个consumer都必须处理所有的messsages,每个consumer可以获取不同的messages并进行处理 + +redis stream通过不同的命令支持了以上三种query model。 + +#### Querying by range: XRANGE and XREVRANGE +为了根据范围查询stream,需要指定两个id:`start ID`和`end ID`。指定的范围为`inclusive`的,包含`start ID`和`end ID`。 + +`+`和`-`则是代表`greatest ID`和`smallest ID`,示例如下所示: +```redis-cli +> XRANGE race:france - + +1) 1) "1692632086370-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "30.2" + 5) "position" + 6) "1" + 7) "location_id" + 8) "1" +2) 1) "1692632094485-0" + 2) 1) "rider" + 2) "Norem" + 3) "speed" + 4) "28.8" + 5) "position" + 6) "3" + 7) "location_id" + 8) "1" +3) 1) "1692632102976-0" + 2) 1) "rider" + 2) "Prickett" + 3) "speed" + 4) "29.7" + 5) "position" + 6) "2" + 7) "location_id" + 8) "1" +4) 1) "1692632147973-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "29.9" + 5) "position" + 6) "1" + 7) "location_id" + 8) "2" +``` +如上所示,返回的每个entry都是由`ID`和`field-value pairs`组成的数组。由于entry ID和时间`currentMillisTime`相关联,故而可以通过XRANGE根据时间范围来查询records。 + +##### query by time range +`并且,在根据时间范围查询records时,可以省略sequence part的部分`: +- 在省略sequence part时,`start`的sequence part将会被设置为0,而`end`的sequence part将会被设置为最大值。 +- 故而,可以通过两个milliseconds unix time来进行时间范围内的查询,可以获取在该时间范围内生成的entries(`range is inclusive`) + +示例如下 +```redis-cli +> XRANGE race:france 1692632086369 1692632086371 +1) 1) "1692632086370-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "30.2" + 5) "position" + 6) "1" + 7) "location_id" + 8) "1" +``` + +##### count option +`XRANGE`在范围查询时,还支持指定一个COUNT选项,用于`get first N items`。如果想要进行增量查询,可以用`上次查询的最大ID + 1`作为新一轮查询的`start`。 + +增量迭代示例如下: +```redis-cli +> XRANGE race:france - + COUNT 2 +1) 1) "1692632086370-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "30.2" + 5) "position" + 6) "1" + 7) "location_id" + 8) "1" +2) 1) "1692632094485-0" + 2) 1) "rider" + 2) "Norem" + 3) "speed" + 4) "28.8" + 5) "position" + 6) "3" + 7) "location_id" + 8) "1" + +# 通过(符号指定了左开右闭的区间 +> XRANGE race:france (1692632094485-0 + COUNT 2 +1) 1) "1692632102976-0" + 2) 1) "rider" + 2) "Prickett" + 3) "speed" + 4) "29.7" + 5) "position" + 6) "2" + 7) "location_id" + 8) "1" +2) 1) "1692632147973-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "29.9" + 5) "position" + 6) "1" + 7) "location_id" + 8) "2" + +# 返回为空,迭代完成 +> XRANGE race:france (1692632147973-0 + COUNT 2 +(empty array) +``` + +`XRANGE`操作查找的时间复杂度为`O(long(N))`,切返回M个元素的时间复杂度为`O(M)`。 + +命令`XREVRANGE`和`XRANGE`集合等价,但是会按照逆序来返回元素,故而,针对`XREVRANGE`传参时参数的顺序也需要颠倒 +```redis-cli +> XREVRANGE race:france + - COUNT 1 +1) 1) "1692632147973-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "29.9" + 5) "position" + 6) "1" + 7) "location_id" + 8) "2" +``` +#### Listening for new items with XREAD +在某些场景下,我们可能希望对`new items arriving to the stream`进行订阅,如下两种场景都希望对`new items`进行订阅 +- 在`Redis Pub/Sub`场景中,通过redis stream来对channel进行订阅 +- 在Redis blocking lists场景中,从redis stream中等并并获取new element + +但是,上述两种场景在如下方面有所不同 +- 一个stream可以包含多个clients(consumers),对于new item,默认情况下`会从传递给每一个等待当前stream的consumer`。 + - 在`Pub/Sub`场景和上述行为相符,其会将new item传递给每一个consumer,即`fan out` + - 在blocking lists场景和上述行为并不相符,每个consumer都会获取到不同的element,相同的element只会被传递给一个consumer +- 其对message的处理也有不同: + - `Pub/Sub`场景下,将会对消息进行`fire and forget`操作,消息将永远不会被存储 + - 在使用`blocking lists`时,如果消息被client接收,其将会从list中移除 +- Stream Consumer Groups提供了`Pub/Sub`或`blocking lsits`都无法实现的控制:不同的groups可以针对相同stream进行订阅;并且对被处理items进行显式的ack;可以对`unprocessed messages`进行声明;只有`private past history of messages`对client可见 + +对于提供`Listening for new items arriving into stream`支持的命令,被称为`XREAD`。其使用示例如下: +```redis-cli +> XREAD COUNT 2 STREAMS race:france 0 +1) 1) "race:france" + 2) 1) 1) "1692632086370-0" + 2) 1) "rider" + 2) "Castilla" + 3) "speed" + 4) "30.2" + 5) "position" + 6) "1" + 7) "location_id" + 8) "1" + 2) 1) "1692632094485-0" + 2) 1) "rider" + 2) "Norem" + 3) "speed" + 4) "28.8" + 5) "position" + 6) "3" + 7) "location_id" + 8) "1" +``` +##### XREAD with STREAMS option +上述示例为`XREAD`命令的`non-blocking`形式,并且`COUNT option并非强制的`;唯一的强制option为`STREAMS`。 + +XREAD在和STREAMS一起使用时,并且需要指定`a list of keys`和`maximum ID already seen for each stream by the calling consumer`,故而,该命令只会向consumer返回`messages with and ID greater than the one we specified`。 + +在上述示例中,命令为`STREAMS race:france 0`,其代表`race:france流中所有ID比0-0大的消息`。故而,返回的结构中,顶层为stream的key name,`因为上述命令可以指定多个stream,针对多个stream进行监听`。