阅读spring kafka关于接收消息文档
This commit is contained in:
@@ -920,5 +920,85 @@ nack只能在调用listener的consumer线程中进行处理。
|
||||
|
||||
并且,当`asyncAcks`启用时,无法使用`nack()`方法来进行nack操作。
|
||||
|
||||
### @KafkaListener
|
||||
`@KafkaListener`会将一个bean方法标记为listener container的listener方法。该bean被封装在一个`MessagingMessageListenerAdapter`对象中,该Adapter对象的构造方法如下所示,封装了bean对象和bean对象方法:
|
||||
```java
|
||||
public MessagingMessageListenerAdapter(Object bean, Method method) {
|
||||
this.bean = bean;
|
||||
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
|
||||
}
|
||||
```
|
||||
|
||||
在配置@KafkaListener注解的绝大部分属性时,可以使用`#{}`形式的spel表达式或`${}`形式的属性占位符。
|
||||
|
||||
#### Record Listener
|
||||
@KafkaListener注解为简单pojo的listenr提供了机制,如下展示了使用示例:
|
||||
```java
|
||||
public class Listener {
|
||||
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
|
||||
public void listen(String data) {
|
||||
...
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
该机制需要在@Configuration类上标注`@EnableKafka`注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的`ConcurrentMessageListenerContainer`。默认情况下,需要一个bean name为`kafkaListenerContainerFactory`的bean对象。如下示例展示了如何使用`ConcurrentMessageListenerContainer`:
|
||||
```java
|
||||
@Configuration
|
||||
@EnableKafka
|
||||
public class KafkaConfig {
|
||||
|
||||
@Bean
|
||||
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
|
||||
kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
factory.setConcurrency(3);
|
||||
factory.getContainerProperties().setPollTimeout(3000);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<Integer, String> consumerFactory() {
|
||||
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Map<String, Object> consumerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
|
||||
...
|
||||
return props;
|
||||
}
|
||||
}
|
||||
```
|
||||
在上述示例中,如果要设置container properties,必须要在container factory对象上调用getContainerProperties(),然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。
|
||||
|
||||
从版本2.1.1开始,对由该注解创建的消费者实例,可以设置`client.id`属性。`client.id`为`clientIdPrefix-n`格式,其中n为整数,代表在使用concurrency时的container number。
|
||||
|
||||
从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的`concurrency`和`autoStartup`属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示:
|
||||
```java
|
||||
@KafkaListener(id = "myListener", topics = "myTopic",
|
||||
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
|
||||
public void listen(String data) {
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
#### 显式分区分配
|
||||
可以通过@KafkaListener注解为pojo listener配置显式的topic和分区(也可以为其指定一个初始offset)。如下示例展示了如何为@KafkaListener指定topic和offset:
|
||||
```java
|
||||
@KafkaListener(id = "thing2", topicPartitions =
|
||||
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
|
||||
@TopicPartition(topic = "topic2", partitions = "0",
|
||||
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
|
||||
})
|
||||
public void listen(ConsumerRecord<?, ?> record) {
|
||||
...
|
||||
}
|
||||
```
|
||||
如上图所示,可以在topicPartitions属性的`@TopicPartition的partitions属性`中或是`@TopicPartitionpation的partitionOffsets属性`中指定分区,但是无法同时在上述两处都指定同一个分区。
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user