Files
rikako-note/中间件/redis/redis.md
2025-09-28 11:33:30 +08:00

163 KiB
Raw Blame History

redis

Using Command

Keys and values

Content of Keys

key为data model中拥有含义的文本名称。Redis key在命名格式方面几乎没有限制故而key name中可以包含空格或标点符号。redis key并不支持namespace或categories,故而在命名时应当避免命名冲突。

通常,会使用:符号来将redis的命名分割为多个sections例如office:London,可以使用该命名规范来实现类似categories的效果。

尽管keys通常是文本的redis中也实现了binary-safe key。可以使用任何byte sequence来作为有效的key并且在redis中empty string也可以作为有效的key。

redis key在命名时存在如下规范

  • 不推荐使用长度很长的key会在存储和key-comparisions方面带来负面影响
  • 不推荐使用长度非常短的key其会减少可读性通常user:1000:followers的可读性相较u1000flw来说可读性要更好,并且前者带来的额外存储开销也较小
  • 应在命名时使用统一的命名模式,例如object-type:id在section中包含多个单词时可以使用.-符号来进行分隔,例如comment:4321:reply.tocomment:4321:reply-to
  • key size的最大允许大小为512MB

Hashtags

redis通过hashing来获取key关联的value。

通常整个key都会被用作hash index的计算但是在部分场景下开发者可能只希望使用key中的一部分来计算hash index。此时可以通过{}包围key中想要计算hash index的部分该部分被称为hash-tag`。

例如,person:1person:2这两个key会计算出不同的hash index但是{persion}:1{person}:2这两个key计算出的hash index却是相同的因为只有person会被用于计算hash index。

通常hashtag的应用场景是在集群场景下进行multi-key operations。在集群场景下除非所有key计算出的hash index相同否则集群并不允许执行multi-key操作。

例如,SINTER命令用于查询两个不同set values的交集可以接收多个key。在集群场景下

SINTER group:1 group:2

上述命名并无法成功执行,因为group:1group:2两个key的hash index不同。

但是,如下命令在集群环境下则是可以正常执行:

SINTER {group}:1 {group}:2

hashtag让两个key产生相同的hash值。

虽然hashtag在上述场景下有效但是不应该过度的使用hashtag。因为hashtag相同的key其hash index都相同故而会被散列到同一个slot中当同一slot中元素过多时会导致redis的性能下降。

altering and querying the key space

存在部分命令其并不针对特定的类型而是用于和key space进行交互其可以被用于所有类型的key。

例如,EXISTS命令会返回0和1代表给定key在redis中是否存在DEL命令则是用于删除key和key关联的value无论value是什么类型。

示例如下:

> set mykey hello
OK
> exists mykey
(integer) 1
> del mykey
(integer) 1
> exists mykey
(integer) 0

在上述示例中,DEL命令返回的值为1或0代表要被删除的值在redis中是否存在。

TYPE命令则是可以返回key所关联value的类型:

> set mykey x
OK
> type mykey
string
> del mykey
(integer) 1
> type mykey
none

key expiration

在redis中不管key对应的value为何种类型都支持key expiration特性。key exipiration支持为key设置超时key expiration也被称为time to live/TTL,当ttl指定的时间过去后key将会被自动移除。

对于key expiration

  • 在对key设置key expiration时可以按照秒或毫秒的精度进行设置
  • 但是,expire time在解析时单位永远为毫秒
  • expire相关的信息会被replicated并存储在磁盘中即使在redis宕机时time virtually passes即redis key其expire若为1天宕机4小时后恢复其expire会变为8小时宕机并不会导致key expire停止计算

可以通过EXPIRE命令来设置key expiration

> set key some-value
OK
> expire key 5
(integer) 1
> get key (immediately)
"some-value"
> get key (after some time)
(nil)

在第二次调用时delay超过5skey已经不存在。

上述示例中,expire key 5将key的超时时间设置为了5sEXPIRE用于为key指定不同的超时时间。

类似的,可以通过PERSIST命令来取消key的超时设置让key永久被保留。

除了使用expire来设置超时外在创建时也能会key指定expiration

> set key 100 ex 10
OK
> ttl key
(integer) 9

上述示例中,使用ttl命令来检查key的超时时间。

如果想要按照毫秒来设置超时,可以使用PEXPIREPTTL命令。

Navigating the keyspace

Scan

SCAN命令支持对redis中key的增量迭代在每次调用时只会返回一小部分数据。该命令可以在生产中使用并不会像keyssmembers等命令一样在处理大量elements或keys时可能产生长时间的阻塞。

scan使用实例如下

> scan 0
1) "17"
2)  1) "key:12"
    2) "key:8"
    3) "key:4"
    4) "key:14"
    5) "key:16"
    6) "key:17"
    7) "key:15"
    8) "key:10"
    9) "key:3"
   10) "key:7"
   11) "key:1"
> scan 17
1) "0"
2) 1) "key:5"
   2) "key:18"
   3) "key:0"
   4) "key:2"
   5) "key:19"
   6) "key:13"
   7) "key:6"
   8) "key:9"
   9) "key:11"

scan是一个cursor based iterator每次在调用scan命令时都会返回一个update cursor并且在下次调用scan时需要使用上次返回的cursor。

当cursor被设置为0时iteration将会开始并且当server返回的cursor为0时iteration结束。

keys

除了scan外还可以通过keys命令来迭代redis中所有的key。但是scan的增量迭代不同的是keys会一次性返回所有的key在返回前会阻塞redis-server。

keys命令支持glob-style pattern

  • h?llo?用于匹配单个字符
  • h*llo: *用于匹配除/外的任何内容
  • h[ae]llo: 匹配hallohello
  • h[^e]llo: [^e]匹配除e外的任何字符
  • h[a-b]llo: 匹配hallohbllo

global-style pattern中转义符为\

pipelining

redis pipelining支持一次发送多条命令而非逐条发送命令,并且发送后一条命令之前必须要等待前一条请求执行完成。pipelining被绝大多数redis client支持能够提高性能。

Request/Response protocols and round-trip time(RTT)

redis是一个使用client-server model的tcp server在请求完成前会经历如下步骤

  • client向server发送query并且阻塞的从socket中读取server的响应
  • server接收到client的请求后处理命令并且将处理结果返回给client

例如包含4条命令的命令序列如下

  1. client: incr x
  2. server: 1
  3. client: incr x
  4. server: 2
  5. client: incr x
  6. server: 3
  7. client: incr x
  8. server: 4

client和server之间通过网络进行连接每次client和server的请求/响应,都需要经历client发送请求server发送响应的过程,该过程会经过网络来传输,带来传输延迟。

该延迟被称为RTT(round trip time), 如果在一次请求中能够发送n条命令,那么将能够节省n-1次网络传输的往返时间。例如RTT如果为250ms即使server能够以100K/s的速度处理请求对于同一client其每秒也只能处理4条命令。

redis pipelining

在redis server处理命令时其处理新请求前并不要求client接收到旧请求并且client在发送多条命令后会一次性统一读取执行结果。

该技术被称为Pipelining,在其他协议中也有广泛使用,例如POP3

pipelining在redis的所有版本中都被支持示例如下

$ (printf "PING\r\nPING\r\nPING\r\n"; sleep 1) | nc localhost 6379
+PONG
+PONG
+PONG

通过pipelining并不需要对每个命令都花费RTT用于网络传输而是在一次网络传输时就包含3条命令。

当client使用pipelining发送commands时server会在内存中对replies进行排队。故而在client需要使用pipeline向server发送大量的请求时其需要分批发送每批中包含合适数量的命令。

pipeline会积累多条命令并一次性发送给server。

Performance Imporvement

pipeline不仅能够减少RTT的次数其也能够增加redis server在每秒的执行操作数。

在redis server处理command时实际的处理逻辑开销很小但是和socket io的交互开销却很大。在进行socket io时会进行writeread的系统调用,其涉及到用户态和内核态的切换,这将带来巨大的开销。

如果使用pipeline多条指令只需要调用一次read系统调用,并且多条执行的执行结果只需要通过一次write系统调用即能够执行。通过使用pipeline能够有效降低redis server的系统调用次数这将减少socket io带来的开销故而redis server能够在一秒内执行更多的commands。

pipelining vs scripting

相比于pipeliningscripting可以在read, compute, write的场景下带来更高的性能。pipelining并无法处理read, compute, write的场景因为在执行write command之前需要先获取read command的执行结果故而无法将read和write命令通过pipeline同时发送给server。

Transactions

redis transaction支持execute a group of commands in a single step,其涉及到multi, exec, discard, watch命令。

  • 在redis事务中所有命令都会被串行、按顺序执行在redis transaction执行时其他client发送的请求永远不会插入到redis transaction中间。在redis transaction中所有命令都会executed as a single siolated operation,事务执行的过程中不会被其他命令打断
  • EXEC命令会触发事务中所有命令的执行故而当client在事务上下文中exec命令调用之前失去了与server的连接事务中的命令都不会被执行。只有当exec被调用后事务中的命令才会实际开始执行

Usage

可以通过multi命令来进入redis事务该命令总会返回ok。在进入事务后,可以向事务中添加多条命令,这些命令并不会被立马执行,而是被排队。只有当发送EXEC命令后,之前排队的命令才会被实际执行。

DISCARD命令可以清空被排队的命令,并且退出事务的上下文。

如下示例展示了如何通过事务原子的执行一系列命令:

> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

EXEC命令会返回一个数组数组中元素为之前QUEUED COMMANDS的返回结果顺序和命令被添加到队列中的顺序相同。

在事务上下文中,所有命令(EXEC除外)都会返回QUEUED

Errors inside transaction

在使用事务时可能遇到如下两种errors

  • 将命令添加到queue中时可能发生失败该时机在EXEC被执行之前。例如command可能存在语法错误或是存在内存错误等都可能导致命令添加到queue失败
  • 调用EXEC对入队的命令实际执行时可能发生异常例如在实际执行command时对string类型的value执行了list操作

对于EXEC时产生的错误,并没有特殊处理:即使事务中部分命令实际执行失败,其他的命令也都会被执行

示例如下所示:

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
MULTI
+OK
SET a abc
+QUEUED
LPOP a
+QUEUED
EXEC
*2
+OK
-WRONGTYPE Operation against a key holding the wrong kind of value

上述示例中执行了两个命令其中命令1执行成功而命令2执行失败。

需要注意的是,事务中即使某个命令执行失败queue中的其他命令仍然会被执行redis在执行某条命令失败时并不会对别的命令执行造成影响。

rollback for redis Transaction

对于redis transaction其并不对rollback做支持rollback会对redis的性能造成巨大影响也会影响redis的易用性。

Discard the command queue

如果想要对事务进行abort可以调用DISCARD命令,在该场景下,并不会有命令被实际执行,并且连接状态也会恢复为正常:

> SET foo 1
OK
> MULTI
OK
> INCR foo
QUEUED
> DISCARD
OK
> GET foo
"1"

Optimistic locking using check-and-set

在redis transaction中WATCH命令用于提供CAS行为。watched keys将会被监控,并探知其是否发生变化。

在执行EXEC命令前如果存在任一key发生过修改那么整个事务都会发生abort,并且会返回NULL,用以提示事务失败。

如下是一个read, compute, write的示例:

val = GET mykey
val = val + 1
SET mykey val

上述逻辑在只存在一个客户端的场景下工作正常,但是当存在多个客户端时,将会发生竞争。由于上述逻辑并不是原子的,故而可能出现如下场景:

  1. client A read old value
  2. client B read old value
  3. client A compute old value + 1
  4. client B compute old value + 1
  5. client A set new value with old value + 1
  6. client B set new value with old value + 1

故而在多client场景下假设old value为10即使client A和client B都对value进行了incr最后new value的值仍有可能为11而不是12

通过WATCH机制能够解决该问题

WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC

在上述示例中在进入事务上下文前client对mykey进行了watch并完成新值的计算之后进入事务上下文后用new value设置mykey并调用EXEC命令。

如果WATCH和EXEC之间存在其他client修改了mykey的值那么当前事务将会失败。

只需要在发生竞争时重新执行上述流程,那么其即是乐观锁。

WATCH

WATCH命令会让EXEC是条件的:

  • 只有当所有watched keys都未修改的前提下才会让redis实际执行transaction

watched keys可能发生如下的修改:

  • watched keys可能被其他client修改
  • watched keys可能被redis本身修改redis本身的修改包含如下场景
    • expiration
    • eviction

如果在对keys进行watch和实际调用exec之间keys发生的变化整个transaction都会被abort。

在redis 6.0.9之前expired keys并不会造成redis transaction被abort

在本事务内的命令并不会造成WATCH condition被触发因为WATCH机制的时间范围为keys watched的时间点到exec调用前的时间点而queued commands在调用exec后才会实际执行

watch命令可以被多次调用所有的watch命令都会生效并且在watch被调用后就开始监控key的变化监控一直到EXEC被调用后才结束。

对于WATCH命令,可以传递任意数量的参数。

EXEC命令被调用后所有的watched keys都会被unwatched不管事务是否被aborted。并且当client连接关闭后所有keys都会被unwatched。

对于discard命令其在调用后所有watched keys也会自动被unwatched

UNWATCH

可以通过UNWATCH命令无参数来清空所有的watched keys。

通常,在调用MULTI进入事务前,会执行如下操作:

  • WATCH mykey
  • GET mykey

如果在GET mykey后,调用MULTI之前如果在读取mykey的值后不再想执行后续事务了那么可以直接调用UNWATCH对先前监视的所有key取消监视。

Using WATCH to implement ZPOP

如下是一个使用WATCH的示例

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

如果EXEC失败那么其将返回Null,所以仅需对之前操作进行重试即可

Data Types

Redis Strings

Redis strings存储字节序列包含文本、serialized objects、binary array。strings通常被用于缓存但是支持额外的功能例如counters和位操作。

redis keys也为strings。

string data type可用于诸多用例例如对html fragement/html page的缓存。

SET / GET

通过set和get命令可以对string value进行设置和获取。

    > SET bike:1 Deimos
    OK
    > GET bike:1
    "Deimos"

在使用SET命令时如果key已存在对应的值那么set指定的value将会对已经存在的值进行替换。即使key对应的旧值并不是strings类型set也会对其进行替换

values可以是strings of every kind包含binary data故而支持在value中存储jpeg image。value的值不能超过512MB.

set with additional arguments

在使用set命令时,可以为其提供额外的参数,例如NX | XX.

  • NX: 仅当redis不存在对应的key时才进行设置否则失败返回nil
  • XX: 仅当redis存在对应的key时的才进行设置否则失败返回nil

GETSET

GETSET命令将会将key设置为指定的new value并且返回oldvalue的值。

127.0.0.1:6379> get bike:1
(nil)
127.0.0.1:6379> GETSET bike:1 3
(nil)
127.0.0.1:6379> GET bike:1
"3"

MSET / MGET

strings类型支持通过mset, mget命令来一次性获取和设置多个keys这将能降低RTT带来的延迟。

    > mset bike:1 "Deimos" bike:2 "Ares" bike:3 "Vanth"
    OK
    > mget bike:1 bike:2 bike:3
    1) "Deimos"
    2) "Ares"
    3) "Vanth"

strings as counters

strings类型支持atomic increment

    > set total_crashes 0
    OK
    > incr total_crashes
    (integer) 1
    > incrby total_crashes 10
    (integer) 11

incr命令会将string value转化为integer并且对其进行加一操作。类似命令还有incrby, decr, drcrby

Limits

默认情况下单个redis string的最大限制为512MB

JSON

redis支持对json值的存储、更新和获取。redis json可以和redis query engine进行协作从而允许index and query json documents

在redis 8中内置支持了RedisJSON否则需要手动安装RedisJSON module。

JSON.SET

JSON.SET命令支持将redis的key设置为JSON value示例如下

127.0.0.1:6379> JSON.SET bike $ '"Hyperion"'
OK
127.0.0.1:6379> JSON.GET bike $
"[\"Hyperion\"]"
127.0.0.1:6379> type bike
ReJSON-RL
127.0.0.1:6379> JSON.TYPE bike $
1) "string"

在上述示例中,$代表的是指向json document中value的path

  • 在上述示例中,$代表root

除此之外JSON还支持其他string operation。JSON.STRLNE支持获取string长度并且可以通过JSON.STRAPPEND来在当前字符串后追加其他字符串:

> JSON.STRLEN bike $
1) (integer) 8
> JSON.STRAPPEND bike $ '" (Enduro bikes)"'
1) (integer) 23
> JSON.GET bike $
"[\"Hyperion (Enduro bikes)\"]"

json数值操作

RedisJSON支持incrementmultiply操作:

> JSON.SET crashes $ 0
OK
> JSON.NUMINCRBY crashes $ 1
"[1]"
> JSON.NUMINCRBY crashes $ 1.5
"[2.5]"
> JSON.NUMINCRBY crashes $ -0.75
"[1.75]"
> JSON.NUMMULTBY crashes $ 24
"[42]"

json数组操作

RedisJSON支持通过JSON.SET将值赋值为数组,path expression支持数组操作

> JSON.SET newbike $ '["Deimos", {"crashes": 0}, null]'
OK
> JSON.GET newbike $
"[[\"Deimos\",{\"crashes\":0},null]]"
> JSON.GET newbike $[1].crashes
"[0]"
> JSON.DEL newbike $[-1]
(integer) 1
> JSON.GET newbike $
"[[\"Deimos\",{\"crashes\":0}]]"
JSON.DEL

JSON.DEL支持通过path对json值进行删除。

JSON.ARRAPPEND

支持向json array中追加值。

JSON.ARRTRIM

支持对json array进行裁剪。

> JSON.SET riders $ []
OK
> JSON.ARRAPPEND riders $ '"Norem"'
1) (integer) 1
> JSON.GET riders $
"[[\"Norem\"]]"
> JSON.ARRINSERT riders $ 1 '"Prickett"' '"Royce"' '"Castilla"'
1) (integer) 4
> JSON.GET riders $
"[[\"Norem\",\"Prickett\",\"Royce\",\"Castilla\"]]"
> JSON.ARRTRIM riders $ 1 1
1) (integer) 1
> JSON.GET riders $
"[[\"Prickett\"]]"
> JSON.ARRPOP riders $
1) "\"Prickett\""
> JSON.ARRPOP riders $
1) (nil)

json object操作

json oject操作同样有其自己的命令示例如下

> JSON.SET bike:1 $ '{"model": "Deimos", "brand": "Ergonom", "price": 4972}'
OK
> JSON.OBJLEN bike:1 $
1) (integer) 3
> JSON.OBJKEYS bike:1 $
1) 1) "model"
   2) "brand"
   3) "price"> JSON.SET bike:1 $ '{"model": "Deimos", "brand": "Ergonom", "price": 4972}'

format output

redis-cli支持对json内容的输出进行格式化步骤如下

  • 在执行redis-cli时指定--raw选项
  • 通过formatting keywords来进行格式化
    • INDENT
    • NEWLINE
    • SPACE
$ redis-cli --raw
> JSON.GET obj INDENT "\t" NEWLINE "\n" SPACE " " $
[
  {
    "name": "Leonard Cohen",
    "lastSeen": 1478476800,
    "loggedOut": true
  }
]

Limitation

传递给command的json值最大深度只能为128如果嵌套深度大于128那么command将返回错误。

Redis lists

Redis lists为string values的链表redis list通常用于如下场景

  • 实现stack和queue
  • 用于backgroup worker system的队列管理

Blocking commands

redis lists中支持阻塞命令

  • BLPOP: 从head of a list移除并且返回一个element如果list为空该command会阻塞直至list被填充元素或发生超时
  • BLMOVE: 从source list中pop一个element并且将其push到target list中。如果source list为空那么command将会被阻塞直到source list中出现new element

在上述文档描述中,阻塞实际指的是针对客户端的阻塞。该Blocking Command命令的调用会实际的阻塞客户端直到redis server返回结果。

但是,server并不会被blocking command所阻塞。redis server为单线程模型当用户发送blocking command给server并且该command无法立即被执行而导致客户端阻塞时server会挂起该客户端的连接,并且转而处理其他请求,直至该客户端的阻塞命令满足执行条件时,才会将挂起的连接唤醒重新执行。

故而,Blocking Command并不会对server造成阻塞而是会阻塞客户端的调用。

Queue(first in, first out)

依次调用LPUSH后再依次调用RPOP,可模拟队列行为,元素的弹出顺序和元素的添加顺序相同:

> LPUSH bikes:repairs bike:1
(integer) 1
> LPUSH bikes:repairs bike:2
(integer) 2
> RPOP bikes:repairs
"bike:1"
> RPOP bikes:repairs
"bike:2"

Stack(first in, last out)

依次调用LPUSH后再依次调用LPOP,可模拟栈的行为,元素的移除顺序和添加顺序相反:

> LPUSH bikes:repairs bike:1
(integer) 1
> LPUSH bikes:repairs bike:2
(integer) 2
> LPOP bikes:repairs
"bike:2"
> LPOP bikes:repairs
"bike:1"

check length of list

可通过LLEN命令来检查list的长度

> LLEN bikes:repairs
(integer) 0

Atomically pop one element from one list and push to another

通过lmove命令能够实现原子的从srclist移除并添加到dstlist的操作

> LPUSH bikes:repairs bike:1
(integer) 1
> LPUSH bikes:repairs bike:2
(integer) 2
> LMOVE bikes:repairs bikes:finished LEFT LEFT
"bike:2"
> LRANGE bikes:repairs 0 -1
1) "bike:1"
> LRANGE bikes:finished 0 -1
1) "bike:2"

trim the list

可以通过LTRIM命令来完成对list的裁剪操作

> LPUSH bikes:repairs bike:1
(integer) 1
> LPUSH bikes:repairs bike:2
(integer) 2
> LMOVE bikes:repairs bikes:finished LEFT LEFT
"bike:2"
> LRANGE bikes:repairs 0 -1
1) "bike:1"
> LRANGE bikes:finished 0 -1
1) "bike:2"

Redis List Impl

redis lists是通过Linked List来实现的其元素的添加操作并开销永远是常量的并不会和Array一样因扩容而可能导致内存的复制。

LPUSH, RPUSH

LPUSH会将元素添加到list的最左端头部RPUSH则会将新元素添加奥list的最右端尾部

LPUSH和RPUSH接收的参数都是可变的在单次调用中可以向list中添加多个元素。

例如向空list中调用lpush 1 2 3时,其等价于lpush 1; lpush 2; lpush3即调用后list中元素为3,2,1

LRANGE

LRANGE命令能够从list中解析范围内的数据其接收两个indexes为range中开始和结束元素的位置。index可以是负的负数代表从尾端开始计数

  • -1代表最后的元素
  • -2代表倒数第二个元素
> RPUSH bikes:repairs bike:1
(integer) 1
> RPUSH bikes:repairs bike:2
(integer) 2
> LPUSH bikes:repairs bike:important_bike
(integer) 3
> LRANGE bikes:repairs 0 -1
1) "bike:important_bike"
2) "bike:1"
3) "bike:2"

LPOP, RPOP

lists支持pop元素从list中移除元素并且获取元素的值。lists支持从list的左端和右端pop元素使用示例如下所示

> RPUSH bikes:repairs bike:1 bike:2 bike:3
(integer) 3
> RPOP bikes:repairs
"bike:3"
> LPOP bikes:repairs
"bike:1"
> RPOP bikes:repairs
"bike:2"
> RPOP bikes:repairs
(nil)

Common use cases for lists

redis lists拥有如下有代表性的用例场景

  • 记录用户最新上传的推文
  • 用于进程间的通信使用consumer-producer pattern其中生产者向lists中推送内容而消费者消费lists中的内容

Capped lists - latest n

在许多用例场景下会使用lists来存储latest items例如social network updates, logs等。

redis允许将lists作为拥有容量上限的集合使用,可以通过LTRIM命令来实现only remembering the latest N items and discarding all the oldest items

LTRIM命令和LRANGE类似,但是LRANGE用于获取获取list中指定范围内的元素LTRIM会将选中的范围作为new list value所有位于选中范围之外的元素都会从list中被移除。

LTRIM的使用示例如下所示:

> RPUSH bikes:repairs bike:1 bike:2 bike:3 bike:4 bike:5
(integer) 5
> LTRIM bikes:repairs 0 2
OK
> LRANGE bikes:repairs 0 -1
1) "bike:1"
2) "bike:2"
3) "bike:3"

LTRIM 0 2会令redis保留index位于[0, 2]范围内的3个元素并且移除其他的元素。将push操作LTRIM操作组合,可以实现add a new element and discard elements exceeding a limt的操作。

例如,使用LRANGE -3 -1可用于实现仅保留最近添加的三个元素的场景

> RPUSH bikes:repairs bike:1 bike:2 bike:3 bike:4 bike:5
(integer) 5
> LTRIM bikes:repairs -3 -1
OK
> LRANGE bikes:repairs 0 -1
1) "bike:3"
2) "bike:4"
3) "bike:5"

Blocking operations on lists

lists的blocking operation特性令其适合用于实现queues并广泛用于进程间通信系统。

在通过redis lists实现进程间通信系统时如果某些时刻list为空并不存在任何元素那么此时消费者client在调用pop操作时只会返回为空。通常来讲consumer会等待一定的时间并且重新尝试调用pop该操作被称为polling,其通常被认为是一种不好的实现:

  • 其会强制redis/client来处理无用的命令(当list为空时pop请求只会返回为空而不会任何的实际处理)
  • 会增加delay to processing of items因为worker在接收到redis server返回的null时其会等待一定的时间。为了令delay更小可以在调用POP操作之间等待更短的时间但是其可能方法前一个问题(当pop调用之间的时间间隔更小时redis server可能会处理更多的无用命令)

故而redis实现支持BRPOPBLPOP命令其命令在list为空时会阻塞上述命令造成的阻塞会在list中被添加新元素时返回如果直到设置的超时到达后该操作也会返回

BRPOP的使用示例如下所示:

> RPUSH bikes:repairs bike:1 bike:2
(integer) 2
> BRPOP bikes:repairs 1
1) "bikes:repairs"
2) "bike:2"
> BRPOP bikes:repairs 1
1) "bikes:repairs"
2) "bike:1"
> BRPOP bikes:repairs 1
(nil)
(2.01s)

上述示例中,BRPOP bikes:repairs 1代表wait for elements in the list bikes:repairs,但是当list中元素为空时最多等待1s。

当为BRPOP指定timeout为0时代表会永久等待elements。并且可以为BRPOP命令指定多个lists其会等待所有的list并且当任何一个list中接收到元素时当前BRPOP命令会立刻返回

示例如下:

# client 1 等待 event-queue:1, event-queue:2, event-queue:3三个list
client 1> brpop event-queue:1 event-queue:2 event-queue:3 1000

# client 2 向event-queue:2 中追加元素
client 2> rpush event-queue:2 baka
(integer) 1

# client 1 立刻返回,返回结果如下
1) "event-queue:2"
2) "baka"
(19.55s)
BRPOP
  • 对于BRPOP命令造成的阻塞,其处理是按照顺序的:the first client that blocked waiting for a list, is served first when an element is pushed by some other client, and so forth
  • BRPOP命令的返回结果其结构和RPOP命令不同:BRPOP返回的是一个包含两个元素的arrayarrary[0]为list对应的keyarray[1]为弹出的元素因为BRPOP可以等待多个list
  • 如果超时后list中仍然没有可获取的元素那么将会返回null

Automatic Creation and removal of keys

在先前示例中向list中添加元素时并没有预先创建空的list或是在list中没有元素时将list手动移除。

在redis中list的创建和删除都是redis的职责

  • 当list中不再包含元素时redis会自动删除list对应的key
  • 当想要对不存在的key中添加元素时redis会自动创建一个empty list

故而,可以整理除如下准则:

  • 当将元素添加到一个聚合数据类型时如果target key不存在那么在添加元素前一个empty aggregate data type将会被自动创建
  • 当从aggregate data type中移除元素时如果移除后该aggregate data type中为空那么key将会被自动销毁stream data type除外)
  • 调用一个read-only command例如LLEN或write command用于移除元素对一个empty key做操作时,其返回结果和针对an key holding an empty aggregate type of type the command expects to find的操作一直

上述三个准则的示例如下:

准则1示例如下所示当new_bikes不存在时通过LPUSH命令向其中添加元素一个empty list在添加前会自动创建

> DEL new_bikes
(integer) 0
> LPUSH new_bikes bike:1 bike:2 bike:3
(integer) 3

准则2示例如下所示当pop出所有的元素后key将会被自动销毁通过EXISTS命令返回的结果为0

> LPUSH bikes:repairs bike:1 bike:2 bike:3
(integer) 3
> EXISTS bikes:repairs
(integer) 1
> LPOP bikes:repairs
"bike:3"
> LPOP bikes:repairs
"bike:2"
> LPOP bikes:repairs
"bike:1"
> EXISTS bikes:repairs
(integer) 0

准则3的示例如下所示当key不存在时对该key进行read-only操作和remove element操作所返回的结果,和对empty aggregated data type操作所返回的结果一致:

> DEL bikes:repairs
(integer) 0
> LLEN bikes:repairs
(integer) 0
> LPOP bikes:repairs
(nil)

redis sets

redis set是一个unordered collection of unique strings通过redis sets可以高效进行如下操作

  • track unique items
  • represent relations
  • perform common set operations such as intersection, unions, and differences

basic commands

  • SADD: 向set中添加new member
  • SREM: 从set中移除指定member
  • SISMEMBER: 检查给定的string是否位于set中
  • SINTER: 返回两个或更多set的交集
  • SCARD: 返回set的大小cardinality

SADD

SADD命令会向set中添加新的元素示例如下

> SADD bikes:racing:france bike:1 bike:2 bike:3
(integer) 3
> SMEMBERS bikes:racing:france
1) bike:3
2) bike:1
3) bike:2

SMEMBERS

在上述示例中,SMEMBERS命令会返回set中所有的元素。redis并不保证元素的返回顺序,每次调用SMEMBERS命令都可能以任何顺序返回set中的元素。

SDIFF

可以通过SDIFF来返回两个sets的差异差集。例如可以通过SDIFF命令查看有哪些元素位于set1中但是不位于set2中,示例如下:

> SADD bikes:racing:usa bike:1 bike:4
(integer) 2
> SDIFF bikes:racing:france bikes:racing:usa
1) "bike:3"
2) "bike:2"

上述示例中,则通过SDIFF命令展示了bikes:racing:francebikes:racing:usa两个set的差集。

SDIFF命令在difference between all sets is empty会返回一个empty array。

SINTER

可以通过SINTER命令来取多个sets的交集。

> SADD bikes:racing:france bike:1 bike:2 bike:3
(integer) 3
> SADD bikes:racing:usa bike:1 bike:4
(integer) 2
> SADD bikes:racing:italy bike:1 bike:2 bike:3 bike:4
(integer) 4
> SINTER bikes:racing:france bikes:racing:usa bikes:racing:italy
1) "bike:1"
> SUNION bikes:racing:france bikes:racing:usa bikes:racing:italy
1) "bike:2"
2) "bike:1"
3) "bike:4"
4) "bike:3"
> SDIFF bikes:racing:france bikes:racing:usa bikes:racing:italy
(empty array)
> SDIFF bikes:racing:france bikes:racing:usa
1) "bike:3"
2) "bike:2"
> SDIFF bikes:racing:usa bikes:racing:france
1) "bike:4"

SREM

可以通过SREM命令来移除set中的元素可以一次性移除一个或多个。

SPOP

SPOP命令支持随机移除一个element。

SRANDMEMBER

SRANDMEMBER命令支持随机返回一个set中的元素但是不对其实际移除

上述命令的使用示例如下所示:

> SADD bikes:racing:france bike:1 bike:2 bike:3 bike:4 bike:5
(integer) 5
> SREM bikes:racing:france bike:1
(integer) 1
> SPOP bikes:racing:france
"bike:3"
> SMEMBERS bikes:racing:france
1) "bike:2"
2) "bike:4"
3) "bike:5"
> SRANDMEMBER bikes:racing:france
"bike:2"

redis hashes

redis hashes为记录field-value pair集合的数据结构可以使用hashes来表示基本对象或存储counter的分组示例如下

对象表示

> HSET bike:1 model Deimos brand Ergonom type 'Enduro bikes' price 4972
(integer) 4
> HGET bike:1 model
"Deimos"
> HGET bike:1 price
"4972"
> HGETALL bike:1
1) "model"
2) "Deimos"
3) "brand"
4) "Ergonom"
5) "type"
6) "Enduro bikes"
7) "price"
8) "4972"

通常来讲可以存储在hash中的fields数量并没有限制。

命令HSET可用于向hash中设置多个fields而命令HGET可以用于获取一个fieldHMGET可以用于获取多个field。

> HMGET bike:1 model price no-such-field
1) "Deimos"
2) "4972"
3) (nil)

同样的hash结构支持对单个field进行操作例如HINCRBY

> HINCRBY bike:1 price 100
(integer) 5072
> HINCRBY bike:1 price -100
(integer) 4972

counters

将hash用于存储counters分组的示例如下所示

> HINCRBY bike:1:stats rides 1
(integer) 1
> HINCRBY bike:1:stats rides 1
(integer) 2
> HINCRBY bike:1:stats rides 1
(integer) 3
> HINCRBY bike:1:stats crashes 1
(integer) 1
> HINCRBY bike:1:stats owners 1
(integer) 1
> HGET bike:1:stats rides
"3"
> HMGET bike:1:stats owners crashes
1) "1"
2) "1"

Field Expiration

redis open source 7.4支持为独立的hash field指定超时

  • HEXPIRE: set the remaining TTL in seconds
  • HPEXPIRE: set the remaining TTL in milliseconds
  • HEXPIREAT: set expiration time to a timestamp specified in seconds
  • HPEXPIREAT: set the expiration time to a timestamp specified in milliseconds

如上所示在指定超时时可以通过时间戳来指定也可以通过TTL来指定。

同时,获取超时事件也可以通过时间戳TTL来获取:

  • HEXPIRETIME: get the expiration time as timestamp in seconds
  • HPEXPIRETIME: get the expiration time as timestamp in milliseconds
  • HTTL: get the remaining ttl in seconds
  • HPTTL: get the remaining ttl in milliseconds

如果想要移除指定hash field的expration可以通过如下方式

  • HPERSIST: 移除hash field的超时
Common field expiration use cases
  • Event Tracking使用hash key来存储最后一小时的事件。其中field为每个事件而设置事件field的ttl为1小时并可使用HLEN来统计最后一小时的事件数量。
  • Fraud Detection通常用户行为进行分析时可按小时记录用户的事件数量。可通过hash结果记录过去48小时中每小时的操作数量其中hash field代表用户某一个小时内的操作数。每个hash field的过期时间都为48h。
  • Customer session management: 可以通过hash来存储用户数据。为每个session创建一个hash key并且向hash key中添加session field。当session过期时自动对session key和session field进行expire操作。
  • Active Session Tracking: 将所有的active sessions存储再一个hash key中。每当session变为inactive时将session的TTL设置为过期。可以使用HLEN来统计活跃的sessions数量。
Field Expiration examples

对于hash field ixpiration的支持在官方client libraries中尚不可用,但是可以在python(redis-py)java(jedis)的beta版本client libraries中使用。

如下python示例展示了如何使用field expiration

event = {
      'air_quality': 256,
        'battery_level':89
}

r.hset('sensor:sensor1', mapping=event)

# set the TTL for two hash fields to 60 seconds
r.hexpire('sensor:sensor1', 60, 'air_quality', 'battery_level')
ttl = r.httl('sensor:sensor1', 'air_quality', 'battery_level')
print(ttl)
# prints [60, 60]

# set the TTL of the 'air_quality' field in milliseconds
r.hpexpire('sensor:sensor1', 60000, 'air_quality')
# and retrieve it
pttl = r.hpttl('sensor:sensor1', 'air_quality')
print(pttl)
# prints [59994] # your actual value may vary

# set the expiration of 'air_quality' to now + 24 hours
# (similar to setting the TTL to 24 hours)
r.hexpireat('sensor:sensor1', 
                datetime.now() + timedelta(hours=24), 
                'air_quality')
# and retrieve it
expire_time = r.hexpiretime('sensor:sensor1', 'air_quality')
print(expire_time)
# prints [1717668041] # your actual value may vary

Sorted sets

redis sorted set是一个包含unique strings的集合其中unique strings会关联一个score并且strings会按照score进行排序。如果多个string存在相同的score那么拥有相同score的strings将会按照字典序进行排序

sorted sets的用例场景包括如下

  • Leaderboards: 可以通过sorted sets维护一个ordered list其可被用于实现排行榜
  • RateLimiter: 通过sorted list可以构建一个sliding-window rate limiter从而避免过多的api调用
    • 在实现sliding-window rate limiter可以将时间戳作为score因为可以快速获取一个时间范围内的调用数量

可以将sorted sets看作是set和hash的混合

  • 和set一样sorted sets由unique strings组成
  • 和hash一样sorted sets中元素由一个关联的floating point value被称为score

sorted set的使用示例如下

> ZADD racer_scores 10 "Norem"
(integer) 1
> ZADD racer_scores 12 "Castilla"
(integer) 1
> ZADD racer_scores 8 "Sam-Bodden" 10 "Royce" 6 "Ford" 14 "Prickett"
(integer) 4

ZADDSADD类似,但是其接收一个用于表示score的额外参数。和SADD类似,可以使用ZADD来添加多个score-value pairs

Implementation

sorted sets实现的数据结构中同时使用了skip listhash table两种数据结构故而每次向zset中添加元素时其操作的复杂度为o(log(n))`。

