- [redis](#redis) - [Using Command](#using-command) - [Keys and values](#keys-and-values) - [Content of Keys](#content-of-keys) - [Hashtags](#hashtags) - [altering and querying the key space](#altering-and-querying-the-key-space) - [key expiration](#key-expiration) - [Navigating the keyspace](#navigating-the-keyspace) - [Scan](#scan) - [keys](#keys) - [pipelining](#pipelining) - [Request/Response protocols and round-trip time(RTT)](#requestresponse-protocols-and-round-trip-timertt) - [redis pipelining](#redis-pipelining) - [Performance Imporvement](#performance-imporvement) - [pipelining vs scripting](#pipelining-vs-scripting) - [Transactions](#transactions) - [Usage](#usage) - [Errors inside transaction](#errors-inside-transaction) - [rollback for redis Transaction](#rollback-for-redis-transaction) - [Discard the command queue](#discard-the-command-queue) - [Optimistic locking using check-and-set](#optimistic-locking-using-check-and-set) - [WATCH](#watch) - [UNWATCH](#unwatch) - [Using WATCH to implement ZPOP](#using-watch-to-implement-zpop) - [Data Types](#data-types) - [Redis Strings](#redis-strings) - [SET / GET](#set--get) - [set with additional arguments](#set-with-additional-arguments) - [GETSET](#getset) - [MSET / MGET](#mset--mget) - [strings as counters](#strings-as-counters) - [Limits](#limits) - [JSON](#json) - [`JSON.SET`](#jsonset) - [json数值操作](#json数值操作) - [json数组操作](#json数组操作) - [`JSON.DEL`](#jsondel) - [`JSON.ARRAPPEND`](#jsonarrappend) - [`JSON.ARRTRIM`](#jsonarrtrim) - [json object操作](#json-object操作) - [format output](#format-output) - [Limitation](#limitation) - [Redis lists](#redis-lists) - [Blocking commands](#blocking-commands) - [Queue(first in, first out)](#queuefirst-in-first-out) - [Stack(first in, last out)](#stackfirst-in-last-out) - [check length of list](#check-length-of-list) - [Atomically pop one element from one list and push to another](#atomically-pop-one-element-from-one-list-and-push-to-another) - [trim the list](#trim-the-list) - [Redis List Impl](#redis-list-impl) - [`LPUSH, RPUSH`](#lpush-rpush) - [`LRANGE`](#lrange) - [`LPOP, RPOP`](#lpop-rpop) - [Common use cases for lists](#common-use-cases-for-lists) - [Capped lists - `latest n`](#capped-lists---latest-n) - [Blocking operations on lists](#blocking-operations-on-lists) - [BRPOP](#brpop) - [Automatic Creation and removal of keys](#automatic-creation-and-removal-of-keys) - [redis sets](#redis-sets) - [basic commands](#basic-commands) - [SADD](#sadd) - [SMEMBERS](#smembers) - [SDIFF](#sdiff) - [SINTER](#sinter) - [SREM](#srem) - [SPOP](#spop) - [SRANDMEMBER](#srandmember) - [redis hashes](#redis-hashes) - [对象表示](#对象表示) - [counters](#counters) - [Field Expiration](#field-expiration) - [Common field expiration use cases](#common-field-expiration-use-cases) - [Field Expiration examples](#field-expiration-examples) - [Sorted sets](#sorted-sets) - [ZRANGEBYSCORE](#zrangebyscore) - [ZREMRANGEBYSCORE / ZREM](#zremrangebyscore--zrem) - [Inclusive and exclusive](#inclusive-and-exclusive) - [ZRANK / ZREVRANK](#zrank--zrevrank) - [`Lexicographical scores`](#lexicographical-scores) - [ZRANGEBYLEX](#zrangebylex) - [Updating the score: leaderboards](#updating-the-score-leaderboards) - [Leaderboard Example](#leaderboard-example) - [ZADD](#zadd) - [ZINCRBY](#zincrby) - [redis Streams](#redis-streams) - [Examples](#examples) - [添加stream entry](#添加stream-entry) - [从指定id开始读取stream entries](#从指定id开始读取stream-entries) - [从末尾开始读取](#从末尾开始读取) - [Stream basics](#stream-basics) - [XADD](#xadd) - [XLEN](#xlen) - [Entry IDs](#entry-ids) - [entry ID设计](#entry-id设计) - [Redis Stream consumer query model](#redis-stream-consumer-query-model) - [Querying by range: XRANGE and XREVRANGE](#querying-by-range-xrange-and-xrevrange) - [query by time range](#query-by-time-range) - [count option](#count-option) - [Listening for new items with XREAD](#listening-for-new-items-with-xread) - [XREAD with STREAMS option](#xread-with-streams-option) - [XREAD with BLOCK argument](#xread-with-block-argument) - [Consumer groups](#consumer-groups) - [Creating a consumer group](#creating-a-consumer-group) - [Create the stream automatically](#create-the-stream-automatically) - [XREADGROUP](#xreadgroup) - [Recovering from permanent failures](#recovering-from-permanent-failures) - [XPENDING](#xpending) - [checking the message content](#checking-the-message-content) - [XCLAIM](#xclaim) - [JUSTID option](#justid-option) - [Automatic claiming](#automatic-claiming) - [Claiming and the delivery counter](#claiming-and-the-delivery-counter) - [working with multiple consumer groups](#working-with-multiple-consumer-groups) - [Enhanced deletion control in Redis 8.2](#enhanced-deletion-control-in-redis-82) - [Stream Observability](#stream-observability) - [Difference with kafka partitions](#difference-with-kafka-partitions) # redis ## Using Command ### Keys and values #### Content of Keys key为data model中`拥有含义的文本名称`。Redis key在命名格式方面几乎没有限制,故而key name中可以包含空格或标点符号。`redis key并不支持namespace或categories`,故而在命名时应当避免命名冲突。 通常,会使用`:`符号来将redis的命名分割为多个sections,例如`office:London`,可以使用该命名规范来实现类似`categories`的效果。 尽管keys通常是文本的,redis中也实现了`binary-safe` key。可以使用任何`byte sequence`来作为有效的key,并且,在redis中`empty string`也可以作为有效的key。 redis key在命名时存在如下规范: - 不推荐使用长度很长的key,会在存储和key-comparisions方面带来负面影响 - 不推荐使用长度非常短的key,其会减少可读性,通常`user:1000:followers`的可读性相较`u1000flw`来说可读性要更好,并且前者带来的额外存储开销也较小 - 应在命名时使用统一的命名模式,例如`object-type:id`,在section中包含多个单词时,可以使用`.`或`-`符号来进行分隔,例如`comment:4321:reply.to`或`comment:4321:reply-to` - key size的最大允许大小为512MB #### Hashtags redis通过hashing来获取`key`关联的value。 通常,整个key都会被用作hash index的计算,但是,在部分场景下,开发者可能只希望使用key中的一部分来计算hash index。此时,可以通过`{}`包围key中`想要计算hash index的部分`,该部分被称为hash-tag`。 例如,`person:1`和`person:2`这两个key会计算出不同的hash index;但是`{persion}:1`和`{person}:2`这两个key计算出的hash index却是相同的,因为只有`person`会被用于计算hash index。 通常,hashtag的应用场景是`在集群场景下进行multi-key operations`。在集群场景下,除非所有key计算出的hash index相同,否则集群并不允许执行multi-key操作。 例如,`SINTER`命令用于查询两个不同`set values`的交集,可以接收多个key。在集群场景下: ```redis SINTER group:1 group:2 ``` 上述命名并无法成功执行,因为`group:1`和`group:2`两个key的hash index不同。 但是,如下命令在集群环境下则是可以正常执行: ```redis SINTER {group}:1 {group}:2 ``` hashtag让两个key产生相同的hash值。 虽然hashtag在上述场景下有效,但是,不应该过度的使用hashtag。因为hashtag相同的key其hash index都相同,故而会被散列到同一个slot中,当同一slot中元素过多时,会导致redis的性能下降。 #### altering and querying the key space 存在部分命令,其并不针对特定的类型,而是用于和key space进行交互,其可以被用于所有类型的key。 例如,`EXISTS`命令会返回0和1,代表给定key在redis中是否存在;而`DEL`命令则是用于删除key和key关联的value,无论value是什么类型。 示例如下: ```bash > set mykey hello OK > exists mykey (integer) 1 > del mykey (integer) 1 > exists mykey (integer) 0 ``` 在上述示例中,`DEL`命令返回的值为1或0,代表要被删除的值在redis中是否存在。 `TYPE`命令则是可以返回`key所关联value的类型`: ```bash > set mykey x OK > type mykey string > del mykey (integer) 1 > type mykey none ``` #### key expiration 在redis中,不管key对应的value为何种类型,都支持`key expiration`特性。`key exipiration`支持为key设置超时,`key expiration`也被称为`time to live`/`TTL`,当`ttl`指定的时间过去后,key将会被自动移除。 对于key expiration: - 在对key设置key expiration时,可以按照秒或毫秒的精度进行设置 - 但是,`expire time`在解析时单位永远为毫秒 - expire相关的信息会被`replicated`并存储在磁盘中,即使在redis宕机时,`time virtually passes`(即redis key其expire若为1天,宕机4小时后恢复,其expire会变为8小时,宕机并不会导致key expire停止计算) 可以通过`EXPIRE`命令来设置key expiration: ```bash > set key some-value OK > expire key 5 (integer) 1 > get key (immediately) "some-value" > get key (after some time) (nil) ``` 在第二次调用时,delay超过5s,key已经不存在。 上述示例中,`expire key 5`将key的超时时间设置为了5s,`EXPIRE`用于为key指定不同的超时时间。 类似的,可以通过`PERSIST`命令来取消key的超时设置,让key永久被保留。 除了使用`expire`来设置超时外,在创建时也能会key指定expiration: ```bash > set key 100 ex 10 OK > ttl key (integer) 9 ``` 上述示例中,使用`ttl`命令来检查key的超时时间。 如果想要按照毫秒来设置超时,可以使用`PEXPIRE`和`PTTL`命令。 #### Navigating the keyspace ##### Scan `SCAN`命令支持对redis中key的增量迭代,在每次调用时只会返回一小部分数据。该命令可以在生产中使用,并不会像`keys`或`smembers`等命令一样,在处理大量elements或keys时可能产生长时间的阻塞。 scan使用实例如下: ```bash > scan 0 1) "17" 2) 1) "key:12" 2) "key:8" 3) "key:4" 4) "key:14" 5) "key:16" 6) "key:17" 7) "key:15" 8) "key:10" 9) "key:3" 10) "key:7" 11) "key:1" > scan 17 1) "0" 2) 1) "key:5" 2) "key:18" 3) "key:0" 4) "key:2" 5) "key:19" 6) "key:13" 7) "key:6" 8) "key:9" 9) "key:11" ``` scan是一个cursor based iterator,每次在调用scan命令时,都会返回一个`update cursor`,并且在下次调用scan时需要使用上次返回的cursor。 当cursor被设置为0时,iteration将会开始,并且当server返回的cursor为0时,iteration结束。 ##### keys 除了scan外,还可以通过keys命令来迭代redis中所有的key。但是,和`scan`的增量迭代不同的是,keys会一次性返回所有的key,在返回前会阻塞redis-server。 keys命令支持glob-style pattern: - `h?llo`:`?`用于匹配单个字符 - `h*llo`: `*`用于匹配除`/`外的任何内容 - `h[ae]llo`: 匹配`hallo`和`hello` - `h[^e]llo`: [^e]匹配除`e`外的任何字符 - `h[a-b]llo`: 匹配`hallo`和`hbllo` global-style pattern中转义符为`\` ### pipelining redis pipelining支持一次发送多条命令,而非`逐条发送命令,并且发送后一条命令之前必须要等待前一条请求执行完成`。pipelining被绝大多数redis client支持,能够提高性能。 #### Request/Response protocols and round-trip time(RTT) redis是一个使用`client-server model`的tcp server,在请求完成前,会经历如下步骤: - client向server发送query,并且阻塞的从socket中读取server的响应 - server接收到client的请求后,处理命令,并且将处理结果返回给client 例如,包含4条命令的命令序列如下: 1. client: incr x 2. server: 1 3. client: incr x 4. server: 2 5. client: incr x 6. server: 3 7. client: incr x 8. server: 4 client和server之间通过网络进行连接,每次client和server的请求/响应,都需要经历`client发送请求,server发送响应`的过程,该过程会经过网络来传输,带来传输延迟。 该延迟被称为`RTT`(round trip time), 如果在一次请求中能够发送`n`条命令,那么将能够节省`n-1`次网络传输的往返时间。例如,RTT如果为`250ms`,即使server能够以`100K/s`的速度处理请求,对于同一client,其每秒也只能处理4条命令。 #### redis pipelining 在redis server处理命令时,其处理新请求前并不要求client接收到旧请求,并且client在发送多条命令后,会一次性统一读取执行结果。 该技术被称为`Pipelining`,在其他协议中也有广泛使用,例如`POP3`。 pipelining在redis的所有版本中都被支持,示例如下: ```bash $ (printf "PING\r\nPING\r\nPING\r\n"; sleep 1) | nc localhost 6379 +PONG +PONG +PONG ``` 通过pipelining,并不需要对每个命令都花费RTT用于网络传输,而是在一次网络传输时就包含3条命令。 > 当client使用pipelining发送commands时,server会在内存中对replies进行排队。故而,在client需要使用pipeline向server发送大量的请求时,其需要分批发送,每批中包含合适数量的命令。 > pipeline会积累多条命令并一次性发送给server。 #### Performance Imporvement pipeline不仅能够减少RTT的次数,其也能够增加redis server在每秒的执行操作数。 在redis server处理command时,实际的处理逻辑开销很小,但是和socket io的交互开销却很大。在进行socket io时,会进行`write`和`read`的系统调用,其涉及到用户态和内核态的切换,这将带来巨大的开销。 如果使用pipeline,多条指令只需要调用一次`read`系统调用,并且多条执行的执行结果只需要通过一次`write`系统调用即能够执行。通过使用pipeline,能够有效降低redis server的系统调用次数,这将减少socket io带来的开销,故而redis server能够在一秒内执行更多的commands。 #### pipelining vs scripting 相比于pipelining,scripting可以在`read, compute, write`的场景下带来更高的性能。pipelining并无法处理`read, compute, write`的场景,因为在执行write command之前,需要先获取read command的执行结果,故而无法将read和write命令通过pipeline同时发送给server。 ### Transactions redis transaction支持`execute a group of commands in a single step`,其涉及到`multi, exec, discard, watch`命令。 - 在redis事务中,所有命令都会被串行、按顺序执行,在redis transaction执行时,其他client发送的请求永远不会插入到redis transaction中间。在redis transaction中,所有命令都会`executed as a single siolated operation`,事务执行的过程中不会被其他命令打断 - `EXEC`命令会触发事务中所有命令的执行,故而,当client在`事务上下文中exec命令调用之前`,失去了与server的连接,事务中的命令都不会被执行。只有当exec被调用后,事务中的命令才会实际开始执行 #### Usage 可以通过`multi`命令来进入redis事务,该命令总会返回`ok`。在进入事务后,可以向事务中添加多条命令,这些命令并不会被立马执行,而是被排队。只有当发送`EXEC`命令后,之前排队的命令才会被实际执行。 `DISCARD`命令可以清空被排队的命令,并且退出事务的上下文。 如下示例展示了如何通过事务原子的执行一系列命令: ```bash > MULTI OK > INCR foo QUEUED > INCR bar QUEUED > EXEC 1) (integer) 1 2) (integer) 1 ``` `EXEC`命令会返回一个数组,数组中元素为之前QUEUED COMMANDS的返回结果,顺序和命令被添加到队列中的顺序相同。 在事务上下文中,所有命令(`EXEC`除外)都会返回`QUEUED`。 #### Errors inside transaction 在使用事务时,可能遇到如下两种errors: - 将命令添加到queue中时可能发生失败,该时机在`EXEC`被执行之前。例如,command可能存在语法错误,或是存在内存错误等,都可能导致命令添加到queue失败 - 调用`EXEC`对入队的命令实际执行时,可能发生异常,例如在实际执行command时,对string类型的value执行了list操作 对于`EXEC`时产生的错误,并没有特殊处理:`即使事务中部分命令实际执行失败,其他的命令也都会被执行`。 示例如下所示: ```redis Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. MULTI +OK SET a abc +QUEUED LPOP a +QUEUED EXEC *2 +OK -WRONGTYPE Operation against a key holding the wrong kind of value ``` 上述示例中执行了两个命令,其中命令1执行成功而命令2执行失败。 需要注意的是,`事务中即使某个命令执行失败,queue中的其他命令仍然会被执行`,redis在执行某条命令失败时,并不会对别的命令执行造成影响。 #### rollback for redis Transaction 对于redis transaction,其并不对`rollback`做支持,rollback会对redis的性能造成巨大影响,也会影响redis的易用性。 #### Discard the command queue 如果想要对事务进行abort,可以调用`DISCARD`命令,在该场景下,并不会有命令被实际执行,并且连接状态也会恢复为正常: ```redis > SET foo 1 OK > MULTI OK > INCR foo QUEUED > DISCARD OK > GET foo "1" ``` #### Optimistic locking using check-and-set 在redis transaction中,`WATCH`命令用于提供`CAS`行为。`watched keys`将会被监控,并探知其是否发生变化。 在执行`EXEC`命令前,如果存在任一key发生过修改,那么整个事务都会发生`abort`,并且会返回`NULL`,用以提示事务失败。 如下是一个`read, compute, write`的示例: ``` val = GET mykey val = val + 1 SET mykey val ``` 上述逻辑在`只存在一个客户端`的场景下工作正常,但是当存在多个客户端时,将会发生竞争。由于上述逻辑并不是原子的,故而可能出现如下场景: 1. client A read old value 2. client B read old value 3. client A compute `old value + 1` 4. client B compute `old value + 1` 5. client A set new value with `old value + 1` 6. client B set new value with `old value + 1` 故而,在多client场景下,假设old value为10,即使client A和client B都对value进行了incr,最后new value的值仍有可能为11而不是12 通过WATCH机制能够解决该问题 ``` WATCH mykey val = GET mykey val = val + 1 MULTI SET mykey $val EXEC ``` 在上述示例中,在进入事务上下文前,client对mykey进行了watch并完成新值的计算,之后,进入事务上下文后,用new value设置mykey,并调用`EXEC`命令。 如果WATCH和EXEC之间,存在其他client修改了mykey的值,那么当前事务将会失败。 只需要在发生竞争时重新执行上述流程,那么其即是乐观锁。 #### WATCH `WATCH`命令会让`EXEC`是条件的: - 只有当所有watched keys都未修改的前提下,才会让redis实际执行transaction `watched keys`可能发生如下的修改: - watched keys可能被其他client修改 - watched keys可能被redis本身修改,redis本身的修改包含如下场景 - expiration - eviction 如果在`对keys进行watch`和实际调用`exec`之间,keys发生的变化,整个transaction都会被abort。 > 在redis 6.0.9之前,expired keys并不会造成redis transaction被abort > 在本事务内的命令并不会造成WATCH condition被触发,因为WATCH机制的时间范围为keys watched的时间点到exec调用前的时间点,而queued commands在调用exec后才会实际执行 `watch`命令可以被多次调用,所有的watch命令都会生效,并且在watch被调用后就开始监控key的变化,监控一直到`EXEC`被调用后才结束。 对于`WATCH`命令,可以传递任意数量的参数。 在`EXEC`命令被调用后,所有的watched keys都会被`unwatched`,不管事务是否被aborted。并且,当client连接关闭后,所有keys都会被unwatched。 对于`discard`命令,其在调用后所有watched keys也会自动被`unwatched`。 #### UNWATCH 可以通过`UNWATCH`命令(无参数)来清空所有的watched keys。 通常,在调用`MULTI`进入事务前,会执行如下操作: - `WATCH mykey` - `GET mykey` 如果`在GET mykey`后,`调用MULTI`之前,如果在读取mykey的值后不再想执行后续事务了,那么可以直接调用`UNWATCH`,对先前监视的所有key取消监视。 #### Using WATCH to implement ZPOP 如下是一个使用WATCH的示例 ```redis WATCH zset element = ZRANGE zset 0 0 MULTI ZREM zset element EXEC ``` > 如果`EXEC失败,那么其将返回Null`,所以仅需对之前操作进行重试即可 ## Data Types ### Redis Strings Redis strings存储字节序列,包含文本、serialized objects、binary array。strings通常被用于缓存,但是支持额外的功能,例如counters和位操作。 redis keys也为strings。 string data type可用于诸多用例,例如对html fragement/html page的缓存。 #### SET / GET 通过set和get命令,可以对string value进行设置和获取。 ```redis-cli > SET bike:1 Deimos OK > GET bike:1 "Deimos" ``` 在使用`SET`命令时,如果key已存在对应的值,那么set指定的value将会对已经存在的值进行替换。`即使key对应的旧值并不是strings类型,set也会对其进行替换`。 values可以是`strings of every kind`(包含binary data),故而支持在value中存储jpeg image。value的值不能超过512MB. #### set with additional arguments 在使用`set`命令时,可以为其提供额外的参数,例如`NX | XX`. - `NX`: 仅当redis不存在对应的key时才进行设置,否则失败(返回nil) - `XX`: 仅当redis存在对应的key时的才进行设置,否则失败(返回nil) #### GETSET GETSET命令将会将key设置为指定的new value,并且返回oldvalue的值。 ```redis-cli 127.0.0.1:6379> get bike:1 (nil) 127.0.0.1:6379> GETSET bike:1 3 (nil) 127.0.0.1:6379> GET bike:1 "3" ``` #### MSET / MGET strings类型支持通过`mset, mget`命令来一次性获取和设置多个keys,这将能降低RTT带来的延迟。 ```redis-cli > mset bike:1 "Deimos" bike:2 "Ares" bike:3 "Vanth" OK > mget bike:1 bike:2 bike:3 1) "Deimos" 2) "Ares" 3) "Vanth" ``` #### strings as counters strings类型支持atomic increment: ```redis-cli > set total_crashes 0 OK > incr total_crashes (integer) 1 > incrby total_crashes 10 (integer) 11 ``` `incr`命令会将string value转化为integer,并且对其进行加一操作。类似命令还有`incrby, decr, drcrby`。 #### Limits 默认情况下,单个redis string的最大限制为`512MB`。 ### JSON redis支持对json值的存储、更新和获取。redis json可以和redis query engine进行协作,从而允许`index and query json documents`。 > 在redis 8中内置支持了RedisJSON,否则需要手动安装RedisJSON module。 #### `JSON.SET` `JSON.SET`命令支持将redis的key设置为JSON value,示例如下: ```redis-cli 127.0.0.1:6379> JSON.SET bike $ '"Hyperion"' OK 127.0.0.1:6379> JSON.GET bike $ "[\"Hyperion\"]" 127.0.0.1:6379> type bike ReJSON-RL 127.0.0.1:6379> JSON.TYPE bike $ 1) "string" ``` 在上述示例中,`$`代表的是指向json document中value的`path`: - 在上述示例中,`$`代表root 除此之外,JSON还支持其他string operation。`JSON.STRLNE`支持获取string长度,并且可以通过`JSON.STRAPPEND`来在当前字符串后追加其他字符串: ```redis-cli > JSON.STRLEN bike $ 1) (integer) 8 > JSON.STRAPPEND bike $ '" (Enduro bikes)"' 1) (integer) 23 > JSON.GET bike $ "[\"Hyperion (Enduro bikes)\"]" ``` #### json数值操作 RedisJSON支持`increment`和`multiply`操作: ```redis-cli > JSON.SET crashes $ 0 OK > JSON.NUMINCRBY crashes $ 1 "[1]" > JSON.NUMINCRBY crashes $ 1.5 "[2.5]" > JSON.NUMINCRBY crashes $ -0.75 "[1.75]" > JSON.NUMMULTBY crashes $ 24 "[42]" ``` #### json数组操作 RedisJSON支持通过`JSON.SET`将值赋值为数组,`path expression`支持数组操作 ```redis-cli > JSON.SET newbike $ '["Deimos", {"crashes": 0}, null]' OK > JSON.GET newbike $ "[[\"Deimos\",{\"crashes\":0},null]]" > JSON.GET newbike $[1].crashes "[0]" > JSON.DEL newbike $[-1] (integer) 1 > JSON.GET newbike $ "[[\"Deimos\",{\"crashes\":0}]]" ``` ##### `JSON.DEL` `JSON.DEL`支持通过`path`对json值进行删除。 ##### `JSON.ARRAPPEND` 支持向json array中追加值。 ##### `JSON.ARRTRIM` 支持对json array进行裁剪。 ```redis-cli > JSON.SET riders $ [] OK > JSON.ARRAPPEND riders $ '"Norem"' 1) (integer) 1 > JSON.GET riders $ "[[\"Norem\"]]" > JSON.ARRINSERT riders $ 1 '"Prickett"' '"Royce"' '"Castilla"' 1) (integer) 4 > JSON.GET riders $ "[[\"Norem\",\"Prickett\",\"Royce\",\"Castilla\"]]" > JSON.ARRTRIM riders $ 1 1 1) (integer) 1 > JSON.GET riders $ "[[\"Prickett\"]]" > JSON.ARRPOP riders $ 1) "\"Prickett\"" > JSON.ARRPOP riders $ 1) (nil) ``` #### json object操作 json oject操作同样有其自己的命令,示例如下: ```redis-cli > JSON.SET bike:1 $ '{"model": "Deimos", "brand": "Ergonom", "price": 4972}' OK > JSON.OBJLEN bike:1 $ 1) (integer) 3 > JSON.OBJKEYS bike:1 $ 1) 1) "model" 2) "brand" 3) "price"> JSON.SET bike:1 $ '{"model": "Deimos", "brand": "Ergonom", "price": 4972}' ``` #### format output redis-cli支持对json内容的输出进行格式化,步骤如下: - 在执行`redis-cli`时指定`--raw`选项 - 通过formatting keywords来进行格式化 - `INDENT` - `NEWLINE` - `SPACE` ```bash $ redis-cli --raw > JSON.GET obj INDENT "\t" NEWLINE "\n" SPACE " " $ [ { "name": "Leonard Cohen", "lastSeen": 1478476800, "loggedOut": true } ] ``` #### Limitation 传递给command的json值最大深度只能为128,如果嵌套深度大于128,那么command将返回错误。 ### Redis lists Redis lists为string values的链表,redis list通常用于如下场景: - 实现stack和queue - 用于backgroup worker system的队列管理 #### Blocking commands redis lists中支持阻塞命令 - `BLPOP`: 从`head of a list`移除并且返回一个element,如果list为空,该command会阻塞,直至list被填充元素或发生超时 - `BLMOVE`: 从source list中pop一个element,并且将其push到target list中。如果source list为空,那么command将会被阻塞,直到source list中出现new element > 在上述文档描述中,`阻塞`实际指的是针对客户端的阻塞。该`Blocking Command`命令的调用会实际的阻塞客户端,直到redis server返回结果。 > > 但是,`server并不会被blocking command所阻塞`。redis server为单线程模型,当用户发送blocking command给server,并且该command无法立即被执行而导致客户端阻塞时,`server会挂起该客户端的连接`,并且转而处理其他请求,直至该客户端的阻塞命令满足执行条件时,才会将挂起的连接唤醒重新执行。 > > 故而,`Blocking Command`并不会对server造成阻塞,而是会阻塞客户端的调用。 #### Queue(first in, first out) 依次调用`LPUSH`后再依次调用`RPOP`,可模拟队列行为,元素的弹出顺序和元素的添加顺序相同: ```redis-cli > LPUSH bikes:repairs bike:1 (integer) 1 > LPUSH bikes:repairs bike:2 (integer) 2 > RPOP bikes:repairs "bike:1" > RPOP bikes:repairs "bike:2" ``` #### Stack(first in, last out) 依次调用`LPUSH`后再依次调用`LPOP`,可模拟栈的行为,元素的移除顺序和添加顺序相反: ```redis-cli > LPUSH bikes:repairs bike:1 (integer) 1 > LPUSH bikes:repairs bike:2 (integer) 2 > LPOP bikes:repairs "bike:2" > LPOP bikes:repairs "bike:1" ``` #### check length of list 可通过`LLEN`命令来检查list的长度 ```redis-cli > LLEN bikes:repairs (integer) 0 ``` #### Atomically pop one element from one list and push to another 通过`lmove`命令能够实现原子的`从srclist移除并添加到dstlist`的操作 ```redis-cli > LPUSH bikes:repairs bike:1 (integer) 1 > LPUSH bikes:repairs bike:2 (integer) 2 > LMOVE bikes:repairs bikes:finished LEFT LEFT "bike:2" > LRANGE bikes:repairs 0 -1 1) "bike:1" > LRANGE bikes:finished 0 -1 1) "bike:2" ``` #### trim the list 可以通过`LTRIM`命令来完成对list的裁剪操作: ```redis-cli > LPUSH bikes:repairs bike:1 (integer) 1 > LPUSH bikes:repairs bike:2 (integer) 2 > LMOVE bikes:repairs bikes:finished LEFT LEFT "bike:2" > LRANGE bikes:repairs 0 -1 1) "bike:1" > LRANGE bikes:finished 0 -1 1) "bike:2" ``` #### Redis List Impl redis lists是通过Linked List来实现的,其元素的添加操作并开销永远是常量的,并不会和Array一样因扩容而可能导致内存的复制。 #### `LPUSH, RPUSH` `LPUSH`会将元素添加到list的最左端(头部),而`RPUSH`则会将新元素添加奥list的最右端(尾部)。 LPUSH和RPUSH接收的参数都是可变的,在单次调用中可以向list中添加多个元素。 例如,向空list中调用`lpush 1 2 3`时,其等价于`lpush 1; lpush 2; lpush3`,即调用后list中元素为`3,2,1` #### `LRANGE` `LRANGE`命令能够从list中解析范围内的数据,其接收两个indexes,为range中开始和结束元素的位置。index可以是负的,负数代表从尾端开始计数: - `-1`代表最后的元素 - `-2`代表倒数第二个元素 ```redis-cli > RPUSH bikes:repairs bike:1 (integer) 1 > RPUSH bikes:repairs bike:2 (integer) 2 > LPUSH bikes:repairs bike:important_bike (integer) 3 > LRANGE bikes:repairs 0 -1 1) "bike:important_bike" 2) "bike:1" 3) "bike:2" ``` #### `LPOP, RPOP` lists支持pop元素,从list中移除元素,并且获取元素的值。lists支持从list的左端和右端pop元素,使用示例如下所示: ```redis-cli > RPUSH bikes:repairs bike:1 bike:2 bike:3 (integer) 3 > RPOP bikes:repairs "bike:3" > LPOP bikes:repairs "bike:1" > RPOP bikes:repairs "bike:2" > RPOP bikes:repairs (nil) ``` #### Common use cases for lists redis lists拥有如下有代表性的用例场景: - 记录用户最新上传的推文 - 用于进程间的通信,使用consumer-producer pattern,其中生产者向lists中推送内容,而消费者消费lists中的内容 #### Capped lists - `latest n` 在许多用例场景下,会使用lists来存储latest items,例如`social network updates, logs`等。 redis允许将lists作为`拥有容量上限的集合使用`,可以通过`LTRIM`命令来实现`only remembering the latest N items and discarding all the oldest items`。 `LTRIM`命令和`LRANGE`类似,但是`LRANGE`用于获取获取list中指定范围内的元素,`LTRIM`会将选中的范围作为`new list value`,所有位于选中范围之外的元素都会从list中被移除。 `LTRIM`的使用示例如下所示: ```redis-cli > RPUSH bikes:repairs bike:1 bike:2 bike:3 bike:4 bike:5 (integer) 5 > LTRIM bikes:repairs 0 2 OK > LRANGE bikes:repairs 0 -1 1) "bike:1" 2) "bike:2" 3) "bike:3" ``` `LTRIM 0 2`会令redis保留index位于`[0, 2]`范围内的3个元素,并且移除其他的元素。将`push操作`和`LTRIM`操作组合,可以实现`add a new element and discard elements exceeding a limt`的操作。 例如,使用`LRANGE -3 -1`可用于实现`仅保留最近添加的三个元素`的场景 ```redis-cli > RPUSH bikes:repairs bike:1 bike:2 bike:3 bike:4 bike:5 (integer) 5 > LTRIM bikes:repairs -3 -1 OK > LRANGE bikes:repairs 0 -1 1) "bike:3" 2) "bike:4" 3) "bike:5" ``` #### Blocking operations on lists lists的`blocking operation`特性令其适合用于实现queues,并广泛用于进程间通信系统。 在通过redis lists实现进程间通信系统时,如果某些时刻list为空,并不存在任何元素,那么此时消费者client在调用`pop`操作时只会返回为空。通常来讲,consumer会等待一定的时间并且重新尝试调用pop,该操作被称为`polling`,其通常被认为是一种不好的实现: - 其会强制redis/client来处理无用的命令(`当list为空时,pop请求只会返回为空而不会任何的实际处理`) - 会增加`delay to processing of items`,因为worker在接收到redis server返回的null时,其会等待一定的时间。为了令delay更小,可以在调用POP操作之间等待更短的时间,但是其可能方法前一个问题(`当pop调用之间的时间间隔更小时,redis server可能会处理更多的无用命令`) 故而,redis实现支持`BRPOP`和`BLPOP`命令,其命令在list为空时会阻塞:`上述命令造成的阻塞会在list中被添加新元素时返回,如果直到设置的超时到达后,该操作也会返回`。 `BRPOP`的使用示例如下所示: ```redis-cli > RPUSH bikes:repairs bike:1 bike:2 (integer) 2 > BRPOP bikes:repairs 1 1) "bikes:repairs" 2) "bike:2" > BRPOP bikes:repairs 1 1) "bikes:repairs" 2) "bike:1" > BRPOP bikes:repairs 1 (nil) (2.01s) ```` 上述示例中,`BRPOP bikes:repairs 1`代表`wait for elements in the list bikes:repairs`,但是`当list中元素为空时,最多等待1s。` 当为`BRPOP`指定timeout为0时,代表会永久等待elements。并且,`可以为BRPOP`命令指定多个lists,其会`等待所有的list,并且当任何一个list中接收到元素时,当前BRPOP命令会立刻返回`。 示例如下: ```redis-client # client 1 等待 event-queue:1, event-queue:2, event-queue:3三个list client 1> brpop event-queue:1 event-queue:2 event-queue:3 1000 # client 2 向event-queue:2 中追加元素 client 2> rpush event-queue:2 baka (integer) 1 # client 1 立刻返回,返回结果如下 1) "event-queue:2" 2) "baka" (19.55s) ``` ##### BRPOP - 对于`BRPOP`命令造成的阻塞,其处理是按照顺序的:`the first client that blocked waiting for a list, is served first when an element is pushed by some other client, and so forth` - `BRPOP`命令的返回结果其结构和`RPOP`命令不同:`BRPOP`返回的是一个包含两个元素的array,`arrary[0]`为list对应的key,`array[1]`为弹出的元素(因为BRPOP可以等待多个list) - 如果超时后list中仍然没有可获取的元素,那么将会返回null #### Automatic Creation and removal of keys 在先前示例中,向list中添加元素时,并没有预先创建空的list,或是在list中没有元素时将list手动移除。 在redis中,`list的创建和删除都是redis的职责`: - 当list中不再包含元素时,redis会自动删除list对应的key - 当想要对不存在的key中添加元素时,redis会自动创建一个empty list 故而,可以整理除如下准则: - 当将元素添加到一个聚合数据类型时,如果target key不存在,那么在添加元素前一个empty aggregate data type将会被自动创建 - 当从aggregate data type中移除元素时,如果移除后该aggregate data type中为空,那么key将会被自动销毁(`stream data type`除外) - 调用一个read-only command(例如`LLEN`)或write command(用于移除元素)对一个`empty key`做操作时,其返回结果和针对`an key holding an empty aggregate type of type the command expects to find`的操作一直 上述三个准则的示例如下: 准则1示例如下所示,当new_bikes不存在时,通过`LPUSH`命令向其中添加元素,一个empty list在添加前会自动创建 ```redis-cli > DEL new_bikes (integer) 0 > LPUSH new_bikes bike:1 bike:2 bike:3 (integer) 3 ``` 准则2示例如下所示,当pop出所有的元素后,key将会被自动销毁,通过`EXISTS`命令返回的结果为0: ```redis-cli > LPUSH bikes:repairs bike:1 bike:2 bike:3 (integer) 3 > EXISTS bikes:repairs (integer) 1 > LPOP bikes:repairs "bike:3" > LPOP bikes:repairs "bike:2" > LPOP bikes:repairs "bike:1" > EXISTS bikes:repairs (integer) 0 ``` 准则3的示例如下所示,当key不存在时,对该key进行`read-only`操作和`remove element`操作所返回的结果,和对`empty aggregated data type`操作所返回的结果一致: ```redis-cli > DEL bikes:repairs (integer) 0 > LLEN bikes:repairs (integer) 0 > LPOP bikes:repairs (nil) ``` ### redis sets redis set是一个unordered collection of unique strings,通过redis sets可以高效进行如下操作: - track unique items - represent relations - perform common set operations such as intersection, unions, and differences #### basic commands - `SADD`: 向set中添加new member - `SREM`: 从set中移除指定member - `SISMEMBER`: 检查给定的string是否位于set中 - `SINTER`: 返回两个或更多set的交集 - `SCARD`: 返回set的大小(cardinality) #### SADD `SADD`命令会向set中添加新的元素,示例如下: ```redis-cli > SADD bikes:racing:france bike:1 bike:2 bike:3 (integer) 3 > SMEMBERS bikes:racing:france 1) bike:3 2) bike:1 3) bike:2 ``` #### SMEMBERS 在上述示例中,`SMEMBERS`命令会返回set中所有的元素。`redis`并不保证元素的返回顺序,每次调用`SMEMBERS`命令都可能以任何顺序返回set中的元素。 #### SDIFF 可以通过SDIFF来返回两个sets的差异(差集)。例如,可以通过`SDIFF`命令查看有哪些元素位于`set1`中但是不位于`set2`中,示例如下: ```redis-cli > SADD bikes:racing:usa bike:1 bike:4 (integer) 2 > SDIFF bikes:racing:france bikes:racing:usa 1) "bike:3" 2) "bike:2" ``` 上述示例中,则通过`SDIFF`命令展示了`bikes:racing:france`和`bikes:racing:usa`两个set的差集。 SDIFF命令在`difference between all sets is empty`时,会返回一个empty array。 #### SINTER 可以通过SINTER命令来取多个sets的交集。 ```redis-cli > SADD bikes:racing:france bike:1 bike:2 bike:3 (integer) 3 > SADD bikes:racing:usa bike:1 bike:4 (integer) 2 > SADD bikes:racing:italy bike:1 bike:2 bike:3 bike:4 (integer) 4 > SINTER bikes:racing:france bikes:racing:usa bikes:racing:italy 1) "bike:1" > SUNION bikes:racing:france bikes:racing:usa bikes:racing:italy 1) "bike:2" 2) "bike:1" 3) "bike:4" 4) "bike:3" > SDIFF bikes:racing:france bikes:racing:usa bikes:racing:italy (empty array) > SDIFF bikes:racing:france bikes:racing:usa 1) "bike:3" 2) "bike:2" > SDIFF bikes:racing:usa bikes:racing:france 1) "bike:4" ``` #### SREM 可以通过`SREM`命令来移除set中的元素,可以一次性移除一个或多个。 #### SPOP `SPOP`命令支持随机移除一个element。 #### SRANDMEMBER `SRANDMEMBER`命令支持随机返回一个set中的元素,但是不对其实际移除 上述命令的使用示例如下所示: ```redis-cli > SADD bikes:racing:france bike:1 bike:2 bike:3 bike:4 bike:5 (integer) 5 > SREM bikes:racing:france bike:1 (integer) 1 > SPOP bikes:racing:france "bike:3" > SMEMBERS bikes:racing:france 1) "bike:2" 2) "bike:4" 3) "bike:5" > SRANDMEMBER bikes:racing:france "bike:2" ``` ### redis hashes redis hashes为记录`field-value pair`集合的数据结构,可以使用hashes来表示基本对象或存储counter的分组,示例如下: #### 对象表示 ```redis-cli > HSET bike:1 model Deimos brand Ergonom type 'Enduro bikes' price 4972 (integer) 4 > HGET bike:1 model "Deimos" > HGET bike:1 price "4972" > HGETALL bike:1 1) "model" 2) "Deimos" 3) "brand" 4) "Ergonom" 5) "type" 6) "Enduro bikes" 7) "price" 8) "4972" ``` 通常来讲,可以存储在hash中的fields数量并没有限制。 命令`HSET`可用于向hash中设置多个fields,而命令`HGET`可以用于获取一个field,`HMGET`可以用于获取多个field。 ```redis-cli > HMGET bike:1 model price no-such-field 1) "Deimos" 2) "4972" 3) (nil) ``` 同样的,hash结构支持对单个field进行操作,例如`HINCRBY` ```redis-cli > HINCRBY bike:1 price 100 (integer) 5072 > HINCRBY bike:1 price -100 (integer) 4972 ``` #### counters 将hash用于存储counters分组的示例如下所示: ```redis-cli > HINCRBY bike:1:stats rides 1 (integer) 1 > HINCRBY bike:1:stats rides 1 (integer) 2 > HINCRBY bike:1:stats rides 1 (integer) 3 > HINCRBY bike:1:stats crashes 1 (integer) 1 > HINCRBY bike:1:stats owners 1 (integer) 1 > HGET bike:1:stats rides "3" > HMGET bike:1:stats owners crashes 1) "1" 2) "1" ``` #### Field Expiration 在`redis open source 7.4`中,支持为独立的hash field指定超时: - `HEXPIRE`: set the remaining TTL in seconds - `HPEXPIRE`: set the remaining TTL in milliseconds - `HEXPIREAT`: set expiration time to a timestamp specified in seconds - `HPEXPIREAT`: set the expiration time to a timestamp specified in milliseconds 如上所示,在指定超时时,可以通过时间戳来指定,也可以通过TTL来指定。 同时,获取超时事件也可以通过`时间戳`和`TTL`来获取: - `HEXPIRETIME`: get the expiration time as timestamp in seconds - `HPEXPIRETIME`: get the expiration time as timestamp in milliseconds - `HTTL`: get the remaining ttl in seconds - `HPTTL`: get the remaining ttl in milliseconds 如果想要移除指定hash field的expration,可以通过如下方式: - `HPERSIST`: 移除hash field的超时 ##### Common field expiration use cases - `Event Tracking`:使用hash key来存储`最后一小时的事件`。其中,field为每个事件,而设置事件field的ttl为1小时,并可使用`HLEN`来统计最后一小时的事件数量。 - `Fraud Detection`:通常,用户行为进行分析时,可按小时记录用户的事件数量。可通过hash结果记录过去48小时中每小时的操作数量,其中hash field代表用户某一个小时内的操作数。每个hash field的过期时间都为48h。 - `Customer session management`: 可以通过hash来存储用户数据。为每个session创建一个hash key,并且向hash key中添加session field。当session过期时,自动对session key和session field进行expire操作。 - `Active Session Tracking`: 将所有的active sessions存储再一个hash key中。每当session变为inactive时,将session的TTL设置为过期。可以使用`HLEN`来统计活跃的sessions数量。 ##### Field Expiration examples `对于hash field ixpiration的支持在官方client libraries中尚不可用`,但是可以在`python(redis-py)`和`java(jedis)`的beta版本client libraries中使用。 如下python示例展示了如何使用field expiration: ```py event = { 'air_quality': 256, 'battery_level':89 } r.hset('sensor:sensor1', mapping=event) # set the TTL for two hash fields to 60 seconds r.hexpire('sensor:sensor1', 60, 'air_quality', 'battery_level') ttl = r.httl('sensor:sensor1', 'air_quality', 'battery_level') print(ttl) # prints [60, 60] # set the TTL of the 'air_quality' field in milliseconds r.hpexpire('sensor:sensor1', 60000, 'air_quality') # and retrieve it pttl = r.hpttl('sensor:sensor1', 'air_quality') print(pttl) # prints [59994] # your actual value may vary # set the expiration of 'air_quality' to now + 24 hours # (similar to setting the TTL to 24 hours) r.hexpireat('sensor:sensor1', datetime.now() + timedelta(hours=24), 'air_quality') # and retrieve it expire_time = r.hexpiretime('sensor:sensor1', 'air_quality') print(expire_time) # prints [1717668041] # your actual value may vary ``` ### Sorted sets redis sorted set是一个包含unique strings的集合,其中unique strings会关联一个score,并且strings会按照score进行排序。`如果多个string存在相同的score,那么拥有相同score的strings将会按照字典序进行排序`。 sorted sets的用例场景包括如下: - `Leaderboards`: 可以通过sorted sets维护一个ordered list,其可被用于实现排行榜 - `RateLimiter`: 通过sorted list,可以构建一个`sliding-window rate limiter`,从而避免过多的api调用 - 在实现`sliding-window rate limiter`时,可以将时间戳作为score,因为可以快速获取一个时间范围内的调用数量 可以将sorted sets看作是set和hash的混合, - 和set一样,sorted sets由unique strings组成 - 和hash一样,sorted sets中元素由一个关联的floating point value,被称为score sorted set的使用示例如下: ```redis-cli > ZADD racer_scores 10 "Norem" (integer) 1 > ZADD racer_scores 12 "Castilla" (integer) 1 > ZADD racer_scores 8 "Sam-Bodden" 10 "Royce" 6 "Ford" 14 "Prickett" (integer) 4 ``` `ZADD`和`SADD`类似,但是其接收一个用于表示`score`的额外参数。和`SADD`类似,可以使用`ZADD`来添加多个`score-value pairs`。 > #### Implementation > sorted sets实现的数据结构中,同时使用了`skip list`和`hash table`两种数据结构,故而每次向zset中添加元素时,其操作的复杂度为o(log(n))`。 > > 并且,在获取元素时,由于元素已经被排序,获取操作无需其他的额外开销。 `ZRANGE`的顺序为从小到大,`ZREVRANGE`的顺序则是从大到小 ```redis-cli > ZRANGE racer_scores 0 -1 1) "Ford" 2) "Sam-Bodden" 3) "Norem" 4) "Royce" 5) "Castilla" 6) "Prickett" > ZREVRANGE racer_scores 0 -1 1) "Prickett" 2) "Castilla" 3) "Royce" 4) "Norem" 5) "Sam-Bodden" 6) "Ford" ``` 在上述示例中,0和-1代表index位于`[0, len-1]`范围内的元素。(负数代表的含义和`LRANGE`命令中相同) 在`ZRANGE`命令中指定`withscores`,也能够在返回zset中value时同时返回score: ```redis-cli > ZRANGE racer_scores 0 -1 withscores 1) "Ford" 2) "6" 3) "Sam-Bodden" 4) "8" 5) "Norem" 6) "10" 7) "Royce" 8) "10" 9) "Castilla" 10) "12" 11) "Prickett" 12) "14" ``` #### ZRANGEBYSCORE 除了上述操作外,sorted sets还支持对`operate on ranges`。可以通过`ZRANGEBYSCORE`来实现`get all racers with 10 or fewer points`的操作: ```redis-cli > ZRANGEBYSCORE racer_scores -inf 10 1) "Ford" 2) "Sam-Bodden" 3) "Norem" 4) "Royce" ``` 上述示例中,`ZRANGEBYSCORE racer_score -inf 10`向redis请求`返回score位于negative infinity和10之间(both included)的元素`。 #### ZREMRANGEBYSCORE / ZREM 如果想要丛zset中移除元素,可以调用`ZREM`命令。同样的,sorted sets也支持`remove ranges of elements`的操作,可通过`ZREMRANGEBYSCORE`命令来实现。 ```redis-cli > ZREM racer_scores "Castilla" (integer) 1 > ZREMRANGEBYSCORE racer_scores -inf 9 (integer) 2 > ZRANGE racer_scores 0 -1 1) "Norem" 2) "Royce" 3) "Prickett" ``` 上述示例中,通过`ZREMRANGEBYSCORE racer_scores -inf 9`命令实现了`remove all the racers with strictly fewer than 10 points`的操作。 `ZREMRANGEBYSCORE`会返回其移除的元素个数。 ##### Inclusive and exclusive 在使用`ZRANGEBYSCORE`指定`min`和`max`时,可以分别将其指定为`-inf`和`+inf`。故而,在获取zset中比`xx`大或比`xx`小的所有元素时,可以使用`-inf`和`+inf`,此时无需知道当前zset中的最大/最小元素。 默认情况下,`interval specified by min and max is closed`(inclusive)。但是,可以将其指定为`open interval`(exclusive),只需要在score前添加`(`符号即可,示例如下: ```redis-cli ZRANGEBYSCORE zset (1 5 ``` 上述示例中,会返回位于`(1, 5]`区间内的元素。 而`ZRANGEBYSCORE zset (5 (10`命令则是会返回`(5, 10)`区间内的元素。 #### ZRANK / ZREVRANK sorted sets还支持`get-rank operation`,通过`ZRANK`可以返回`position of an element in the set of ordered elements`。 `ZREVRANK`命令的作用和`ZRANK`类似,但是`ZREVRANK`返回的值为`从大到小的降序ranking`。 使用示例如下所示: ```redis-cli > ZRANK racer_scores "Norem" (integer) 0 > ZREVRANK racer_scores "Norem" (integer) 2 ``` #### `Lexicographical scores` 自redis 2.8其,引入了`getting ranges lexicograhpically`的新特性,其假设sorted set中的所有元素都拥有相同的score。 与`lexicographical ranges`进行交互的主要命令如下: - ZRANGEBYLEX - ZREVRANGEBYLEX - ZREMRANGEBYLEX - ZLEXCOUNT 使用示例如下所示: ```redis-cli > ZADD racer_scores 0 "Norem" 0 "Sam-Bodden" 0 "Royce" 0 "Castilla" 0 "Prickett" 0 "Ford" (integer) 3 > ZRANGE racer_scores 0 -1 1) "Castilla" 2) "Ford" 3) "Norem" 4) "Prickett" 5) "Royce" 6) "Sam-Bodden" > ZRANGEBYLEX racer_scores [A [L 1) "Castilla" 2) "Ford" ``` 在上述示例中,可以通过`ZRANGEBYLEX`按照字典序对range进行请求。 ##### ZRANGEBYLEX ZRANGEBYLEX的语法如下: ``` ZRANGEBYLEX key min max [limit offset count] ``` 和`ZRANGEBYSCORE`命令不同的是,`ZRANGEBYSCORE`在指定范围时,默认是`included`的;而`ZRANGEBYLEX`必须通过`[`和`(`来显式指定inclusive或exclusive。 而在指定min和max时,`-`和`+`分别则代表negatively infinite和positive infinite。故而,`ZRANGEBYLEX myzset - +`命令代表返回zset中所有的元素。 #### Updating the score: leaderboards 支持对sorted set中元素的score进行更新。在更新sorted set中元素的score时,只需要`再次调用ZADD命令即可`,sorted set会更新score,更新操作的时间复杂度为`O(log(N))`。 #### Leaderboard Example 在通过zset实现leaderboard时,由如下两种方式对user score进行更新: - 在得知user当前score的情况下,可以直接通过`ZADD`命令来进行覆盖 - 如果想要针对当前的score进行`增加`操作时,可以使用`ZINCRBY`命令 ```redis-cli > ZADD racer_scores 100 "Wood" (integer) 1 > ZADD racer_scores 100 "Henshaw" (integer) 1 > ZADD racer_scores 150 "Henshaw" (integer) 0 > ZINCRBY racer_scores 50 "Wood" "150" > ZINCRBY racer_scores 50 "Henshaw" "200" ``` ##### ZADD 当当前添加的元素已经在sorted set中存在时,`ZADD`命令会返回`0`,否则`ZADD`命令会返回`1`。 ##### ZINCRBY 而`ZINCRBY`命令则是会返回更新后的new score。 ### redis Streams Redis Stream类型数据结构的行为类似于append-only log,但是实现了`o(1)`时间复杂度的`random access`和复杂的消费策略、consumer groups。通过redis stream,可以实时的记录事件并对事件做同步分发。 通用的redis stream用例如下: - event sourcing(e.g., tracking user actions) - sensor monitoring - notifications(e.g., storing a record of each user's notifications in a separate stream) redis会为每个stream entry生成一个unique ID。可以使用IDs来获取其关联的entries,或读取和处理stream中所有的后续entries。 redis stream支持一些trimming strategies(用于避免stream的无尽增长)。并且,redis stream也支持多种消费策略(XREAD, XREADGROUP, XRANGE)。 #### Examples ##### 添加stream entry 在如下示例中,当racers通过检查点时,将会为每个racer添加一个stream entry,stream entry中包含racer name, speed, position, location ID信息,示例如下: ```redis-cli > XADD race:france * rider Castilla speed 30.2 position 1 location_id 1 "1692632086370-0" > XADD race:france * rider Norem speed 28.8 position 3 location_id 1 "1692632094485-0" > XADD race:france * rider Prickett speed 29.7 position 2 location_id 1 "1692632102976-0" ``` #### 从指定id开始读取stream entries 在如下示例中,将从stream entry ID `1692632086370-0`开始读取两条stream entries: ```redis-cli > XRANGE race:france 1692632086370-0 + COUNT 2 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" 2) 1) "1692632094485-0" 2) 1) "rider" 2) "Norem" 3) "speed" 4) "28.8" 5) "position" 6) "3" 7) "location_id" 8) "1" ``` #### 从末尾开始读取 在如下示例中,会从`end of the stream`开始读取entries,最多读取100条entries,并在没有entries被写入的情况下最多阻塞300ms ```redis-cli > XREAD COUNT 100 BLOCK 300 STREAMS race:france $ (nil) ``` #### Stream basics stream为`append-only`数据结构,其基础的write command为`XADD`,会向指定stream中添加一个新的entry。 每个stream entry都由一个或多个field-value pairs组成,类似dictionary或redis hash: ```redis-cli > XADD race:france * rider Castilla speed 29.9 position 1 location_id 2 "1692632147973-0" ``` ##### XADD 上述示例中,通过`XADD`向key为`race:france`中添加了值为`rider: Castilla, speed:29.9, position: 1, location_id: 2`的entry,并使用了auto-generated entry ID `1692632147973-0`作为返回值。 XADD命令的描述如下: ```redis-cli XADD key [NOMKSTREAM] [KEEPREF | DELREF | ACKED] [ [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...] ``` 在`XADD race:france * rider Castilla speed 29.9 position 1 location_id 2`的命令示例中, - 第一个参数`race:france`代表key name - 第二个参数为entry id,而`*`代表stream中所有的entry id - 在上述示例中,为第二个参数传入`*`代表`希望server生成一个新的ID` - 所有新生成的ID都应该单调递增,即新生成的ID将会比过去所有entries的ID都要大 - 需要显式指定ID而不是新生成的场景比较罕见 - 位于第一个和第二个参数之后的为`field-value pairs` ##### XLEN 可以通过`XLEN`命令来获取Stream中items的个数: ```redis-cli > XLEN race:france (integer) 4 ``` #### Entry IDs entry ID由`XADD`命令返回,并且可以对给定stream中的entries进行唯一标识。entry ID由两部分组成: ``` - ``` - `millisecondsTime`: 该部分代表生成stream id时,redis node的本地时间。但是,如果current milliseconds time比`previous entry time`要小,则会使用`previous entry time`而不是`current milliseconds`。 - 该设计主要是为了解决`时钟回拨`的问题,即使在redis node回拨本地时间的场景下,新生成的ID仍然能够保证单调递增 - `sequenceNumber`: 该部分主要是为了处理`同一ms内创建多条entries的问题` - sequence number的宽度为64bit ##### entry ID设计 entry ID中包含millisecondsTime的原因是,Reids Stream支持`range queries by ID`。因为`ID`关联entry的生成时间,故而可以不花费额外成本的情况下按照`time range`对entries进行查询。 当在某种场景下,用户可能需要`incremental IDs that are not related to time but are actually associated to another external system ID`,此时`XADD`则可以在第二个参数接收一个实际的ID而不是`*`通配符。 - `*`符号会触发auto-generation 手动指定entry ID的示例如下所示: ```redis-cli > XADD race:usa 0-1 racer Castilla 0-1 > XADD race:usa 0-2 racer Norem 0-2 ``` `在通过XADD手动指定entry ID时,后添加的entry ID必须大于先前指定的entry ID`,否则将会返回error。 ```redis-cli > XADD race:usa 0-1 racer Prickett (error) ERR The ID specified in XADD is equal or smaller than the target stream top item ``` 在redis 7及之后,可以仅显式指定`millisecondsTime`的部分,指定后`sequenceNumber`的部分将会自动生成并填充,示例如下所示: ```redis-cli > XADD race:usa 0-* racer Prickett 0-3 ``` #### Redis Stream consumer query model 向redis stream中添加entry的操作可以通过`XADD`进行实现。而从redis stream从读取数据,redis stream支持如下query model: - `Listening for new items`: 和unix command `tail -f`类似,reids stream consumer会监听`new messages that are appended to the stream`。 - 但是,和BLPOP这类阻塞操作不同的是,对于`BLPOP`,一个给定元素只会被传递给`一个client`;而在使用stream时,我们希望`new messages appended to the stream时`,新消息对多个consumers可见。(`tail -f`同样支持新被添加到log文件中的内容被多个进程可见) - 故而,在`Listening for new items`这种query model下,`stream is able to fan out messages to multiple clients` - `Query by range`: 除了上述类似`tail -f`的query model外,可能还希望将redis stream以另一种方式进行使用:并不将redis stream作为messaging system,而是将其看作`time series store` - 在将其作为`time series store`的场景下,其仍然能访问最新的messages,但是其还支持`get messages by ranges of time`的操作,或是`iterate the messages using a cursor to incrementally check all the history` - `Consumer group`: 上述两种场景都是从`consuemrs`的视角来读取/处理消息,但是,redis stream支持另一种抽象:`a stream of messages that can be partitioned to multiple consumers that are processing such messages`。 - 在该场景下,并非每个consumer都必须处理所有的messsages,每个consumer可以获取不同的messages并进行处理 redis stream通过不同的命令支持了以上三种query model。 #### Querying by range: XRANGE and XREVRANGE 为了根据范围查询stream,需要指定两个id:`start ID`和`end ID`。指定的范围为`inclusive`的,包含`start ID`和`end ID`。 `+`和`-`则是代表`greatest ID`和`smallest ID`,示例如下所示: ```redis-cli > XRANGE race:france - + 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" 2) 1) "1692632094485-0" 2) 1) "rider" 2) "Norem" 3) "speed" 4) "28.8" 5) "position" 6) "3" 7) "location_id" 8) "1" 3) 1) "1692632102976-0" 2) 1) "rider" 2) "Prickett" 3) "speed" 4) "29.7" 5) "position" 6) "2" 7) "location_id" 8) "1" 4) 1) "1692632147973-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "29.9" 5) "position" 6) "1" 7) "location_id" 8) "2" ``` 如上所示,返回的每个entry都是由`ID`和`field-value pairs`组成的数组。由于entry ID和时间`currentMillisTime`相关联,故而可以通过XRANGE根据时间范围来查询records。 ##### query by time range `并且,在根据时间范围查询records时,可以省略sequence part的部分`: - 在省略sequence part时,`start`的sequence part将会被设置为0,而`end`的sequence part将会被设置为最大值。 - 故而,可以通过两个milliseconds unix time来进行时间范围内的查询,可以获取在该时间范围内生成的entries(`range is inclusive`) 示例如下 ```redis-cli > XRANGE race:france 1692632086369 1692632086371 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" ``` ##### count option `XRANGE`在范围查询时,还支持指定一个COUNT选项,用于`get first N items`。如果想要进行增量查询,可以用`上次查询的最大ID + 1`作为新一轮查询的`start`。 增量迭代示例如下: ```redis-cli > XRANGE race:france - + COUNT 2 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" 2) 1) "1692632094485-0" 2) 1) "rider" 2) "Norem" 3) "speed" 4) "28.8" 5) "position" 6) "3" 7) "location_id" 8) "1" # 通过(符号指定了左开右闭的区间 > XRANGE race:france (1692632094485-0 + COUNT 2 1) 1) "1692632102976-0" 2) 1) "rider" 2) "Prickett" 3) "speed" 4) "29.7" 5) "position" 6) "2" 7) "location_id" 8) "1" 2) 1) "1692632147973-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "29.9" 5) "position" 6) "1" 7) "location_id" 8) "2" # 返回为空,迭代完成 > XRANGE race:france (1692632147973-0 + COUNT 2 (empty array) ``` `XRANGE`操作查找的时间复杂度为`O(long(N))`,切返回M个元素的时间复杂度为`O(M)`。 命令`XREVRANGE`和`XRANGE`集合等价,但是会按照逆序来返回元素,故而,针对`XREVRANGE`传参时参数的顺序也需要颠倒 ```redis-cli > XREVRANGE race:france + - COUNT 1 1) 1) "1692632147973-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "29.9" 5) "position" 6) "1" 7) "location_id" 8) "2" ``` #### Listening for new items with XREAD 在某些场景下,我们可能希望对`new items arriving to the stream`进行订阅,如下两种场景都希望对`new items`进行订阅 - 在`Redis Pub/Sub`场景中,通过redis stream来对channel进行订阅 - 在Redis blocking lists场景中,从redis stream中等并并获取new element 但是,上述两种场景在如下方面有所不同 - 一个stream可以包含多个clients(consumers),对于new item,默认情况下`会从传递给每一个等待当前stream的consumer`。 - 在`Pub/Sub`场景和上述行为相符,其会将new item传递给每一个consumer,即`fan out` - 在blocking lists场景和上述行为并不相符,每个consumer都会获取到不同的element,相同的element只会被传递给一个consumer - 其对message的处理也有不同: - `Pub/Sub`场景下,将会对消息进行`fire and forget`操作,消息将永远不会被存储 - 在使用`blocking lists`时,如果消息被client接收,其将会从list中移除 - Stream Consumer Groups提供了`Pub/Sub`或`blocking lsits`都无法实现的控制:不同的groups可以针对相同stream进行订阅;并且对被处理items进行显式的ack;可以对`unprocessed messages`进行声明;只有`private past history of messages`对client可见 对于提供`Listening for new items arriving into stream`支持的命令,被称为`XREAD`。其使用示例如下: ```redis-cli > XREAD COUNT 2 STREAMS race:france 0 1) 1) "race:france" 2) 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" 2) 1) "1692632094485-0" 2) 1) "rider" 2) "Norem" 3) "speed" 4) "28.8" 5) "position" 6) "3" 7) "location_id" 8) "1" ``` ##### XREAD with STREAMS option 上述示例为`XREAD`命令的`non-blocking`形式,并且`COUNT option并非强制的`;唯一的强制option为`STREAMS`。 XREAD在和STREAMS一起使用时,并且需要指定`a list of keys`和`maximum ID already seen for each stream by the calling consumer`,故而,该命令只会向consumer返回`messages with and ID greater than the one we specified`。 在上述示例中,命令为`STREAMS race:france 0`,其代表`race:france流中所有ID比0-0大的消息`。故而,返回的结构中,顶层为stream的key name,`因为上述命令可以指定多个stream,针对多个stream进行监听`。 在指定`STREAMS` option时,必须后缀key names,故而STREAMS选项必须为最后一个option,任何其他option必须要放在STREAMS之前。 除了streams之外,还可以为`XREAD`指定`last ID we own`。 ##### XREAD with BLOCK argument 上述示例为`non-blocking`形式,除此之外还可以通过`BLOCK`参数将命令转化为`blocking`形式: ```redis-cli > XREAD BLOCK 0 STREAMS race:france $ ``` 在上述示例中,并没有指定`COUNT`,而是指定了`BLOCK`选项,并设置`timeout`为`0 milliseconds`(代表永远不会超时)。 并且,BLOCK版本的示例并没有传递正常的ID,而是传递了一个特殊的ID `$`,该符号代表`XREAD`应当使用`stream已经存储的最大ID`来作为last ID。故而,在指定last ID为`$`后,只会接收到`new messages`。其行为和`unix command`中的`tail -f`类似。 在指定了BLOCK选中后,其行为如下: - `if the command is able to serve our request immediately without blocking`,那么其会立马被执行 - 如果当前不满足获取entry的条件,那么client会发生阻塞 通常,在希望`从new entries`开始消费时,会从`ID $`开始,在获取到`ID $`对应的entry后,下一次消费从`last message recevied`开始。 XREAD的blocking形式也支持监听多个streams,也可以指定多个key names。如果至少存在一个stream中`存在元素的ID并命令指定的ID更大`,那么将会立马返回结果;否则,该命令将会阻塞,直到有一个stream获取到新的data。 和blocking list操作类似,`blocking stream reads`对`clients wating for data`而言是公平的,也支持FIFO。`the first client that blocked for a given stream will be the first to be unlocked when new items are available`。 #### Consumer groups 当从不同的clients消费相同stream时,可通过`XREAD`来进行读取,此时可将message `fan-out`给多个clients,在此种场景下,同一条消息会被多个clients进行处理。 在某些场景下,可能不希望上述行为,而是希望每个client都负责处理stream中的不同消息subset,特别是在消息处理耗时较长的场景下。假设拥有3个消费者`C1, C2, C3`和包含消息`1,2,3,4,5,6,7`的stream,则消费者对stream进行消费时,消费情形可能如下: ``` 1 -> C1 2 -> C2 3 -> C3 4 -> C1 5 -> C2 6 -> C3 7 -> C1 ``` 为了实现上述消费方式,redis引入了`consumer groups`的概念。`consumer group`从stream中获取数据,并且`serves multiple consumers`。consumer group提供了如下保证: - `每条消息只会被传递给一个消费者,不存在一条消息传递给多个消费者的情形` - 在consumer group内consumer根据name进行标识,name是大小写敏感的。即使在redis client断连后,stream consumer group也会保留对应的状态,当client重连后会被标识为之前的consumer - 每个consumer group都拥有`first ID never consumed`的概念,当consumer请求新消息是,其仅可向consumer提供尚未被传递过的消息 - 当消费消息时,需要通过特定命令来进行显式的ack。 - 在redis中,`ack`代表如下含义:该消息已经被正确的处理,并且该消息可在consumer group中被淘汰 - consumer group会追踪当前所有的`pending messages` - `pending messages`代表已经被传递给consumer但是尚未被ack的消息 - 由于consumer group对pending message的追踪,当访问stream的message hisory(记录message被传递给哪个consumer instance)时,每个consumer只能看到被传递给其自身的messages 在某种程度上,consumer group可以被看作是`amount of state abount a stream`: ``` +----------------------------------------+ | consumer_group_name: mygroup | | consumer_group_stream: somekey | | last_delivered_id: 1292309234234-92 | | | | consumers: | | "consumer-1" with pending messages | | 1292309234234-4 | | 1292309234232-8 | | "consumer-42" with pending messages | | ... (and so forth) | +----------------------------------------+ ``` consumer group能够为consumer instance提供`history of pending messages`,并且当consumer请求new messages时,只会向其传递`ID大于last_delivered_id的message`。 对于redis stream而言,其可以包含多个consumer groups。并且,对于同一redis stream,可以同时包含`consumer reading without consumer groups via XREAD`和`consumer reading via XREADGROUP in consumer group`两种类型的consumer。 consumer group包含如下的基本命令: - XGROUP - XREADGROUP - XACK - XACKDEL ##### Creating a consumer group 可以通过如下方式来为stream创建一个consumer group: ```redis-cli > XGROUP CREATE race:france france_riders $ OK ``` 在上述示例中,在通过command创建consumer group时,以`$`的形式指定了一个`ID`。该ID是必要的,该ID将作为consumer group创建时的`last message ID`,在第一个consumer连接时,会根据向consumer传递`大于last message ID的消息`。 - 当将last message ID指定为`$`时,只有`new messages arriving in the stream from now on`会被传递给consumer group中的consumer instances。由于`$`代表`current greatest ID in the stream`,指定`$`代表`consuming only new messages` - 如果在此处指定了`0`,那么consumer group将会消费`all the messages in the stream history` - 此处,也可以指定其他的`valid ID`,`consumer group will start delivering messages that are greater than the ID you specify` ##### Create the stream automatically `XGROUP CREATE`命令会期望target stream存在,并且当target stream不存在时会返回异常。如果target stream不存在,可以通过在末尾指定`MKSTREAM`选项来自动创建stream。 `XGROUP CREATE`同样支持自动创建stream,可通过`MKSTREAM`的subcommand作为最后一个参数: ```redis-cli XGROUP CREATE race:italy italy_riders $ MKSTREAM ``` 在调用该命令后,consumer group会被创建,之后可以使用`XREADGROUP`命令来通过consumer group读取消息。 ##### XREADGROUP XREADGROUP命令和XREAD命令类似,并同样提供`BLOCK`选项: - `GROUP`: 该option是一个mandatory option,在使用`XREADGROUP`时必须指定该选项。 - `GROUP`包含两个arguments: - the name of consumer group - the name of consumer that is attemping to read - `COUNT`: XREADGROUP同样支持该option,和XREAD中的`COUNT` option等价 在如下示例中,向`race:italy` stream中添加riders并且并且尝试通过consumer group进行读取: ```redis-cli > XADD race:italy * rider Castilla "1692632639151-0" > XADD race:italy * rider Royce "1692632647899-0" > XADD race:italy * rider Sam-Bodden "1692632662819-0" > XADD race:italy * rider Prickett "1692632670501-0" > XADD race:italy * rider Norem "1692632678249-0" > XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy > 1) 1) "race:italy" 2) 1) 1) "1692632639151-0" 2) 1) "rider" 2) "Castilla" ``` XGROUPREAD的返回结构和XREAD类似。 在上述命令中,`STREAMS` option之后指定了`>`作为special ID: - special ID `>`只在consumer group上下文中有效,其代表`messages never delivered to other consumers so far` 除了指定`>`作为id外,还支持将其指定为`0`或其他有效ID。在指定id为`>`外的其他id时,`XREADGROUP command will just provide us with history of pending messages, in that case we will never see new messages in the group`。 XREADGROUP命令基于指定id的不同,行为拥有如下区别: - 如果指定的id为`>`, 那么`XREADGROUP`命令将只会返回`new messages never delivered to other consumers so far`,并且,作为副作用,会更新consumer group的last ID - 如果id为其他有效的`numerical ID`,`XREADGROUP command will let us access our history of pending messages`,即`set of messages that are delivered to this specified consumer and never acknowledged so far` 可以指定ID为`0`来测试该行为,并且结果如下: - we'll just see the only pending message ```redis-cli > XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0 1) 1) "race:italy" 2) 1) 1) "1692632639151-0" 2) 1) "rider" 2) "Castilla" ``` 但是,如果通过ack将该message标记为processed,那么其将不再作为`pending messages history`的一部分,故而,即使继续调用`XREADGROUP`也不再包含该message: ```redis-cli > XACK race:italy italy_riders 1692632639151-0 (integer) 1 > XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0 1) 1) "race:italy" 2) (empty array) ``` > 在上述示例中,用于指定id为非`>`后,只会返回pending messages,在对pending messages进行ack后,那么被ack的消息在下次XREADGROUP调用时将不会被返回 `上述操作都通过名为Alice的consumer进行的操作,如下示例中将通过名为Bob的consumer进行操作`。 ```redis-cli > XREADGROUP GROUP italy_riders Bob COUNT 2 STREAMS race:italy > 1) 1) "race:italy" 2) 1) 1) "1692632647899-0" 2) 1) "rider" 2) "Royce" 2) 1) "1692632662819-0" 2) 1) "rider" 2) "Sam-Bodden" ``` 在上述示例中,`group相同但是consumer为Bob而不是Alice`。并且redis仅会返回new messages,前面示例中的`Castilla`消息由于已经被传递给Alice并不会被传递给Bob。 通过上述方式,Alice, Bob和group中的其他consumer可以从相同的stream中读取不同的消息。 XREADGROUP有如下方面需要关注: - consumer并不需要显式创建,`consumers are auto-created the first time they are mentioned` - 在使用XREADGROUP时,可以同时读取多个keys。但是,想要令该操作可行,`必须create a consumer group with the same name in every stream`。该方式通常不会被使用,但是在技术上是可行的 - `XREADGROUP`为一个`write command`,其只能在master redis instance上被调用。因为该命令调用存在side effect,该side effect会对consumer group造成修改 > 在使用XREADGROUP命令时, 只有当传递的id为`>`时,才会对consumer group的last ID造成修改。否则,只会读取pending messages history,并不会实际的修改last ID。故而,当consumer实例重启进行XREADGROUP pending messages时,并不会触发side effect对其他consumers的消费造成影响。 如下示例为一个Ruby实现的consumer, ```ruby require 'redis' if ARGV.length == 0 puts "Please specify a consumer name" exit 1 end ConsumerName = ARGV[0] GroupName = "mygroup" r = Redis.new def process_message(id,msg) puts "[#{ConsumerName}] #{id} = #{msg.inspect}" end $lastid = '0-0' puts "Consumer #{ConsumerName} starting..." check_backlog = true while true # Pick the ID based on the iteration: the first time we want to # read our pending messages, in case we crashed and are recovering. # Once we consumed our history, we can start getting new messages. if check_backlog myid = $lastid else myid = '>' end items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid) if items == nil puts "Timeout!" next end # If we receive an empty reply, it means we were consuming our history # and that the history is now empty. Let's start to consume new messages. check_backlog = false if items[0][1].length == 0 items[0][1].each{|i| id,fields = i # Process the message process_message(id,fields) # Acknowledge the message as processed r.xack(:my_stream_key,GroupName,id) $lastid = id } end ``` 在上述consumer实现中,`consumer启动时会对history进行消费`,即`list of pending messages`。(因为consumer可能在之前发生过崩溃,故而在重启时可能希望对`messages delivered to us without getting acked`进行重新读取)。在该场景下,`we might process a message multiple times or one time`。 并且,在consumer实现中,一旦history被消费完成,`XREADGROUP`命令将会返回empty list,此时便可以切换到`special ID >`,从而对new messages进行消费。 ##### Recovering from permanent failures 在上述ruby实现的consumers中,多个consumers将会加入相同的consumer group,每个consumer都会消费和处理`a subset of messages`。当consumer从failure中进行恢复时,会重新读取`pending messages that were delivered just to them`。 但是,在现实场景中,可能会出现`consumer may permanently fail and never recover`的场景。 redis consumer group对`permanently fail`的场景提供了专门的特性,支持对`pending messages of a given consumer`进行认领,故而`such pending messages will change ownership and will be re-assigned to a different consumer`。 在使用上述特性时,consumer必须检查`list of pending messages`,并`claim specific messages using special command`。否则,对于permanently fail的consumer,其pending messages将会被pending forever,即pending messages一直被分配给`old consumer that fail permanently`。 ###### XPENDING 通过`XPENDING`命令,可以查看pending entries in the consumer group。该命令是`read-only`的,调用该命令并不会导致任何消息的`ownership`被变更。 如下为XPENDING命令调用的示例: ```redis-cli > XPENDING race:italy italy_riders 1) (integer) 2 2) "1692632647899-0" 3) "1692632662819-0" 4) 1) 1) "Bob" 2) "2" ``` 当调用该command时,command的输出将会包含如下内容: - `total number of pending messages in the consumer group` - `lower and higher message ID among the pending messages` - `a list of consumers and the number of pending messages they have` 在上述示例中,仅名为`Bob`的consumer存在2条`pending messages`。 `XPENDING命令支持传递更多的参数来获取更多信息`,`full command signature`如下: ```redis-cli XPENDING [[IDLE ] []] ``` 如上述示例所示,可以为`XREADGROUP`命令指定如下内容: - `start-id`和`end-id`: 可以将其指定为`-`和`+` - `count`: 用于控制该command返回information的数量 - `consumer-name`: 该选项为optional的,指定该选项后,仅会输出`messages pending for a given consumer` 为`XPENDINGS`命令指定更多选项的示例如下所示 ```redis-cli > XPENDING race:italy italy_riders - + 10 1) 1) "1692632647899-0" 2) "Bob" 3) (integer) 74642 4) (integer) 1 2) 1) "1692632662819-0" 2) "Bob" 3) (integer) 74642 4) (integer) 1 ``` 在上述示例中,会输出`details for each message`,具体包含如下信息: - `ID`: message id - `consumer name`: 该pending message对应的consumer - `idle time in milliseconds`:代表该message被传递给consumer后,经过的毫秒数 - `the number of times that a given message was delivered`: 该消息已经被传递的次数 在上述示例中,返回的两条`pending messages`都是针对`Bob`的,并且两条消息都被传递了超过一分钟,都只被传递过一次。 ###### checking the message content 在获取到message detail后,可以根据message ID来查询message的内容,只需将message ID同时作为`XRANGE`命令的`start-id`和`end-id`即可。示例如下所示: ```redis-cli > XRANGE race:italy 1692632647899-0 1692632647899-0 1) 1) "1692632647899-0" 2) 1) "rider" 2) "Royce" ``` 通过上述介绍的命令,可以制定`permanently fail`场景下的处理策略: - 当Bob存在超过一分钟都没有处理的pending messages时,Bob可能无法快速被恢复,此时Alice可以针对Bob的pending messages进行`claim`,并且代替Bob对pending messages进行处理 ###### XCLAIM `XCLAIM`命令的形式如下: ```redis-cli XCLAIM ... ``` XCLAIM命令的作用如下: - 对于``和``所指定的stream和consumer group,希望将``所指定的message都能更改ownership,将messages分配给``所指定的consumer。 - 除此之外,我们还提供了``,仅当指定message的`idle time`比指定的`min-idle-time`更大时才有效 - 指定`min-idle-time`在多个clients尝试同时对message进行claim时会起作用,当第一个client对message进行claim后,idle-time会重置,故而当第二个client尝试对message进行claim时,会因不满足min-idle-time的条件而失败 作为claim操作的side effect,`claiming a message will reset its idle time and will increment its number of deliveries counter`。 XCLAIM的使用示例如下所示: ```redis-cli > XCLAIM race:italy italy_riders Alice 60000 1692632647899-0 1) 1) "1692632647899-0" 2) 1) "rider" 2) "Royce" ``` 该消息会`claimed by Alice`,此时consumer Alice可以对该message进行处理并ack,即使original consumer恢复失败,pending messages也能够被继续处理。 ###### JUSTID option 在上述示例中,在成功对message进行`XCLAIM`后,会返回该message本身。可以通过`JUSTID` option来修改返回的结构,在指定该option后,返回内容为`just IDs of the messages successfully claimed`。 通过`JUSTID` option,可以降低client和server之间使用的带宽。 `claiming并非一定要位于consumer自身的进程中,其也可以被实现在独立的进程中`: - `可以使用独立的进程来扫描pending messages list,并且将pending messages分配给明显处于活跃状态的consumers` ##### Automatic claiming 在redis 6.2版本中,添加了`XAUTOCLAIM`命令,实现了上一章节中描述的claiming process。`XPENDING`和`XCLAIM`命令为各种不同的recovery机制提供了基础。而`XAUTOCLAIM`命令优化了通用的`claiming process`,令`claiming process`由redis来进行管理,为大多的recovery提供简单的解决方案。 `XAUTOCLAIM`命令会识别idle messages并且转移message的ownership给指定consumer。`XAUTOCLAIM`命令的形式如下所示: ```redis-cli XAUTOCLAIM [COUNT count] [JUSTID] ``` 故而,可以通过如下方式来使用automatic claiming: ```redis-cli > XAUTOCLAIM race:italy italy_riders Alice 60000 0-0 COUNT 1 1) "0-0" 2) 1) 1) "1692632662819-0" 2) 1) "rider" 2) "Sam-Bodden" ``` 和`XCLAIM`类似,其返回结果为`an array of the claimed messsages`。但是,其还会返回一个stream ID(`在上述示例中为0-0`)。返回的stream ID可以用于对pending entries进行迭代。该stream ID为一个cursor,可以将其用作下次XAUTOCLAIM调用的`start`: ```redis-cli > XAUTOCLAIM race:italy italy_riders Lora 60000 (1692632662819-0 COUNT 1 1) "1692632662819-0" 2) 1) 1) "1692632647899-0" 2) 1) "rider" 2) "Royce" ``` 当`XAUTOCLAIM`返回“0-0”作为cursor时,代表其到达了`end of the consumer group pending entries list`。其并不代表没有新的idle pending messages,可以重新从begining of the stream来调用`XAUTOCLAIM`。 ##### Claiming and the delivery counter 通过`XPENDING`命令输出的counter代表该message的被传递次数,该counter在两种场景下会增加: - when a message is successfully claimed via `XCLAIM` - when a `XREADGROUP` call is used in order to access the history of pending messages 当存在failure时,message可能会被传递多次,但是message最终会被处理并ack。但是,在处理特定的消息时,可能会在处理逻辑中抛出异常,在该类场景下consumer会持续的在处理该消息时抛出异常。故而,可以通过delivery counter来探知那些不可处理的message。`一旦delivery counter到达给定的值时,可以将该消息发送给另一个stream,并且向系统的管理员发送notification。`这就是redis stream实现`dead letter`的基础。 ##### working with multiple consumer groups redis stream可以关联多个consumer groups,每个entries都会被传递给每个consumer group。在consumer group内,每个consumer instance处理一部分entries。 当consumer对message进行处理时,其会使用`XACK`命令来对message进行确认,`并且从consumer group的Pending Entries List(PEL)中移除该entry reference`。但是,`被ack的message仍然保存在stream中`,且consumer group A中对message的ack并不会影响consumer group B的PEL,group A在对message进行ack后,message仍然位于group B的PEL中,直到group B中的consumer对message进行ack,此时message才从group B的PEL中被移除。 通常来说,如果想要从stream中删除entries,必须要等到所有的consumer groups都对entries进行了ack,应用需要实现复杂的逻辑。 ###### Enhanced deletion control in Redis 8.2 从redis 8.2开始,一些命令为`entries在多个consumer groups间的处理`提供了增强控制: - `XADD`支持KEEPREF, DELREF, ACKED模式 - `XTRIM`同样支持KEEPREF, DELREF, ACKED选项 如下选项控制consumer group references是如何被处理的: - `KEEPREF`(默认): Preserves existing references to entries in all consumer groups' PELs - `DELREF`: Removes all references to entries from consumer groups' PELs, effectively cleaning up all traces of the messages - `ACKED`: Only processes entries that have been acked by all consumer groups `ACKED` mode对于`coordinating deletion across multiple consumer groups`的复杂逻辑十分有用,确认entires在所有consumer groups在完成对其的处理后才移除。 ##### Stream Observability 缺乏可观测性的消息系统将十分难以使用,一个透明的消息系统需要令如下信息可观测: - who is consuming messages - what messages are pending - the set of consumer groups active in a given stream 在前面章节中,已经介绍了`XPENDINGS`命令,通过其可以观测处于`处理中`状态的消息,并且能够获取消息的idle time和number of deliveries。 `XINFO`命令和sub-commands一起使用,可以用于获取stream和consumer group相关的信息。 `XINFO`命令的使用示例如下: ```redis-cli > XINFO STREAM race:italy 1) "length" 2) (integer) 5 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1692632678249-0" 9) "groups" 10) (integer) 1 11) "first-entry" 12) 1) "1692632639151-0" 2) 1) "rider" 2) "Castilla" 13) "last-entry" 14) 1) "1692632678249-0" 2) 1) "rider" 2) "Norem" ``` 上述示例中,通过`XINFO`命令获取了stream本身的信息,输出展示了stream内部的编码方式,并且记录了stream中的第一条消息和最后一条消息。 如果想要获取和stream相关的consumer group消息,参照如下示例: ```redis-cli > XINFO GROUPS race:italy 1) 1) "name" 2) "italy_riders" 3) "consumers" 4) (integer) 3 5) "pending" 6) (integer) 2 7) "last-delivered-id" 8) "1692632662819-0" ``` 如果想要查看consumer group中注册的consumer实例,可以通过`XINFO CONSUMERS`命令来进行查看: ```redis-cli > XINFO CONSUMERS race:italy italy_riders 1) 1) "name" 2) "Alice" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 177546 2) 1) "name" 2) "Bob" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 424686 3) 1) "name" 2) "Lora" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 72241 ``` ##### Difference with kafka partitions Redis stream中的consumer group可能在某些方面类似于kafka中基于分区的consumer groups,但是仍然存在较大差别。 在redis strema中,`partition`这一概念是逻辑的,实际上stream所有的messages都存在相同的key中。故而,redis stream中consumer instance并不从实际的partition中读取信息,也不涉及partition在consumer间的分配。 > 例如,如果consumer C3在某个时刻fail permanently,redis在所有新消息到达时都会传递给C1和C2,将好像redis stream只存在2个逻辑分区。 类似的,如果某个consumer处理消息比其他consumers都快,那么该consumer将在单位时间内按比例收到更多的消息。Redis会追踪所有尚未被ack的消息,并且记录哪条消息被哪个consumer接收,`the ID of the first message never delivered to any consumer`也会被redis记录。 在redis Stream中: - 如果redis stream的数量和consumer的数量都为1,那么消息将是按照顺序被处理的 - 如果stream的数量为1,consumer的数量为n,那么可以将负载均衡给n个consumers,但是,在这种情况下消息的消费可能是无序的 - 当使用n个stream和n个consumers时,一个consumer只用于处理所有streams中的一部分,可以将`1 stream->1 consumer`拓展到`n stream->n consuimer`