阅读kafka 生产者发送消息文档

This commit is contained in:
asahi
2023-12-28 23:55:21 +08:00
parent 308e53cce7
commit 40ffdb90b8

View File

@@ -433,6 +433,134 @@ public class KRequestingApplication {
```
在上述示例中采用了spring自动注入的containerFactory来创建reply container。
#### ReplyingKafkaTemplate header
在使用ReplyingKafkaHeader时template会为消息设置一个headerKafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。
server端返回correlation id的示例如下
```java
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
```
上述@KafkaListener结构会返回correlation id并且决定返回消息发送的topic。
并且ReplyingKafkaTemplate会使用headerKafkaHeaders.REPLY_TOPIC来代表返回消息发送到的topic。
从版本2.2开始该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset该topic或topicpartitionoffset将会被用于设置reply header但是如果reply container被配置监听多个topic或topicpartitionoffset用户必须自己手动设置header。
为消息设置header的示例如下
```java
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
```
若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时可以多个template共享一个reply topic只要每个template监听的分区不同。
若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时那么每个template实例都必须拥有不同的group id在这种情况下所有的template实例都会接受到所有消息但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便但是所有的template都接收所有消息将会带来额外的网络通信开销并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时推荐将template中`sharedReplyTopic`属性设置为true此时将会把收到不想要消息的日志级别从ERROR降低为DEBUG。
如下是使用共享topic配置template的示例
```java
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
```
默认情况下ReplyingKafkaTemplate将会使用如下三种header
- KafkaHeaders.CORRELATION_ID用于关联发送消息和回复消息的关联id
- KafkaHeaders.REPLY_TOPIC用于告诉接收消息的server将消息发送到哪个topic
- KafkaHeaders.REPLY_PARTITION该header是可选的可以通过该header告知server将消息发送到指定的分区
上述header将会被@KafkaListener结构用作返回消息的路由
#### 通过Message<?>来发送请求和返回请求
在2.7版本中对ReplyingKafkaTemplate加入了如下接口来发送和接收`spring-messaging`中的`Message<?>`
```java
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
```
上述接口会使用template默认的replyTimeout也存在接收timeout参数的重载版本。
如下示例展示了如何构建Message类型消息的发送和接收
```java
// template 配置
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
// 通过template发送和接收消息
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
// server端接收消息并且回复消息
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
```
从2.5版本开始若是返回的Message中没有指定headerframework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC如果存在的话。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS也会将其填充到返回消息的header中。
> #### ErrorHandlingDeserializer
> 可以考虑在reply container中使用ErrorHandlingDeserializer如果反序列化失败RequestReplyFuture将会以异常状态完成可以访问获取到的ExecutionException其cause属性中包含DeserializationException。