diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 165b5b7..a46c74b 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1209,6 +1209,51 @@ public class Listener { } ``` +如果在项目中,已经存在了名为`__listener`的bean实例,那么可以通过`beanRef`属性来对属性表达式进行修改。如下示例展示了如何使用`beanRef`: +```java +@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group") +``` +从版本2.2.4开始,可以直接通过@KafkaListener注解指定consumer属性,通过注解指定的consumer属性会覆盖在consumer factory中指定的属性。 + +> 但是,无法通过`@KafkaListener`注解的上述方法来指定`group.id`和`client.id`属性,通过上述方法指定这两个属性时,指定会被忽略;需要使用`groupId`和`clientIdPrefix`注解属性来指定consumer属性中的`group.id`和`client.id`属性 + +在通过@KafkaListener来指定consumer属性时,通过独立的字符串按照`foo:bar`、`foo bar`、`foo=bar`的格式进行指定,示例如下所示: +```java +@KafkaListener(topics = "myTopic", groupId = "group", properties = { + "max.poll.interval.ms:60000", + ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100" +}) + +@KafkaListener(id = "one", topics = "one") +public void listen1(String in) { + System.out.println("1: " + in); +} + +@KafkaListener(id = "two", topics = "two", + properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer") +public void listen2(byte[] in) { + System.out.println("2: " + new String(in)); +} +``` +### 获取consumer的`group.id` +在不同container中运行相同listener代码时,需要识别该消息来源于哪个container(通过`group.id`标识)。 + +在需要获取container的groupId时,可以在listener线程中调用`KafkaUtils.getConsumerGroupId()`方法;也可以在方法参数中访问`group.id`属性: + +```java +@KafkaListener(id = "id", topicPattern = "someTopic") +public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) String groupId) { + ... +} +``` +### container线程命名 +框架通过一个TaskExecutor来调用consumer和listener。可以在container properties中通过设置`consumerExecutor`属性来设置一个自定义的executor。当使用pooled executor时,需要确保线程池中含有足够的线程来处理来自所有container的并发。当使用`ConcurrentMessageListenerContainer`时,executor中的线程会被所有consumer(concurrency)使用。 + +如果没有为container提供consumer executor,那么会为每个container提供`SimpleAsyncTaskExecutor`。 + +> SimpleAsyncTaskExecutor会为所有的task都开启一个新线程,故而该类型的executor并不会对线程进行重用。 + +