并且,在获取元素时,由于元素已经被排序,获取操作无需其他的额外开销。

ZRANGE的顺序为从小到大,ZREVRANGE的顺序则是从大到小

> ZRANGE racer_scores 0 -1
1) "Ford"
2) "Sam-Bodden"
3) "Norem"
4) "Royce"
5) "Castilla"
6) "Prickett"
> ZREVRANGE racer_scores 0 -1
1) "Prickett"
2) "Castilla"
3) "Royce"
4) "Norem"
5) "Sam-Bodden"
6) "Ford"

在上述示例中0和-1代表index位于[0, len-1]范围内的元素。(负数代表的含义和LRANGE命令中相同)

ZRANGE命令中指定withscores也能够在返回zset中value时同时返回score

> ZRANGE racer_scores 0 -1 withscores
 1) "Ford"
 2) "6"
 3) "Sam-Bodden"
 4) "8"
 5) "Norem"
 6) "10"
 7) "Royce"
 8) "10"
 9) "Castilla"
10) "12"
11) "Prickett"
12) "14"

ZRANGEBYSCORE

除了上述操作外sorted sets还支持对operate on ranges。可以通过ZRANGEBYSCORE来实现get all racers with 10 or fewer points的操作:

> ZRANGEBYSCORE racer_scores -inf 10
1) "Ford"
2) "Sam-Bodden"
3) "Norem"
4) "Royce"

