阅读kafka关于@KafkaListener注解的文档

This commit is contained in:
asahi
2024-02-02 19:43:49 +08:00
parent ca7facff06
commit c88a6e4a2a

View File

@@ -1383,6 +1383,78 @@ regisry只维护其管理container的生命周期如果以bean形式声明的
endpoint将会在spring容器被refreshed之后被注册到registry中并且endpoint将会立马被启动不论其autoStartup属性值是什么。 endpoint将会在spring容器被refreshed之后被注册到registry中并且endpoint将会立马被启动不论其autoStartup属性值是什么。
### @KafkaListener @Payload校验
从2.2版本开始,可以更加方便的为@KafkaListener @Payload参数添加validator。在之前的版本中必须要自定义配置一个`DefaultMessageHandlerMethodFactory`并且将其添加到registrar中。现在可以为registrar中添加validator。如下代码展示了使用示例
```java
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
```
当使用spring boot并且存在有validation的starter时`LocalValidatorFactoryBean`将会被自动装配,装配逻辑和如下代码类似:
```java
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
```
如下代码示例展示了如果通过validator校验payload
```java
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
```
```java
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
```
从3.1版本开始,可以在`ErrorHandlingDeserializer`上执行validation操作。
### Rebalancing Listener
`ContainerProperties`中存在一个`consumerRebalanceListener`属性,