阅读kafka error handler文档

This commit is contained in:
asahi
2024-03-07 20:06:39 +08:00
parent fc8cdbf2d9
commit 8ef6aaa8c5

View File

@@ -61,6 +61,11 @@
- [KafkaTemplate本地事务](#kafkatemplate本地事务) - [KafkaTemplate本地事务](#kafkatemplate本地事务)
- [KafkaTemplate事务发布和非事务发布](#kafkatemplate事务发布和非事务发布) - [KafkaTemplate事务发布和非事务发布](#kafkatemplate事务发布和非事务发布)
- [事务结合BatchListener使用](#事务结合batchlistener使用) - [事务结合BatchListener使用](#事务结合batchlistener使用)
- [Exactly Once语义](#exactly-once语义)
- [Pausing and Resuming Listener Containers](#pausing-and-resuming-listener-containers)
- [异常处理](#异常处理)
- [Listener Error Handler](#listener-error-handler)
- [Container Handler](#container-handler)
# Spring Kafka # Spring Kafka
@@ -2239,6 +2244,142 @@ boolean result = template.executeInTransaction(t -> {
在使用batch listener时如果处理失败那么整个batch中的消息都会被重新传递因为framwork不知道在batch中具体那条record处理失败。 在使用batch listener时如果处理失败那么整个batch中的消息都会被重新传递因为framwork不知道在batch中具体那条record处理失败。
## Exactly Once语义
可以为ListenerContainer提供KafkaAwareTransactionManager实例当这样配置时container会在调用listener之前开启一个事务。
任何由listener执行的KafkaTemplate操作将会加入该事务。如果listener成功的处理了该条record或通过BatchMessageListener处理了多条请求container将会在transaction manager提交事务前调用`producer.sendOffsetsToTransaction()`将offset发送给事务。
如果listener抛出一个异常那么kafka trasaction将会回滚并且consumer会将position进行回退被回滚的records将会在下次poll操作时被重新传递。
当使用事务时会开启exactly-once语义EOSexactly-once-semantics。其代表在一个`read -> process -> write`序列中,该序列只会被完成一次。
> eos代表`read->process->write`序列只会被执行完成一次,但是`read->process`流程可能会执行多次拥有at-least-once的语义。
## Pausing and Resuming Listener Containers
2.1.3版本对listener container添加了`pause``resume`方法在2.1.3之前的版本中可以在ConsumerAwareMessageListener中对container执行pause操作在ListenerContainerIdleEvent中对container执行resume操作`ConsumerAwareMessageListener``ListenerContainerIdleEvent`中都提供了对Consumer的访问。
调用container中的pause和resume方法是线程安全的调用pause方法之后其在下次调用poll前生效调用resume方法后其会在本次poll调用返回后生效。当在container上调用pause后其会持续调用poll方法从而避免在使用group management后发生rebalance但是在处于paused状态时其不会获取任何records。
从2.1.5版本开始,可以通过调用`isPauseRequested`来得知是否`pause`方法被调用。但是调用pause后其会在下次调用poll前才生效当前可能并未生效可以通过`isConsumerPaused()`方法来获取是否当前consumer实际处于paused状态。
如下展示了如何通过registry来对@KafkaListener对应的container执行pause和resume操作
```java
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
```
## 异常处理
### Listener Error Handler
从版本2.0开始,`@KafkaListener`注解含有一个新的属性,`errorHandler`.
可以为errorHandler属性提供一个`KafkaListenerErrorHandler`实现的bean name该接口为函数式接口仅含有一个方法接口定义如下所示
```java
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
```
通过该接口可以访问由message converter生成的spring message对象和由listener抛出的异常该异常被包装在ListenerExecutionFailedException中。error handler可以抛出原来的异常或抛出一个新异常抛出的异常将会被抛给container。由该listener方法返回的任何内容都会被忽略。
从2.7版本开始可以为MessagingMessageConverter和BatchMessagingMessageConverter设置rawRecordHeader属性其会导致原始的`ConsumerRecord`对象将会被设置到converted message中的`KafkaHeaders.RAW_DATA` header上。使用示例如下所示
```java
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
```
其还含有一个子接口`ConsumerAwareListenerErrorHandler`该接口可以访问consumer对象。
```java
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
```
另一个子接口`ManualAckListenerErrorHandler`则是可以在手动ack时使用
```java
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
```
### Container Handler
从2.8版本开始,`ErrorHandler`接口和`BatchErrorHandler`都由`CommonErrorHandler`取代。该接口可以为单条和批量record处理异常。
当使用kafka事务时默认情况下不会配置任何error handler故而异常将会导致事务的回滚。transactional container抛出的异常将会由`AfterRollbackProcessor`进行处理。当在使用事务时如果提供了自定义的error handler在想要事务回滚时必须让异常被抛出。
CommonErrorHandler拥有一个default method `isAckAfterHandle`该方法在container调用error handler完成后error handler没有抛出异常被调用用于判断是否应该提交offset默认情况下其会返回true。
通常由framework提供的error handler会抛出接收的异常。默认情况下异常将会由container在`error`级别日志打出。
可以通过listener container factory指定全局的common listener示例如下所示
```java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
```
在调用error handler之前container将会提交所有pending offset。
在使用spring boot时只需要将error handler声明为beanspring boot会将其添加到自动装配的container factory中。用于factory生成的所有container