上述示例中,ZRANGEBYSCORE racer_score -inf 10向redis请求返回score位于negative infinity和10之间both included的元素

ZREMRANGEBYSCORE / ZREM

如果想要丛zset中移除元素可以调用ZREM命令。同样的sorted sets也支持remove ranges of elements的操作,可通过ZREMRANGEBYSCORE命令来实现。

> ZREM racer_scores "Castilla"
(integer) 1
> ZREMRANGEBYSCORE racer_scores -inf 9
(integer) 2
> ZRANGE racer_scores 0 -1
1) "Norem"
2) "Royce"
3) "Prickett"

上述示例中,通过ZREMRANGEBYSCORE racer_scores -inf 9命令实现了remove all the racers with strictly fewer than 10 points的操作。

ZREMRANGEBYSCORE会返回其移除的元素个数。

Inclusive and exclusive

在使用ZRANGEBYSCORE指定minmax时,可以分别将其指定为-inf+inf。故而在获取zset中比xx大或比xx小的所有元素时,可以使用-inf+inf此时无需知道当前zset中的最大/最小元素。

默认情况下,interval specified by min and max is closed(inclusive)。但是,可以将其指定为open interval(exclusive)只需要在score前添加(符号即可,示例如下:

ZRANGEBYSCORE zset (1 5

上述示例中,会返回位于(1, 5]区间内的元素。

ZRANGEBYSCORE zset (5 (10命令则是会返回(5, 10)区间内的元素。

ZRANK / ZREVRANK

sorted sets还支持get-rank operation,通过ZRANK可以返回position of an element in the set of ordered elements

ZREVRANK命令的作用和ZRANK类似,但是ZREVRANK返回的值为从大到小的降序ranking

使用示例如下所示:

> ZRANK racer_scores "Norem"
(integer) 0
> ZREVRANK racer_scores "Norem"
(integer) 2

Lexicographical scores

自redis 2.8其,引入了getting ranges lexicograhpically的新特性其假设sorted set中的所有元素都拥有相同的score。

lexicographical ranges进行交互的主要命令如下:

  • ZRANGEBYLEX
  • ZREVRANGEBYLEX
  • ZREMRANGEBYLEX
  • ZLEXCOUNT

使用示例如下所示:

> ZADD racer_scores 0 "Norem" 0 "Sam-Bodden" 0 "Royce" 0 "Castilla" 0 "Prickett" 0 "Ford"
(integer) 3
> ZRANGE racer_scores 0 -1
1) "Castilla"
2) "Ford"
3) "Norem"
4) "Prickett"
5) "Royce"
6) "Sam-Bodden"
> ZRANGEBYLEX racer_scores [A [L
1) "Castilla"
2) "Ford"

在上述示例中,可以通过ZRANGEBYLEX按照字典序对range进行请求。

ZRANGEBYLEX

ZRANGEBYLEX的语法如下

ZRANGEBYLEX key min max [limit offset count]

ZRANGEBYSCORE命令不同的是,ZRANGEBYSCORE在指定范围时,默认是included的;而ZRANGEBYLEX必须通过[(来显式指定inclusive或exclusive。

而在指定min和max时-+分别则代表negatively infinite和positive infinite。故而ZRANGEBYLEX myzset - +命令代表返回zset中所有的元素。

Updating the score: leaderboards

支持对sorted set中元素的score进行更新。在更新sorted set中元素的score时只需要再次调用ZADD命令即可sorted set会更新score更新操作的时间复杂度为O(log(N))

Leaderboard Example

在通过zset实现leaderboard时由如下两种方式对user score进行更新

  • 在得知user当前score的情况下可以直接通过ZADD命令来进行覆盖
  • 如果想要针对当前的score进行增加操作时,可以使用ZINCRBY命令
> ZADD racer_scores 100 "Wood"
(integer) 1
> ZADD racer_scores 100 "Henshaw"
(integer) 1
> ZADD racer_scores 150 "Henshaw"
(integer) 0
> ZINCRBY racer_scores 50 "Wood"
"150"
> ZINCRBY racer_scores 50 "Henshaw"
"200"
ZADD

当当前添加的元素已经在sorted set中存在时ZADD命令会返回0,否则ZADD命令会返回1

ZINCRBY

ZINCRBY命令则是会返回更新后的new score。

redis Streams

Redis Stream类型数据结构的行为类似于append-only log但是实现了o(1)时间复杂度的random access和复杂的消费策略、consumer groups。通过redis stream可以实时的记录事件并对事件做同步分发。

通用的redis stream用例如下

  • event sourcing(e.g., tracking user actions)
  • sensor monitoring
  • notifications(e.g., storing a record of each user's notifications in a separate stream)

redis会为每个stream entry生成一个unique ID。可以使用IDs来获取其关联的entries或读取和处理stream中所有的后续entries。

redis stream支持一些trimming strategies用于避免stream的无尽增长。并且redis stream也支持多种消费策略XREAD, XREADGROUP, XRANGE

Examples

添加stream entry

在如下示例中当racers通过检查点时将会为每个racer添加一个stream entrystream entry中包含racer name, speed, position, location ID信息示例如下

> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1
"1692632086370-0"
> XADD race:france * rider Norem speed 28.8 position 3 location_id 1
"1692632094485-0"
> XADD race:france * rider Prickett speed 29.7 position 2 location_id 1
"1692632102976-0"

从指定id开始读取stream entries

在如下示例中将从stream entry ID 1692632086370-0开始读取两条stream entries

> XRANGE race:france 1692632086370-0 + COUNT 2
1) 1) "1692632086370-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "30.2"
      5) "position"
      6) "1"
      7) "location_id"
      8) "1"
2) 1) "1692632094485-0"
   2) 1) "rider"
      2) "Norem"
      3) "speed"
      4) "28.8"
      5) "position"
      6) "3"
      7) "location_id"
      8) "1"

从末尾开始读取

在如下示例中,会从end of the stream开始读取entries最多读取100条entries并在没有entries被写入的情况下最多阻塞300ms

> XREAD COUNT 100 BLOCK 300 STREAMS race:france $
(nil)

Stream basics

stream为append-only数据结构其基础的write command为XADD会向指定stream中添加一个新的entry。

每个stream entry都由一个或多个field-value pairs组成类似dictionary或redis hash

> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"
XADD

上述示例中,通过XADD向key为race:france中添加了值为rider: Castilla, speed:29.9, position: 1, location_id: 2的entry并使用了auto-generated entry ID 1692632147973-0作为返回值。

XADD命令的描述如下

XADD key [NOMKSTREAM] [KEEPREF | DELREF | ACKED] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

XADD race:france * rider Castilla speed 29.9 position 1 location_id 2的命令示例中,

  • 第一个参数race:france代表key name
  • 第二个参数为entry id*代表stream中所有的entry id
    • 在上述示例中,为第二个参数传入*代表希望server生成一个新的ID
    • 所有新生成的ID都应该单调递增即新生成的ID将会比过去所有entries的ID都要大
    • 需要显式指定ID而不是新生成的场景比较罕见
  • 位于第一个和第二个参数之后的为field-value pairs
XLEN

可以通过XLEN命令来获取Stream中items的个数

> XLEN race:france
(integer) 4

Entry IDs

entry ID由XADD命令返回并且可以对给定stream中的entries进行唯一标识。entry ID由两部分组成

<millisecondsTime>-<sequenceNumber>
  • millisecondsTime: 该部分代表生成stream id时redis node的本地时间。但是如果current milliseconds time比previous entry time要小,则会使用previous entry time而不是current milliseconds
    • 该设计主要是为了解决时钟回拨的问题即使在redis node回拨本地时间的场景下新生成的ID仍然能够保证单调递增
  • sequenceNumber: 该部分主要是为了处理同一ms内创建多条entries的问题
    • sequence number的宽度为64bit
entry ID设计

entry ID中包含millisecondsTime的原因是Reids Stream支持range queries by ID。因为ID关联entry的生成时间故而可以不花费额外成本的情况下按照time range对entries进行查询。

当在某种场景下,用户可能需要incremental IDs that are not related to time but are actually associated to another external system ID,此时XADD则可以在第二个参数接收一个实际的ID而不是*通配符。

  • *符号会触发auto-generation

手动指定entry ID的示例如下所示

> XADD race:usa 0-1 racer Castilla
0-1
> XADD race:usa 0-2 racer Norem
0-2

在通过XADD手动指定entry ID时后添加的entry ID必须大于先前指定的entry ID否则将会返回error。

> XADD race:usa 0-1 racer Prickett
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

在redis 7及之后可以仅显式指定millisecondsTime的部分,指定后sequenceNumber的部分将会自动生成并填充,示例如下所示:

> XADD race:usa 0-* racer Prickett
0-3

Redis Stream consumer query model

向redis stream中添加entry的操作可以通过XADD进行实现。而从redis stream从读取数据redis stream支持如下query model

  • Listening for new items: 和unix command tail -f类似reids stream consumer会监听new messages that are appended to the stream
    • 但是和BLPOP这类阻塞操作不同的是对于BLPOP,一个给定元素只会被传递给一个client而在使用stream时我们希望new messages appended to the stream时新消息对多个consumers可见。tail -f同样支持新被添加到log文件中的内容被多个进程可见
    • 故而,在Listening for new items这种query model下stream is able to fan out messages to multiple clients
  • Query by range: 除了上述类似tail -f的query model外可能还希望将redis stream以另一种方式进行使用并不将redis stream作为messaging system而是将其看作time series store
    • 在将其作为time series store的场景下其仍然能访问最新的messages但是其还支持get messages by ranges of time的操作,或是iterate the messages using a cursor to incrementally check all the history
  • Consumer group: 上述两种场景都是从consuemrs的视角来读取/处理消息但是redis stream支持另一种抽象a stream of messages that can be partitioned to multiple consumers that are processing such messages
    • 在该场景下并非每个consumer都必须处理所有的messsages每个consumer可以获取不同的messages并进行处理

redis stream通过不同的命令支持了以上三种query model。

Querying by range: XRANGE and XREVRANGE

为了根据范围查询stream需要指定两个idstart IDend ID。指定的范围为inclusive的,包含start IDend ID

+-则是代表greatest IDsmallest ID,示例如下所示:

> XRANGE race:france - +
1) 1) "1692632086370-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "30.2"
      5) "position"
      6) "1"
      7) "location_id"
      8) "1"
2) 1) "1692632094485-0"
   2) 1) "rider"
      2) "Norem"
      3) "speed"
      4) "28.8"
      5) "position"
      6) "3"
      7) "location_id"
      8) "1"
3) 1) "1692632102976-0"
   2) 1) "rider"
      2) "Prickett"
      3) "speed"
      4) "29.7"
      5) "position"
      6) "2"
      7) "location_id"
      8) "1"
4) 1) "1692632147973-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "29.9"
      5) "position"
      6) "1"
      7) "location_id"
      8) "2"

如上所示返回的每个entry都是由IDfield-value pairs组成的数组。由于entry ID和时间currentMillisTime相关联故而可以通过XRANGE根据时间范围来查询records。

query by time range

并且在根据时间范围查询records时可以省略sequence part的部分

  • 在省略sequence part时start的sequence part将会被设置为0end的sequence part将会被设置为最大值。
  • 故而可以通过两个milliseconds unix time来进行时间范围内的查询可以获取在该时间范围内生成的entriesrange is inclusive

示例如下

