阅读kafka关于@KafkaListener注解的文档
This commit is contained in:
@@ -1082,6 +1082,134 @@ public void listen(String str, ConsumerRecordMetadata meta) {
|
||||
```
|
||||
上述`ConsumerRecordMetadata`类型中包含`ConsumerRecord`中所有的数据,除了record中的key和value。
|
||||
|
||||
#### Batch Listener
|
||||
从版本1.1开始,可以通过@KafkaListener方法来接收record batch。
|
||||
|
||||
为了配置listener container factory支持batch listener,需要设置container factory的`batchListener`属性。如下展示了配置示例:
|
||||
```java
|
||||
@Bean
|
||||
public KafkaListenerContainerFactory<?> batchFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
|
||||
return factory;
|
||||
}
|
||||
```
|
||||
|
||||
> 从2.8版本开始,可以通过@KafkaListener属性来覆盖container factory的`batchListener`属性,只需要为@KafkaListener注解指定`batch`属性。
|
||||
|
||||
> 从2.9.6版本开始,container factory对于`recordMessageConverter`和`batchMessageConverter`有不同的setter。在2.9.6之前的版本中,只有`messageConverter`的setter,该converter被同时用于批量和单条场景。
|
||||
|
||||
如下示例展示了如何接收list类型的payload:
|
||||
```java
|
||||
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen(List<String> list,
|
||||
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
|
||||
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
|
||||
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
|
||||
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
|
||||
...
|
||||
}
|
||||
```
|
||||
record batch的topic、分区、offset等信息也可以批量获取,如下展示了如何批量获取header:
|
||||
```java
|
||||
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen(List<String> list,
|
||||
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
|
||||
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
|
||||
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
|
||||
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
|
||||
...
|
||||
}
|
||||
```
|
||||
除了上述方法外,还可以通过`List of Message<?>`来批量接收消息,但是,除了`Acknowledgment`或`Consumer<?, ?>`外,`List of Message<?>`应该是方法中唯一的参数。如下示例展示了如何使用:
|
||||
```java
|
||||
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen1(List<Message<?>> list) {
|
||||
...
|
||||
}
|
||||
|
||||
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen2(List<Message<?>> list, Acknowledgment ack) {
|
||||
...
|
||||
}
|
||||
|
||||
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
|
||||
...
|
||||
}
|
||||
```
|
||||
在上述示例中,并不会对payload执行任何转换。
|
||||
|
||||
如果当前存在`BatchMessagingMessageConverter`(`BatchMessagingMessageConverter`通过`RecordMessageConverter`配置),则可以为`Message<?>`添加一个泛型类型,payload将会被转化为该类型。
|
||||
|
||||
除了上述两种方式外,还可以接收`List<ConsumerRecord<?, ?>>`类型的参数,但是其必须为listener method的唯一参数(`Acknowledgment`类型和`Consumer<?, ?>`类型参数除外)。如下实例展示了如何使用:
|
||||
```java
|
||||
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen(List<ConsumerRecord<Integer, String>> list) {
|
||||
...
|
||||
}
|
||||
|
||||
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
|
||||
...
|
||||
}
|
||||
```
|
||||
从2.2版本开始,listener可以接收由poll方法返回的完整的`ConsumerRecords<?, ?>`对象(该record中封装了一个record的list)。从而允许listener访问额外的方法,例如`partitions()`方法,返回TopicPartition对象的集合。方法的使用示例如下:
|
||||
```java
|
||||
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
|
||||
public void pollResults(ConsumerRecords<?, ?> records) {
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
#### 注解属性
|
||||
从2.0版本开始,`id`属性被用作kafka consumer group的`group.id`属性,其会覆盖consumer factory中配置的属性。如果不想使用该行为,可以显式的另外设置`groupId`属性;或是将`idIsGroup`属性设置为false,此时`group.id`仍然会使用consumer factory中配置的属性。
|
||||
|
||||
对于@KafkaListener注解,可以在绝大多数属性中使用spel表达式和placeholder:
|
||||
```java
|
||||
@KafkaListener(topics = "${some.property}")
|
||||
|
||||
@KafkaListener(topics = "#{someBean.someProperty}",
|
||||
groupId = "#{someBean.someProperty}.group")
|
||||
```
|
||||
|
||||
从2.1.2版本开始,spel表达式支持`__listener`。该伪bean name代表当前该注解所位于的bean。
|
||||
|
||||
例如如下示例:
|
||||
```java
|
||||
@Bean
|
||||
public Listener listener1() {
|
||||
return new Listener("topic1");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Listener listener2() {
|
||||
return new Listener("topic2");
|
||||
}
|
||||
|
||||
public class Listener {
|
||||
|
||||
private final String topic;
|
||||
|
||||
public Listener(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
@KafkaListener(topics = "#{__listener.topic}",
|
||||
groupId = "#{__listener.topic}.group")
|
||||
public void listen(...) {
|
||||
...
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return this.topic;
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user