diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 6bbe1e7..f5f6c78 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -583,3 +583,96 @@ poison pill可能在如下场景下产生: #### ErrorHandlingDeserializer 为了解决poison pill问题,spring引入了ErrorHandlingDeserializer,该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败,那么ErrorHandlingDeserializer将会返回一个null,并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。 +#### replyErrorChecker +从版本2.6.7开始,可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker,当提供了checker方法时,template会自动调用该方法,如果该方法抛出异常,那么该reply message对应的future也会以失败状态完成。 + +replychecker使用示例如下: +```java +template.setReplyErrorChecker(record -> { + Header error = record.headers().lastHeader("serverSentAnError"); + if (error != null) { + return new MyException(new String(error.value())); + } + else { + return null; + } +}); + +... + +RequestReplyFuture future = template.sendAndReceive(record); +try { + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok + ConsumerRecord consumerRecord = future.get(10, TimeUnit.SECONDS); + ... +} +catch (InterruptedException e) { + ... +} +catch (ExecutionException e) { + if (e.getCause instanceof MyException) { + ... + } +} +catch (TimeoutException e) { + ... +} +``` + +### AggregatingReplyingKafkaTemplate +ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景,但如果对于发送一条消息,存在多个接收方,会返回多条消息的场景,则需要使用AggregatingReplyingKafkaTemplate。 + +像ReplyingKafkaTemplate一样,AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener container,listener container用于接收返回的消息。除此之外,AggregatingReplyingKafkaTemplate还会接收第三个参数`BiPredicate>, Boolean>`,该断言每次在接收到新消息时都会被计算。如果断言返回true,该`AggregatingReplyingKafkaTemplate.sendAndReceive`方法返回的Future对象将会被完成,并且Future中的值为断言中ConsumerRecord的集合。 + +从版本2.3.5开始,第三个参数为`BiPredicate>, Boolean>`类型,其会在每次接收到消息或是超时(replyTimeout超时)的情况下被调用,第二个参数传入的boolean即是断言的这次调用是否因为超时。**该断言可以针对ConsumerRecord进行修改**。 + +> #### returnPartialOnTimeout +> AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout,该属性值默认为false,如果该值被设置为true,那么当请求发生超时时,会返回已经接收到的部分ConsumerRecord集合。 +> +> BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息,要想成功在超时场景下返回接收到的部分消息,不仅需要returnPartialOnTimeout设置为true,还需要BiPredicate断言在发生timeout的情况下返回值为true。 + +AggregatingReplyingKafkaTemplate使用示例如下: +```java +AggregatingReplyingKafkaTemplate template = + new AggregatingReplyingKafkaTemplate<>(producerFactory, container, + (coll, timeout) -> timeout || coll.size() == releaseSize); +... +RequestReplyFuture>> future = + template.sendAndReceive(record); +future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok +ConsumerRecord>> consumerRecord = + future.get(30, TimeUnit.SECONDS); +``` + +注意sendAndReceive方法返回的future,其值类型为`ConsumerRecord>>`类型,外层的ConsumerRecord并不是真正的返回消息,而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的,外层ConsumerRecord用于存储实际接收到的消息集合。 + +> #### ConsumerRecord>> +> `ConsumerRecord>>`作为聚合返回消息集合的伪ConsumerRecord,其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal release(releaseStrategy返回为true)时,伪ConsumerRecord的topic name为`aggregatedResults`. +> +> 当获取该伪ConsumerRecord的原因是timeout(returnPartialOnTimeout被设置为true,并且发生timeout,并且至少获取到一条返回消息),那么伪ConsumerRecord的topic name将会被设置为`partialResultsAfterTimeout`. + +template为上述伪topic name提供了静态变量: +```java +/** + * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated + * results in its value after a normal release by the release strategy. + */ +public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults"; + +/** + * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated + * results in its value after a timeout. + */ +public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout"; +``` + +伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。 + +#### AggregatingReplyingKafkaTemplate配置要求 +listener container必须要配置为`AckMode.MANUAL`模式或`AckMode.MANUAL_IMMEDIATE`;consumer属性`enable.auto.commit`必须被设置伪false。为了避免任何丢失消息的可能,template只会在没有待处理请求的前提下提交offset,即最后一个未处理请求被releaseStrategy释放。 + +当consumer发生rebalance时,可能会造成返回消息被重复传递(由于template只会在没有待处理请求的情况下提交offset,如果在listener container接收到消息后,尚未提交offset,此时发生rebalance,那么未提交offset的消息将会被重复接收)。对于在途的请求,消息的重复传递将会被忽略(在途的消息还会存在多条消息聚合的过程);针对已经被releaseStrategy释放的返回消息,如果接收到多条重复的返回消息,那么会在log中看到error日志。 + +另外,如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`,那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null,并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。 + +对于AggregatingReplyingKafkaTemplate,`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。