> XRANGE race:france 1692632086369 1692632086371
1) 1) "1692632086370-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "30.2"
      5) "position"
      6) "1"
      7) "location_id"
      8) "1"
count option

XRANGE在范围查询时还支持指定一个COUNT选项用于get first N items。如果想要进行增量查询,可以用上次查询的最大ID + 1作为新一轮查询的start

增量迭代示例如下:

> XRANGE race:france - + COUNT 2
1) 1) "1692632086370-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "30.2"
      5) "position"
      6) "1"
      7) "location_id"
      8) "1"
2) 1) "1692632094485-0"
   2) 1) "rider"
      2) "Norem"
      3) "speed"
      4) "28.8"
      5) "position"
      6) "3"
      7) "location_id"
      8) "1"

# 通过(符号指定了左开右闭的区间
> XRANGE race:france (1692632094485-0 + COUNT 2
1) 1) "1692632102976-0"
   2) 1) "rider"
      2) "Prickett"
      3) "speed"
      4) "29.7"
      5) "position"
      6) "2"
      7) "location_id"
      8) "1"
2) 1) "1692632147973-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "29.9"
      5) "position"
      6) "1"
      7) "location_id"
      8) "2"

# 返回为空,迭代完成
> XRANGE race:france (1692632147973-0 + COUNT 2
(empty array)

XRANGE操作查找的时间复杂度为O(long(N))切返回M个元素的时间复杂度为O(M)

命令XREVRANGEXRANGE集合等价,但是会按照逆序来返回元素,故而,针对XREVRANGE传参时参数的顺序也需要颠倒

> XREVRANGE race:france + - COUNT 1
1) 1) "1692632147973-0"
   2) 1) "rider"
      2) "Castilla"
      3) "speed"
      4) "29.9"
      5) "position"
      6) "1"
      7) "location_id"
      8) "2"

Listening for new items with XREAD

在某些场景下,我们可能希望对new items arriving to the stream进行订阅,如下两种场景都希望对new items进行订阅

  • Redis Pub/Sub场景中通过redis stream来对channel进行订阅
  • 在Redis blocking lists场景中从redis stream中等并并获取new element

但是,上述两种场景在如下方面有所不同

  • 一个stream可以包含多个clients(consumers)对于new item默认情况下会从传递给每一个等待当前stream的consumer
    • Pub/Sub场景和上述行为相符其会将new item传递给每一个consumerfan out
    • 在blocking lists场景和上述行为并不相符每个consumer都会获取到不同的element相同的element只会被传递给一个consumer
  • 其对message的处理也有不同
    • Pub/Sub场景下,将会对消息进行fire and forget操作,消息将永远不会被存储
    • 在使用blocking lists如果消息被client接收其将会从list中移除
  • Stream Consumer Groups提供了Pub/Subblocking lsits都无法实现的控制不同的groups可以针对相同stream进行订阅并且对被处理items进行显式的ack可以对unprocessed messages进行声明;只有private past history of messages对client可见

对于提供Listening for new items arriving into stream支持的命令,被称为XREAD。其使用示例如下:

> XREAD COUNT 2 STREAMS race:france 0
1) 1) "race:france"
   2) 1) 1) "1692632086370-0"
         2) 1) "rider"
            2) "Castilla"
            3) "speed"
            4) "30.2"
            5) "position"
            6) "1"
            7) "location_id"
            8) "1"
      2) 1) "1692632094485-0"
         2) 1) "rider"
            2) "Norem"
            3) "speed"
            4) "28.8"
            5) "position"
            6) "3"
            7) "location_id"
            8) "1"
XREAD with STREAMS option

上述示例为XREAD命令的non-blocking形式,并且COUNT option并非强制的唯一的强制option为STREAMS

XREAD在和STREAMS一起使用时并且需要指定a list of keysmaximum ID already seen for each stream by the calling consumer故而该命令只会向consumer返回messages with and ID greater than the one we specified

在上述示例中,命令为STREAMS race:france 0,其代表race:france流中所有ID比0-0大的消息。故而返回的结构中顶层为stream的key name因为上述命令可以指定多个stream针对多个stream进行监听

在指定STREAMS option时必须后缀key names故而STREAMS选项必须为最后一个option任何其他option必须要放在STREAMS之前。

除了streams之外还可以为XREAD指定last ID we own

XREAD with BLOCK argument

上述示例为non-blocking形式,除此之外还可以通过BLOCK参数将命令转化为blocking形式:

> XREAD BLOCK 0 STREAMS race:france $

在上述示例中,并没有指定COUNT,而是指定了BLOCK选项,并设置timeout0 milliseconds(代表永远不会超时)。

并且BLOCK版本的示例并没有传递正常的ID而是传递了一个特殊的ID $,该符号代表XREAD应当使用stream已经存储的最大ID来作为last ID。故而在指定last ID为$后,只会接收到new messages。其行为和unix command中的tail -f类似。

在指定了BLOCK选中后其行为如下

  • if the command is able to serve our request immediately without blocking,那么其会立马被执行
  • 如果当前不满足获取entry的条件那么client会发生阻塞

通常,在希望从new entries开始消费时,会从ID $开始,在获取到ID $对应的entry后下一次消费从last message recevied开始。

XREAD的blocking形式也支持监听多个streams也可以指定多个key names。如果至少存在一个stream中存在元素的ID并命令指定的ID更大那么将会立马返回结果否则该命令将会阻塞直到有一个stream获取到新的data。

和blocking list操作类似blocking stream readsclients wating for data而言是公平的也支持FIFO。the first client that blocked for a given stream will be the first to be unlocked when new items are available

Consumer groups

当从不同的clients消费相同stream时可通过XREAD来进行读取此时可将message fan-out给多个clients在此种场景下同一条消息会被多个clients进行处理。

在某些场景下可能不希望上述行为而是希望每个client都负责处理stream中的不同消息subset特别是在消息处理耗时较长的场景下。假设拥有3个消费者C1, C2, C3和包含消息1,2,3,4,5,6,7的stream则消费者对stream进行消费时消费情形可能如下

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现上述消费方式redis引入了consumer groups的概念。consumer group从stream中获取数据并且serves multiple consumers。consumer group提供了如下保证

  • 每条消息只会被传递给一个消费者,不存在一条消息传递给多个消费者的情形
  • 在consumer group内consumer根据name进行标识name是大小写敏感的。即使在redis client断连后stream consumer group也会保留对应的状态当client重连后会被标识为之前的consumer
  • 每个consumer group都拥有first ID never consumed的概念当consumer请求新消息是其仅可向consumer提供尚未被传递过的消息
  • 当消费消息时需要通过特定命令来进行显式的ack。
    • 在redis中ack代表如下含义该消息已经被正确的处理并且该消息可在consumer group中被淘汰
  • consumer group会追踪当前所有的pending messages
    • pending messages代表已经被传递给consumer但是尚未被ack的消息
    • 由于consumer group对pending message的追踪当访问stream的message hisory记录message被传递给哪个consumer instance每个consumer只能看到被传递给其自身的messages

在某种程度上consumer group可以被看作是amount of state abount a stream:

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

consumer group能够为consumer instance提供history of pending messages并且当consumer请求new messages时只会向其传递ID大于last_delivered_id的message

对于redis stream而言其可以包含多个consumer groups。并且对于同一redis stream可以同时包含consumer reading without consumer groups via XREADconsumer reading via XREADGROUP in consumer group两种类型的consumer。

consumer group包含如下的基本命令

  • XGROUP
  • XREADGROUP
  • XACK
  • XACKDEL
Creating a consumer group

可以通过如下方式来为stream创建一个consumer group

> XGROUP CREATE race:france france_riders $
OK

在上述示例中在通过command创建consumer group时$的形式指定了一个ID。该ID是必要的该ID将作为consumer group创建时的last message ID在第一个consumer连接时会根据向consumer传递大于last message ID的消息

  • 当将last message ID指定为$时,只有new messages arriving in the stream from now on会被传递给consumer group中的consumer instances。由于$代表current greatest ID in the stream,指定$代表consuming only new messages
  • 如果在此处指定了0那么consumer group将会消费all the messages in the stream history
  • 此处,也可以指定其他的valid IDconsumer group will start delivering messages that are greater than the ID you specify
Create the stream automatically

XGROUP CREATE命令会期望target stream存在并且当target stream不存在时会返回异常。如果target stream不存在可以通过在末尾指定MKSTREAM选项来自动创建stream。

XGROUP CREATE同样支持自动创建stream可通过MKSTREAM的subcommand作为最后一个参数

XGROUP CREATE race:italy italy_riders $ MKSTREAM

在调用该命令后consumer group会被创建之后可以使用XREADGROUP命令来通过consumer group读取消息。

XREADGROUP

XREADGROUP命令和XREAD命令类似并同样提供BLOCK选项:

  • GROUP: 该option是一个mandatory option在使用XREADGROUP时必须指定该选项。
    • GROUP包含两个arguments
      • the name of consumer group
      • the name of consumer that is attemping to read
    • COUNT: XREADGROUP同样支持该option和XREAD中的COUNT option等价

在如下示例中,向race:italy stream中添加riders并且并且尝试通过consumer group进行读取

> XADD race:italy * rider Castilla
"1692632639151-0"
> XADD race:italy * rider Royce
"1692632647899-0"
> XADD race:italy * rider Sam-Bodden
"1692632662819-0"
> XADD race:italy * rider Prickett
"1692632670501-0"
> XADD race:italy * rider Norem
"1692632678249-0"
> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"
   2) 1) 1) "1692632639151-0"
         2) 1) "rider"
            2) "Castilla"

XGROUPREAD的返回结构和XREAD类似。

在上述命令中,STREAMS option之后指定了>作为special ID

  • special ID >只在consumer group上下文中有效其代表messages never delivered to other consumers so far

除了指定>作为id外还支持将其指定为0或其他有效ID。在指定id为>外的其他id时XREADGROUP command will just provide us with history of pending messages, in that case we will never see new messages in the group

XREADGROUP命令基于指定id的不同行为拥有如下区别

  • 如果指定的id为> 那么XREADGROUP命令将只会返回new messages never delivered to other consumers so far并且作为副作用会更新consumer group的last ID
  • 如果id为其他有效的numerical IDXREADGROUP command will let us access our history of pending messages,即set of messages that are delivered to this specified consumer and never acknowledged so far

可以指定ID为0来测试该行为,并且结果如下:

  • we'll just see the only pending message
> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
   2) 1) 1) "1692632639151-0"
         2) 1) "rider"
            2) "Castilla"

但是如果通过ack将该message标记为processed那么其将不再作为pending messages history的一部分,故而,即使继续调用XREADGROUP也不再包含该message

> XACK race:italy italy_riders 1692632639151-0
(integer) 1
> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
   2) (empty array)

在上述示例中用于指定id为非>只会返回pending messages在对pending messages进行ack后那么被ack的消息在下次XREADGROUP调用时将不会被返回

上述操作都通过名为Alice的consumer进行的操作如下示例中将通过名为Bob的consumer进行操作

> XREADGROUP GROUP italy_riders Bob COUNT 2 STREAMS race:italy >
1) 1) "race:italy"
   2) 1) 1) "1692632647899-0"
         2) 1) "rider"
            2) "Royce"
      2) 1) "1692632662819-0"
         2) 1) "rider"
            2) "Sam-Bodden"

在上述示例中,group相同但是consumer为Bob而不是Alice。并且redis仅会返回new messages前面示例中的Castilla消息由于已经被传递给Alice并不会被传递给Bob。

通过上述方式Alice, Bob和group中的其他consumer可以从相同的stream中读取不同的消息。

XREADGROUP有如下方面需要关注

  • consumer并不需要显式创建consumers are auto-created the first time they are mentioned
  • 在使用XREADGROUP时可以同时读取多个keys。但是想要令该操作可行必须create a consumer group with the same name in every stream。该方式通常不会被使用,但是在技术上是可行的
  • XREADGROUP为一个write command其只能在master redis instance上被调用。因为该命令调用存在side effect该side effect会对consumer group造成修改

在使用XREADGROUP命令时 只有当传递的id为>才会对consumer group的last ID造成修改。否则只会读取pending messages history并不会实际的修改last ID。故而当consumer实例重启进行XREADGROUP pending messages时并不会触发side effect对其他consumers的消费造成影响。

如下示例为一个Ruby实现的consumer

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

在上述consumer实现中consumer启动时会对history进行消费,即list of pending messages因为consumer可能在之前发生过崩溃故而在重启时可能希望对messages delivered to us without getting acked进行重新读取)。在该场景下,we might process a message multiple times or one time

并且在consumer实现中一旦history被消费完成XREADGROUP命令将会返回empty list此时便可以切换到special ID >从而对new messages进行消费。

Recovering from permanent failures

在上述ruby实现的consumers中多个consumers将会加入相同的consumer group每个consumer都会消费和处理a subset of messages。当consumer从failure中进行恢复时会重新读取pending messages that were delivered just to them

但是,在现实场景中,可能会出现consumer may permanently fail and never recover的场景。

redis consumer group对permanently fail的场景提供了专门的特性,支持对pending messages of a given consumer进行认领,故而such pending messages will change ownership and will be re-assigned to a different consumer

在使用上述特性时consumer必须检查list of pending messages,并claim specific messages using special command。否则对于permanently fail的consumer其pending messages将会被pending forever即pending messages一直被分配给old consumer that fail permanently

XPENDING

通过XPENDING命令可以查看pending entries in the consumer group。该命令是read-only的,调用该命令并不会导致任何消息的ownership被变更。

如下为XPENDING命令调用的示例

> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"
      2) "2"

当调用该command时command的输出将会包含如下内容

  • total number of pending messages in the consumer group
  • lower and higher message ID among the pending messages
  • a list of consumers and the number of pending messages they have

在上述示例中,仅名为Bob的consumer存在2条pending messages

XPENDING命令支持传递更多的参数来获取更多信息full command signature如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

如上述示例所示,可以为XREADGROUP命令指定如下内容:

  • start-idend-id: 可以将其指定为-+
  • count: 用于控制该command返回information的数量
  • consumer-name: 该选项为optional的指定该选项后仅会输出messages pending for a given consumer

XPENDINGS命令指定更多选项的示例如下所示

> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"
   2) "Bob"
   3) (integer) 74642
   4) (integer) 1
2) 1) "1692632662819-0"
   2) "Bob"
   3) (integer) 74642
   4) (integer) 1

在上述示例中,会输出details for each message,具体包含如下信息:

  • ID: message id
  • consumer name: 该pending message对应的consumer
  • idle time in milliseconds代表该message被传递给consumer后经过的毫秒数
  • the number of times that a given message was delivered: 该消息已经被传递的次数

在上述示例中,返回的两条pending messages都是针对Bob的,并且两条消息都被传递了超过一分钟,都只被传递过一次。

checking the message content

在获取到message detail后可以根据message ID来查询message的内容只需将message ID同时作为XRANGE命令的start-idend-id即可。示例如下所示:

> XRANGE race:italy 1692632647899-0 1692632647899-0
1) 1) "1692632647899-0"
   2) 1) "rider"
      2) "Royce"

通过上述介绍的命令,可以制定permanently fail场景下的处理策略:

  • 当Bob存在超过一分钟都没有处理的pending messages时Bob可能无法快速被恢复此时Alice可以针对Bob的pending messages进行claim并且代替Bob对pending messages进行处理
XCLAIM

XCLAIM命令的形式如下:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

XCLAIM命令的作用如下

  • 对于<key><group>所指定的stream和consumer group希望将<ID-X>所指定的message都能更改ownership将messages分配给<consumer>所指定的consumer。
  • 除此之外,我们还提供了<min-idle-time>仅当指定message的idle time比指定的min-idle-time更大时才有效
    • 指定min-idle-time在多个clients尝试同时对message进行claim时会起作用当第一个client对message进行claim后idle-time会重置故而当第二个client尝试对message进行claim时会因不满足min-idle-time的条件而失败

作为claim操作的side effectclaiming a message will reset its idle time and will increment its number of deliveries counter

XCLAIM的使用示例如下所示

> XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
1) 1) "1692632647899-0"
   2) 1) "rider"
      2) "Royce"

该消息会claimed by Alice此时consumer Alice可以对该message进行处理并ack即使original consumer恢复失败pending messages也能够被继续处理。

JUSTID option

在上述示例中在成功对message进行XCLAIM会返回该message本身。可以通过JUSTID option来修改返回的结构在指定该option后返回内容为just IDs of the messages successfully claimed

通过JUSTID option可以降低client和server之间使用的带宽。

claiming并非一定要位于consumer自身的进程中其也可以被实现在独立的进程中

  • 可以使用独立的进程来扫描pending messages list并且将pending messages分配给明显处于活跃状态的consumers
Automatic claiming

在redis 6.2版本中,添加了XAUTOCLAIM命令实现了上一章节中描述的claiming process。XPENDINGXCLAIM命令为各种不同的recovery机制提供了基础。而XAUTOCLAIM命令优化了通用的claiming process,令claiming process由redis来进行管理为大多的recovery提供简单的解决方案。

