From e608c50af14e3aaf52af478919a18ebe9b3ee590 Mon Sep 17 00:00:00 2001 From: Rikako Wu <496063163@qq.com> Date: Thu, 2 Mar 2023 16:21:35 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=85=B3=E4=BA=8EKafka?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E7=9A=84=E9=98=85=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/Spring for Apache Kafka.md | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/mq/kafka/Spring for Apache Kafka.md b/mq/kafka/Spring for Apache Kafka.md index 0aae368..2c994a1 100644 --- a/mq/kafka/Spring for Apache Kafka.md +++ b/mq/kafka/Spring for Apache Kafka.md @@ -669,9 +669,33 @@ public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory, ConcurrentMessageListenerContainer还有一个concurrency属性,例如,`container.setConcurrency(3)`会创建3个`KafkaMessageListenerContainer`实例。 kafka会根据其组管理功能再消费者之间分配分区。 > **kafka分配策略** -> 当监听多个topic时,默认的分区分配策略可能并不是想要的。例如有三个topic,每个topic有5个分区,那么将concurrency设置为15时,只会看到5个活跃的consumer,其余的10个consumer都会空闲。 +> 当监听多个topic时,默认的分区分配策略可能并不是想要的。例如有三个topic,每个topic有5个分区,那么将concurrency设置为15时,只会看到5个活跃的consumer > **RangeAssignor** > 默认情况下,kafka的分区分配策略是通过RangeAssignor来进行分配的,其会将每个topic的分区在消费者group中所有消费者实例之间进行分配,如果消费者实例数大于分区数,则是会将消费者按字典顺序排序且分配分区给排序靠前的人。 > 故而,在RangeAssignor策略下,只有字典排序靠前的消费者实例才能在每个topic中分配到一个分区,前5个消费者每个实例3个分区,后面的10个消费者完全空闲。 > **RoundRobinAssignor** -> 相对于RangeAssignor,RoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区,在上述情况下,15个消费者实例,每个都会分配到一个分区 \ No newline at end of file +> 相对于RangeAssignor,RoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区,在上述情况下,15个消费者实例,每个都会分配到一个分区。 +> +> 想要改变`PartitionAssignor`,可以再提供给DefaultKafkaConsumerFactory的参数中设置`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`属性。 +> 如果使用Spring Boot,可以按如下方式来配置: +> ```properties +> spring.kafka.consumer.properties.partition.assignment.strategy= +> org.apache.kafka.clients.consumer.RoundRobinAssignor +> ``` + +如果container属性通过`TopicPartitionOffset`配置,`ConcurrentMessageListenerContainer`将会把TopicPartitionOffset分发给所有其委托的KafkaMessageListenerContainer实例。 +> 如果提供了六个TopicPartitionOffset实例,并且concurrency被设置为3,那么每个实例都会获取到2个分区。 +> 如果5个TopicPartitionOffset实例被提供,且concurrency被设置为2,那么两个实例会获取到2个分区,一个实例会获取到一个分区。 +> 如果concurrency比TopicPartitionOffset的数量更大,那么concurrency会向下进行调整,让每个实例都获取一个分区。 + +**Committing Offsets** +如果消费者属性`enable.auto.commit`被设置为true,Kafka会对offset进行自动提交。如果该选项被设置为false,container支持一些`AckMode`设置。默认的`AckMode`值为`Batch`。`enable.auto.commit`被默认设置为false,如果想要开启自动提交,可以手动将该属性设置为true。 +消费者的poll方法会返回一个或者多个ConsumerRecord。MessageListener对于每条record都会被调用。 +根据`AckMode`的值,container会执行如下操作: +- RECORD:当listener处理完record返回之后,会提交Offset +- BATCH:当poll方法返回的所有record都被处理之后,提交Offset +- TIME:当poll方法返回的所有record都被处理之后,提交Offset,或者自上次提交之后已经超过了ackTime,也会提交Offset +- COUNT:当poll方法返回的所有record都被处理之后,提交Offset,或者自从上次提交之后已经接收了超过ackCount条record,也会被提交 +- COUNT_TIME:和TIME和COUNT类似,但是只要TIME和COUNT任一满足,也会提交Offset +- MANNUAL:message listener负责调用`Acknowledgment.acknowledge()`方法。除此之外,和BATCH语义相同 +- MANNUAL_IMMEDIATE:当listener调用`Acknowledgment.acknowledge()`之后,对Offset进行提交 \ No newline at end of file