doc: 阅读redis文档

This commit is contained in:
asahi
2025-09-16 11:35:23 +08:00
parent 110b653c2d
commit 11ad9a1420

View File

@@ -101,6 +101,10 @@
- [XREAD with BLOCK argument](#xread-with-block-argument) - [XREAD with BLOCK argument](#xread-with-block-argument)
- [Consumer groups](#consumer-groups) - [Consumer groups](#consumer-groups)
- [Creating a consumer group](#creating-a-consumer-group) - [Creating a consumer group](#creating-a-consumer-group)
- [Create the stream automatically](#create-the-stream-automatically)
- [XREADGROUP](#xreadgroup)
- [Recovering from permanent failures](#recovering-from-permanent-failures)
- [XPENDING](#xpending)
# redis # redis
@@ -1646,7 +1650,7 @@ XREAD的blocking形式也支持监听多个streams也可以指定多个key na
| ... (and so forth) | | ... (and so forth) |
+----------------------------------------+ +----------------------------------------+
``` ```
consumer group能够为consumer instance提供`history of pending messages`情切当consumer请求new messages时只会向其传递`ID大于last_delivered_id的message`。 consumer group能够为consumer instance提供`history of pending messages`并且当consumer请求new messages时只会向其传递`ID大于last_delivered_id的message`。
对于redis stream而言其可以包含多个consumer groups。并且对于同一redis stream可以同时包含`consumer reading without consumer groups via XREAD`和`consumer reading via XREADGROUP in consumer group`两种类型的consumer。 对于redis stream而言其可以包含多个consumer groups。并且对于同一redis stream可以同时包含`consumer reading without consumer groups via XREAD`和`consumer reading via XREADGROUP in consumer group`两种类型的consumer。
@@ -1667,3 +1671,184 @@ OK
- 如果在此处指定了`0`那么consumer group将会消费`all the messages in the stream history` - 如果在此处指定了`0`那么consumer group将会消费`all the messages in the stream history`
- 此处,也可以指定其他的`valid ID``consumer group will start delivering messages that are greater than the ID you specify` - 此处,也可以指定其他的`valid ID``consumer group will start delivering messages that are greater than the ID you specify`
##### Create the stream automatically
`XGROUP CREATE`命令会期望target stream存在并且当target stream不存在时会返回异常。如果target stream不存在可以通过在末尾指定`MKSTREAM`选项来自动创建stream。
`XGROUP CREATE`同样支持自动创建stream可通过`MKSTREAM`的subcommand作为最后一个参数
```redis-cli
XGROUP CREATE race:italy italy_riders $ MKSTREAM
```
在调用该命令后consumer group会被创建之后可以使用`XREADGROUP`命令来通过consumer group读取消息。
##### XREADGROUP
XREADGROUP命令和XREAD命令类似并同样提供`BLOCK`选项:
- `GROUP`: 该option是一个mandatory option在使用`XREADGROUP`时必须指定该选项。
- `GROUP`包含两个arguments
- the name of consumer group
- the name of consumer that is attemping to read
- `COUNT`: XREADGROUP同样支持该option和XREAD中的`COUNT` option等价
在如下示例中,向`race:italy` stream中添加riders并且并且尝试通过consumer group进行读取
```redis-cli
> XADD race:italy * rider Castilla
"1692632639151-0"
> XADD race:italy * rider Royce
"1692632647899-0"
> XADD race:italy * rider Sam-Bodden
"1692632662819-0"
> XADD race:italy * rider Prickett
"1692632670501-0"
> XADD race:italy * rider Norem
"1692632678249-0"
> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"
2) 1) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"
```
XGROUPREAD的返回结构和XREAD类似。
在上述命令中,`STREAMS` option之后指定了`>`作为special ID
- special ID `>`只在consumer group上下文中有效其代表`messages never delivered to other consumers so far`
除了指定`>`作为id外还支持将其指定为`0`或其他有效ID。在指定id为`>`外的其他id时`XREADGROUP command will just provide us with history of pending messages, in that case we will never see new messages in the group`。
XREADGROUP命令基于指定id的不同行为拥有如下区别
- 如果指定的id为`>` 那么`XREADGROUP`命令将只会返回`new messages never delivered to other consumers so far`并且作为副作用会更新consumer group的last ID
- 如果id为其他有效的`numerical ID``XREADGROUP command will let us access our history of pending messages`,即`set of messages that are delivered to this specified consumer and never acknowledged so far`
可以指定ID为`0`来测试该行为,并且结果如下:
- we'll just see the only pending message
```redis-cli
> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
2) 1) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"
```
但是如果通过ack将该message标记为processed那么其将不再作为`pending messages history`的一部分,故而,即使继续调用`XREADGROUP`也不再包含该message
```redis-cli
> XACK race:italy italy_riders 1692632639151-0
(integer) 1
> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
2) (empty array)
```
> 在上述示例中用于指定id为非`>`后只会返回pending messages在对pending messages进行ack后那么被ack的消息在下次XREADGROUP调用时将不会被返回
`上述操作都通过名为Alice的consumer进行的操作如下示例中将通过名为Bob的consumer进行操作`。
```redis-cli
> XREADGROUP GROUP italy_riders Bob COUNT 2 STREAMS race:italy >
1) 1) "race:italy"
2) 1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"
2) 1) "1692632662819-0"
2) 1) "rider"
2) "Sam-Bodden"
```
在上述示例中,`group相同但是consumer为Bob而不是Alice`。并且redis仅会返回new messages前面示例中的`Castilla`消息由于已经被传递给Alice并不会被传递给Bob。
通过上述方式Alice, Bob和group中的其他consumer可以从相同的stream中读取不同的消息。
XREADGROUP有如下方面需要关注
- consumer并不需要显式创建`consumers are auto-created the first time they are mentioned`
- 在使用XREADGROUP时可以同时读取多个keys。但是想要令该操作可行`必须create a consumer group with the same name in every stream`。该方式通常不会被使用,但是在技术上是可行的
- `XREADGROUP`为一个`write command`其只能在master redis isntance`上被调用。`因为该命令调用存在side effect该side effect会对consumer group造成修改
如下示例为一个Ruby实现的consumer
```ruby
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
# Pick the ID based on the iteration: the first time we want to
# read our pending messages, in case we crashed and are recovering.
# Once we consumed our history, we can start getting new messages.
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# If we receive an empty reply, it means we were consuming our history
# and that the history is now empty. Let's start to consume new messages.
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# Process the message
process_message(id,fields)
# Acknowledge the message as processed
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end
```
在上述consumer实现中`consumer启动时会对history进行消费`,即`list of pending messages`。因为consumer可能在之前发生过崩溃故而在重启时可能希望对`messages delivered to us without getting acked`进行重新读取)。在该场景下,`we might process a message multiple times or one time`。
并且在consumer实现中一旦history被消费完成`XREADGROUP`命令将会返回empty list此时便可以切换到`special ID >`从而对new messages进行消费。
##### Recovering from permanent failures
在上述ruby实现的consumers中多个consumers将会加入相同的consumer group每个consumer都会消费和处理`a subset of messages`。当consumer从failure中进行恢复时会重新读取`pending messages that were delivered just to them`。
但是,在现实场景中,可能会出现`consumer may permanently fail and never recover`的场景。
redis consumer group对`permanently fail`的场景提供了专门的特性,支持对`pending messages of a given consumer`进行认领,故而`such pending messages will change ownership and will be re-assigned to a different consumer`。
在使用上述特性时consumer必须检查`list of pending messages`,并`claim specific messages using special command`。否则对于permanently fail的consumer其pending messages将会被pending forever即pending messages一直被分配给`old consumer that fail permanently`。
###### XPENDING
通过`XPENDING`命令可以查看pending entries in the consumer group。该命令是`read-only`的,调用该命令并不会导致任何消息的`ownership`被变更。
如下为XPENDING命令调用的示例
```redis-cli
> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"
2) "2"
```
当调用该command时command的输出将会包含如下内容
- `total number of pending messages in the consumer group`
- `lower and higher message ID among the pending messages`
- `a list of consumers and the number of pending messages they have`
在上述示例中,仅名为`Bob`的consumer存在2条`pending messages`。
`XPENDING命令支持传递更多的参数来获取更多信息``full command signature`如下:
```redis-cli
XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]
```