XAUTOCLAIM命令会识别idle messages并且转移message的ownership给指定consumer。XAUTOCLAIM命令的形式如下所示:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]

故而可以通过如下方式来使用automatic claiming

> XAUTOCLAIM race:italy italy_riders Alice 60000 0-0 COUNT 1
1) "0-0"
2) 1) 1) "1692632662819-0"
      2) 1) "rider"
         2) "Sam-Bodden"

XCLAIM类似,其返回结果为an array of the claimed messsages。但是其还会返回一个stream ID在上述示例中为0-0。返回的stream ID可以用于对pending entries进行迭代。该stream ID为一个cursor可以将其用作下次XAUTOCLAIM调用的start

> XAUTOCLAIM race:italy italy_riders Lora 60000 (1692632662819-0 COUNT 1
1) "1692632662819-0"
2) 1) 1) "1692632647899-0"
      2) 1) "rider"
         2) "Royce"

XAUTOCLAIM返回“0-0”作为cursor时代表其到达了end of the consumer group pending entries list。其并不代表没有新的idle pending messages可以重新从begining of the stream来调用XAUTOCLAIM

Claiming and the delivery counter

通过XPENDING命令输出的counter代表该message的被传递次数该counter在两种场景下会增加

  • when a message is successfully claimed via XCLAIM
  • when a XREADGROUP call is used in order to access the history of pending messages

当存在failure时message可能会被传递多次但是message最终会被处理并ack。但是在处理特定的消息时可能会在处理逻辑中抛出异常在该类场景下consumer会持续的在处理该消息时抛出异常。故而可以通过delivery counter来探知那些不可处理的message。一旦delivery counter到达给定的值时可以将该消息发送给另一个stream并且向系统的管理员发送notification。这就是redis stream实现dead letter的基础。

working with multiple consumer groups

redis stream可以关联多个consumer groups每个entries都会被传递给每个consumer group。在consumer group内每个consumer instance处理一部分entries。

当consumer对message进行处理时其会使用XACK命令来对message进行确认并且从consumer group的Pending Entries ListPEL中移除该entry reference。但是,被ack的message仍然保存在stream中且consumer group A中对message的ack并不会影响consumer group B的PELgroup A在对message进行ack后message仍然位于group B的PEL中直到group B中的consumer对message进行ack此时message才从group B的PEL中被移除。

通常来说如果想要从stream中删除entries必须要等到所有的consumer groups都对entries进行了ack应用需要实现复杂的逻辑。

Enhanced deletion control in Redis 8.2

从redis 8.2开始,一些命令为entries在多个consumer groups间的处理提供了增强控制:

  • XADD支持KEEPREF, DELREF, ACKED模式
  • XTRIM同样支持KEEPREF, DELREF, ACKED选项

如下选项控制consumer group references是如何被处理的

  • KEEPREF(默认) Preserves existing references to entries in all consumer groups' PELs
  • DELREF: Removes all references to entries from consumer groups' PELs, effectively cleaning up all traces of the messages
  • ACKED: Only processes entries that have been acked by all consumer groups

ACKED mode对于coordinating deletion across multiple consumer groups的复杂逻辑十分有用确认entires在所有consumer groups在完成对其的处理后才移除。

Stream Observability

缺乏可观测性的消息系统将十分难以使用,一个透明的消息系统需要令如下信息可观测:

  • who is consuming messages
  • what messages are pending
  • the set of consumer groups active in a given stream

在前面章节中,已经介绍了XPENDINGS命令,通过其可以观测处于处理中状态的消息并且能够获取消息的idle time和number of deliveries。

XINFO命令和sub-commands一起使用可以用于获取stream和consumer group相关的信息。

XINFO命令的使用示例如下:

> XINFO STREAM race:italy
 1) "length"
 2) (integer) 5
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1692632678249-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1692632639151-0"
    2) 1) "rider"
       2) "Castilla"
13) "last-entry"
14) 1) "1692632678249-0"
    2) 1) "rider"
       2) "Norem"

上述示例中,通过XINFO命令获取了stream本身的信息输出展示了stream内部的编码方式并且记录了stream中的第一条消息和最后一条消息。

如果想要获取和stream相关的consumer group消息参照如下示例

> XINFO GROUPS race:italy
1) 1) "name"
   2) "italy_riders"
   3) "consumers"
   4) (integer) 3
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1692632662819-0"

如果想要查看consumer group中注册的consumer实例可以通过XINFO CONSUMERS命令来进行查看:

> XINFO CONSUMERS race:italy italy_riders
1) 1) "name"
   2) "Alice"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 177546
2) 1) "name"
   2) "Bob"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 424686
3) 1) "name"
   2) "Lora"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 72241
Difference with kafka partitions

Redis stream中的consumer group可能在某些方面类似于kafka中基于分区的consumer groups但是仍然存在较大差别。

在redis strema中partition这一概念是逻辑的实际上stream所有的messages都存在相同的key中。故而redis stream中consumer instance并不从实际的partition中读取信息也不涉及partition在consumer间的分配。

例如如果consumer C3在某个时刻fail permanentlyredis在所有新消息到达时都会传递给C1和C2将好像redis stream只存在2个逻辑分区。

类似的如果某个consumer处理消息比其他consumers都快那么该consumer将在单位时间内按比例收到更多的消息。Redis会追踪所有尚未被ack的消息并且记录哪条消息被哪个consumer接收the ID of the first message never delivered to any consumer也会被redis记录。

在redis Stream中

  • 如果redis stream的数量和consumer的数量都为1那么消息将是按照顺序被处理的
  • 如果stream的数量为1consumer的数量为n那么可以将负载均衡给n个consumers但是在这种情况下消息的消费可能是无序的
  • 当使用n个stream和n个consumers时一个consumer只用于处理所有streams中的一部分可以将1 stream->1 consumer拓展到n stream->n consuimer

Capped Streams

在许多应用中并不想在stream中永久存储data。有时需要限制stream中entries的最大数量。

XADD with MAXLEN

redis stream对上述特性提供了支持在使用XADD命令时,支持指定MAXLEN选项,示例如下所示:

> XADD race:italy MAXLEN 2 * rider Jones
"1692633189161-0"
> XADD race:italy MAXLEN 2 * rider Wood
"1692633198206-0"
> XADD race:italy MAXLEN 2 * rider Henshaw
"1692633208557-0"
> XLEN race:italy
(integer) 2
> XRANGE race:italy - +
1) 1) "1692633198206-0"
   2) 1) "rider"
      2) "Wood"
2) 1) "1692633208557-0"
   2) 1) "rider"
      2) "Henshaw"

当使用MAXLEN选项时如果达到了指定长度那么old entries将自动被淘汰从而确保stream处于恒定的大小。

trimming with MAXLEN的开销在部分场景下可能会变得很大:为了内存效率stream由radix-tree结构表示。radix-tree由macro nodes组成单个macro node中包含多个elements对单个macro node的修改并不高效。

故而,支持按照如下形式来使用XADD命令,并支持MAXLEN选项:

XADD race:italy MAXLEN ~ 1000 * ... entry fields here ...

在上述示例中,在MAXLENcount之间指定了~,代表并不需要将长度上限严格限制为1000。该长度可以是1000,可以是1010只保证该长度比1000大。在指定了~只有当允许移除整个节点时trimming操作才会被实际执行。指定~能够让MAXLEN操作更加高效。

XTRIM

同样的redis还支持XTRIM命令,其执行和MAXLEN类似:

> XTRIM race:italy MAXLEN 10
(integer) 0

> XTRIM mystream MAXLEN ~ 10
(integer) 0

除此之外,XTRIM命令还支持不同的trimming strategies

  • MINID:该trimming strategy支持对entries with IDs lower than the on specified进行淘汰
Trimming with consumer group Awareness

从redis 8.2开始,XADD with trimming optionsXTRIM命令都支持enhanced control over how trimming interacts with consumer groups,其支持KEEPREF, DELREF, ACKED三个选项:

XADD mystream KEEPREF MAXLEN 1000 * field value
XTRIM mystream ACKED MAXLEN 1000
  • KEEPREF(default): Trim entries according to the strategy but preserves references references in consumer groups' PELs
  • DELREF: Trims entries and removes all references from consumer groups' PELs
  • ACKED: Only Trims entries that have been acknowledged by all consumer groups

ACKED模式在多个consumer groups之间维护数据完整性十分有用其能够保证entries只有当被所有的consumer groups都处理完成之后才会被移除

Special IDs in streams API

在redis API中存在部分Special IDs

  • -, +: 这两个特殊ID分别代表the smallest ID possiblethe greatest ID possible
  • $: 该ID代表stream中已经存在的最大ID。在使用XREADGROUP命令时如果只希望读取new entries可以使用该special ID。同样的可以将consumer group的last delivered ID设置为$,从而just deliver new entries to consumers in the group
  • >: 该ID代表last delivered ID of a consumer group该ID的适用范围仅位于同一consumer group并且该ID仅在XREADGROUP命令中使用,代表we want only entries that were never delivered to other consumers so far
  • *: 该ID仅在XADD命令中被使用代表为new entry自动选中ID

Persistence, replication and message safety

stream和redis中的其他数据结构一样is asynchronously replicated to replicas,并且持久化到RDBAOF文件中。并且,consumer group的full state也会被传播到AOF, RDB, replcias中

故而,如果message is pending in the master, also the replica will have the same information。并且当重启后AOF也会恢复consumer group的状态。

redis streams和consumer groups将会被持久化并且replicated using the Redis default replication

  • 如果消息的持久化十分重要,那么AOF必须使用strong fsync policy
  • redis asynchronously replication并不能保证xadd/consumer group state changes能被同步到replica
    • 当发生failover故障转移指哨兵或集群模式下主节点发生故障replica节点被升级为主节点可能主节点的变化尚未被同步到replica此时failover将会产生data missing
  • WAIT命令可以强制使changes被传播给replicas,该命令仅会降低数据丢失的可能,但是并无法完全避免数据的丢失。
    • 在发生故障转移时redis仅会执行best effort check,从而转移到replica which is the most updated在某些失败场景下转移到的replica仍然可能缺失部分数据

removing single items from a stream

stream同样支持removing items from the middle of a stream尽管stream为append-only data structure, 但是该功能仍然在许多场景下十分有用。

XDEL

XDEL命令的使用示例如下所示:

> XRANGE race:italy - + COUNT 2
1) 1) "1692633198206-0"
   2) 1) "rider"
      2) "Wood"
2) 1) "1692633208557-0"
   2) 1) "rider"
      2) "Henshaw"
> XDEL race:italy 1692633208557-0
(integer) 1
> XRANGE race:italy - + COUNT 2
1) 1) "1692633198206-0"
   2) 1) "rider"
      2) "Wood"
Enhanced deletion with XDELEX

从redis 8.2开始,XDELEX命令对entry删除提供了增强的控制尤其是针对consumer groups。和其他enhanced commands一样其支持KEEPREF, DELREF, ACKED三种模式:

XDELEX mystream ACKED IDS 2 1692633198206-0 1692633208557-0
  • 在使用ACKED模式时仅在指定entries已经被所有consumer groups都acknowledged之后才能够对entries进行删除
  • 在使用DELREF模式时会移除指定entries并清除所有consumer groups' PEL中的指定entries
  • 在使用KEEPREF模式时会移除指定entries并对consumer groups' PEL中的entries引用进行保留

对于通过KEEPREF模式移除entries的场景如果entries从stream中被移除但是PEL中仍然存在entries references此时XCLAIM操作并无法对该类entries进行认领

zero length streams

redis stream数据类型和redis的其他数据结构存在差别

  • 当其他数据结构中不再包含任何元素时,作为calling commands that remove elements的副作用该key本身也会被自动移除
    • 例如,当通过ZREM命令移除zset中的最后一个元素时该zset本身也会被彻底移除
  • 而redis stream类型的数据则是允许stay at zero elements,即使通过MAXLEN option或是XDEL调用令stream类型数据的entries数量为0该stream也不会被自动移除

目前即使当stream没有对应的consumer groups当某command导致stream中的entries数量为0时stream也不会被删除

Total latency of consuming a message

对于非阻塞命令(例如XRANGE, XREAD, XREADGROUP without BLOCK optionredis是同步对其进行处理的其处理同其他的一般redis命令相同对于这类commands并不需要讨论其latency。

但是,对于delay of processing a message,其拥有如下定义:

  • in the context of blocking consumers in a consumer group, from the moment the message is produced via XADD to the moment the message is obtained by the consumer because XREADGROUP returned with the message

The model redis uses in order to route stream messages

redis通过如下方式来管理blocking operation waiting for data

  • 对于blocked client其会在hash table中被引用。hash table中的key其对应的blocking consumers至少存在一个而hash table中的value则是a list of consumers that are waiting for such key
    • 通过上述管理方式如果一个key接收到消息则可以解析所有等待该data的clients
  • 当发生写操作,例如XADD时,其会调用signalKeyAsReady方法。该方法会将key添加到a list of keys that need to be processed该list中的key对于blocked consumers存在新的数据被称为ready keys
    • ready keys后续会被处理故而在相同的event loop cycle中该key可能还会接收到其他的写操作(XADD)
  • 在返回event loop之前ready keys最终被处理对于每一个keythe list of clients waiting for data都会被扫描如果适用那么client将会接收到new data。

如上述描述所示在返回到event loop之前调用XADD的clientclients blocked to consume messages其reply都会出现在ouput buffers中。故而caller of XADD收到reply的时间将会和blocked clients接收到新消息的时间几乎相同

上述model是push-based的将消息添加到consumer buffers的操作将直接由XADD调用来执行。

Redis Geospatial

redis geospatial支持对坐标进行存储和查询该数据结构主要用于find nearby points within a given radius or bounding box

Bike Rental stations Example

假设当前需要支持基于当前位置查找最近的bike rental station功能如下是redis geospatial的示例。

> GEOADD bikes:rentable -122.27652 37.805186 station:1
(integer) 1
> GEOADD bikes:rentable -122.2674626 37.8062344 station:2
(integer) 1
> GEOADD bikes:rentable -122.2469854 37.8104049 station:3
(integer) 1

在上述示例中向geospatial index中添加了多个自行车租赁点的location。

> GEOSEARCH bikes:rentable FROMLONLAT -122.2612767 37.7936847 BYRADIUS 5 km WITHDIST
1) 1) "station:1"
   2) "1.8523"
2) 1) "station:2"
   2) "1.4979"
3) 1) "station:3"
   2) "2.2441"

而在上述命令中,则是find all locations within a 5 kilometer radius of a given location并对每个location都返回了对应的距离

Redis bitmaps

在redis中bitmaps并不是实际的数据结构而是针对String类型的一系列bit operationsbitmaps的对外表现为和bit vector类似。

对于bitmaps其最大大小为512MB,其最多可以包含2^32个不同的bits。

redis支持对一个或多个strings执行位运算

Example

假设存在1000个骑行运动员正在乡间竞速并且他们的自行车上装有编号为0~999的传感器并需要检查在当前小时传感器是否ping过服务器以此来确认运动员状态。

该场景可以适用bitmapbitmap的key代表current hour

  • Rider 123在2024-01-01 00:00:00时刻ping过server此时可以通过setbit命令将123代表的bit置位
  • 可以通过GETBIT命令来检查是否rdier 456在一小时内ping过server
> SETBIT pings:2024-01-01-00:00 123 1
(integer) 0
> GETBIT pings:2024-01-01-00:00 123
1
> GETBIT pings:2024-01-01-00:00 456
0

Bit Operations

Bit Operations分为两种类型

  • single bit operations: 例如将某个bit改为0或1或获取某个bit的值
  • operations on groups of bits: 例如counting the number of set bits in a given range of bits

bitmap的一个巨大优势是其能够在存储信息的同时极大的节省存储空间。对于存储single bit information其能够在512MB的空间限制下存储4 billion位的值。

single bit operations
  • SETBIT: SETBIT命令的第一个参数为bit number而第二个参数为0或1。如果addressed bit is outside the current string length那么该SETBIT操作将自动拓展string的长度
  • GETBIT: GETBIT命令会返回指定位置bit的值。out of range bits(addressing a bit that is outside then length of the string stored into the target key)其值将会被视为0
operations on group of bits

redis支持对group of bits进行如下操作:

  • BITOP: performs bit-wise operations between different strings。支持的operators有AND, OR, XOR, NOT, DIFF, DIFF1, ANDOR, ONE
  • BITCOUNT: 该命令用于统计置为1的bit个数
  • BITPOS: 该命令用于查找the first bit having specified value of 0 or 1

bitcount和bitpos都支持对bitmap的指定范围来进行操作该范围可以通过bit或byte来指定从0开始,默认情况下,范围指是基于BYTE的,如果要基于BIT指定范围,需要手动指定BIT option。

