阅读kafka关于@KafkaListener注解的文档

This commit is contained in:
asahi
2024-01-25 21:13:55 +08:00
parent 6bcbd361e7
commit a8c6f344de

View File

@@ -1252,9 +1252,109 @@ public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) Str
如果没有为container提供consumer executor那么会为每个container提供`SimpleAsyncTaskExecutor`
> SimpleAsyncTaskExecutor会为所有的task都开启一个新线程故而该类型的executor并不会对线程进行重用。
>
> SimpleAsyncTaskExecutor在创建线程时executor创建线程的名称按照`<beanname>-C-<n>`的格式。对于`ConcurrentMessageListenerContainer``<beanname>`部分将会被换成`<beanname>-m`m代表消费者实例。当每次container启动时`<n>`都会增加。
>
> 故而对于name为`container`的bean对象其executor中的线程名称在container第一次启动后为`container-0-C-1`、`container-1-C-1`n均为1m为0和1.当containerr再次启动时线程名称则是会变为`container-0-C-2`、`container-1-C-2 etc`。
从3.0.1版本开始,可以修改线程的名称。如果将`AbstractMessageListenerContainer.changeConsumerThreadName`属性设置为true将会调用`AbstractMessageListenerContainer.threadNameSupplier`来获取线程名称。
### @KafkaListener用作元注解
从2.2版本开始,可以将@KafkaListener用作元注解,如下展示了使用示例:
```java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
```
在将@KafkaListener用作元注解时,必须在自定义注解中针对`topics``topicPattern``topicPartitions`中的至少一个进行alias操作。
如下展示了如何使用自定义注解:
```java
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
```
### 在类级别使用@KafkaListener
当在类级别使用@KafkaListener时,必须要在方法级别指定@KafkaHandler。当消息被传递到时消息payload转化为的类型将会用于决定调用哪个handler方法。示例如下所示
```java
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
```
从2.1.3版本开始,可以指定一个@KafkaHandler方法作为默认方法当类型匹配不到其他handler时默认handler方法将会被调用。并且最多只有有一个方法被指定为默认handler方法。
当使用@KafkaHandler时必须要求payload已经被转化为目标对象类型如此匹配才能进行。
基于spring解析方法参数的限制default kafkahandler无法接收header中的单条内容其只能通过`ConsumerRecordMetadata`类型来获取header中的内容。
例如如下方式在Object类型为String时不起作用
```java
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
```
如果object类型为String那么topic也会指向object而不会获取到header中的topic内容。
如果要在default handler method中访问元数据需要按如下方式
```java
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
```
### @KafkaListener属性修改
从2.7.2版本开始可以在container创建之前通过编码的方式来修改注解属性。为了实现改功能可以在spring context中添加一个或者多个`KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer``AnnotationEnhancer`是个`BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>`类型的二元函数返回一个Map<String,Object>类型的值。属性值可能会包含spel表达式或者properties placeholder而enhancer则是在spel表达式和属性占位符被解析之前调用。如果当前spring容器存在多个enhancer且enhancer实现了Ordered接口那么enhancer将会按照顺序被调用
```java
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
```
`AnnotationEnhancer`bean定义必须要被声明为static该bean在spring context生命周期中非常早期的时间点被需要。
### @KafkaListener生命周期管理
为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为`KafkaListenerEndpointRegistry`类型的bean对象中该bean对象会自动被spring framework创建并且管理listener container的生命周期。