阅读kafka线程安全文档
This commit is contained in:
@@ -48,6 +48,8 @@
|
|||||||
- [Prototype Beans](#prototype-beans)
|
- [Prototype Beans](#prototype-beans)
|
||||||
- [Topic/Partition初始offset](#topicpartition初始offset)
|
- [Topic/Partition初始offset](#topicpartition初始offset)
|
||||||
- [seek到指定offset](#seek到指定offset)
|
- [seek到指定offset](#seek到指定offset)
|
||||||
|
- [Container Factory](#container-factory)
|
||||||
|
- [线程安全](#线程安全)
|
||||||
|
|
||||||
|
|
||||||
# Spring Kafka
|
# Spring Kafka
|
||||||
@@ -1961,8 +1963,45 @@ public class SomeOtherBean {
|
|||||||
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
## Container Factory
|
||||||
|
和之前在`@KafkaListener`中提到的一样,对于被`@KafkaListener`注解的方法,`ConcurrentKafkaListenerContainerFactory`将会被用于创建@KafkaListener方法对应的container。
|
||||||
|
|
||||||
|
从2.2版本开始,可以使用相同的factory来创建任何ConcurrentMessageListenerContainer。一旦container创建后,可以进一步改变container的属性,大多数属性都可以通过`container.getContainerProperties()`来进行修改。如下示例配置了`ConcurrentMessageListenerContainer`:
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public ConcurrentMessageListenerContainer<String, String>(
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
|
||||||
|
|
||||||
|
ConcurrentMessageListenerContainer<String, String> container =
|
||||||
|
factory.createContainer("topic1", "topic2");
|
||||||
|
container.setMessageListener(m -> { ... } );
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
> 通过上述@Bean方式创建的container并不会被加入到endpoint registry中,而是会作为bean对象被注入到spring容器中。
|
||||||
|
>
|
||||||
|
> 而通过@KafkaListener创建的container,则是并不会被注册为bean对象,而是统一通过endpoint registry进行管理。
|
||||||
|
|
||||||
|
从2.3.4版本开始,可以添加为factory添加`ContainerCustomizer`,在factory已经被配置后进一步的对container进行自定义设置。
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||||||
|
new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
...
|
||||||
|
factory.setContainerCustomizer(container -> { /* customize the container */ });
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 线程安全
|
||||||
|
当使用concurrent message listener container时,单个listener将会在所有consumer thread中被调用。故而listener需要是线程安全的,为了保证listener的线程安全,更倾向于使用无状态的listener(stateless)。如果无法将listener变为线程安全的,或是通过synchronization进行同步会大幅降性能,可以使用如下的技术之一:
|
||||||
|
- 使用n个container,每个container的concurrency为1,并且container中的MessageListener其scope为prototype,故而每个container都独占一个listener(在使用@KafkaListener时,该方案无法实现)
|
||||||
|
- 将状态保存在ThreadLocal中
|
||||||
|
|
||||||
|
为了促进线程状态的清理,从2.2版本开始,每个线程退出时,listener container将会发布`ConsumerStoppedEvent`事件。可以通过`ApplicationListener`或是`@EventListener`来消费该事件,从而对`ThreadLocal`对象调用remove方法来清除线程状态。
|
||||||
|
|
||||||
|
> 默认情况下,spring将会在事件触发的线程中调用event listener。如果将multicaster改为async executor,那么清理操作将没有作用。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user