diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 5f51f47..165b5b7 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1082,6 +1082,134 @@ public void listen(String str, ConsumerRecordMetadata meta) { ``` 上述`ConsumerRecordMetadata`类型中包含`ConsumerRecord`中所有的数据,除了record中的key和value。 +#### Batch Listener +从版本1.1开始,可以通过@KafkaListener方法来接收record batch。 + +为了配置listener container factory支持batch listener,需要设置container factory的`batchListener`属性。如下展示了配置示例: +```java +@Bean +public KafkaListenerContainerFactory batchFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<< + return factory; +} +``` + +> 从2.8版本开始,可以通过@KafkaListener属性来覆盖container factory的`batchListener`属性,只需要为@KafkaListener注解指定`batch`属性。 + +> 从2.9.6版本开始,container factory对于`recordMessageConverter`和`batchMessageConverter`有不同的setter。在2.9.6之前的版本中,只有`messageConverter`的setter,该converter被同时用于批量和单条场景。 + +如下示例展示了如何接收list类型的payload: +```java +@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") +public void listen(List list, + @Header(KafkaHeaders.RECEIVED_KEY) List keys, + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions, + @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, + @Header(KafkaHeaders.OFFSET) List offsets) { + ... +} +``` +record batch的topic、分区、offset等信息也可以批量获取,如下展示了如何批量获取header: +```java +@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") +public void listen(List list, + @Header(KafkaHeaders.RECEIVED_KEY) List keys, + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions, + @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, + @Header(KafkaHeaders.OFFSET) List offsets) { + ... +} +``` +除了上述方法外,还可以通过`List of Message`来批量接收消息,但是,除了`Acknowledgment`或`Consumer`外,`List of Message`应该是方法中唯一的参数。如下示例展示了如何使用: +```java +@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") +public void listen1(List> list) { + ... +} + +@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") +public void listen2(List> list, Acknowledgment ack) { + ... +} + +@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory") +public void listen3(List> list, Acknowledgment ack, Consumer consumer) { + ... +} +``` +在上述示例中,并不会对payload执行任何转换。 + +如果当前存在`BatchMessagingMessageConverter`(`BatchMessagingMessageConverter`通过`RecordMessageConverter`配置),则可以为`Message`添加一个泛型类型,payload将会被转化为该类型。 + +除了上述两种方式外,还可以接收`List>`类型的参数,但是其必须为listener method的唯一参数(`Acknowledgment`类型和`Consumer`类型参数除外)。如下实例展示了如何使用: +```java +@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") +public void listen(List> list) { + ... +} + +@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") +public void listen(List> list, Acknowledgment ack) { + ... +} +``` +从2.2版本开始,listener可以接收由poll方法返回的完整的`ConsumerRecords`对象(该record中封装了一个record的list)。从而允许listener访问额外的方法,例如`partitions()`方法,返回TopicPartition对象的集合。方法的使用示例如下: +```java +@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory") +public void pollResults(ConsumerRecords records) { + ... +} +``` + +#### 注解属性 +从2.0版本开始,`id`属性被用作kafka consumer group的`group.id`属性,其会覆盖consumer factory中配置的属性。如果不想使用该行为,可以显式的另外设置`groupId`属性;或是将`idIsGroup`属性设置为false,此时`group.id`仍然会使用consumer factory中配置的属性。 + +对于@KafkaListener注解,可以在绝大多数属性中使用spel表达式和placeholder: +```java +@KafkaListener(topics = "${some.property}") + +@KafkaListener(topics = "#{someBean.someProperty}", + groupId = "#{someBean.someProperty}.group") +``` + +从2.1.2版本开始,spel表达式支持`__listener`。该伪bean name代表当前该注解所位于的bean。 + +例如如下示例: +```java +@Bean +public Listener listener1() { + return new Listener("topic1"); +} + +@Bean +public Listener listener2() { + return new Listener("topic2"); +} + +public class Listener { + + private final String topic; + + public Listener(String topic) { + this.topic = topic; + } + + @KafkaListener(topics = "#{__listener.topic}", + groupId = "#{__listener.topic}.group") + public void listen(...) { + ... + } + + public String getTopic() { + return this.topic; + } + +} +``` +