bitcount的使用示例如下

> BITCOUNT pings:2024-01-01-00:00
(integer) 1
longest streak of daily visits

可以通过bitmap来记录用户的最长连续访问时间。可以用0来表示the day you made your website public,并且set the bit every time the user visits the web sitebit index则是当前时间基于day 0已经经过的天数。

这样针对每个user都通过一个small string来存储了用户的访问记录。并且可以简单的计算用户的最长连续访问天数。

bitmaps可以被轻松的拆分为多个key通常来讲也应当避免操作过大的key。将大的bitmap拆分为多个key时通用策略是限制每个key存储M个bits并且通过{bit-index}/M来决定当前bit位于哪个key{bit-index} MOD M则用于当前bit位于key的哪个位置。

Probabilistic

Probabilistic data structure向使用者提供了统计数据的近似值,例如计数、频率、排名而并非精确值。使用Probabilistic data structure可以提高计算效率。

HyperLogLog

HyperLogLog数据结构用于估计set中的基数HyperLogLog并不能保证结果的精确性但是能够提高空间的使用效率。

Redis的HyperLogLog实现最多占用12KB空间并提供了0.81%的标准误差。

通常统计items中的唯一项个数需要花费的空间和items的个数成正比但是有一系列算法可以牺牲精确性来换取内存使用大小

  • 其能够返回唯一项个数的大致估计值并且估计值存在标准误差在redis HyperLogLog的实现中标准误差小于1%
  • 在使用该算法时并不需要使用和items个数成正比的内存空间而是使用常量大小的内存在最坏情况下使用空间大小为12KB当HyperLogLog中包含元素较少时其使用的空间也远小于12KB

在redis中HyperLogLog是编码成Redis strings的。故而对于HyperLogLog类型,可以通过GET来进行序列化,并通过SET来进行反序列化。

在使用HyperLogLog时其API如下

  • PFADD: 可以通过该命令将new item添加到HyperLogLog
  • PFCOUNT: 当想要获取唯一项个数的近似值时,可以调用PFCOUNT命令
  • PFMERGE: 如果想要对两个不同的HyperlogLog进行合并可以使用PFMERGE命令

HyperLogLog的使用示例如下所示

> PFADD bikes Hyperion Deimos Phoebe Quaoar
(integer) 1
> PFCOUNT bikes
(integer) 4
> PFADD commuter_bikes Salacia Mimas Quaoar
(integer) 1
> PFMERGE all_bikes bikes commuter_bikes
OK
> PFCOUNT all_bikes
(integer) 6

HyperLogLog数据结构通常有如下用例场景统计页面或网站的每天用户访问量

Bloom Filter

Bloom Filter也是一种Probabilistic data structure用于检测在set中item是否存在其使用少量的的固定大小存储空间。

Bloom Filter并不会将items都存储在set中相对的其仅存储item在hash后的结果故而在部分程度上会牺牲精确性。通过对精确性的牺牲Bloom Filter拥有十分高的内存效率并且其执行效率也很高

Bloom Filter仅能够保证某元素在set中不存在但是关于元素的存在其仅能给出一个估计值

  • 当Bloom Filter的返回结果表示某item不存在于set中时返回结果是精确的该item一定在set中不存在
  • 但是如果若Bloom Filter返回结果表示某item存在时每N个存在的返回结果就有一个是错误的

Bloom Filter通常用于negative answer will prevent more costly operations的场景,例如用户名称是否被占用,信用卡是否被丢失,用户是否看过广告等

Example

假设自行车生产商已经生产了百万种不同型号的自行车,目前需要在为新model指定名称时避免指定旧model已经使用过的名称

在该种用例场景下可以使用bloom filter来检测重复。在如下实例中创建的bloom filter可容纳100w entries并且错误率仅为0.1%。

> BF.RESERVE bikes:models 0.001 1000000
OK
> BF.ADD bikes:models "Smoky Mountain Striker"
(integer) 1
> BF.EXISTS bikes:models "Smoky Mountain Striker"
(integer) 1
> BF.MADD bikes:models "Rocky Mountain Racer" "Cloudy City Cruiser" "Windy City Wippet"
1) (integer) 1
2) (integer) 1
3) (integer) 1
> BF.MEXISTS bikes:models "Rocky Mountain Racer" "Cloudy City Cruiser" "Windy City Wippet"
1) (integer) 1
2) (integer) 1
3) (integer) 1

在上述示例中即使bloom filter中仅存在少量元素返回的存在结果也存在误报的可能,即元素其实根本不存在。

Reserving Bloom filters

在使用bloom filters时大部分sizing工作都会自动完成

BF.RESERVE {key} {error_rate} {capacity} [EXPANSION expansion] [NONSCALING]
  • error_rate: 该参数代表false positive rate该rate为0和1之间的decimal例如当希望的false positive rate为0.1%(1 in 1000)时需要将error_rate设置为0.001

  • expected capacity(capacity): 该参数代表bloom filter中期望包含的元素数量。需要确保该值的准确性:

    • 如果该值设置过大,其会浪费内存空间
    • 如果该值设置过小那么filter被填充满后a new one will have to be stacked on top of it (sub-filter stacking)
      • when a filter consists of multiple sub-filters stacked on top of each other其新增操作的延迟仍然会保持不变;但是存在性检查的延迟会增加
        • 存在性检查的原理如下首先会对top filter检查元素的存在性如果返回为false那么会继续检查以一个sub-filter该机制会导致存在性检查的延迟增加
  • scaling(EXPANSION): 在向bloom filter中添加数据时并不会因为数据结构的填充而失败。在向filter中添加元素时error rate也会随之增加。为了保证错误率和bloom filter创建时指定的error rate相近bloom filter需要自动扩容当capacity限制达到时需要自动创建额外的sub-filter

    • new sub filter的size大小等于last-sub-filter-size * EXPANSION
      • 如果filter中存储的items数量未知可以将EXPANSION设置为2 or more从而减少sub-filters的数量。
      • 否则,可以将EXPANSION设置为1,从而减少内存的消耗
    • 默认的EXPANSION为2

    在向filter中添加new sub-filter时相比于前一个filter会为new sub-filter分配更多的hash function

  • NONSCALING: 如果想要禁用scale可以指定NONSCALING。如果达到了initially assigned capacityerror rate将会开始增加。

total size of bloom filter

bloom filter实际使用的内存大小是根据a function of choosen error rate来决定的:

  • hash functions的最佳数量为ceil(-ln(error_rate) / ln(2))
    • 即要求的error_rate越小hash function的数量应该越多
  • 在给定期望error_rate和最有hash functions数量的前提下对每个items需要的bits个数为 -ln(error_rate) / ln(2)^2
  • 故而bloom filter需要的bits数量为capacity * -ln(error_rate) / ln(2)^2
    • 1% error rate的前提下需要7个hash functions每个item需要9.585bits
    • 0.1% error rate的前提下需要10个hash functions每个item需要14.378bits
    • 0.01% error rate的前提下需要14个hash fucntions每个item需要19.170bits

而相比于bloom filter使用redis set来membership testing时需要耗费的内存大小为

memory_with_sets = capacity*(192b + value)

对于ip地址每个item大概需要在40 bytes320bits而在使用error rate为0.01%的bloom filter时每个item仅需19.170bits

Performance

向bloom filter执行插入操作的时间复杂度为O(K),其中K为hash functions的数量。

对bloom filter执行存在性检查的时间复杂度为O(K)O(K*n)针对stacked filters场景n为stacked filters的数量

Cuckoo filter

Cuckoo filter和Bloom filter类似同样用于检测item在set中是否存在其也提供了a very fast and space efficient way。除此之外Cuckoo filter允许对item进行删除且在部分场景下相比Bloom filter而言Cuckoo filter的性能要更好。

Bloom filter和Cuckoo filter的实现逻辑如下

  • Bloom filter是一个bit arrayhash function决定的位置bit将会被置为1
  • 而Cuckoo filter则是一个bucket arraystoring the fingerprints of the values in one of the buckets at positions decided by the two hash function
    • 通过两个hash function能够计算出两个可能的bucket position而item的fingerprint则存储在两个bucket的其中一个
    • 对于item x的membership query会针对x的fingerprint查找possible bucket并且在查询到对应的fingerprint时返回true
    • 对于Cuckoo filterfingerprint size会决定其false positive rate
User Cases

在应用中Cuckoo filter拥有如下使用示例

  • Targeted ad campaigns: 在该场景下Cuckoo filter主要用于处理用户是否参与了指定活动。为每个活动都使用一个Cuckoo filter并且向Cuckoo filter中添加目标用户的id。每次用户访问时都进行如下校验:
    • 如果用户id包含在cuckoo filter中则代表用户没有参与过活动向用户展示广告
    • 如果用户点击广告并进行参与从cuckoo filter中移除user id
    • 如果用户id不包含在cuckoo filter中那么代表该用户已经参加过该活动尝试下一个ad/Cuckoo filter
  • Discount code: 该场景下Cuckoo filter主要用于处理折扣码/优惠券是否已经被使用。可以向Cuckoo Filter中注入所有的折扣码在每次尝试使用折扣码时都通过Cuckoo Filter校验
    • 如果cuckoo filter表示该折扣码不存在则校验失败折扣码已经被使用
    • 如果cuckoo filter表示该折扣码存在则继续通过maindatabase来进行校验适配false positive的场景如果maindatabase校验通过则将该折扣码从cuckoo filter中也移除
Example
> CF.RESERVE bikes:models 1000
OK
> CF.ADD bikes:models "Smoky Mountain Striker"
(integer) 1
> CF.EXISTS bikes:models "Smoky Mountain Striker"
(integer) 1
> CF.EXISTS bikes:models "Terrible Bike Name"
(integer) 0
> CF.DEL bikes:models "Smoky Mountain Striker"
(integer) 1

上述是Cuckoo Filter的使用示例其通过CF.RESERVE创建了初始容量为1000的cuckoo filter。

当key不存在时直接调用CF.ADD命令也能自动创建一个新的Cuckoo filter但是通过CF.RESERVE命令创建能够按需指定容量。

Cuckoo vs Bloom Filter

在插入items时Bloom Filter的性能和可拓展性通常要更好。但是Cuckoo filter的check operation执行更快并且允许删除操作

Sizing Cuckoo filters

在Cuckoo filters中一个bucket可以包含多个entries每个entry都可以存储一个fingerprint。如果cuckoo filter中所有的entries都存储了fingerprint那么将没有empty slot来存储新的元素此时cuckoo filter将被看作full

在使用cuckoo filter时应该保留一定比例的空闲空间。

当在创建一个新cuckoo filter时需要指定其capacity和bucket size

