阅读kafka关于@KafkaListener注解的文档
This commit is contained in:
@@ -1354,7 +1354,36 @@ public static AnnotationEnhancer groupIdEnhancer() {
|
||||
`AnnotationEnhancer`bean定义必须要被声明为static,该bean在spring context生命周期中非常早期的时间点被需要。
|
||||
|
||||
### @KafkaListener生命周期管理
|
||||
为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为`KafkaListenerEndpointRegistry`类型的bean对象中,该bean对象会自动被spring framework创建,并且管理listener container的生命周期。
|
||||
为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为`KafkaListenerEndpointRegistry`类型的bean对象中,该bean对象会自动被spring framework创建,并且管理listener container的生命周期。kafkaListenerEndpointRegistry对象会启动所有`autoStartup`属性设置为true的container。所有container factory都在同一阶段创建所有container。通过registry,可以通过编程的方式来管理container的生命周期。对registry执行start或stop操作将会对registry中所有的container都执行start或stop操作。同时,也可以通过container的id属性来获取单独的container对象。
|
||||
|
||||
可以通过@KafkaListener注解来设置container的autoStartup属性,通过注解指定的autoStartup值将会覆盖在container factory中指定的autoStartup属性。
|
||||
|
||||
如果要通过registry对象来管理注册的container,可以通过bean注入的方式来获取registry:
|
||||
|
||||
```java
|
||||
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
|
||||
public void listen(...) { ... }
|
||||
|
||||
/**
|
||||
* 通过bean注入来获取registry
|
||||
*
|
||||
**/
|
||||
@Autowired
|
||||
private KafkaListenerEndpointRegistry registry;
|
||||
|
||||
...
|
||||
|
||||
this.registry.getListenerContainer("myContainer").start();
|
||||
|
||||
...
|
||||
```
|
||||
regisry只维护其管理container的生命周期;如果以bean形式声明的container,其并不由registry进行管理,而是可以从spring容器中获取。可以调用registry对象的`getListenerContainers`方法来获取其管理的container集合。
|
||||
|
||||
从2.2.5版本开始,registry新增了一个新的方法`getAllListenerContainers()`,通过该方法可以获取所有的container集合,集合中包括由registry管理的container和以bean对象形式生命的container。该返回集合中将会包含任何prototype的已初始化bean对象,但是集合中不会对懒加载的bean对象进行加载操作。
|
||||
|
||||
endpoint将会在spring容器被refreshed之后被注册到registry中,并且,endpoint将会立马被启动,不论其autoStartup属性值是什么。
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user