继续关于Kafka文档的阅读
This commit is contained in:
@@ -619,12 +619,17 @@ public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends Ba
|
|||||||
- ConcurrentMessageListenerContainer
|
- ConcurrentMessageListenerContainer
|
||||||
|
|
||||||
KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例,从而提供多线程的消费。
|
KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例,从而提供多线程的消费。
|
||||||
|
|
||||||
|
**RecordInterceptor**
|
||||||
|
|
||||||
可以添加一个RecordInterceptor到ListenerContainer,在调用listener之前,interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null,那么listener则不会被调用。而且,其还有一个额外的方法,可以在listener退出之后被调用(退出之后,指正常退出或以抛出异常的形式退出)。
|
可以添加一个RecordInterceptor到ListenerContainer,在调用listener之前,interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null,那么listener则不会被调用。而且,其还有一个额外的方法,可以在listener退出之后被调用(退出之后,指正常退出或以抛出异常的形式退出)。
|
||||||
还有一个BatchInterceptor,提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。
|
还有一个BatchInterceptor,提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。
|
||||||
如果想要调用多个Interceptor,则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
|
如果想要调用多个Interceptor,则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
|
||||||
默认情况下,当使用事务时,interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。
|
默认情况下,当使用事务时,interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。
|
||||||
ConcurrentMessageListenerContainer支持“静态成员”(即固定消费者,即使消费者实例重启,如此可以降低事件在消费者之间重新负载均衡的开销),当并发量大于1时。`group.instance.id`的后缀为`-n`,其中n从1开始。
|
ConcurrentMessageListenerContainer支持“静态成员”(即固定消费者,即使消费者实例重启,如此可以降低事件在消费者之间重新负载均衡的开销),当并发量大于1时。`group.instance.id`的后缀为`-n`,其中n从1开始。
|
||||||
|
|
||||||
|
**KafkaMessageListenerContainer**
|
||||||
|
|
||||||
可以通过如下构造函数来使用MessageListenerContainer:
|
可以通过如下构造函数来使用MessageListenerContainer:
|
||||||
```java
|
```java
|
||||||
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
|
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
|
||||||
@@ -638,3 +643,35 @@ public ContainerProperties(String... topics)
|
|||||||
public ContainerProperties(Pattern topicPattern)
|
public ContainerProperties(Pattern topicPattern)
|
||||||
```
|
```
|
||||||
- 第一个构造方法接收一个TopicPartitionOffset的数组作为参数,来显式指定container使用哪些分区(通过consumer的assign方法),并可以附带一个可选的初始offset。默认情况下,如果offset为正值,代表其是绝对的offset。若offset为负值,则offset代表在默认分区中相对于current last offset的相对位置。并且,对于TopicPartitionOffset类,其提供了一个接收额外boolean参数的构造方法,如果该值设置为true,无论init offset为正值或者负值,都是相对于consumer当前位置的相对值。当容器启动时,offset将会被使用。
|
- 第一个构造方法接收一个TopicPartitionOffset的数组作为参数,来显式指定container使用哪些分区(通过consumer的assign方法),并可以附带一个可选的初始offset。默认情况下,如果offset为正值,代表其是绝对的offset。若offset为负值,则offset代表在默认分区中相对于current last offset的相对位置。并且,对于TopicPartitionOffset类,其提供了一个接收额外boolean参数的构造方法,如果该值设置为true,无论init offset为正值或者负值,都是相对于consumer当前位置的相对值。当容器启动时,offset将会被使用。
|
||||||
|
- 第二个构造方法接收一个topic数组,并且kafka根据`group.id`属性来分配分区(在group中对分区进行分配)。
|
||||||
|
- 第三个构造方法接收一个正则表达式,根据正则表达式来选中topic
|
||||||
|
|
||||||
|
想要为容器指定一个MessageListener,可以在创建容器时使用ContainerProps.setMessageListener方法。如下是创建容器时设置MessageListener的示例:
|
||||||
|
```java
|
||||||
|
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
|
||||||
|
containerProps.setMessageListener(new MessageListener<Integer, String>() {
|
||||||
|
...
|
||||||
|
});
|
||||||
|
DefaultKafkaConsumerFactory<Integer, String> cf =
|
||||||
|
new DefaultKafkaConsumerFactory<>(consumerProps());
|
||||||
|
KafkaMessageListenerContainer<Integer, String> container =
|
||||||
|
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||||
|
return container;
|
||||||
|
```
|
||||||
|
`missingTopicsFatal`可以控制topic不存在时container是否启动(`missingTopicsFatal`的值默认为false)。如果任一topic在broker中不存在,那么container的启动会被终止。
|
||||||
|
|
||||||
|
**ConcurrentMessageListenerContainer**
|
||||||
|
|
||||||
|
`ConcurrentMessageListenerContainer`唯一的构造方法和`KafkaListenerContainer`类似,如下显示了构造方法的签名:
|
||||||
|
```java
|
||||||
|
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
|
||||||
|
```
|
||||||
|
ConcurrentMessageListenerContainer还有一个concurrency属性,例如,`container.setConcurrency(3)`会创建3个`KafkaMessageListenerContainer`实例。
|
||||||
|
kafka会根据其组管理功能再消费者之间分配分区。
|
||||||
|
> **kafka分配策略**
|
||||||
|
> 当监听多个topic时,默认的分区分配策略可能并不是想要的。例如有三个topic,每个topic有5个分区,那么将concurrency设置为15时,只会看到5个活跃的consumer,其余的10个consumer都会空闲。
|
||||||
|
> **RangeAssignor**
|
||||||
|
> 默认情况下,kafka的分区分配策略是通过RangeAssignor来进行分配的,其会将每个topic的分区在消费者group中所有消费者实例之间进行分配,如果消费者实例数大于分区数,则是会将消费者按字典顺序排序且分配分区给排序靠前的人。
|
||||||
|
> 故而,在RangeAssignor策略下,只有字典排序靠前的消费者实例才能在每个topic中分配到一个分区,前5个消费者每个实例3个分区,后面的10个消费者完全空闲。
|
||||||
|
> **RoundRobinAssignor**
|
||||||
|
> 相对于RangeAssignor,RoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区,在上述情况下,15个消费者实例,每个都会分配到一个分区
|
||||||
Reference in New Issue
Block a user