From 6f7f99f5828093cd93e1be1b71e4cf0c33d04192 Mon Sep 17 00:00:00 2001 From: rikako <496063163@qq.com> Date: Tue, 7 Nov 2023 00:36:52 +0800 Subject: [PATCH] =?UTF-8?q?kafka=20consumer=E7=9B=B8=E5=85=B3=E6=96=87?= =?UTF-8?q?=E6=A1=A3=E9=98=85=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/kafka-尚硅谷.md | 51 +++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/mq/kafka/kafka-尚硅谷.md b/mq/kafka/kafka-尚硅谷.md index 040dc5e..0fb95e5 100644 --- a/mq/kafka/kafka-尚硅谷.md +++ b/mq/kafka/kafka-尚硅谷.md @@ -245,5 +245,56 @@ kafka中的日志清除策略由如下两种: > > 压缩后offset可能并不连续,此时若想要消费的offset不存在,那么会拿到比预期offset大的offset的消息 +#### 消费者 +通过消息队列的消费方式分为两种: +- pull: 拉取 +- push: 推送 + +kafka采用的是拉取模式来进行消费,因为拉取模式可以很好的兼容不同消费者实例的消费速率。各个消费者可以根据自己消费消息的速度来选择拉取消息的速度,如此能避免消息在多个消费者实例间的不合适分配,造成有的消费者实例空闲,而有的消费者实例消息堆积的情况。 + +##### kafka消费者工作流程 +在kafka架构中,一个分区只能够被同一消费者组中的一个消费者实例进行消费,但是消费者组中的一个消费者实例能够对多个分区进行消费。 + +并且,任一分区中的消息都会被广播到所有订阅该topic的消费者组中,但是一条消息只能够被消费者组中的一个消费者实例进行消费。 + +> #### offset +> kafka中一个一个分区能够被多个实例进行消费(被多个位于不同消费者组中的消费者进行消费),kafka通过`offset`来记录每个消费者消费到分区的偏移量。 +> +> offset被存储在kafka的一个主题中(_consumer_offsets),用于持久化offset数据。 + +##### 消费者组 +消费者组是由若干个消费者实例组成的集合,位于同一消费者组中的消费者实例,其groupid都相同。 + +在同一消费者组中,一条消息只能由一个消费者实例进行消费。消费者组中的消费者实例负责消费同一topic中的不同分区,各实例负责的分区没有重叠部分。 + +> 当消费者组中的消费者实例大于分区数时,消费者组中将存在闲置的消费者实例,同一分区无法由同一消费者组中的多个消费者实例共同消费。 +> + +> ##### 消费者组初始化流程 +> 在消费者组中,通过coordinator来协助消费者组实现初始化和分区分配操作,kafka集群中的每个broker实例都有一个对应的coordinator。 +> +> 消费者组在选择通过哪个coordinator实例来协助进行初始化时,会将groupid根据_consumer_offset主题的分区数量进行取模(默认_consumer_offset主题默认有50个分区,故而默认为groupid%50),根据取模后的结果查找_consumer_offsets的第N号分区位于哪台broker上,该broker的coordinator即会协助消费者组进行初始化。(例如`groupid%50=4`,即会选取_consumer_offsets第四号分区所在的broker来作为协助初始化的broker,该broker上的coordinator会协助消费者组进行初始化。 +> +> 在消费者组初始化时,组内每个消费者实例都会向选中的coordinator发送消息,请求加入消费者组,而coordinator则会选中其中一个消费者实例作为该消费者组的leader。 +> +> 选中leader分区之后,coordinator会把broker集群中待消费的topic信息全部发送给leader消费者,leader消费者则是会指定分区分配计划,将topic分区在组内的多个消费者实例之间进行分配。 +> +> 在leader指定完分区分配计划之后,会将分配计划发送给coordinator,coordinator收到分配方案后则会将分配方案分发给消费者组内的各个消费者实例。 + +> ##### 消费者组的心跳机制 +> 消费者组中的每个消费者都会和coordinator维持长连接,消费者会向coordinator发送心跳包(心跳包默认时间为3s)。 +> +> 一旦超过指定时间(`session.timeout.ms`,默认为45s)消费者没有向coordinator发送心跳包,那么该消费者将会被从消费者组中移除,并且会触发分区的再平衡,该宕机消费者实例负责的分区将会被分配给同组中别的消费者实例。 +> +> 如果消费者实例处理消息的时间过长(`max.poll.interval.ms`,默认为5min),自从上一次拉取分区消息时起超过5min还未再次拉取数据,也会触发再平衡。 + +##### 消费者实例拉取数据流程 +消费者实例在拉取消息时,首先会向broker发送一个fetch请求,从broker处批量拉取数据。其中,fetch过程能够通过如下参数自定义: +- `fetch.min.bytes`:单个fetch请求中,broker server返回的最小消息大小。如果消费者向broker发送fetch请求时,可获取的消息没有达到该参数限制的大小,那么broker会等待消息累积,直到累积消息达到`fetch.min.bytes`指定的大小,再将消息批量返回给消费者。`默认情况下,该参数的值为1`,代表fetch请求时消息会被立刻返回。增大该值会批量累积消息提升效率,但是也会增加fetch请求的延迟。 +- `fetch.max.bytes`:单个fetch请求中,broker server返回消息最大数据量大小。fetch操作请求数据时,数据将会被broker批量返回。如果如果分区中待返回第一条record的大小大于`fetch.max.bytes`指定的大小,那么该消息大小也会被返回,即使消息大小大于限制。`默认情况下,该参数的默认值为50m`。 +- `fetch.max.wait.ms`:在单个请求中,如果broker没有累积到`fetch.max.bytes`指定规模的消息数量,当broker阻塞时间超过`fetch.max.wait.ms`指定的超时时间后,也会将消息返回给消费者。`默认情况下,该参数默认值为500ms`。 + +在消费者通过fetch请求拉取到数据后,会将拉取的records缓存起来,然后在调用poll方法时返回缓存的record数据。 +- `max.poll.records`:该参数用于限制poll()调用返回的最大消息条数。`该参数默认值为500`,调用poll方法时,最多返回500条缓存数据。`max.poll.records`参数用于指定poll行为,并不会对fetch行为造成影响。