From 08b7bc1a7866c07c9b88fb1b88e78063ac1f1839 Mon Sep 17 00:00:00 2001 From: asahi Date: Tue, 27 Feb 2024 20:59:16 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBkafka=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 43 +++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 40babb5..45548ee 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -48,6 +48,8 @@ - [Prototype Beans](#prototype-beans) - [Topic/Partition初始offset](#topicpartition初始offset) - [seek到指定offset](#seek到指定offset) + - [Container Factory](#container-factory) + - [线程安全](#线程安全) # Spring Kafka @@ -1961,8 +1963,45 @@ public class SomeOtherBean { } ``` - - +## Container Factory +和之前在`@KafkaListener`中提到的一样,对于被`@KafkaListener`注解的方法,`ConcurrentKafkaListenerContainerFactory`将会被用于创建@KafkaListener方法对应的container。 + +从2.2版本开始,可以使用相同的factory来创建任何ConcurrentMessageListenerContainer。一旦container创建后,可以进一步改变container的属性,大多数属性都可以通过`container.getContainerProperties()`来进行修改。如下示例配置了`ConcurrentMessageListenerContainer`: +```java +@Bean +public ConcurrentMessageListenerContainer( + ConcurrentKafkaListenerContainerFactory factory) { + + ConcurrentMessageListenerContainer container = + factory.createContainer("topic1", "topic2"); + container.setMessageListener(m -> { ... } ); + return container; +} +``` +> 通过上述@Bean方式创建的container并不会被加入到endpoint registry中,而是会作为bean对象被注入到spring容器中。 +> +> 而通过@KafkaListener创建的container,则是并不会被注册为bean对象,而是统一通过endpoint registry进行管理。 + +从2.3.4版本开始,可以添加为factory添加`ContainerCustomizer`,在factory已经被配置后进一步的对container进行自定义设置。 +```java +@Bean +public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + ... + factory.setContainerCustomizer(container -> { /* customize the container */ }); + return factory; +} +``` + +## 线程安全 +当使用concurrent message listener container时,单个listener将会在所有consumer thread中被调用。故而listener需要是线程安全的,为了保证listener的线程安全,更倾向于使用无状态的listener(stateless)。如果无法将listener变为线程安全的,或是通过synchronization进行同步会大幅降性能,可以使用如下的技术之一: +- 使用n个container,每个container的concurrency为1,并且container中的MessageListener其scope为prototype,故而每个container都独占一个listener(在使用@KafkaListener时,该方案无法实现) +- 将状态保存在ThreadLocal中 + +为了促进线程状态的清理,从2.2版本开始,每个线程退出时,listener container将会发布`ConsumerStoppedEvent`事件。可以通过`ApplicationListener`或是`@EventListener`来消费该事件,从而对`ThreadLocal`对象调用remove方法来清除线程状态。 + +> 默认情况下,spring将会在事件触发的线程中调用event listener。如果将multicaster改为async executor,那么清理操作将没有作用。