阅读关于kakfka接收消息的文档

This commit is contained in:
asahi
2024-02-22 18:46:58 +08:00
parent 0d653ba29f
commit 24ce571d45

View File

@@ -1624,7 +1624,35 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() {
}; };
} }
``` ```
### 过滤消息
在某些特定场景下例如发生rebalance已经被处理过的消息将会被重复传递。对此spring kafka提供了类`FilteringMessageListenerAdapter`,其可以对`MessageListener`进行包装。FilteringMessageListenerAdapter类接收一个`RecordFilterStrategy`的实现,`RecordFilterStrategy.filter`方法为一个predicate接收一条消息并返回boolean。若predicate返回为true则代表该消息是重复传递的应当被丢弃。
`FilteringMessageListenerAdapter`类存在一个名为`ackDiscarded`的属性若该属性被设置为true代表adapter也会针对被丢弃的消息执行ack操作。
当使用`@KafkaListener`时,若针对`KafkaListenerContainerFactory`设置`RecordFilterStrategy``ackDiscarded`属性listener将会被包装在特定的adapter中。
同样的spring kafka也提供了`FilteringBatchMessageListenerAdapter`类,用于批量消息的过滤。
从2.8.4开始,可以通过`@KafkaListener`注解的filter属性来覆盖container factory的`RecordFilterStrategy`属性。
```java
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
```
### 通过KafkaTemplate来接收消息
从2.8版本开始kafka template有4个方法用于接收消息
```java
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
```