From c88a6e4a2a436e062f9355dbaa0a36d89cbb2673 Mon Sep 17 00:00:00 2001 From: asahi Date: Fri, 2 Feb 2024 19:43:49 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBkafka=E5=85=B3=E4=BA=8E@Kafka?= =?UTF-8?q?Listener=E6=B3=A8=E8=A7=A3=E7=9A=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 72 +++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index fb5bddc..a3019b7 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1383,6 +1383,78 @@ regisry只维护其管理container的生命周期;如果以bean形式声明的 endpoint将会在spring容器被refreshed之后被注册到registry中,并且,endpoint将会立马被启动,不论其autoStartup属性值是什么。 +### @KafkaListener @Payload校验 +从2.2版本开始,可以更加方便的为@KafkaListener @Payload参数添加validator。在之前的版本中必须要自定义配置一个`DefaultMessageHandlerMethodFactory`并且将其添加到registrar中。现在,可以为registrar中添加validator。如下代码展示了使用示例: +```java +@Configuration +@EnableKafka +public class Config implements KafkaListenerConfigurer { + + ... + + @Override + public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { + registrar.setValidator(new MyValidator()); + } + +} +``` + +当使用spring boot并且存在有validation的starter时,`LocalValidatorFactoryBean`将会被自动装配,装配逻辑和如下代码类似: +```java +@Configuration +@EnableKafka +public class Config implements KafkaListenerConfigurer { + + @Autowired + private LocalValidatorFactoryBean validator; + ... + + @Override + public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { + registrar.setValidator(this.validator); + } +} +``` +如下代码示例展示了如果通过validator校验payload: +```java +public static class ValidatedClass { + + @Max(10) + private int bar; + + public int getBar() { + return this.bar; + } + + public void setBar(int bar) { + this.bar = bar; + } + +} +``` + +```java +@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", + containerFactory = "kafkaJsonListenerContainerFactory") +public void validatedListener(@Payload @Valid ValidatedClass val) { + ... +} + +@Bean +public KafkaListenerErrorHandler validationErrorHandler() { + return (m, e) -> { + ... + }; +} +``` + +从3.1版本开始,可以在`ErrorHandlingDeserializer`上执行validation操作。 + +### Rebalancing Listener +`ContainerProperties`中存在一个`consumerRebalanceListener`属性, + +