CF.RESERVE {key} {capacity} [BUCKETSIZE bucketSize] [MAXITERATIONS maxIterations]
[EXPANSION expansion]
  • capacity:
    • capacity可以通过如下公式来计算n * f / a
      • n代表numbe of items
      • f代表fingerprint length in bits如下为8
      • a代表`fill rate or load factor (0<=a<=1)
    • 基于Cuckoo filter的工作机制filter在capacity到达上限之前就会声明自身为full故而fill rate永远不会到达100%
  • bucksize:
    • bucksize代表每个buckets中可以存储的元素个数bucket size越大fill rate越高但是error rate也会越高并且会略微影响性能
    • error rate的计算公式为error_rate = (buckets * hash_functions)/2^fingerprint_size = (buckets*2)/256
    • 当bucket size为1时fill rate为55%false positive rate大概为2/256,即约等于0.78%这也是可以实现的最小false positive rate
    • buckets变大时error rate也会线性的增加filter的fill rate也会增加。当bucket size为3时false positive rate约为2.34%并且fill rate约为80%。当bucket size为4时false positive rate约为3.12%fill rate约为95%
  • EXPANSION:
    • when filter self-declare itself full其会自动拓展生成额外的sub-filter该操作会降低性能并增加error rate。新创建sub-filter的容量为prev_sub_filter_size * EXPANSION
    • 该默认值为1
  • MAXITERATIONS:
    • MAXITERATIONS代表the number of attempts to find a slot for incoming fingerprint。一旦filter为full后如果MAXITERATIONS越大,插入越慢。
    • 该默认值为20

t-digest

t-digest是一种probabilistic data structure,其允许在不对set中所有数据进行实际存储与排序的情况下获取数据的percentile point。故而,其可以针对如下场景:What's the average latency for 99% of my database operations

  • 在不使用t-digest时,如果要获取上述指标,需要对每位用户都存储平均延迟,并且对平均延迟进行排序,排除最后的百分之一数据,并计算剩余数据的平均值。该操作过程十分耗时
  • 而通过t-digest可以解决该方面的问题

t-digest可以用于其他百分位相关的问题例如trimmed means:

  • A trimmed mean is the mean value from the sketch, excluding observation values outside the low and high cutoff percentiles.例如,0.1 trimmed means代表排除最低的10%和最高的10%之后计算出的平均值
Use Cases
  • Hardware/Software monitoring
    • 当测量online server response latency可以需要观测如下指标
      • What are the 50th, 90th, and 99th percentiles of the measured latencies
      • Which fraction of the measured latencies are less than 25 milliseconds
      • What is the mean latency, ignoring outliers? or What is the mean latency between the 10th and the 90th percentile?
  • Online Gaming:
    • 当online gaming platform涉及数百万用户时可能需要观测如下指标
      • Your score is better than x percent of the game sessions played.
      • There were about y game sessions where people scored larger than you.
      • To have a better score than 90% of the games played, your score should be z
  • Network traffic:
    • 在对网络传输中的ip packets进行监测时如需探测ddos攻击可能需要观测如下指标
      • 过去1s的packets数量是否超过了过去所有packets数量的99%
      • 在正常网络环境下期望看到多少packets?
Examples

在如下示例中,将会创建一个compression为100的t-digest并且向其中添加item。在t-digest数据结构中compression参数用于在内存消耗和精确度之间做权衡。compression的默认值为100当compression值指定的更大时t-digest的精确度会更高。

> TDIGEST.CREATE bikes:sales COMPRESSION 100
OK
> TDIGEST.ADD bikes:sales 21
OK
> TDIGEST.ADD bikes:sales 150 95 75 34
OK
Estimating fractions or ranks by values

t-digest中一个有用的特性为CDF(definition of rank)其能给出小于或等于给定值的fraction。该特性能够解决What's the percentage of observations with a value lower or equal to X

更精确的说,TDIGEST.CDF will return the estimated fraction of observations in the sketch that are smaller than X plus half the number of observations that are equal to X.

也可以使用TDIGEST.RANK命令相比于返回fraction其会返回value的rank。

> TDIGEST.CREATE racer_ages
OK
> TDIGEST.ADD racer_ages 45.88 44.2 58.03 19.76 39.84 69.28 50.97 25.41 19.27 85.71 42.63
OK
> TDIGEST.CDF racer_ages 50
1) "0.63636363636363635"
> TDIGEST.RANK racer_ages 50
1) (integer) 7
> TDIGEST.RANK racer_ages 50 40
1) (integer) 7
2) (integer) 4

同样的TDIGEST也支持TDIGEST.REVRANK命令,其返回的结果是the number of (observations larger than a given value + half the observations equal to the given value)

Estimating values by fractions or ranks

TDIGEST.QUANTILE key fraction ...命令可以根据fraction来获取an estimation of the value (floating point) that is smaller than the given fraction of observations.

TDIGEST.BYRANK key rank ...命令可以根据rank来获取an estimation of the value (floating point) with that rank

使用示例如下所示:

> TDIGEST.QUANTILE racer_ages .5
1) "44.200000000000003"
> TDIGEST.BYRANK racer_ages 4
1) "42.630000000000003"

TDIGEST.BYREVRANK命令可以根据reverse rank来获取value。

trimmed mean

如果要计算TDIGEST结构的trimmed mean,可以使用TDIGEST.TRIMMED_MEAN {key} low_fraction high_fraction来获取。

TDIGEST.MERGE

可以通过TDIGEST.MERGE命令来对sketches进行merge操作。

假设为3台servers测量了latency此时需要合并多台servers的测量结果并且获取合并后结果中90%、95%、99%的latency此时可以使用TDIGEST.MERGE命令。

TDIGEST.MERGE命令的格式如下

TDIGEST.MERGE destKey numKeys sourceKey... [COMPRESSION compression] [OVERRIDE]

在使用上述命令时:

  • 如果destKey之前不存在将会自动创建destKey并且将合并结果设置到key的值
  • 如果destKey之前已经存在 那么destKey的old value将会和values of source keys一起合并。如果需要覆盖原destkey的内容,需要指定OVERRIDE选项
Retrieving sketch information

TDIGEST.MINTDIGEST.MAX命令可以用于获取sketch中的最小值和最大值使用示例如下

> TDIGEST.MIN racer_ages
"19.27"
> TDIGEST.MAX racer_ages
"85.709999999999994"

如果TDIGEST为空那么TDIGEST.MIN和TDIGEST.MAX命令都会返回nan

Resetting a sketch

通过TDIGEST.RESET命令能够对sketch进行重置示例如下

> TDIGEST.RESET racer_ages
OK

Redis Programmability

redis提供了programming interface允许在server执行自定义的脚本。在redis 7及以上可以使用Redis Function来管理和运行脚本而在redis 6.2或更低的版本,则使用lua scripting with EVAL command

Introduce

Backgroud

在redis中Programmability代表可以在server端执行任意用户定义的逻辑,我们将该逻辑片段称之为scripts。通过脚本能够在server端即数据被存储的地方处理数据。在server端处理用户逻辑能够降低网络延迟并且能提高整体性能。

在redis通过一个embedded, sandboxed scripting engine来执行用户脚本。目前redis仅支持单一的脚本引擎lua 5.1 interpreter

running scripts

redis提供了两种方式来运行脚本。从2.6.0版本开始redis支持通过EVAL命令来运行server-side scripts。在使用用户脚本时脚本逻辑中包含应用的业务逻辑。脚本的source code必须存储在应用端redis server仅会临时存储source code。当应用的逻辑发生变动时script将变得难以维护。

在redis 7.0版本中引入了Redis Function。function能够将脚本编写与应用逻辑解耦并且支持脚本的独立开发、测试和部署。如果要使用redis function需要先对其进行导入redis function导入后对所有的connections都是可用的。

当redis执行script或function时能够保证执行过程是原子的。在脚本执行的整个过程中redis server端的所有其他活动都会被阻塞。

在脚本执行中,应当避免执行slow script。如果脚本执行较慢在执行脚本过程中所有其他clients都会被阻塞执行不了任何命令。

Read-only scripts

read-only scipts代表在脚本执行时并不会对redis中的任何key造成修改。可以通过两种方式来执行read-only script:

  • 在脚本中添加no-write flag
  • 通过read-only script command来执行脚本
    • 常用read-only command如下
      • EVAL_RO
      • EVALSHA_RO
      • FCALL_RO

read-only script拥有如下特性:

  • read-only scripts可以在replicas上被执行
  • 其总是可以被SCRIPT_KILL命令被killed
  • 即使当redis超过内存限制其也不会fail with OOM error
  • 当发生write pause时,其也不会被阻塞
  • 在read-only script中不允许执行任何能对data set造成修改的命令
  • PUBLISH, SPUBLISH, PFCOUNT目前仍然被视为write command

Sandboxed script context

redis将执行用户脚本的engine放在了sandbox中。sandbox主要用于防止accidental misuse并且降低server环境的潜在威胁。

script中永远不应该访问redis server的底层宿主机系统例如file system、network或执行不受支持的系统调用。

script仅应该操作redis中存储的数据脚本执行时传入的作为参数的数据

Maximum execution time

脚本的执行时长受限于最大执行时长(默认情况下为5s)。这个默认超时很大默认情况下脚本的运行时长应当小于1ms。该超时用于处理脚本执行时非预期的无限循环。

可以通过修改redis.conf文件使用config set命令来修改该脚本执行时长上限。影响脚本最长执行时间上限的配置属性为busy-reply-threshold

当脚本达到该超时上限时其并不会自动被redis终止。中断脚本的执行将可能导致half-write的问题。

故而,将脚本执行时长超过限制时,将会发生如下事件:

  • redis会在日志中添加脚本执行时间过长
  • redis开始从其他clients接收commands但是会对所有发送normal commands的clients返回BUSY error。在该场景下,唯一被允许的命令为SCRIPT_KILL, FUNCTION_KILL, SHUTDOWN NOSAVE
    • 对于read-only script,可以使用SCRIPT_KILLFUNCTION_KILL命令,因为该脚本未对数据造成任何修改
    • 如果script在执行过程中哪怕执行了一个write operation那么只能使用SHUTDOWN NOSAVE命令其会停止server并且不会将当前的data set保存到磁盘中

Scripting with Lua

redis允许用户上传lua脚本到server并在server端执行故而在脚本执行时对数据的读取和写入操作十分高效并不需要网络开销。

redis保证脚本的执行是原子的。在执行脚本时server的其他活动都会被阻塞。

lua允许在脚本中集成部分属于应用的逻辑例如跨多个keys的条件更新等。

Getting Started

可以通过EVAL命令在redis中执行script示例如下

> EVAL "return 'Hello, scripting!'" 0
"Hello, scripting!"

在上述示例中,EVAL命令接收两个参数:

  • 第一个参数为lua script的source code内容该脚本内容将在redis engine's context下运行
  • 第二个参数为the number of arguments that follow the script's body

Script Parameterization

虽然不建议这样做但是仍可以在应用程序中动态的生成script source code示例如下

redis> EVAL "return 'Hello'" 0
"Hello"
redis> EVAL "return 'Scripting!'" 0
"Scripting!"

虽然在应用程序中动态生成script source code的操作并未被redis禁止但是其是不推荐的使用方式可能会在script cache方面造成问题。在该场景下不应该生成多个拥有微小差异的不同脚本而是应该将细微的差异提取为script param。

如下实例则展示了参数化后的标准实现方式:

redis> EVAL "return ARGV[1]" 0 Hello
"Hello"
redis> EVAL "return ARGV[1]" 0 Parameterization!
"Parameterization!"

对于lua script执行的参数其分为如下两种

  • input arguments that are names of keys
  • input arguments that are not names of keys

为了确保lua script的正确执行不管是在standalone还是在clustered的部署模式下script访问的所有key names都必须显式的作为input key arguments被指定

script只应该访问那些在input key argments中被显式指定的key name。在编写script时应该遵守该要求。在redis集群环境下应通过redis可能操作的键从而对脚本进行路由。

如果用户编写的脚本使用了未在input key arguments中指定的keyredis script engine虽然不会在执行时对此进行校验但是可能导致script路由到错误的节点错误的节点执行脚本时仍可能报错。

在调用redis脚本时作为非key name被传递给redis脚本的参数即是regular input argument

在上述示例中,HelloParameterization!都是作为常规参数被传递的上述示例中脚本并未用到任何redis key故而第二个参数被指定为0,代表input key arguments的个数为0。

在redis script context中可以通过KEYSARGV这两个global runtime variables来访问传递的参数。

  • KEYS针对的是input key arguments
  • ARGV针对regular argument

如下就示例展示了input arguments在KEYSARGV之间分配的情况:

redis> EVAL "return { KEYS[1], KEYS[2], ARGV[1], ARGV[2], ARGV[3] }" 2 key1 key2 arg1 arg2 arg3
1) "key1"
2) "key2"
3) "arg1"
4) "arg2"
5) "arg3"

Interact with redis from a script

在lua script中可以通过redis.callredis.pcall来调用redis commands。

callpcall大致是相同的都会执行redis command但是在针对runtime error的处理上这两个function之间有所不同

  • redis.call:对于redis.call方法抛出的error会被直接返回给客户端
  • redis.pcall 对于redis.pcall方法抛出的异常其会被返回给script's execution contextfor possible handling

lua script中和redis的交互示例如下所示

> EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 foo bar
OK

Script Cache

当调用EVAL在请求中也会包含source code内容如果对相同的script content进行重复调用不仅会浪费网络带宽redis也会产生额外开销。为了节省网络带宽和计算资源redis对script提供了缓存机制。

在redis中每个通过EVAL执行的script都会被存储在由server维护的专用缓存中。缓存的内容按照script的SHA1 digest sum来进行组织SHA1 digest sum在cache中能够唯一标识脚本。

就像上述中提到的在应用中动态生成脚本内容是不推荐的。如果不对脚本进行参数化那么动态生成脚本的source code并多次对脚本进行调用将会占用redis额外的内存资源来对脚本内容进行缓存。脚本source code应该尽量通用。

脚本可以通过SCRIPT LOAD命令加载到cache中server并不会实际对脚本进行执行而是只会对其编译并加载到cache中。一旦完成加载后可以通过脚本的SHA1 digest sum来执行脚本。

SCRIPT LOAD的使用示例如下所示:

redis> SCRIPT LOAD "return 'Immabe a cached script'"
"c664a3bf70bd1d45c4284ffebb65a6f2299bfc9f"
redis> EVALSHA c664a3bf70bd1d45c4284ffebb65a6f2299bfc9f 0
"Immabe a cached script"

在实际使用eval时,org.springframework.data.redis.core.script.DefaultScriptExecutor#execute方法中已经实现了evalSha first, and eval it if script not exists的逻辑故而无需手动eval。

Cache volatility

redis script cache是易失的。script cache并不会被看作是database的一部分且其不会被持久化。缓存的脚本随时都由可能丢失。

在如下场景下script cache会被清空

  • when server restarts
  • during fail-over
  • 显式调用SCRIPT FLUSH

在应用使用scripts时应该一直使用EVALSHA来执行脚本当sha1 digest不存在时redis cache会返回error例如

redis> EVALSHA ffffffffffffffffffffffffffffffffffffffff 0
(error) NOSCRIPT No matching script

在cache中不存在脚本的场景下应用需要通过SCRIPT LOAD先导入脚本内容,然后调用EVALSHA命令来根据sha1 digest运行cache中的脚本。对于大多数的api都提供了自动执行该过程的API。

evalsha in context of pipelining

在pipeline场景下执行EVALSHA命令时pipeline中的commands是按顺序执行的但是执行的间隙中可能穿插来自其他clients的commands`。

在pipeline执行时可能返回NOSCRIPT error,但是该错误无法被处理。

因此在pipeline环境下client library通常直接使用eval而不是evalsha -> script load -> evalsha

Script Cache Semantics

在执行常规操作时来自应用的脚本会被一直保存在cache中直到server被重启或缓存被清空。

清空缓存的唯一方式是通过显式调用SCRIPT FLUSH命令执行该命令会完全清空script cache移除目前执行过的所有脚本内容。通常来说该命令只在当前redis实例需要初始化并后续为其他应用服务时被调用。

Script Command

SCRIPT命令提供了一系列用于控制scripting subsystem的方法

  • SCRIPT FLUSH: 该命令是清空redis script cache的唯一方式
  • SCRIPT EXISTS: 可接收一个或多个sha1 digests作为参数该命令会返回值为0 or 1的数组。其中1代表脚本在cache中存在0代表脚本在cache中不存在。
  • SCRIPT LOAD: 该命令会向redis cache中注册一个特定的脚本。在执行EVALSHA命令之前执行该命令,能够确保EVALSHA命令不会失败
  • SCRIPT KILL: 该命令是打断long-running script的唯一方式。当script的执行时长超过限制默认为5s如果该脚本执行时未对数据进行任何修改则可以通过SCRIPT KILL命令来终止
  • SCRIPT DEBUG: 用于控制redis lua scripts debugger的使用

Script Replication

在standalone部署场景下单个master实例管理整个database而在集群部署模式下至少会存在3个masters共同管理database。对于master实例redis会使用replication来维护一个或多个replicas。

由于scripts可能会修改数据故而redis需要保证所有由script执行的写操作都要被同步给replicas从而replicas的数据一致性。

在script replication方面存在如下两种方法

  • verbatim replicationmaster会将script的source code发送给replicasreplicas再执行接收到的脚本。这种模式能够节省replication brandwith因为在循环场景下短脚本能够产生大量的命令。
    • 但是这种模式下replicas会对master的工作进行redo即使针对读指令这将会浪费资源并且要求脚本中的写操作具有确定性
  • effects replication: 只有脚本中对数据造成修改的命令才会被同步replicas针对被同步的写指令进行执行,并不会像前一个模式一样重新执行脚本。
    • 这这种模式下关于replication的brandwith会增加但是该模式本质上的确定的无需写操作具有确定性

在redis 3.2之前,verbatim script replication是唯一受支持的模式直到Redis 3.2加入effects replication

Replicating commands instead of Scripts

从redis 3.2开始,可以选择effects replication的同步方式,该方式会replicate commands generated by the script

从redis 5.0开始script effects replication为默认模式并不需要显式启用。

在该模式下当lua脚本被执行时redis会收集all the commands executed by Lua scripting engine that actually modify the dataset。当脚本执行完成之后commands sequence将会被封装到multi/exec并且被发送给replcias和AOF。

该种replication方式在如下用例场景下具有优势

  • script的计算速度很慢但是最终script的修改能够被统计为较少的write commands
  • 当启用script effects replication时non-determinstic function校验将会被移除。故而,可以在脚本中自由的使用TIMESRANDMEMBER这类非确定性的指令
  • The Lua PRNG in this mode is seeded randomly on every call

除非通过server配置或默认启用了effect replication否则若需令脚本同步按照effect replication的方式进行同步必须在脚本执行任何write command之前执行如下lua命令

redis.replicate_commands()

在调用redis.replicate_commands方法时:

  • 如果effect replication被启用那么返回值为true
  • 如果在脚本已经执行过write command之后再调用该方法那么该方法返回值为false并且会使用normal whole script replication

该方法在Redis 7.0中被标记为废弃如果仍然调用它那么其返回值一直为true

Scripts with deterministic writes

从redis 5.0开始script replication默认情况下为effect-based而不是verbatim而在redis 7.0verbatim script replication的方式被完全移除。故而如下内容只针对版本低于7.0并且没有使用effect-based的script replication。

在使用verbatim script replication的情况下主要注意only change the database in a deterministic way。在redis实例执行script时一直到redis 5.0,默认都是通过sending the script itself方式来传播脚本的执行到replicas和AOF中的。在replica上被传递的脚本会被重新执行脚本对database的修改必须可重现。

通常,发送脚本本身占用的带宽要比发送脚本产生的命令占用的带宽要小cpu消耗也更小。

但是,sending the script itself这种replication方式并非对所有的场景都是可行的。

verbatim scripts replication要求脚本拥有如下属性:

  • arguments相同input data set相同的场景下脚本必须产生相同的redis write commands
  • 脚本执行的操作不能依赖于任何hidden(non-explicit) informationstate that may change as the script execution proceeds or between different executions of the script
  • 脚本的执行也不能依赖于任何io设备的外部输入

例如,使用系统时间、调用返回随机值的redis命令、使用redis的随机数生成器都会导致scripts that will not evaluate consistently

为了保证脚本的确定性行为redis做了如下处理

  • lua不会export任何访问系统时间或外部状态的命令
  • 如果脚本在调用random command(例如RANDOMKEY, SRANDMEMBER, TIME)之后,又调用了Redis command able to alter the data set那么redis将会block the script with error
    • 上述即代表read-only scripts that don't modify dataset can call those commands
    • random command并不代表使用随机数的command而是代表non-deterministic command例如TIME
  • 在redis 4.0中,例如SMEMBERS这类以随机顺序返回元素的命令,在被lua脚本调用时表现出的行为不同在将数据返回给lua脚本时会根据字典序进行排序。
    • 故而在redis 4.0环境下lua脚本中调用redis.call("SMEMBERS", KEYS[1])总是会按相同的顺序来返回Set中的元素。
    • 但是从redis 5.0开始,又不会执行该排序,因为可以effect replication被设置为默认
  • lua的伪随机数生成function math.random已经被redis修改故而在每次执行时都会使用相同的seed。
    • 故而每次script执行时调用match.random总是会生成相同序列的数字

由上述描述可知redis修改了lua的伪随机数生成器(只在verbatim replication下成立)故而每次运行lua脚本时随机数生成器返回的数值序列都相同。

但是,仍然可以通过一定的技巧来生成随机数,示例如下所示:

require 'rubygems'
require 'redis'

r = Redis.new

RandomPushScript = <<EOF
    local i = tonumber(ARGV[1])
    local res
    while (i > 0) do
        res = redis.call('LPUSH',KEYS[1],math.random())
        i = i-1
    end
    return res
EOF

r.del(:mylist)
puts r.eval(RandomPushScript,[:mylist],[10,rand(2**32)])

每次上述程序运行resulting list都会拥有相同的元素

redis> LRANGE mylist 0 -1
 1) "0.74509509873814"
 2) "0.87390407681181"
 3) "0.36876626981831"
 4) "0.6921941534114"
 5) "0.7857992587545"
 6) "0.57730350670279"
 7) "0.87046522734243"
 8) "0.09637165539729"
 9) "0.74990198051087"
10) "0.17082803611217"

为了让脚本确定并且让其产生不同的random elements可以像脚本中添加额外参数用于对lua的伪随机数生成器进行seed。脚本示例如下所示

RandomPushScript = <<EOF
    local i = tonumber(ARGV[1])
    local res
    math.randomseed(tonumber(ARGV[2]))
    while (i > 0) do
        res = redis.call('LPUSH',KEYS[1],math.random())
        i = i-1
    end
    return res
EOF

r.del(:mylist)
puts r.eval(RandomPushScript,1,:mylist,10,rand(2**32))

上述示例中,通过math.randomseed方法来将ruby生成的随机数作为了lua随机数生成器的种子从而产生了不同的数值序列。

