diff --git a/中间件/redis/redis.md b/中间件/redis/redis.md index 266b49d..c187b29 100644 --- a/中间件/redis/redis.md +++ b/中间件/redis/redis.md @@ -101,6 +101,10 @@ - [XREAD with BLOCK argument](#xread-with-block-argument) - [Consumer groups](#consumer-groups) - [Creating a consumer group](#creating-a-consumer-group) + - [Create the stream automatically](#create-the-stream-automatically) + - [XREADGROUP](#xreadgroup) + - [Recovering from permanent failures](#recovering-from-permanent-failures) + - [XPENDING](#xpending) # redis @@ -1646,7 +1650,7 @@ XREAD的blocking形式也支持监听多个streams,也可以指定多个key na | ... (and so forth) | +----------------------------------------+ ``` -consumer group能够为consumer instance提供`history of pending messages`,情切当consumer请求new messages时,只会向其传递`ID大于last_delivered_id的message`。 +consumer group能够为consumer instance提供`history of pending messages`,并且当consumer请求new messages时,只会向其传递`ID大于last_delivered_id的message`。 对于redis stream而言,其可以包含多个consumer groups。并且,对于同一redis stream,可以同时包含`consumer reading without consumer groups via XREAD`和`consumer reading via XREADGROUP in consumer group`两种类型的consumer。 @@ -1667,3 +1671,184 @@ OK - 如果在此处指定了`0`,那么consumer group将会消费`all the messages in the stream history` - 此处,也可以指定其他的`valid ID`,`consumer group will start delivering messages that are greater than the ID you specify` +##### Create the stream automatically +`XGROUP CREATE`命令会期望target stream存在,并且当target stream不存在时会返回异常。如果target stream不存在,可以通过在末尾指定`MKSTREAM`选项来自动创建stream。 + +`XGROUP CREATE`同样支持自动创建stream,可通过`MKSTREAM`的subcommand作为最后一个参数: +```redis-cli +XGROUP CREATE race:italy italy_riders $ MKSTREAM +``` +在调用该命令后,consumer group会被创建,之后可以使用`XREADGROUP`命令来通过consumer group读取消息。 + +##### XREADGROUP +XREADGROUP命令和XREAD命令类似,并同样提供`BLOCK`选项: +- `GROUP`: 该option是一个mandatory option,在使用`XREADGROUP`时必须指定该选项。 + - `GROUP`包含两个arguments: + - the name of consumer group + - the name of consumer that is attemping to read + - `COUNT`: XREADGROUP同样支持该option,和XREAD中的`COUNT` option等价 + +在如下示例中,向`race:italy` stream中添加riders并且并且尝试通过consumer group进行读取: +```redis-cli +> XADD race:italy * rider Castilla +"1692632639151-0" +> XADD race:italy * rider Royce +"1692632647899-0" +> XADD race:italy * rider Sam-Bodden +"1692632662819-0" +> XADD race:italy * rider Prickett +"1692632670501-0" +> XADD race:italy * rider Norem +"1692632678249-0" +> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy > +1) 1) "race:italy" + 2) 1) 1) "1692632639151-0" + 2) 1) "rider" + 2) "Castilla" +``` + +XGROUPREAD的返回结构和XREAD类似。 + +在上述命令中,`STREAMS` option之后指定了`>`作为special ID: +- special ID `>`只在consumer group上下文中有效,其代表`messages never delivered to other consumers so far` + +除了指定`>`作为id外,还支持将其指定为`0`或其他有效ID。在指定id为`>`外的其他id时,`XREADGROUP command will just provide us with history of pending messages, in that case we will never see new messages in the group`。 + +XREADGROUP命令基于指定id的不同,行为拥有如下区别: +- 如果指定的id为`>`, 那么`XREADGROUP`命令将只会返回`new messages never delivered to other consumers so far`,并且,作为副作用,会更新consumer group的last ID +- 如果id为其他有效的`numerical ID`,`XREADGROUP command will let us access our history of pending messages`,即`set of messages that are delivered to this specified consumer and never acknowledged so far` + +可以指定ID为`0`来测试该行为,并且结果如下: +- we'll just see the only pending message + +```redis-cli +> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0 +1) 1) "race:italy" + 2) 1) 1) "1692632639151-0" + 2) 1) "rider" + 2) "Castilla" +``` + +但是,如果通过ack将该message标记为processed,那么其将不再作为`pending messages history`的一部分,故而,即使继续调用`XREADGROUP`也不再包含该message: +```redis-cli +> XACK race:italy italy_riders 1692632639151-0 +(integer) 1 +> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0 +1) 1) "race:italy" + 2) (empty array) +``` +> 在上述示例中,用于指定id为非`>`后,只会返回pending messages,在对pending messages进行ack后,那么被ack的消息在下次XREADGROUP调用时将不会被返回 + +`上述操作都通过名为Alice的consumer进行的操作,如下示例中将通过名为Bob的consumer进行操作`。 + +```redis-cli +> XREADGROUP GROUP italy_riders Bob COUNT 2 STREAMS race:italy > +1) 1) "race:italy" + 2) 1) 1) "1692632647899-0" + 2) 1) "rider" + 2) "Royce" + 2) 1) "1692632662819-0" + 2) 1) "rider" + 2) "Sam-Bodden" +``` + +在上述示例中,`group相同但是consumer为Bob而不是Alice`。并且redis仅会返回new messages,前面示例中的`Castilla`消息由于已经被传递给Alice并不会被传递给Bob。 + +通过上述方式,Alice, Bob和group中的其他consumer可以从相同的stream中读取不同的消息。 + +XREADGROUP有如下方面需要关注: +- consumer并不需要显式创建,`consumers are auto-created the first time they are mentioned` +- 在使用XREADGROUP时,可以同时读取多个keys。但是,想要令该操作可行,`必须create a consumer group with the same name in every stream`。该方式通常不会被使用,但是在技术上是可行的 +- `XREADGROUP`为一个`write command`,其只能在master redis isntance`上被调用。`因为该命令调用存在side effect,该side effect会对consumer group造成修改 + +如下示例为一个Ruby实现的consumer, +```ruby +require 'redis' + +if ARGV.length == 0 + puts "Please specify a consumer name" + exit 1 +end + +ConsumerName = ARGV[0] +GroupName = "mygroup" +r = Redis.new + +def process_message(id,msg) + puts "[#{ConsumerName}] #{id} = #{msg.inspect}" +end + +$lastid = '0-0' + +puts "Consumer #{ConsumerName} starting..." +check_backlog = true +while true + # Pick the ID based on the iteration: the first time we want to + # read our pending messages, in case we crashed and are recovering. + # Once we consumed our history, we can start getting new messages. + if check_backlog + myid = $lastid + else + myid = '>' + end + + items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid) + + if items == nil + puts "Timeout!" + next + end + + # If we receive an empty reply, it means we were consuming our history + # and that the history is now empty. Let's start to consume new messages. + check_backlog = false if items[0][1].length == 0 + + items[0][1].each{|i| + id,fields = i + + # Process the message + process_message(id,fields) + + # Acknowledge the message as processed + r.xack(:my_stream_key,GroupName,id) + + $lastid = id + } +end +``` +在上述consumer实现中,`consumer启动时会对history进行消费`,即`list of pending messages`。(因为consumer可能在之前发生过崩溃,故而在重启时可能希望对`messages delivered to us without getting acked`进行重新读取)。在该场景下,`we might process a message multiple times or one time`。 + +并且,在consumer实现中,一旦history被消费完成,`XREADGROUP`命令将会返回empty list,此时便可以切换到`special ID >`,从而对new messages进行消费。 + +##### Recovering from permanent failures +在上述ruby实现的consumers中,多个consumers将会加入相同的consumer group,每个consumer都会消费和处理`a subset of messages`。当consumer从failure中进行恢复时,会重新读取`pending messages that were delivered just to them`。 + +但是,在现实场景中,可能会出现`consumer may permanently fail and never recover`的场景。 + +redis consumer group对`permanently fail`的场景提供了专门的特性,支持对`pending messages of a given consumer`进行认领,故而`such pending messages will change ownership and will be re-assigned to a different consumer`。 + +在使用上述特性时,consumer必须检查`list of pending messages`,并`claim specific messages using special command`。否则,对于permanently fail的consumer,其pending messages将会被pending forever,即pending messages一直被分配给`old consumer that fail permanently`。 + +###### XPENDING +通过`XPENDING`命令,可以查看pending entries in the consumer group。该命令是`read-only`的,调用该命令并不会导致任何消息的`ownership`被变更。 + +如下为XPENDING命令调用的示例: +```redis-cli +> XPENDING race:italy italy_riders +1) (integer) 2 +2) "1692632647899-0" +3) "1692632662819-0" +4) 1) 1) "Bob" + 2) "2" +``` +当调用该command时,command的输出将会包含如下内容: +- `total number of pending messages in the consumer group` +- `lower and higher message ID among the pending messages` +- `a list of consumers and the number of pending messages they have` + +在上述示例中,仅名为`Bob`的consumer存在2条`pending messages`。 + +`XPENDING命令支持传递更多的参数来获取更多信息`,`full command signature`如下: +```redis-cli +XPENDING [[IDLE ] []] +```