diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 14e0df0..fe5970a 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -354,4 +354,104 @@ void updateConfigs(Map updates); void removeConfig(String configKey); ``` +### ReplyingKafkaTemplate +从2.1.3版本开始,kafka引入了ReplyingKafkaTemplate,其是KafkaTemplate的一个子类,用于提供request/reply语义。该类相比父类含有两个额外的方法: +```java +RequestReplyFuture sendAndReceive(ProducerRecord record); + +RequestReplyFuture sendAndReceive(ProducerRecord record, + Duration replyTimeout); +``` +该方法的返回类型RequestReplyFuture继承了CompletableFuture,RequestReplyFuture会异步的注入该future的结果(可能正常返回,也可能是一个exception或者timeout)。 + +RequestReplyFuture含有一个sendFuture属性,该属性是调用kafkaTemplate的send方法发送消息的结果,类型为`CompletableFuture>`,可以通过该属性future来判断发送消息操作的结果。 + +如果在调用sendAndReceive方法时没有传递replyTimeout参数,或是指定replyTimeout参数为null,那么该template的`defaultReplyTimeout`属性将会被用作超时时间。默认情况下,该超时属性为5s。 + +从2.8.8版本开始,该template还有一个`waitForAssingment`方法。当reply container被配置为`auto.offset.reset=latest`时waitForAssingment方法相当有用,避免当reply container尚未初始化完成时,发送消息对应的reply已经返回了。 + +如下展示了如何使用ReplyingKafkaTemplate: +```java +@SpringBootApplication +public class KRequestingApplication { + + public static void main(String[] args) { + SpringApplication.run(KRequestingApplication.class, args).close(); + } + + @Bean + public ApplicationRunner runner(ReplyingKafkaTemplate template) { + return args -> { + if (!template.waitForAssignment(Duration.ofSeconds(10))) { + throw new IllegalStateException("Reply container did not initialize"); + } + ProducerRecord record = new ProducerRecord<>("kRequests", "foo"); + RequestReplyFuture replyFuture = template.sendAndReceive(record); + SendResult sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS); + System.out.println("Sent ok: " + sendResult.getRecordMetadata()); + ConsumerRecord consumerRecord = replyFuture.get(10, TimeUnit.SECONDS); + System.out.println("Return value: " + consumerRecord.value()); + }; + } + + @Bean + public ReplyingKafkaTemplate replyingTemplate( + ProducerFactory pf, + ConcurrentMessageListenerContainer repliesContainer) { + + return new ReplyingKafkaTemplate<>(pf, repliesContainer); + } + + @Bean + public ConcurrentMessageListenerContainer repliesContainer( + ConcurrentKafkaListenerContainerFactory containerFactory) { + + ConcurrentMessageListenerContainer repliesContainer = + containerFactory.createContainer("kReplies"); + repliesContainer.getContainerProperties().setGroupId("repliesGroup"); + repliesContainer.setAutoStartup(false); + return repliesContainer; + } + + @Bean + public NewTopic kRequests() { + return TopicBuilder.name("kRequests") + .partitions(10) + .replicas(2) + .build(); + } + + @Bean + public NewTopic kReplies() { + return TopicBuilder.name("kReplies") + .partitions(10) + .replicas(2) + .build(); + } + +} +``` +在上述示例中采用了spring自动注入的containerFactory来创建reply container。 + +> #### ErrorHandlingDeserializer +> 可以考虑在reply container中使用ErrorHandlingDeserializer,如果反序列化失败,RequestReplyFuture将会以异常状态完成,可以访问获取到的ExecutionException,其cause属性中包含DeserializationException。 + +### kafka poison pill & ErrorHandlingDeserializer +poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败,不管重试过多少次之后仍然无法成功被消费。 + +poison pill可能在如下场景下产生: +- 该记录被损坏 +- 该记录发序列化失败 + +在生产场景中,consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容,将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。 + +在现实场景中,可能因为如下缘故而遭遇poison pill: +- 生产者改变了key或value的serializer并且持续向先前的topic中发送消息,这将会导致反序列化问题 +- consumer的key或value deserializer配置错误 +- 不同的生产者实例,使用不同的key或value serializer向topic中发送消息 + +在发生poison后,consumer在调用poll拉取数据时将无法反序列化record,调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理,针对该topic分区的消费会被阻塞(因为consumer offset一直无法向前移动)。并且,在consumer不停重试针对该消息的反序列化时,大量的反序列化失败日志将会被追加到日志文件中,磁盘占用量将会急剧增大。 + +#### ErrorHandlingDeserializer +为了解决poison pill问题,spring引入了ErrorHandlingDeserializer,该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败,那么ErrorHandlingDeserializer将会返回一个null,并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。