继续关于Kafka文档的阅读
This commit is contained in:
@@ -669,9 +669,33 @@ public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
|
||||
ConcurrentMessageListenerContainer还有一个concurrency属性,例如,`container.setConcurrency(3)`会创建3个`KafkaMessageListenerContainer`实例。
|
||||
kafka会根据其组管理功能再消费者之间分配分区。
|
||||
> **kafka分配策略**
|
||||
> 当监听多个topic时,默认的分区分配策略可能并不是想要的。例如有三个topic,每个topic有5个分区,那么将concurrency设置为15时,只会看到5个活跃的consumer,其余的10个consumer都会空闲。
|
||||
> 当监听多个topic时,默认的分区分配策略可能并不是想要的。例如有三个topic,每个topic有5个分区,那么将concurrency设置为15时,只会看到5个活跃的consumer
|
||||
> **RangeAssignor**
|
||||
> 默认情况下,kafka的分区分配策略是通过RangeAssignor来进行分配的,其会将每个topic的分区在消费者group中所有消费者实例之间进行分配,如果消费者实例数大于分区数,则是会将消费者按字典顺序排序且分配分区给排序靠前的人。
|
||||
> 故而,在RangeAssignor策略下,只有字典排序靠前的消费者实例才能在每个topic中分配到一个分区,前5个消费者每个实例3个分区,后面的10个消费者完全空闲。
|
||||
> **RoundRobinAssignor**
|
||||
> 相对于RangeAssignor,RoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区,在上述情况下,15个消费者实例,每个都会分配到一个分区
|
||||
> 相对于RangeAssignor,RoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区,在上述情况下,15个消费者实例,每个都会分配到一个分区。
|
||||
>
|
||||
> 想要改变`PartitionAssignor`,可以再提供给DefaultKafkaConsumerFactory的参数中设置`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`属性。
|
||||
> 如果使用Spring Boot,可以按如下方式来配置:
|
||||
> ```properties
|
||||
> spring.kafka.consumer.properties.partition.assignment.strategy=
|
||||
> org.apache.kafka.clients.consumer.RoundRobinAssignor
|
||||
> ```
|
||||
|
||||
如果container属性通过`TopicPartitionOffset`配置,`ConcurrentMessageListenerContainer`将会把TopicPartitionOffset分发给所有其委托的KafkaMessageListenerContainer实例。
|
||||
> 如果提供了六个TopicPartitionOffset实例,并且concurrency被设置为3,那么每个实例都会获取到2个分区。
|
||||
> 如果5个TopicPartitionOffset实例被提供,且concurrency被设置为2,那么两个实例会获取到2个分区,一个实例会获取到一个分区。
|
||||
> 如果concurrency比TopicPartitionOffset的数量更大,那么concurrency会向下进行调整,让每个实例都获取一个分区。
|
||||
|
||||
**Committing Offsets**
|
||||
如果消费者属性`enable.auto.commit`被设置为true,Kafka会对offset进行自动提交。如果该选项被设置为false,container支持一些`AckMode`设置。默认的`AckMode`值为`Batch`。`enable.auto.commit`被默认设置为false,如果想要开启自动提交,可以手动将该属性设置为true。
|
||||
消费者的poll方法会返回一个或者多个ConsumerRecord。MessageListener对于每条record都会被调用。
|
||||
根据`AckMode`的值,container会执行如下操作:
|
||||
- RECORD:当listener处理完record返回之后,会提交Offset
|
||||
- BATCH:当poll方法返回的所有record都被处理之后,提交Offset
|
||||
- TIME:当poll方法返回的所有record都被处理之后,提交Offset,或者自上次提交之后已经超过了ackTime,也会提交Offset
|
||||
- COUNT:当poll方法返回的所有record都被处理之后,提交Offset,或者自从上次提交之后已经接收了超过ackCount条record,也会被提交
|
||||
- COUNT_TIME:和TIME和COUNT类似,但是只要TIME和COUNT任一满足,也会提交Offset
|
||||
- MANNUAL:message listener负责调用`Acknowledgment.acknowledge()`方法。除此之外,和BATCH语义相同
|
||||
- MANNUAL_IMMEDIATE:当listener调用`Acknowledgment.acknowledge()`之后,对Offset进行提交
|
||||
Reference in New Issue
Block a user