阅读spring kafka接收消息的文档

This commit is contained in:
asahi
2024-01-06 22:51:21 +08:00
parent c65d93d529
commit 4d28c2f1a6

View File

@@ -676,3 +676,178 @@ listener container必须要配置为`AckMode.MANUAL`模式或`AckMode.MANUAL_IMM
另外如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。 另外如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。
对于AggregatingReplyingKafkaTemplate`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。 对于AggregatingReplyingKafkaTemplate`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。
## 接收消息
在使用spring kafka时可以通过为MessageListenerContainer提供listener或是通过@KafkaListener注解来从kafka broker接收消息。
### Message Listener
当使用Message Listener container时必须为其提供一个listener用于接收消息。如下是8个用于接收消息的接口
```java
// interface-1
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
// interface-2
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// interface-3
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// interface-4
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// interface-5
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
// interface-6
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
// interface-7
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
// interface-8
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
```
1. interface-1当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理
2. interface-2当使用由容器管理的提交方法时可以使用该接口该接口针对单条消息进行处理
3. interface-3当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理该接口提供对consumer实例的访问
4. interface-4当使用由容器管理的提交方法时可以使用该接口该接口针对单挑消息进行处理该接口提供对consumer实例的访问
5. interface-5当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对`consumer.poll`方法返回的所有消息进行处理;当使用该接口时,不支持`AckMode.RECORD`模式因为所有的poll方法接收到的message batch都给了listener
6. interface-6该当使用容器管理的提交方法时可以使用该接口该接口会处理由poll方法接收到的所有消息
7. interface-7当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对poll方法返回的所有消息进行处理使用该接口时不支持`AckMode.RECORD`使用该接口可以针对consumer实例进行处理
8. interface-8当使用由容器管理的commit method时可以使用该接口该接口针对由poll方法返回的全部消息进行处理该接口提供对consumer实例的访问
> listener接口提供的consumer对象并不是线程安全的故而必须在调用该listener的线程内访问该consumer中的方法
> 在对consumer中的方法进行访问时不应该在listener中执行任何会修改consumer position或commit offset的方法position或offset信息由container来进行管理。
### MessageListenerContainer
spring kafka提供了两种MessageListenerContainer的实现
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
上述两种实现的区别如下:
- KafkaMessageListenerContainer在单个线程中对来源所有topic所有分区的消息进行接收
- ConcurrentMessageListenerContainer则是通过将接收消息的任务委托给多个KafkaMessageListenerContainer实例来实现多线程消费。
#### Interceptors
从2.2.7版本开始可以为listener container添加`RecordInterceptor`拦截器会在container调用listener之前被调用可以通过拦截器来查看或是修改消息内容。如果interceptor在接收到消息或返回为空那么后续listener将不会被调用。
从2.7版本开始RecordInterceptor还增加了方法`afterRecord`该方法在listener退出之后调用正常退出或是抛异常退出。并且从2.7版本开始,还新增了`BatchInterceptor`为Batch Listener提供了类似的拦截器功能ConsumerAwareRecordInterceptor也在提供拦截器功能的基础上提供了对consumer实例的访问。可以通过kafka consumer来访问consumer metrics。
> 如果拦截器对record进行了修改同构创建一个新的record那么topic、partition、offset必须都保持不变从而避免类似消息丢失的副作用
为了调用多个interceptors可以使用`CompositeRecordInterceptor``CompositeBatchInterceptor`
默认情况下从2.8版本开始在使用事务时拦截器会在事务开启之前被调用。可以将listener container的`interceptBeforeTx`属性修改为`false`从而在事务开启之后调用拦截器方法。从2.9版本开始上述会被应用于任何transaction manager而不单单只用于`KafkaAwareTransactionManager`这允许拦截器加入到由container开启的jdbc事务。
从2.3.8、2.4.6版本开始,`ConcurrentMessageListenerContainer`支持Static Membership当concurrency大于1时。而`group.instance.id`的后缀则是`-n``n`从1开始。与此同时增加`session.timeout.ms`可以减少rebalance event的数量例如当应用实例重启时。
#### 使用KafkaMessageListenerContainer
在创建KafkaMessageListenerContainer时可以使用如下构造方法
```java
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
```
上述构造函数接收一个consumerFactory并且通过containerProperties接收topic和partition的信息此外containerProperties中还包含其他配置信息。
containerProperties拥有如下构造方法
```java
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
```
第一个构造方法中接收一个`TopicPartitionOffset`数组通过该数组可以显式的告知container应该使用哪些分区通过调用consumer的assign方法。并且可以选择为TopicPartitionOffset指定初始的offset。当`relativeToCurrent`参数为false时默认为false如果指定offset为正数默认代表绝对offset如果offset为负数默认代表相对于该分区最后位置的偏移量。如果`relativeToCurrent`为true初始offset将会被设置为相对当前的consumer position。当容器启动时设置的offset将会被应用。
第二个构造方法接收了一个topic数组kafka则会根据`group.id`属性来分配分区将分区在组内的订阅了该topic的消费者实例之间进行分配。
第三个构造方法则是接收一个topicPattern通过该pattern来匹配分区。
在创建container时为了将MessageListener分配给container可以使用`ContainerProperties.setMessageListener`。如下展示了添加listener的示例
```java
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
在上述示例中创建DefaultKafkaConsumerFactory实例时其构造方法只接受了一个ConsumerProperties属性代表其会在属性中获取key和value的`Deserializer.class`并创建实例。同时也可以通过调用DefaultKafkaConsumerFactory的其他构造方法接收key和value的Deserializer实例在这种情况下所有consumer都会公用key和value的反序列化实例。另一种选择时提供`Supplier<Deserializer>`这种情况下每个consumer都会使用不同的反序列化器实例。
```java
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
从2.2.1版本开始,提供了一个叫`logContainerConfig`的配置属性当该属性设置为true并且info日志级别开启时每个listener container都会写日志记录其配置。
默认情况下会在debug的日志级别记录topic offset commit。从2.1.2开始,通过`ContainerProperties``commitLogLevel`的属性可以指定commit offset的日志级别。如果要将offset commit的日志级别改为info可以调用`containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)`方法。
从2.2版本开始,添加了`missingTopicsFatal`属性该属性默认值为false。如果在container启动时其配置的任一topic在broker中并不存在那么其将会阻止container启动。如果container被配置为监听pattern那么missingTopicsFatal配置将不会被应用。在未指定missingTopicsFatal时container线程将会循环调用`consumer.poll`方法等待topic出现同时日志输出消息除了日志之外没有任何迹象显示出现了问题。
在2.8版本中,引入了`authExceptionRetryInterval`属性。该属性会导致container在kafkaConsumer得到`AuthenticationException``AuthorizationException`异常后重试fetch消息。该属性用于kafka consumer获取topic消息权限认证失败时在等待`authExceptionRetryInterval`时间间隔后,可以重试获取消息,此时权限可能已经被授予。
> 在默认情况下,没有配置`authExceptionRetryInterval`AuthenticationException和AuthorizationException将被认为是致命的这将会导致messageListenerContainer停止。
#### 使用ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer的构造函数类似于KafkaMessageListenerContainer
```java
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
```
除此之外ConcurrentMessageListenerContainer还有一个concurrency属性该属性代表并行度。如果调用`container.setConcurrency(3)`方法其会创建3个`KafkaMessageListenerContainer`.
对于该构造方法kafka使用组管理功能在消费者实例之间分配分区。
如果container通过TopicPartitonOffset配置时ConcurrentMessageListenerContainer将TopicPartitionOffset实例分配到多个委托的KafkaMessageListenerContainer实例上每个委托实例负责监听不同的TopicPartitionOffset。
如果6个TopicPartitionOffset被提供并且concurrency被设置为3每个委托实例将会被分配2个TopicPartitionOffset。如果5个TopicPartitionOffset被提供并且concurrency被设置为3那么两个委托容器实例将会被分配到2个分区第三个委托容器将被分配到1个分区。如果concurrency数大于TopicPartitionOffset数量那么concurrency将会降低到和TopicPartitionOffset数量相同每个委托容器实例被分配到一个分区。
从1.3版本开始,`MessageListenerContainer`提供了对底层`KafkaConsumer`的metrics信息访问。对于`ConcurrentMessageListenerContainer`其metircs方法将会返回一个map该map中记录了所有委托KafkaMessageListenerContainer实例的metics信息。Map类型为`Map<String, Map<MetricName, ? extends Metric>>`,其中key为底层KafkaConsumer的`client-id`.
从2.3版本开始ContainerProperties提供了`idleBetweenPolls`属性允许listener container在调用`consumer.poll()`方法循环拉取数据时睡眠一段时间。实际的睡眠时间将会取如下两个值中较小的值:`idleBetweenPolls`属性实际配置的值;`max.poll.interval.ms`值减去当前消息批处理时间的差值。
> `max.poll.interval.ms`
>
> 该值代表在使用消费者组管理功能时,两次`poll()`调用之间最长时间间隔。该值代表在调用完poll方法后在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后poll方法仍然未被调用那么该消费者示例将会被认为失败并且消费者组会出发rebalance操作将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的`group.instance.id`不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过`session.timeout.ms`时间后出发rebalance。`group.instance.id`不为空时,该消费者实例将会被认为是该消费者组的静态成员。
>
> 默认情况下,`max.poll.interval.ms`的默认值为5min。