diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index f9bf4ae..d8d24c8 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -61,6 +61,11 @@ - [KafkaTemplate本地事务](#kafkatemplate本地事务) - [KafkaTemplate事务发布和非事务发布](#kafkatemplate事务发布和非事务发布) - [事务结合BatchListener使用](#事务结合batchlistener使用) + - [Exactly Once语义](#exactly-once语义) + - [Pausing and Resuming Listener Containers](#pausing-and-resuming-listener-containers) + - [异常处理](#异常处理) + - [Listener Error Handler](#listener-error-handler) + - [Container Handler](#container-handler) # Spring Kafka @@ -2239,6 +2244,142 @@ boolean result = template.executeInTransaction(t -> { 在使用batch listener时,如果处理失败,那么整个batch中的消息都会被重新传递,因为framwork不知道在batch中具体那条record处理失败。 +## Exactly Once语义 +可以为ListenerContainer提供KafkaAwareTransactionManager实例,当这样配置时,container会在调用listener之前开启一个事务。 + +任何由listener执行的KafkaTemplate操作将会加入该事务。如果listener成功的处理了该条record(或通过BatchMessageListener处理了多条请求),container将会在transaction manager提交事务前调用`producer.sendOffsetsToTransaction()`将offset发送给事务。 + +如果listener抛出一个异常,那么kafka trasaction将会回滚,并且consumer会将position进行回退,被回滚的records将会在下次poll操作时被重新传递。 + +当使用事务时,会开启exactly-once语义(EOS:exactly-once-semantics)。其代表在一个`read -> process -> write`序列中,该序列只会被完成一次。 + +> eos代表`read->process->write`序列只会被执行完成一次,但是`read->process`流程可能会执行多次,拥有at-least-once的语义。 + +## Pausing and Resuming Listener Containers +2.1.3版本对listener container添加了`pause`和`resume`方法,在2.1.3之前的版本中,可以在ConsumerAwareMessageListener中对container执行pause操作,在ListenerContainerIdleEvent中对container执行resume操作,`ConsumerAwareMessageListener`和`ListenerContainerIdleEvent`中都提供了对Consumer的访问。 + +调用container中的pause和resume方法是线程安全的,调用pause方法之后,其在下次调用poll前生效;调用resume方法后,其会在本次poll调用返回后生效。当在container上调用pause后,其会持续调用poll方法,从而避免在使用group management后发生rebalance,但是在处于paused状态时其不会获取任何records。 + +从2.1.5版本开始,可以通过调用`isPauseRequested`来得知是否`pause`方法被调用。但是,调用pause后,其会在下次调用poll前才生效,当前可能并未生效,可以通过`isConsumerPaused()`方法来获取是否当前consumer实际处于paused状态。 + +如下展示了如何通过registry来对@KafkaListener对应的container执行pause和resume操作: +```java +@SpringBootApplication +public class Application implements ApplicationListener { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args).close(); + } + + @Override + public void onApplicationEvent(KafkaEvent event) { + System.out.println(event); + } + + @Bean + public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, + KafkaTemplate template) { + return args -> { + template.send("pause.resume.topic", "thing1"); + Thread.sleep(10_000); + System.out.println("pausing"); + registry.getListenerContainer("pause.resume").pause(); + Thread.sleep(10_000); + template.send("pause.resume.topic", "thing2"); + Thread.sleep(10_000); + System.out.println("resuming"); + registry.getListenerContainer("pause.resume").resume(); + Thread.sleep(10_000); + }; + } + + @KafkaListener(id = "pause.resume", topics = "pause.resume.topic") + public void listen(String in) { + System.out.println(in); + } + + @Bean + public NewTopic topic() { + return TopicBuilder.name("pause.resume.topic") + .partitions(2) + .replicas(1) + .build(); + } + +} +``` +## 异常处理 +### Listener Error Handler +从版本2.0开始,`@KafkaListener`注解含有一个新的属性,`errorHandler`. + +可以为errorHandler属性提供一个`KafkaListenerErrorHandler`实现的bean name,该接口为函数式接口,仅含有一个方法,接口定义如下所示: +```java +@FunctionalInterface +public interface KafkaListenerErrorHandler { + + Object handleError(Message message, ListenerExecutionFailedException exception) throws Exception; + +} +``` +通过该接口,可以访问由message converter生成的spring message对象和由listener抛出的异常,该异常被包装在ListenerExecutionFailedException中。error handler可以抛出原来的异常或抛出一个新异常,抛出的异常将会被抛给container。由该listener方法返回的任何内容都会被忽略。 + +从2.7版本开始,可以为MessagingMessageConverter和BatchMessagingMessageConverter设置rawRecordHeader属性,其会导致原始的`ConsumerRecord`对象将会被设置到converted message中的`KafkaHeaders.RAW_DATA` header上。使用示例如下所示: +```java +@Bean +public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) { + return (msg, ex) -> { + if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) { + recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex); + return "FAILED"; + } + throw ex; + }; +} +``` +其还含有一个子接口`ConsumerAwareListenerErrorHandler`,该接口可以访问consumer对象。 +```java +Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer); +``` + +另一个子接口`ManualAckListenerErrorHandler`则是可以在手动ack时使用: +```java +Object handleError(Message message, ListenerExecutionFailedException exception, + Consumer consumer, @Nullable Acknowledgment ack); +``` +### Container Handler +从2.8版本开始,`ErrorHandler`接口和`BatchErrorHandler`都由`CommonErrorHandler`取代。该接口可以为单条和批量record处理异常。 + +当使用kafka事务时,默认情况下不会配置任何error handler,故而异常将会导致事务的回滚。transactional container抛出的异常将会由`AfterRollbackProcessor`进行处理。当在使用事务时,如果提供了自定义的error handler,在想要事务回滚时必须让异常被抛出。 + +CommonErrorHandler拥有一个default method `isAckAfterHandle`,该方法在container调用error handler完成后(error handler没有抛出异常)被调用,用于判断是否应该提交offset,默认情况下,其会返回true。 + +通常,由framework提供的error handler会抛出接收的异常。默认情况下,异常将会由container在`error`级别日志打出。 + +可以通过listener container factory指定全局的common listener,示例如下所示: +```java +@Bean +public KafkaListenerContainerFactory> + kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + ... + factory.setCommonErrorHandler(myErrorHandler); + ... + return factory; +} +``` +在调用error handler之前,container将会提交所有pending offset。 + +在使用spring boot时,只需要将error handler声明为bean,spring boot会将其添加到自动装配的container factory中。(用于factory生成的所有container)。 + + + + + + + + +