From 0d653ba29fc664aae305f4f07a3a3f917c980e49 Mon Sep 17 00:00:00 2001 From: asahi Date: Wed, 21 Feb 2024 19:47:40 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBkafka=E5=85=B3=E4=BA=8E@Kafka?= =?UTF-8?q?Listener=E6=B3=A8=E8=A7=A3=E7=9A=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 174 +++++++++++++++++++++++++++- 1 file changed, 173 insertions(+), 1 deletion(-) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index a3019b7..90fc660 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1452,7 +1452,179 @@ public KafkaListenerErrorHandler validationErrorHandler() { 从3.1版本开始,可以在`ErrorHandlingDeserializer`上执行validation操作。 ### Rebalancing Listener -`ContainerProperties`中存在一个`consumerRebalanceListener`属性, +`ContainerProperties`中存在一个`consumerRebalanceListener`属性,该属性会接收一个`ConsumerRebalanceListener`接口的实现。如果该属性没有被提供,那么container将会自动装配一个logging listener,该listener将会将rebalance event打印到日志中,日志级别为`info`。spring kafka框架还提供了了一个`ConsumerAwareRebalanceListener`接口,如下展示了接口的定义: +```java +public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { + + void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions); + + void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions); + + void onPartitionsAssigned(Consumer consumer, Collection partitions); + + void onPartitionsLost(Consumer consumer, Collection partitions); + +} +``` +与`ConsumerRebalanceListener`接口不同的是,`ConsumerAwareRebalanceListener`针对revoke存在两个回调方法,`beforeCommit`和`afterCommit`。第一次将会马上被调用,第二次则是会在所有阻塞的offset都提交后再调用。如果想要再外部系统中维护offset,这将非常有用: +```java +containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { + + @Override + public void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) { + // acknowledge any pending Acknowledgments (if using manual acks) + } + + @Override + public void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions) { + // ... + store(consumer.position(partition)); + // ... + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + // ... + consumer.seek(partition, offsetTracker.getOffset() + 1); + // ... + } +}); +``` +### 强制触发consumer rebalance +kafka client目前支持触发enforced rebalance。从3.1.2版本开始,spring kafka支持通过message listener container来调用kafka consumer的enforce rebalance api。当调用该api时,其会提醒kafka consumer触发一个enforced rebalance,实际rebalance将会作为下一次poll操作的一部分。当正在发生rebalance时,调用该api将不会触发任何操作。调用方必须等待当前rebalance操作完成之后,再调用触发另一个rebalance。 + +如下示例展示了如何触发一个enforced rebalance: +```java +@KafkaListener(id = "my.id", topics = "my-topic") +void listen(ConsumerRecord in) { + System.out.println("From KafkaListener: " + in); +} + +@Bean +public ApplicationRunner runner(KafkaTemplate template, KafkaListenerEndpointRegistry registry) { + return args -> { + final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id"); + System.out.println("Enforcing a rebalance"); + Thread.sleep(5_000); + listenerContainer.enforceRebalance(); + Thread.sleep(5_000); + }; +} +``` +再上述代码中,应用通过使用`KafkaListenerEndpointRegistry`来访问message listener container,并调用MessageListenerContainer的enforceRebalance方法。当调用container的enforceRebalance方法时,其会委托调用底层consumer的enforceRebalance方法。而consumer则是会在下次poll操作时触发rebalance。 + +### 通过@SendTo注解发送listener结果 +从2.0版本开始,如果将@SendTo和@KafkaListener一起使用,并且被注解的方法存在返回值,那么方法的返回值将会被发送给@SendTo注解中指定的topic。 + +`@SendTo`值可以存在如下几种格式: +- `@SendTo("someTopic")`:返回值将会被发送给`someTopic`主题 +- `@SendTo("#{someExpression}")`:返回值将会被发送到表达式计算出的topic,`表达式将会在应用上下文初始化时被计算` +- `@SendTo("!{someExpression}")`:返回值将会被发送到表达式计算出的topic,`表达式将会在运行时被计算,并且表达式的`#root`对象有3个属性: + - `request`:该方法接收到的ConsumerRecord(在batch listener时request代表ConsumerRecords) + - `source`:request转化为的`org.springframework.messaging.Message` + - `result`:该方法的返回值 +- `@SendTo`:当没有为@SendTo注解指定value时,value默认会被当做`!{source.headers['kafka_replyTopic']}` + +从2.1.11和2.2.1开始,属性占位符(property placeholder)也会被@SendTo解析。 + +为@SendTo注解指定的表达式,其返回值必须为一个String,该String代表topic name。如下展示了使用@SendTo的不同方式: +```java +@KafkaListener(topics = "annotated21") +@SendTo("!{request.value()}") // runtime SpEL +public String replyingListener(String in) { + ... +} + +@KafkaListener(topics = "${some.property:annotated22}") +@SendTo("#{myBean.replyTopic}") // config time SpEL +public Collection replyingBatchListener(List in) { + ... +} + +@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler") +@SendTo("annotated23reply") // static reply topic definition +public String replyingListenerWithErrorHandler(String in) { + ... +} +... +@KafkaListener(topics = "annotated25") +@SendTo("annotated25reply1") +public class MultiListenerSendTo { + + @KafkaHandler + public String foo(String in) { + ... + } + + @KafkaHandler + @SendTo("!{'annotated25reply2'}") + public String bar(@Payload(required = false) KafkaNull nul, + @Header(KafkaHeaders.RECEIVED_KEY) int key) { + ... + } + +} +``` + +> 为了支持使用@SendTo,listener container factory必须被提供KafkaTemplate(通过`replyTemplate`属性指定),该template将会被用于发送消息。当使用spring boot时,会自动将template注入到container factory中。 + +从2.2版本开始,可以向listener container factory中添加`ReplyHeadersConfigurer`,通过其可以设置接收消息中的哪些header可以被拷贝到返回消息的header中,使用示例如下所示: +```java +@Bean +public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf()); + factory.setReplyTemplate(template()); + factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat")); + return factory; +} +``` +如果在拷贝header外,还想为reply message指定额外的header,可以通过如下方式来实现: +```java +@Bean +public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf()); + factory.setReplyTemplate(template()); + factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() { + + @Override + public boolean shouldCopy(String headerName, Object headerValue) { + return false; + } + + @Override + public Map additionalHeaders() { + return Collections.singletonMap("qux", "fiz"); + } + + }); + return factory; +} +``` +在additionHeaders方法返回的map中,如果有key和已经存在的header key重复,那么map中的key-value将会覆盖现存header + +在使用@SendTo时,必须为`ConcurrentKafkaListenerContainerFactory`配置一个`KafkaTemplate`,需要配置的属性为`replyTemplate`。spring boot将会将自动装配的template注入到replyTemplate属性中。 + +可以将@SendTo添加到无返回值的方法上,通过指定errorHandler属性,当且仅当@SendTo注解的方法发生异常时,将异常消息作为messaage发送给特定topic,使用示例如下: +```java +@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic", + errorHandler = "voidSendToErrorHandler") +@SendTo("failures") +public void voidListenerWithReplyingErrorHandler(String in) { + throw new RuntimeException("fail"); +} + +@Bean +public KafkaListenerErrorHandler voidSendToErrorHandler() { + return (m, e) -> { + return ... // some information about the failure and input data + }; +} +``` +