当然,上述脚本的内容仍然是确定的,当传递的ARGV[2]相同时lua随机数生成器生成的数值序列仍然是固定的。

该seed是由client生成的作为参数被传递给脚本并且会作为参数被传播给replicas和AOF。这样能够确保同步给AOF和replicas的changes相同。

Debugging Eval scripts

从Redis 3.2开始Redis支持natvie Lua debuggingredis lua debugger是远程的由server和client组成。

Execution under low memory conditions

当redis的内存使用超过最大限制后the first write command encountered in the script that uses additional memory will cause the script to abort

但是当script中的第一条write command没有使用额外内存时存在例外例如DEL, LREM命令。在该场景下redis会允许该脚本中所有的命令运行从而保证lua脚本执行的原子性。如果lua脚本中接下来的命令耗费了额外的内存那么redis内存使用可以超过最大值限制。

Eval flags

通常当运行Eval script时server并不知道该脚本如何访问database。默认情况下redis假设所有脚本都会对数据进行读取和写入

但是从Redis 7.0开始,支持在创建脚本时声明flags用于告知redis将如何访问数据。

在如下示例中将在脚本的第一行声明flags:

#!lua flags=no-writes,allow-stale
local x = redis.call('get','x')
return x

当redis看到#!的注释时,其将会将脚本看作声明了flags即使没有flags被实际定义其相比于没有#!的脚本仍然存在一系列不同的默认值。

另一个不同的区别是,scripts without #!可以运行命令来访问keys belonging to different cluster hash slots,但是拥有#!的将继承默认的flags故而其不能对keys belonging to different cluster hash slots进行访问。

Redis Pub/sub

在redis中SUBSCRIBE,UNSUBSCRIBE,PUBLISH实现了Publish/Subscribe消息范式,其规范如下:

  • 消息的发送者并未直接将消息发送给特定的接受者
  • 消息发送者将消息发送给特定的channel并不知道消息订阅者的情况甚至不知道消息是否存在订阅者
  • 消息的订阅者对一个或者多个channel订阅并且只会接收订阅的消息
  • 消息的订阅者并无法感知消息的发布者

上述规范将消息的发布者和订阅者进行了解耦,增强了拓展性,并且允许更加动态且灵活的网络拓扑。

例如,如果想要对channel11ch:00channel进行订阅client可以发送SUBSCRIBE命令:

SUBSCRIBE channel11 ch:00

其他clients发送到这些channels的消息将会被redis推送给所有订阅这些channels的消息。消息的订阅者将会按照消息被推送的顺序来接收消息。

如果一个client订阅了一个或多个channels那么其不应该发送任何commands但其可以对channel进行SUBSCRIBEUNSUBSCRIBE

对于subscriptionunsubscription操作的回复以消息的形式被返回client只需要读取连续的消息流即可消息流中的第一个元素代表消息的类型。

在一个已经订阅的RESP2client的上下文中允许执行的命令为

  • PING
  • PSUBSCRIBE
  • PUNSUBSCRIBE
  • QUIT
  • RESET
  • SSUBSCRIBE
  • SUBSCRIBE
  • SUNSUBSCRIBE
  • UNSUBSCRIBE

但是在使用RESP3时client在subscribed状态下可以发送任何commands。

当使用redis-cli时,如果处于subscribed模式下,那么并无法调用UNSUBSCRIBEPUNSUBSCRIBE命令,此时redis-cli将无法接收任何命令,并且只能通过Ctrl + C来退出。

Delivery semantics

Redis的Pub/Sub机制表现出了at-most-once的消息传递语义其代表消息最多只会被发送一次。一旦消息被redis server发送不会再重新发送。并且如果订阅者无法处理该消息例如网络故障或处理异常那么该消息将会被永远丢失。

如果应用需要更强的消息传递保证,需要使用Redis Stream。在stream中的消息将会被持久化并且支持at-most-onceat-least-once两种传递语义。

format of pushed messages

消息是array-reply with three elements,其中第一个元素为消息的种类:

  • subscribe: 代表我们成功订阅了channelchannel的名称在第二个element中给出。第三个元素代表目前订阅的channels数量
  • unsubscribe: 代表我们成功取消了对channel的订阅channel的名称在第二个元素中给出。第三个元素代表目前订阅的channels数量
    • 当最后一个元素为0时代表不再订阅任何channel此时client可以发送任何类型的redis commandsclient不再处于Pub/Sub状态
  • message: 代表当前message是被其他client通过PUBLISH命令发布的消息。第二个元素为name of the originating channel,第三个元素则是实际消息的payload

Database & Scoping

Pub/Sub机制和key space没有关联。Pub/Sub机制并不会被任何层面干扰包括database numbers。

在db 10中发布的消息仍然可以被db1上的subscriber接收。

如果需要对channel进行作用域限制可以为channel name前缀环境名称test, staging, production...)。

Example

对channel进行监听可以使用SUBSCRIBE命令

127.0.0.1:6379> subscribe first second
1) "subscribe"
2) "first"
3) (integer) 1
1) "subscribe"
2) "second"
3) (integer) 2
1) "message"
2) "first"
3) "fuckyou"
1) "message"
2) "second"
3) "shit"

向channel发送消息可以使用PUBLISH命令

127.0.0.1:6379> publish first fuckyou
(integer) 1
127.0.0.1:6379> publish second shit
(integer) 1

Pattern-matching subscriptions

redis pub/sub实现支持pattern matching。clients支持对glob-style pattern进行订阅并接收所有发送到匹配channel的消息。

例如

PSUBSCRIBE news.*

上述指令会订阅所有发送到news.art.figurative, news.music.jazz等channel的消息。所有global-style patterns都有效也支持多个wildcards。

PUNSUBSCRIBE news.*

上述命令将会根据pattern对channels进行取消订阅。

Messages received as a result of pattern matching其发送格式将会有所不同其将包含4个elements

  • 消息的type将会是pmessage: 这代表该消息通过pattern-matching subscription匹配到的
  • 第二个元素为original pattern matched
  • 第三个元素为name of the originating channel
  • 第四个元素为message payload

通过psubscribe接收到的消息结构如下:

127.0.0.1:6379> psubscribe f*
1) "psubscribe"
2) "f*"
3) (integer) 1
1) "pmessage"
2) "f*"
3) "first"
4) "suki"

Messages matching both a pattern and a channel subscription

如果client订阅了多个patterns那么可能多次接收到相同的消息示例如下所示

SUBSCRIBE foo
PSUBSCRIBE f*

在上述示例中,如果消息被发送到foo channel那么client将会接收到两条消息type为message的消息和type为pmessage的消息。

the meaning of the subscription count with pattern matching

subscribe, unsubscribe, psubscribe, punsubscribe消息类型中,消息中最后的元素代表仍然处于活跃状态的订阅数量。该数量代表total number of channels and patterns the client is still subscribed to。故而如果该数量变为0代表client会退出Pub/Sub状态client取消了对所有channels和patterns的订阅。

Sharded Pub/Sub

从reids 7开始引入了shard channels其将被分配给slots且分配的算法和assign keys to slots的算法相同。并且,a shard message must be sent to a node that owns the slot the shard channel is hashed to。集群将会确保published shard messages将会被转发给shard中所有的节点故而client在订阅shard channel时可以连接shard中的任意一个节点不管是master responseible for the slot还是any of master's replicas

Sharded Pub/Sub能够帮助Pub/Sub在集群模式下的拓展。其将消息的传播范围限制在shard of a cluster之内。故而,相比于global Pub/Sub模式下每个消息都会被传播到cluster中的所有节点Sharded Pub/Sub能够减少cluster bus传播的数据量。

SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH命令能够用于sharded pub/sub场景。

在redis的定义中a shard is defined as a collection of nodes that serve the same set of slots and that replicate from each other

在未引入Shard Pub/Sub机制之前,Pub/Sub的channel在集群中并不会被hash到slot。此时cluster中的每个node独立的维护订阅关系不同节点之间的订阅并不共享。并且发送给某一节点的消息将会广播到整个cluster中所有的nodes。

Redis Keyspace notifaction

Keyspace notification允许客户端针对Pub/Sub channels进行订阅,从而接收events affecting the Redis data set

接收事件的示例如下:

  • all the commands affecting a given key
  • all the keys receiving an LPUSH operation
  • all the keys expiring in the database 0

在使用Pub/Sub如果client失去了对redis的连接并重新连接后在client丢失连接的时间范围内所有的事件都会丢失

Type of events

keyspace notifications are implemented by sending two distinct types of events for every operation affecting the redis data space.

例如对database 0中的meykey执行的DEL操作将会传递两个消息等价于如下publish语句

PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey

其中__keyspace@0__:mykey代表所有针对mykey的事件;__keyevent@0__:del只针对mykeydel operation。

第一种类型的事件其channel名称包含keyspace prefix被称为key-space notification

第二种类型的事件channel名称中包含keyevent prfix, 被称为Key-event notification

在前面示例中对mykey的del event生成了两条消息

  • The key-space channel receives as message the name of the event
  • The key-event channel receives as message the name of the key
Configuration

默认情况下keyspace event notification处于disabled状态因为其并非必须特性且会消耗部分CPU资源。可以通过redis.conf中的notify-keyspace-events配置或通过CONFIG SET来对其进行启用。

将该参数设置为空字符串会禁用notifications为了启用该特性需要将指定为非空字符串由如下字符组成

  • 字符串中至少应该存在K或E,否则事件将不会被传递
  • 如果要对lists启用key-space events,那么可以配置为Kl
  • 可以将参数设置为KEA为大部分data types启用events

字符的含义如下表所示:

K     Keyspace events, published with __keyspace@<db>__ prefix.
E     Keyevent events, published with __keyevent@<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
t     Stream commands
d     Module key type events
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
m     Key miss events generated when a key that doesn't exist is accessed (Note: not included in the 'A' class)
n     New key events generated whenever a new key is created (Note: not included in the 'A' class)
o     Overwritten events generated every time a key is overwritten (Note: not included in the 'A' class)
c     Type-changed events generated every time a key's type changes (Note: not included in the 'A' class)
A     Alias for "g$lshztdxe", so that the "AKE" string means all the events except "m", "n", "o" and "c".
Timing of expired events

如果key存在ttl那么redis将会在如下两种时机下让key过期

  • 当key被command访问并发现key过期
  • 当backgroud system查找到expired keys

当key通过上述两种机制之一被发现过期时会生成expired events。redis并不保证在key的ttl到期后立马会产生expired event。

如果该key未持续被command访问且同时存在多个key关联ttl那么the key time to live drops to zero的时间和the exipred event生成的时间可能存在较大的延迟。

Expired events将会在redis server删除key时被生成并非ttl减为0的时间。

Events in cluster

redis cluster中的每个node都会生成其自己的keyspace相关事件。但是和集群环境下Pub/Sub机制不同的是,events notifications并不会被广播到cluster中的其他节点。keyspace events是node-specific从cluster接收到所有的keyspace events时clients需要为每个node都进行订阅。

Redis Cluster Speification

Main properties and rationales of the design

Redis Cluster Goals

Redis Cluster为redis的分布式实现其目标按照重要性排序如下

  • 高性能并且支持线性拓展到1000 nodes并不会使用代理并且不会对values执行merge operations
  • Acceptable degree of write safty: 系统会尝试保留所有来源clients的写操作
  • Availability: redis cluster在绝大多数master nodes可达并且对每个master node都至少有一个replica可达的场景下维持生存。此外,在使用replicas migration如果master不再拥有任何replica那么其会从其他拥有多个replicas的master处接收一个replica。

Implemented subset

redis cluster实现了所有非分布式版本redis支持的single key commands。对于performing complex multi-key operations的命令,例如set unions and intersections,仅针对所有keys都被hash到相同slot的场景提供了实现。

redis cluster实现了hash tags的概念用于强制让部分keys被存储到相同的slot。但是在manual resharding的期间multi-ey operations可能并不可用而single-key operations则是永远可用的。

Redis Cluster并不像standalone版本的redis一样支持multiple databases其只支持database 0。并且SELECT命令在集群环境下不允许被执行。

Client and Server roles in the Redis cluster protocol

在redis集群中nodes负责对数据进行存储并维护集群的状态以及将keys映射到对应的nodes。cluster nodes可以自动发现其他的ndoes并探知non-working nodes在错误发生时将replica nodes提升为master。

cluster中所有的nodes都会通过TCP bus和名为Redis Cluster Bus的二进制协议来连结。其中,TCP bus是一种逻辑上的结构代表cluster nodes中任两个nodes之间都会相互连接。

nodes使用gossip protocol来传播关于cluster的信息可用于发现新节点、发送ping packet确保所有的其他节点状态正常、指定条件下发送的消息在集群间传播Pub/Sub消息、在用户请求时手动failover也会使用到cluster bus。

因为cluster nodes不能代理请求故client可通过redirection errors(MOVED, ASK)被重定向到其他节点。client理论上可以对集群中的任何节点发送请求并在需要时重定向到其他节点故client无需持有集群的状态。但是client可以缓存keys和nodes的关系从而提升性能。

write safety

redis cluster在nodes之间使用了asynchronous replication并使用了last failover winsimplicit merge function

last failover wins implicit merge function代表在集群中如果某节点被选举为新的主节点那么该选举出节点的数据就称为了该分片的权威数据分片中所有其他replicas都会从新选举的节点中复制数据并覆盖自己原先数据从而保证failover后所有replicas和master node数据的一致性。

在发生network partition总会存在一个window of time在该时间范围内部分写操作可能会丢失。但是在不同场景下window可能会有所不同这取决于

  • client is connected to the majority of masters
  • client is connected to the minority of masters
connected to the majority of masters

Redis Cluster tries harder to retain writes that are performed by clients connected to the majority of masters, compared to writes performed in the minority side.

在上述描述中,tries harder是由redis cluster的failover选举机制决定的相比于minority of master的写操作会直接丢失,针对majority of masters的写操作绝大部分会被保留,基于last failover wins的策略。

如下示例展示了loss of acknowledged writes received in the majority partitions during failures的场景:

  • 写操作可能已经发送给master且mater响应了该写请求但是该写操作尚未通过asynchronous replication被同步到replica nodes。如果在该时刻master发生故障那么该写操作将永远丢失。
  • 另一种理论上可能的write loss模式如下:
    • master因为network partition而不可被访问
    • master的其中一个replica发生failover
    • 过了一段时间后master又重新可访问
    • A client with an out-of-date routing table可能会向old master发送写请求如果此时old master尚未被转化为cluster的replica,那么该写操作又可能会被丢失

上述描述中,第二种场景不太可能发生,因为在master nodes unable to communicate with the majority of the other masters for enough time to be failed over(for enough time to be failed over代表无法通信的时间已经达到触发failover的时间)的前提下master ndoes将无法接收写请求并且即使在network partition已经被修复的场景下在一段时间内写请求仍然会被拒绝用以其他节点通知该master节点集群状态的变化。除此之外该场景还需要client的routing table尚未被更新。

connected to the minority of masters

针对minority side of a partition的写请求将会又有更大的丢失写操作窗口。例如,如果minority of masters拥有一个或多个clientsredis cluster将会在network partition期间丢失相当多数量的写操作,因为在the masters are failed over in the majority side时,所有发送到minority of masters的写请求都会丢失。

具体而言如果master要发生fail over那么其必须至少在NODE_TIMEOUT时间范围内无法被majority of masters访问

  • 如果network partition在该时间限制前恢复并不会丢失任何写请求
  • 如果network partition的持续时间超过NODE_TIMEOUT,那么所有针对minority side的写操作直到达到NODE_TIMEOUT都会被丢失
    • 因为minority side在NODE_TIMEOUT达到后如果仍然无法连接majority将拒绝接收写请求故而minority不可访问后window存在上限

Availability

redis cluster在minority side of partition方将不可用。在majority side of the partition,假设其拥有majority of masters并对每个不可达的master都拥有一个replica那么在时间超过NODE_TIMEOUT之后一个replica将会被重新选举变为新的master此时redis cluster将重新可访问。

上述描述具体含义如下:

  • 当network partition发生后majority partition部分可用而minority partition不可用
  • 当NODE_TIMEOUT达到后majority partition会重新选举master然后majority partition完全可用
  • 当network partition恢复后minority partition将作为replicas加入到集群作为replcias提供服务

故而redis cluster允许在少数节点发生故障时进行恢复需要majority of masters但是对于large net splits场景redis cluster并不适用。

如果cluster由N个master nodes组成每个master node都拥有一个replica那么cluster的majority side在一个节点partitioned之后仍然可访问但是当两个节点partitioned之后仍然能正常访问的概率为1-(1/(N*2-1))

redis cluster目前存在replicas migration的特性replicas将会迁移到orphaned mastersmasters no longer having replicas这将在很多场景下提高redis cluster的可访问性。故而每次正常处理failure event后cluster都会重新调整replicas layout以更高的抵御下一次failure。