diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index f3b268e..e257276 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -920,5 +920,85 @@ nack只能在调用listener的consumer线程中进行处理。 并且,当`asyncAcks`启用时,无法使用`nack()`方法来进行nack操作。 +### @KafkaListener +`@KafkaListener`会将一个bean方法标记为listener container的listener方法。该bean被封装在一个`MessagingMessageListenerAdapter`对象中,该Adapter对象的构造方法如下所示,封装了bean对象和bean对象方法: +```java +public MessagingMessageListenerAdapter(Object bean, Method method) { + this.bean = bean; + this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final + } +``` + +在配置@KafkaListener注解的绝大部分属性时,可以使用`#{}`形式的spel表达式或`${}`形式的属性占位符。 + +#### Record Listener +@KafkaListener注解为简单pojo的listenr提供了机制,如下展示了使用示例: +```java +public class Listener { + @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") + public void listen(String data) { + ... + } + +} +``` +该机制需要在@Configuration类上标注`@EnableKafka`注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的`ConcurrentMessageListenerContainer`。默认情况下,需要一个bean name为`kafkaListenerContainerFactory`的bean对象。如下示例展示了如何使用`ConcurrentMessageListenerContainer`: +```java +@Configuration +@EnableKafka +public class KafkaConfig { + + @Bean + KafkaListenerContainerFactory> + kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(3); + factory.getContainerProperties().setPollTimeout(3000); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); + ... + return props; + } +} +``` +在上述示例中,如果要设置container properties,必须要在container factory对象上调用getContainerProperties(),然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。 + +从版本2.1.1开始,对由该注解创建的消费者实例,可以设置`client.id`属性。`client.id`为`clientIdPrefix-n`格式,其中n为整数,代表在使用concurrency时的container number。 + +从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的`concurrency`和`autoStartup`属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示: +```java +@KafkaListener(id = "myListener", topics = "myTopic", + autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") +public void listen(String data) { + ... +} +``` + +#### 显式分区分配 +可以通过@KafkaListener注解为pojo listener配置显式的topic和分区(也可以为其指定一个初始offset)。如下示例展示了如何为@KafkaListener指定topic和offset: +```java +@KafkaListener(id = "thing2", topicPartitions = + { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), + @TopicPartition(topic = "topic2", partitions = "0", + partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) + }) +public void listen(ConsumerRecord record) { + ... +} +``` +如上图所示,可以在topicPartitions属性的`@TopicPartition的partitions属性`中或是`@TopicPartitionpation的partitionOffsets属性`中指定分区,但是无法同时在上述两处都指定同一个分区。 +