From 24ce571d45dcb22afdbc74b8f0925602a30f3a7c Mon Sep 17 00:00:00 2001 From: asahi Date: Thu, 22 Feb 2024 18:46:58 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BB=E5=85=B3=E4=BA=8Ekakfka?= =?UTF-8?q?=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E7=9A=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 30 ++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 90fc660..25de6ec 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1624,7 +1624,35 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() { }; } ``` - +### 过滤消息 +在某些特定场景下,例如发生rebalance,已经被处理过的消息将会被重复传递。对此,spring kafka提供了类`FilteringMessageListenerAdapter`,其可以对`MessageListener`进行包装。FilteringMessageListenerAdapter类接收一个`RecordFilterStrategy`的实现,`RecordFilterStrategy.filter`方法为一个predicate,接收一条消息并返回boolean。若predicate返回为true,则代表该消息是重复传递的,应当被丢弃。 + +`FilteringMessageListenerAdapter`类存在一个名为`ackDiscarded`的属性,若该属性被设置为true,代表adapter也会针对被丢弃的消息执行ack操作。 + +当使用`@KafkaListener`时,若针对`KafkaListenerContainerFactory`设置`RecordFilterStrategy`或`ackDiscarded`属性,listener将会被包装在特定的adapter中。 + +同样的,spring kafka也提供了`FilteringBatchMessageListenerAdapter`类,用于批量消息的过滤。 + +从2.8.4开始,可以通过`@KafkaListener`注解的filter属性来覆盖container factory的`RecordFilterStrategy`属性。 + +```java +@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter") +public void listen(Thing thing) { + ... +} +``` + +### 通过KafkaTemplate来接收消息 +从2.8版本开始,kafka template有4个方法用于接收消息: +```java +ConsumerRecord receive(String topic, int partition, long offset); + +ConsumerRecord receive(String topic, int partition, long offset, Duration pollTimeout); + +ConsumerRecords receive(Collection requested); + +ConsumerRecords receive(Collection requested, Duration pollTimeout); +```