From a8c6f344deae8ed4a7ed1a237cf2d69cfb48d2ba Mon Sep 17 00:00:00 2001 From: asahi Date: Thu, 25 Jan 2024 21:13:55 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBkafka=E5=85=B3=E4=BA=8E@Kafka?= =?UTF-8?q?Listener=E6=B3=A8=E8=A7=A3=E7=9A=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 106 +++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index a46c74b..a62dc4c 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1252,9 +1252,109 @@ public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) Str 如果没有为container提供consumer executor,那么会为每个container提供`SimpleAsyncTaskExecutor`。 > SimpleAsyncTaskExecutor会为所有的task都开启一个新线程,故而该类型的executor并不会对线程进行重用。 - - - +> +> SimpleAsyncTaskExecutor在创建线程时,executor创建线程的名称按照`-C-`的格式。对于`ConcurrentMessageListenerContainer`,``部分将会被换成`-m`,m代表消费者实例。当每次container启动时,``都会增加。 +> +> 故而,对于name为`container`的bean对象,其executor中的线程名称在container第一次启动后为`container-0-C-1`、`container-1-C-1`(n均为1,m为0和1).当containerr再次启动时,线程名称则是会变为`container-0-C-2`、`container-1-C-2 etc`。 + +从3.0.1版本开始,可以修改线程的名称。如果将`AbstractMessageListenerContainer.changeConsumerThreadName`属性设置为true,将会调用`AbstractMessageListenerContainer.threadNameSupplier`来获取线程名称。 + +### @KafkaListener用作元注解 +从2.2版本开始,可以将@KafkaListener用作元注解,如下展示了使用示例: +```java +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@KafkaListener +public @interface MyThreeConsumersListener { + + @AliasFor(annotation = KafkaListener.class, attribute = "id") + String id(); + + @AliasFor(annotation = KafkaListener.class, attribute = "topics") + String[] topics(); + + @AliasFor(annotation = KafkaListener.class, attribute = "concurrency") + String concurrency() default "3"; + +} +``` + +在将@KafkaListener用作元注解时,必须在自定义注解中针对`topics`、`topicPattern`、`topicPartitions`中的至少一个进行alias操作。 + +如下展示了如何使用自定义注解: +```java +@MyThreeConsumersListener(id = "my.group", topics = "my.topic") +public void listen1(String in) { + ... +} +``` + +### 在类级别使用@KafkaListener +当在类级别使用@KafkaListener时,必须要在方法级别指定@KafkaHandler。当消息被传递到时,消息payload转化为的类型将会用于决定调用哪个handler方法。示例如下所示: +```java +@KafkaListener(id = "multi", topics = "myTopic") +static class MultiListenerBean { + + @KafkaHandler + public void listen(String foo) { + ... + } + + @KafkaHandler + public void listen(Integer bar) { + ... + } + + @KafkaHandler(isDefault = true) + public void listenDefault(Object object) { + ... + } + +} +``` +从2.1.3版本开始,可以指定一个@KafkaHandler方法作为默认方法,当类型匹配不到其他handler时,默认handler方法将会被调用。并且,最多只有有一个方法被指定为默认handler方法。 + +当使用@KafkaHandler时,必须要求payload已经被转化为目标对象类型,如此匹配才能进行。 + +基于spring解析方法参数的限制,default kafkahandler无法接收header中的单条内容,其只能通过`ConsumerRecordMetadata`类型来获取header中的内容。 + +例如,如下方式在Object类型为String时不起作用: +```java +@KafkaHandler(isDefault = true) +public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + ... +} +``` +如果object类型为String,那么topic也会指向object,而不会获取到header中的topic内容。 + +如果要在default handler method中访问元数据,需要按如下方式: +```java +@KafkaHandler(isDefault = true) +void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) { + String topic = meta.topic(); + ... +} +``` + +### @KafkaListener属性修改 +从2.7.2版本开始,可以在container创建之前,通过编码的方式来修改注解属性。为了实现改功能,可以在spring context中添加一个或者多个`KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer`。`AnnotationEnhancer`是个`BiFunction, AnnotatedElement, Map`类型的二元函数,返回一个Map类型的值。属性值可能会包含spel表达式或者properties placeholder,而enhancer则是在spel表达式和属性占位符被解析之前调用。如果当前spring容器存在多个enhancer,且enhancer实现了Ordered接口,那么enhancer将会按照顺序被调用: +```java +@Bean +public static AnnotationEnhancer groupIdEnhancer() { + return (attrs, element) -> { + attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class + ? ((Class) element).getSimpleName() + : ((Method) element).getDeclaringClass().getSimpleName() + + "." + ((Method) element).getName())); + return attrs; + }; +} +``` + +`AnnotationEnhancer`bean定义必须要被声明为static,该bean在spring context生命周期中非常早期的时间点被需要。 + +### @KafkaListener生命周期管理 +为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为`KafkaListenerEndpointRegistry`类型的bean对象中,该bean对象会自动被spring framework创建,并且管理listener container的生命周期。