From 40ffdb90b8376f21b5f52825a719bf176d417522 Mon Sep 17 00:00:00 2001 From: asahi Date: Thu, 28 Dec 2023 23:55:21 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBkafka=20=E7=94=9F=E4=BA=A7?= =?UTF-8?q?=E8=80=85=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 128 ++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index fe5970a..6bbe1e7 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -433,6 +433,134 @@ public class KRequestingApplication { ``` 在上述示例中采用了spring自动注入的containerFactory来创建reply container。 +#### ReplyingKafkaTemplate header +在使用ReplyingKafkaHeader时,template会为消息设置一个header(KafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。 + +server端返回correlation id的示例如下: +```java +@SpringBootApplication +public class KReplyingApplication { + + public static void main(String[] args) { + SpringApplication.run(KReplyingApplication.class, args); + } + + @KafkaListener(id="server", topics = "kRequests") + @SendTo // use default replyTo expression + public String listen(String in) { + System.out.println("Server received: " + in); + return in.toUpperCase(); + } + + @Bean + public NewTopic kRequests() { + return TopicBuilder.name("kRequests") + .partitions(10) + .replicas(2) + .build(); + } + + @Bean // not required if Jackson is on the classpath + public MessagingMessageConverter simpleMapperConverter() { + MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter(); + messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper()); + return messagingMessageConverter; + } + +} +``` +上述@KafkaListener结构会返回correlation id,并且决定返回消息发送的topic。 + +并且,ReplyingKafkaTemplate会使用header(KafkaHeaders.REPLY_TOPIC)来代表返回消息发送到的topic。 + +从版本2.2开始,该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset,该topic或topicpartitionoffset将会被用于设置reply header;但是如果reply container被配置监听多个topic或topicpartitionoffset,用户必须自己手动设置header。 + +为消息设置header的示例如下: +```java +record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes())); +``` +若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时,可以多个template共享一个reply topic,只要每个template监听的分区不同。 + +若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时,那么每个template实例都必须拥有不同的group id,在这种情况下,所有的template实例都会接受到所有消息,但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便,但是所有的template都接收所有消息将会带来额外的网络通信开销,并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时,推荐将template中`sharedReplyTopic`属性设置为true,此时,将会把收到不想要消息的日志级别从ERROR降低为DEBUG。 + +如下是使用共享topic配置template的示例: +```java +@Bean +public ConcurrentMessageListenerContainer replyContainer( + ConcurrentKafkaListenerContainerFactory containerFactory) { + + ConcurrentMessageListenerContainer container = containerFactory.createContainer("topic2"); + container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique + Properties props = new Properties(); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies + container.getContainerProperties().setKafkaConsumerProperties(props); + return container; +} +``` + +默认情况下,ReplyingKafkaTemplate将会使用如下三种header: +- KafkaHeaders.CORRELATION_ID:用于关联发送消息和回复消息的关联id +- KafkaHeaders.REPLY_TOPIC:用于告诉接收消息的server将消息发送到哪个topic +- KafkaHeaders.REPLY_PARTITION:该header是可选的,可以通过该header告知server将消息发送到指定的分区 + +上述header将会被@KafkaListener结构用作返回消息的路由。 + +#### 通过Message来发送请求和返回请求 +在2.7版本中,对ReplyingKafkaTemplate加入了如下接口来发送和接收`spring-messaging`中的`Message`: +```java +RequestReplyMessageFuture sendAndReceive(Message message); + +

RequestReplyTypedMessageFuture sendAndReceive(Message message, + ParameterizedTypeReference

returnType); +``` +上述接口会使用template默认的replyTimeout,也存在接收timeout参数的重载版本。 + +如下示例展示了如何构建Message类型消息的发送和接收: +```java +// template 配置 +@Bean +ReplyingKafkaTemplate template( + ProducerFactory pf, + ConcurrentKafkaListenerContainerFactory factory) { + + ConcurrentMessageListenerContainer replyContainer = + factory.createContainer("replies"); + replyContainer.getContainerProperties().setGroupId("request.replies"); + ReplyingKafkaTemplate template = + new ReplyingKafkaTemplate<>(pf, replyContainer); + template.setMessageConverter(new ByteArrayJsonMessageConverter()); + template.setDefaultTopic("requests"); + return template; +} + +// 通过template发送和接收消息 +RequestReplyTypedMessageFuture future1 = + template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(), + new ParameterizedTypeReference() { }); +log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString()); +Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload(); +log.info(thing.toString()); + +RequestReplyTypedMessageFuture> future2 = + template.sendAndReceive(MessageBuilder.withPayload("getThings").build(), + new ParameterizedTypeReference>() { }); +log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString()); +List things = future2.get(10, TimeUnit.SECONDS).getPayload(); +things.forEach(thing1 -> log.info(thing1.toString())); + +// server端接收消息并且回复消息 +@KafkaListener(id = "requestor", topics = "request") +@SendTo +public Message messageReturn(String in) { + return MessageBuilder.withPayload(in.toUpperCase()) + .setHeader(KafkaHeaders.TOPIC, replyTo) + .setHeader(KafkaHeaders.KEY, 42) + .setHeader(KafkaHeaders.CORRELATION_ID, correlation) + .build(); +} +``` +从2.5版本开始,若是返回的Message中没有指定header,framework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC(如果存在的话)。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS,也会将其填充到返回消息的header中。 + > #### ErrorHandlingDeserializer > 可以考虑在reply container中使用ErrorHandlingDeserializer,如果反序列化失败,RequestReplyFuture将会以异常状态完成,可以访问获取到的ExecutionException,其cause属性中包含DeserializationException。