doc: 阅读redis stream文档
This commit is contained in:
@@ -82,6 +82,16 @@
|
|||||||
- [Leaderboard Example](#leaderboard-example)
|
- [Leaderboard Example](#leaderboard-example)
|
||||||
- [ZADD](#zadd)
|
- [ZADD](#zadd)
|
||||||
- [ZINCRBY](#zincrby)
|
- [ZINCRBY](#zincrby)
|
||||||
|
- [redis Streams](#redis-streams)
|
||||||
|
- [Examples](#examples)
|
||||||
|
- [添加stream entry](#添加stream-entry)
|
||||||
|
- [从指定id开始读取stream entries](#从指定id开始读取stream-entries)
|
||||||
|
- [从末尾开始读取](#从末尾开始读取)
|
||||||
|
- [Stream basics](#stream-basics)
|
||||||
|
- [XADD](#xadd)
|
||||||
|
- [XLEN](#xlen)
|
||||||
|
- [Entry IDs](#entry-ids)
|
||||||
|
- [entry ID设计](#entry-id设计)
|
||||||
|
|
||||||
|
|
||||||
# redis
|
# redis
|
||||||
@@ -1261,3 +1271,122 @@ ZRANGEBYLEX key min max [limit offset count]
|
|||||||
##### ZINCRBY
|
##### ZINCRBY
|
||||||
而`ZINCRBY`命令则是会返回更新后的new score。
|
而`ZINCRBY`命令则是会返回更新后的new score。
|
||||||
|
|
||||||
|
### redis Streams
|
||||||
|
Redis Stream类型数据结构的行为类似于append-only log,但是实现了`o(1)`时间复杂度的`random access`和复杂的消费策略、consumer groups。通过redis stream,可以实时的记录事件并对事件做同步分发。
|
||||||
|
|
||||||
|
通用的redis stream用例如下:
|
||||||
|
- event sourcing(e.g., tracking user actions)
|
||||||
|
- sensor monitoring
|
||||||
|
- notifications(e.g., storing a record of each user's notifications in a separate stream)
|
||||||
|
|
||||||
|
redis会为每个stream entry生成一个unique ID。可以使用IDs来获取其关联的entries,或读取和处理stream中所有的后续entries。
|
||||||
|
|
||||||
|
redis stream支持一些trimming strategies(用于避免stream的无尽增长)。并且,redis stream也支持多种消费策略(XREAD, XREADGROUP, XRANGE)。
|
||||||
|
|
||||||
|
#### Examples
|
||||||
|
##### 添加stream entry
|
||||||
|
在如下示例中,当racers通过检查点时,将会为每个racer添加一个stream entry,stream entry中包含racer name, speed, position, location ID信息,示例如下:
|
||||||
|
```redis-cli
|
||||||
|
> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1
|
||||||
|
"1692632086370-0"
|
||||||
|
> XADD race:france * rider Norem speed 28.8 position 3 location_id 1
|
||||||
|
"1692632094485-0"
|
||||||
|
> XADD race:france * rider Prickett speed 29.7 position 2 location_id 1
|
||||||
|
"1692632102976-0"
|
||||||
|
```
|
||||||
|
#### 从指定id开始读取stream entries
|
||||||
|
在如下示例中,将从stream entry ID `1692632086370-0`开始读取两条stream entries:
|
||||||
|
```redis-cli
|
||||||
|
> XRANGE race:france 1692632086370-0 + COUNT 2
|
||||||
|
1) 1) "1692632086370-0"
|
||||||
|
2) 1) "rider"
|
||||||
|
2) "Castilla"
|
||||||
|
3) "speed"
|
||||||
|
4) "30.2"
|
||||||
|
5) "position"
|
||||||
|
6) "1"
|
||||||
|
7) "location_id"
|
||||||
|
8) "1"
|
||||||
|
2) 1) "1692632094485-0"
|
||||||
|
2) 1) "rider"
|
||||||
|
2) "Norem"
|
||||||
|
3) "speed"
|
||||||
|
4) "28.8"
|
||||||
|
5) "position"
|
||||||
|
6) "3"
|
||||||
|
7) "location_id"
|
||||||
|
8) "1"
|
||||||
|
```
|
||||||
|
#### 从末尾开始读取
|
||||||
|
在如下示例中,会从`end of the stream`开始读取entries,最多读取100条entries,并在没有entries被写入的情况下最多阻塞300ms
|
||||||
|
```redis-cli
|
||||||
|
> XREAD COUNT 100 BLOCK 300 STREAMS race:france $
|
||||||
|
(nil)
|
||||||
|
```
|
||||||
|
#### Stream basics
|
||||||
|
stream为`append-only`数据结构,其基础的write command为`XADD`,会向指定stream中添加一个新的entry。
|
||||||
|
|
||||||
|
每个stream entry都由一个或多个field-value pairs组成,类似dictionary或redis hash:
|
||||||
|
```redis-cli
|
||||||
|
> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
|
||||||
|
"1692632147973-0"
|
||||||
|
```
|
||||||
|
|
||||||
|
##### XADD
|
||||||
|
上述示例中,通过`XADD`向key为`race:france`中添加了值为`rider: Castilla, speed:29.9, position: 1, location_id: 2`的entry,并使用了auto-generated entry ID `1692632147973-0`作为返回值。
|
||||||
|
|
||||||
|
XADD命令的描述如下:
|
||||||
|
```redis-cli
|
||||||
|
XADD key [NOMKSTREAM] [KEEPREF | DELREF | ACKED] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
|
||||||
|
```
|
||||||
|
|
||||||
|
在`XADD race:france * rider Castilla speed 29.9 position 1 location_id 2`的命令示例中,
|
||||||
|
- 第一个参数`race:france`代表key name
|
||||||
|
- 第二个参数为entry id,而`*`代表stream中所有的entry id
|
||||||
|
- 在上述示例中,为第二个参数传入`*`代表`希望server生成一个新的ID`
|
||||||
|
- 所有新生成的ID都应该单调递增,即新生成的ID将会比过去所有entries的ID都要大
|
||||||
|
- 需要显式指定ID而不是新生成的场景比较罕见
|
||||||
|
- 位于第一个和第二个参数之后的为`field-value pairs`
|
||||||
|
|
||||||
|
##### XLEN
|
||||||
|
可以通过`XLEN`命令来获取Stream中items的个数:
|
||||||
|
```redis-cli
|
||||||
|
> XLEN race:france
|
||||||
|
(integer) 4
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Entry IDs
|
||||||
|
entry ID由`XADD`命令返回,并且可以对给定stream中的entries进行唯一标识。entry ID由两部分组成:
|
||||||
|
```
|
||||||
|
<millisecondsTime>-<sequenceNumber>
|
||||||
|
```
|
||||||
|
- `millisecondsTime`: 该部分代表生成stream id时,redis node的本地时间。但是,如果current milliseconds time比`previous entry time`要小,则会使用`previous entry time`而不是`current milliseconds`。
|
||||||
|
- 该设计主要是为了解决`时钟回拨`的问题,即使在redis node回拨本地时间的场景下,新生成的ID仍然能够保证单调递增
|
||||||
|
- `sequenceNumber`: 该部分主要是为了处理`同一ms内创建多条entries的问题`
|
||||||
|
- sequence number的宽度为64bit
|
||||||
|
|
||||||
|
##### entry ID设计
|
||||||
|
entry ID中包含millisecondsTime的原因是,Reids Stream支持`range queries by ID`。因为`ID`关联entry的生成时间,故而可以不花费额外成本的情况下按照`time range`对entries进行查询。
|
||||||
|
|
||||||
|
当在某种场景下,用户可能需要`incremental IDs that are not related to time but are actually associated to another external system ID`,此时`XADD`则可以在第二个参数接收一个实际的ID而不是`*`通配符。
|
||||||
|
- `*`符号会触发auto-generation
|
||||||
|
|
||||||
|
手动指定entry ID的示例如下所示:
|
||||||
|
```redis-cli
|
||||||
|
> XADD race:usa 0-1 racer Castilla
|
||||||
|
0-1
|
||||||
|
> XADD race:usa 0-2 racer Norem
|
||||||
|
0-2
|
||||||
|
```
|
||||||
|
`在通过XADD手动指定entry ID时,后添加的entry ID必须大于先前指定的entry ID`,否则将会返回error。
|
||||||
|
|
||||||
|
```redis-cli
|
||||||
|
> XADD race:usa 0-1 racer Prickett
|
||||||
|
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
|
||||||
|
```
|
||||||
|
|
||||||
|
在redis 7及之后,可以仅显式指定`millisecondsTime`的部分,指定后`sequenceNumber`的部分将会自动生成并填充,示例如下所示:
|
||||||
|
```redis-cli
|
||||||
|
> XADD race:usa 0-* racer Prickett
|
||||||
|
0-3
|
||||||
|
```
|
||||||
|
|||||||
Reference in New Issue
Block a user