diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 40babb5..45548ee 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -48,6 +48,8 @@ - [Prototype Beans](#prototype-beans) - [Topic/Partition初始offset](#topicpartition初始offset) - [seek到指定offset](#seek到指定offset) + - [Container Factory](#container-factory) + - [线程安全](#线程安全) # Spring Kafka @@ -1961,8 +1963,45 @@ public class SomeOtherBean { } ``` - - +## Container Factory +和之前在`@KafkaListener`中提到的一样,对于被`@KafkaListener`注解的方法,`ConcurrentKafkaListenerContainerFactory`将会被用于创建@KafkaListener方法对应的container。 + +从2.2版本开始,可以使用相同的factory来创建任何ConcurrentMessageListenerContainer。一旦container创建后,可以进一步改变container的属性,大多数属性都可以通过`container.getContainerProperties()`来进行修改。如下示例配置了`ConcurrentMessageListenerContainer`: +```java +@Bean +public ConcurrentMessageListenerContainer( + ConcurrentKafkaListenerContainerFactory factory) { + + ConcurrentMessageListenerContainer container = + factory.createContainer("topic1", "topic2"); + container.setMessageListener(m -> { ... } ); + return container; +} +``` +> 通过上述@Bean方式创建的container并不会被加入到endpoint registry中,而是会作为bean对象被注入到spring容器中。 +> +> 而通过@KafkaListener创建的container,则是并不会被注册为bean对象,而是统一通过endpoint registry进行管理。 + +从2.3.4版本开始,可以添加为factory添加`ContainerCustomizer`,在factory已经被配置后进一步的对container进行自定义设置。 +```java +@Bean +public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + ... + factory.setContainerCustomizer(container -> { /* customize the container */ }); + return factory; +} +``` + +## 线程安全 +当使用concurrent message listener container时,单个listener将会在所有consumer thread中被调用。故而listener需要是线程安全的,为了保证listener的线程安全,更倾向于使用无状态的listener(stateless)。如果无法将listener变为线程安全的,或是通过synchronization进行同步会大幅降性能,可以使用如下的技术之一: +- 使用n个container,每个container的concurrency为1,并且container中的MessageListener其scope为prototype,故而每个container都独占一个listener(在使用@KafkaListener时,该方案无法实现) +- 将状态保存在ThreadLocal中 + +为了促进线程状态的清理,从2.2版本开始,每个线程退出时,listener container将会发布`ConsumerStoppedEvent`事件。可以通过`ApplicationListener`或是`@EventListener`来消费该事件,从而对`ThreadLocal`对象调用remove方法来清除线程状态。 + +> 默认情况下,spring将会在事件触发的线程中调用event listener。如果将multicaster改为async executor,那么清理操作将没有作用。