679 lines
36 KiB
Markdown
679 lines
36 KiB
Markdown
# Spring Kafka
|
||
## 连接到kafka
|
||
### 运行时切换bootstrap servers
|
||
从2.5版本开始,KafkaAdmin、ProducerFactory、ConsumerFactory都继承于`KafkaResourceFactory`抽象类。通过调用`KafkaResourceFactory`抽象类的`setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)`方法,可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。
|
||
|
||
> #### 切换bootstrap后关闭旧consumer和producer
|
||
> kafka consumer和producer通常都是基于长连接的,在调用setBootstrapServersSupplier在运行时切换bootstrap servers后,如果想要关闭现存的producer,可以调用`DefaultKafkaProducerFactory`的`reset`方法。如果想要关闭现存的consumer,可以调用`KafkaListenerEndpointRegistry`的`close`方法(调用close后再调用start),或是调用其他listener container的close和start方法。
|
||
|
||
#### ABSwitchCluster
|
||
为了方便起见,framework提供了`ABSwitchCluster`类,该类支持两套bootstrap servers集合,在任一时刻,只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier\<String\>接口,将`ABSwitchCluster`对象提供给consumer factory, producer factory, KafkaAdmin后,如果想要切换bootstrap servers,可以调用ABSwitchCluster类的`primary`和`secondary`方法,并关闭生产者和消费者的旧实例(关闭生产者旧实例,在producer factory上调用reset方法,用于创建到新bootstrap servers的连接;对于消费者实例,可以对所有listener container先调用close方法再调用start方法,当使用@KafkaListener注解时,需要对`KafkaListenerEndpointRegistry`bean对象调用close和start方法。
|
||
|
||
### Factory Listener
|
||
从2.5版本开始,`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`都可以配置Listener,通过配置Listener可以监听生产者或消费者实例的创建和关闭。
|
||
|
||
```java
|
||
// producer listener
|
||
interface Listener<K, V> {
|
||
|
||
default void producerAdded(String id, Producer<K, V> producer) {
|
||
}
|
||
|
||
default void producerRemoved(String id, Producer<K, V> producer) {
|
||
}
|
||
|
||
}
|
||
```
|
||
```java
|
||
// consumer listener
|
||
interface Listener<K, V> {
|
||
|
||
default void consumerAdded(String id, Consumer<K, V> consumer) {
|
||
}
|
||
|
||
default void consumerRemoved(String id, Consumer<K, V> consumer) {
|
||
}
|
||
|
||
}
|
||
```
|
||
再上述接口中,id代表再factory bean对象名称后追加client-id属性,二者通过`.`分隔。
|
||
|
||
## 配置Topic
|
||
如果在当前应用上下文中定义了KafkaAdmin bean对象,kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加,可以定义一个`NewTopic`类型的bean对象,kafkaAdmin会自动将该topic添加到broker中。
|
||
|
||
为了方便topic的创建,2.3版本中引入了TopicBuilder类。
|
||
```java
|
||
@Bean
|
||
public KafkaAdmin admin() {
|
||
Map<String, Object> configs = new HashMap<>();
|
||
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||
return new KafkaAdmin(configs);
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic1() {
|
||
return TopicBuilder.name("thing1")
|
||
.partitions(10)
|
||
.replicas(3)
|
||
.compact()
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic2() {
|
||
return TopicBuilder.name("thing2")
|
||
.partitions(10)
|
||
.replicas(3)
|
||
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic3() {
|
||
return TopicBuilder.name("thing3")
|
||
.assignReplicas(0, List.of(0, 1))
|
||
.assignReplicas(1, List.of(1, 2))
|
||
.assignReplicas(2, List.of(2, 0))
|
||
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
||
.build();
|
||
}
|
||
```
|
||
|
||
从2.6版本开始,创建NewTopic时可以省略partitions()和replicas()方法的调用,此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。
|
||
|
||
```java
|
||
@Bean
|
||
public NewTopic topic4() {
|
||
return TopicBuilder.name("defaultBoth")
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic5() {
|
||
return TopicBuilder.name("defaultPart")
|
||
.replicas(1)
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic6() {
|
||
return TopicBuilder.name("defaultRepl")
|
||
.partitions(3)
|
||
.build();
|
||
}
|
||
```
|
||
从版本2.7开始,可以在`KafkaAdmin.NewTopics`的bean对象中声明多个NewTopic对象:
|
||
```java
|
||
@Bean
|
||
public KafkaAdmin.NewTopics topics456() {
|
||
return new NewTopics(
|
||
TopicBuilder.name("defaultBoth")
|
||
.build(),
|
||
TopicBuilder.name("defaultPart")
|
||
.replicas(1)
|
||
.build(),
|
||
TopicBuilder.name("defaultRepl")
|
||
.partitions(3)
|
||
.build());
|
||
}
|
||
```
|
||
> 当使用spring boot时,KafkaAdmin对象将会被自动注册,故而只需要定义NewTopic bean对象即可。
|
||
|
||
默认情况下,如果kafka broker不可用,会输出日志进行记录,但是此时context的载入还会继续,后续可以手动调用KafkaAdmin的`initalize`方法和进行重试。如果想要在kafka broker不可用时,停止context的载入,可以将kafka Admin`fatalIfBrokerNotAvailable`属性设置为true,此时context会初始化失败。
|
||
|
||
从版本2.7开始,KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic:
|
||
- `createOrModifyTopics`
|
||
- `describeTopics`
|
||
|
||
从版本2.9.10、3.0.9开始,KafkaAdmin提供了`setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)`接口,该接口接收一个Predicate\<NewTopic\>参数,通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象,每个kafkaAdmin对应不同的broker集群,在上下文中含有多个NewTopic对象时,可以通过predicate判断每个topic应该属性哪个amdin。
|
||
|
||
## 发送消息
|
||
KafkaTemplate类对KafkaProducer进行了包装,提供了如下接口用于向kafka topic发送消息。
|
||
```java
|
||
CompletableFuture<SendResult<K, V>> sendDefault(V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(Message<?> message);
|
||
|
||
Map<MetricName, ? extends Metric> metrics();
|
||
|
||
List<PartitionInfo> partitionsFor(String topic);
|
||
|
||
<T> T execute(ProducerCallback<K, V, T> callback);
|
||
|
||
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
|
||
|
||
// Flush the producer.
|
||
void flush();
|
||
|
||
interface ProducerCallback<K, V, T> {
|
||
|
||
T doInKafka(Producer<K, V> producer);
|
||
|
||
}
|
||
|
||
interface OperationsCallback<K, V, T> {
|
||
|
||
T doInOperations(KafkaOperations<K, V> operations);
|
||
|
||
}
|
||
```
|
||
其中,sendDefault接口需要向KafkaTemplate提供一个默认的topic。
|
||
|
||
kafkaTemplate中部分api接收timestamp作为参数,并且将timestamp存储到record中。接口中指定的timestamp参数如何存储,取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为`CREATE_TIME`,那么用户指定的timestamp参数将会被使用(如果用户没有指定timestamp,那么会自动创建timestamp,producer会在发送时将timestamp指定为System.currentTimeMillis())。如果topic中timstamp类型被配置为`LOG_APPEND_TIME`,那么用户指定的timestamp将会被丢弃,而broker则会负责为timestamp赋值。
|
||
|
||
mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法,execute接口则是提供了对底层KafkaProducer的直接访问。
|
||
|
||
要使用KafkaTemplate,可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate:
|
||
```java
|
||
@Bean
|
||
public ProducerFactory<Integer, String> producerFactory() {
|
||
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
||
}
|
||
|
||
@Bean
|
||
public Map<String, Object> producerConfigs() {
|
||
Map<String, Object> props = new HashMap<>();
|
||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
|
||
return props;
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<Integer, String> kafkaTemplate() {
|
||
return new KafkaTemplate<Integer, String>(producerFactory());
|
||
}
|
||
```
|
||
从2.5开始,创建KafkaTemplate时可以基于factory进行创建,但是覆盖factory中的配置属性,具体示例如下:
|
||
```java
|
||
@Bean
|
||
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
|
||
return new KafkaTemplate<>(pf);
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
|
||
return new KafkaTemplate<>(pf,
|
||
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
|
||
}
|
||
```
|
||
|
||
当使用KafkaTemplate接收`Message\<?\>`类型的参数时,可以将topic、partition、key和timestamp参数指定在Message的header中,header中包含如下条目:
|
||
- KafkaHeaders.TOPIC
|
||
- KafkaHeaders.PARTITION
|
||
- KafkaHeaders.KEY
|
||
- KafkaHeaders.TIMESTAMP
|
||
|
||
除了调用发送方法获取CompletableFuture外,还可以为KafkaTemplate配置一个ProducerListener,从而在消息发送完成(成功或失败)后执行一个异步的回调。如下是ProducerListener接口的定义:
|
||
```java
|
||
public interface ProducerListener<K, V> {
|
||
|
||
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
|
||
|
||
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
|
||
Exception exception);
|
||
|
||
}
|
||
```
|
||
默认情况下,KafkaTemplate配置了一个LoggingProducerListener,会在发送失败时打印失败日志,在发送成功时并不做任何事。并且为了方便起见,方法的默认实现已经被提供,可以只覆盖其中一个方法。
|
||
|
||
send方法默认返回的是CompletableFuture类型,可以在发送完成之后为future注册一个回调:
|
||
```java
|
||
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
|
||
future.whenComplete((result, ex) -> {
|
||
...
|
||
});
|
||
```
|
||
其中,Throwable类型的ex可以被转化为`KafkaProducerException`,该类型的failedProducerRecord属性可以获取发送失败的record。
|
||
|
||
如果想要同步调用KafkaTemplate的发送方法并且等待返回结果,可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下,调用`CompletableFuture.get`时,推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`,那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便,KafkaTemplate提供了带autoFlush参数的构造器版本,如果设置autoFlush为true,kafkaTemplate在每次发送消息时都会调用flush方法。
|
||
|
||
### 发送示例
|
||
如下展示了通过KafkaTemplate向broker发送消息的示例:
|
||
```java
|
||
// async
|
||
public void sendToKafka(final MyOutputData data) {
|
||
final ProducerRecord<String, String> record = createRecord(data);
|
||
|
||
try {
|
||
template.send(record).get(10, TimeUnit.SECONDS);
|
||
handleSuccess(data);
|
||
}
|
||
catch (ExecutionException e) {
|
||
handleFailure(data, record, e.getCause());
|
||
}
|
||
catch (TimeoutException | InterruptedException e) {
|
||
handleFailure(data, record, e);
|
||
}
|
||
}
|
||
```
|
||
```java
|
||
// sync
|
||
public void sendToKafka(final MyOutputData data) {
|
||
final ProducerRecord<String, String> record = createRecord(data);
|
||
|
||
try {
|
||
template.send(record).get(10, TimeUnit.SECONDS);
|
||
handleSuccess(data);
|
||
}
|
||
catch (ExecutionException e) {
|
||
handleFailure(data, record, e.getCause());
|
||
}
|
||
catch (TimeoutException | InterruptedException e) {
|
||
handleFailure(data, record, e);
|
||
}
|
||
}
|
||
```
|
||
|
||
### RoutingKafkaTemplate
|
||
从2.5版本开始,额可以通过RoutingKafkaTemplate在运行时选择producer实例,选择过程基于topic名称。
|
||
|
||
> RoutingKafkaTemplate不支持事务,也不支持execute、flush、metrics等方法,因为RoutingKafkaTemplate根据topic来选择producer,但是在执行这些操作时并不知道操作所属topic。
|
||
|
||
RoutingKafkaTemplate需要一个map,map的key为`java.util.regex.Pattern`,而value则是`ProducerFactory<Object, Object>`实例。该map必须是有序的(例如LinkedHashMap),该map需要按顺序遍历,顺序在前的key-value对会优先匹配。
|
||
|
||
如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息,实例中每个topic都使用不同的序列化方式。
|
||
```java
|
||
@SpringBootApplication
|
||
public class Application {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(Application.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
|
||
ProducerFactory<Object, Object> pf) {
|
||
|
||
// Clone the PF with a different Serializer, register with Spring for shutdown
|
||
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
|
||
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
|
||
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
|
||
|
||
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
|
||
map.put(Pattern.compile("two"), bytesPF);
|
||
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
|
||
return new RoutingKafkaTemplate(map);
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
|
||
return args -> {
|
||
routingTemplate.send("one", "thing1");
|
||
routingTemplate.send("two", "thing2".getBytes());
|
||
};
|
||
}
|
||
|
||
}
|
||
```
|
||
### 使用DefaultKafkaProducerFactory
|
||
ProducerFactory是用于创建生产者实例的。当没有使用事务时,默认情况下,`DefaultKafkaFactory`会创建一个单例的生产者实例,所有客户端都会使用生产者实例。但是,如果在template中调用了flush方法,这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始,DefaultKafkaFactory有了新的`producerPerThread`属性,当该属性设置为true时,factory会针对每个线程都创建并缓存一个producer实例。
|
||
> 当`producerPerThread`被设置为true时,若线程中的producer不再被需要,那么对factory必须手动调用`closeThreadBoundProducer()`。这将会物理上对producer进行关闭,并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。
|
||
|
||
当创建DefaultKafkaFactory时,key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时,可以选择向构造函数中传入serializer实例或是传入serializer supplier对象:
|
||
- 当传入serializer实例时,通过该factory创建的所有生产者实例都共享该serializer实例
|
||
- 当传入的是返回一个serializer的supplier时,可令通过该factory创建的producer实例都拥有属于自己的serializer
|
||
|
||
如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例:
|
||
```java
|
||
@Bean
|
||
public ProducerFactory<Integer, CustomValue> producerFactory() {
|
||
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
|
||
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
|
||
}
|
||
```
|
||
|
||
从2.5.10版本开始,可以在factory创建之后再更新factory的producer config属性。例如,可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例,故而需要调用factory的reset方法,在调用reset后所有现存producer实例都会被关闭,而之后新创建的producer都会使用新的属性配置。
|
||
> 在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。
|
||
|
||
为了更新producer属性配置,factory提供了如下两个接口:
|
||
```java
|
||
void updateConfigs(Map<String, Object> updates);
|
||
|
||
void removeConfig(String configKey);
|
||
```
|
||
### ReplyingKafkaTemplate
|
||
从2.1.3版本开始,kafka引入了ReplyingKafkaTemplate,其是KafkaTemplate的一个子类,用于提供request/reply语义。该类相比父类含有两个额外的方法:
|
||
```java
|
||
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
|
||
|
||
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
|
||
Duration replyTimeout);
|
||
```
|
||
该方法的返回类型RequestReplyFuture继承了CompletableFuture,RequestReplyFuture会异步的注入该future的结果(可能正常返回,也可能是一个exception或者timeout)。
|
||
|
||
RequestReplyFuture含有一个sendFuture属性,该属性是调用kafkaTemplate的send方法发送消息的结果,类型为`CompletableFuture<SendResult<K,V>>`,可以通过该属性future来判断发送消息操作的结果。
|
||
|
||
如果在调用sendAndReceive方法时没有传递replyTimeout参数,或是指定replyTimeout参数为null,那么该template的`defaultReplyTimeout`属性将会被用作超时时间。默认情况下,该超时属性为5s。
|
||
|
||
从2.8.8版本开始,该template还有一个`waitForAssingment`方法。当reply container被配置为`auto.offset.reset=latest`时waitForAssingment方法相当有用,避免当reply container尚未初始化完成时,发送消息对应的reply已经返回了。
|
||
|
||
如下展示了如何使用ReplyingKafkaTemplate:
|
||
```java
|
||
@SpringBootApplication
|
||
public class KRequestingApplication {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(KRequestingApplication.class, args).close();
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
|
||
return args -> {
|
||
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
|
||
throw new IllegalStateException("Reply container did not initialize");
|
||
}
|
||
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
|
||
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
|
||
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
|
||
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
|
||
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
|
||
System.out.println("Return value: " + consumerRecord.value());
|
||
};
|
||
}
|
||
|
||
@Bean
|
||
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
|
||
ProducerFactory<String, String> pf,
|
||
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
|
||
|
||
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> repliesContainer =
|
||
containerFactory.createContainer("kReplies");
|
||
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
|
||
repliesContainer.setAutoStartup(false);
|
||
return repliesContainer;
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic kRequests() {
|
||
return TopicBuilder.name("kRequests")
|
||
.partitions(10)
|
||
.replicas(2)
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic kReplies() {
|
||
return TopicBuilder.name("kReplies")
|
||
.partitions(10)
|
||
.replicas(2)
|
||
.build();
|
||
}
|
||
|
||
}
|
||
```
|
||
在上述示例中采用了spring自动注入的containerFactory来创建reply container。
|
||
|
||
#### ReplyingKafkaTemplate header
|
||
在使用ReplyingKafkaHeader时,template会为消息设置一个header(KafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。
|
||
|
||
server端返回correlation id的示例如下:
|
||
```java
|
||
@SpringBootApplication
|
||
public class KReplyingApplication {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(KReplyingApplication.class, args);
|
||
}
|
||
|
||
@KafkaListener(id="server", topics = "kRequests")
|
||
@SendTo // use default replyTo expression
|
||
public String listen(String in) {
|
||
System.out.println("Server received: " + in);
|
||
return in.toUpperCase();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic kRequests() {
|
||
return TopicBuilder.name("kRequests")
|
||
.partitions(10)
|
||
.replicas(2)
|
||
.build();
|
||
}
|
||
|
||
@Bean // not required if Jackson is on the classpath
|
||
public MessagingMessageConverter simpleMapperConverter() {
|
||
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
|
||
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
|
||
return messagingMessageConverter;
|
||
}
|
||
|
||
}
|
||
```
|
||
上述@KafkaListener结构会返回correlation id,并且决定返回消息发送的topic。
|
||
|
||
并且,ReplyingKafkaTemplate会使用header(KafkaHeaders.REPLY_TOPIC)来代表返回消息发送到的topic。
|
||
|
||
从版本2.2开始,该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset,该topic或topicpartitionoffset将会被用于设置reply header;但是如果reply container被配置监听多个topic或topicpartitionoffset,用户必须自己手动设置header。
|
||
|
||
为消息设置header的示例如下:
|
||
```java
|
||
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
|
||
```
|
||
若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时,可以多个template共享一个reply topic,只要每个template监听的分区不同。
|
||
|
||
若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时,那么每个template实例都必须拥有不同的group id,在这种情况下,所有的template实例都会接受到所有消息,但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便,但是所有的template都接收所有消息将会带来额外的网络通信开销,并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时,推荐将template中`sharedReplyTopic`属性设置为true,此时,将会把收到不想要消息的日志级别从ERROR降低为DEBUG。
|
||
|
||
如下是使用共享topic配置template的示例:
|
||
```java
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, String> replyContainer(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
|
||
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
|
||
Properties props = new Properties();
|
||
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
|
||
container.getContainerProperties().setKafkaConsumerProperties(props);
|
||
return container;
|
||
}
|
||
```
|
||
|
||
默认情况下,ReplyingKafkaTemplate将会使用如下三种header:
|
||
- KafkaHeaders.CORRELATION_ID:用于关联发送消息和回复消息的关联id
|
||
- KafkaHeaders.REPLY_TOPIC:用于告诉接收消息的server将消息发送到哪个topic
|
||
- KafkaHeaders.REPLY_PARTITION:该header是可选的,可以通过该header告知server将消息发送到指定的分区
|
||
|
||
上述header将会被@KafkaListener结构用作返回消息的路由。
|
||
|
||
#### 通过Message<?>来发送请求和返回请求
|
||
在2.7版本中,对ReplyingKafkaTemplate加入了如下接口来发送和接收`spring-messaging`中的`Message<?>`:
|
||
```java
|
||
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
|
||
|
||
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
|
||
ParameterizedTypeReference<P> returnType);
|
||
```
|
||
上述接口会使用template默认的replyTimeout,也存在接收timeout参数的重载版本。
|
||
|
||
如下示例展示了如何构建Message类型消息的发送和接收:
|
||
```java
|
||
// template 配置
|
||
@Bean
|
||
ReplyingKafkaTemplate<String, String, String> template(
|
||
ProducerFactory<String, String> pf,
|
||
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> replyContainer =
|
||
factory.createContainer("replies");
|
||
replyContainer.getContainerProperties().setGroupId("request.replies");
|
||
ReplyingKafkaTemplate<String, String, String> template =
|
||
new ReplyingKafkaTemplate<>(pf, replyContainer);
|
||
template.setMessageConverter(new ByteArrayJsonMessageConverter());
|
||
template.setDefaultTopic("requests");
|
||
return template;
|
||
}
|
||
|
||
// 通过template发送和接收消息
|
||
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
|
||
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
|
||
new ParameterizedTypeReference<Thing>() { });
|
||
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
|
||
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
|
||
log.info(thing.toString());
|
||
|
||
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
|
||
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
|
||
new ParameterizedTypeReference<List<Thing>>() { });
|
||
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
|
||
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
|
||
things.forEach(thing1 -> log.info(thing1.toString()));
|
||
|
||
// server端接收消息并且回复消息
|
||
@KafkaListener(id = "requestor", topics = "request")
|
||
@SendTo
|
||
public Message<?> messageReturn(String in) {
|
||
return MessageBuilder.withPayload(in.toUpperCase())
|
||
.setHeader(KafkaHeaders.TOPIC, replyTo)
|
||
.setHeader(KafkaHeaders.KEY, 42)
|
||
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
|
||
.build();
|
||
}
|
||
```
|
||
从2.5版本开始,若是返回的Message中没有指定header,framework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC(如果存在的话)。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS,也会将其填充到返回消息的header中。
|
||
|
||
> #### ErrorHandlingDeserializer
|
||
> 可以考虑在reply container中使用ErrorHandlingDeserializer,如果反序列化失败,RequestReplyFuture将会以异常状态完成,可以访问获取到的ExecutionException,其cause属性中包含DeserializationException。
|
||
|
||
### kafka poison pill & ErrorHandlingDeserializer
|
||
poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败,不管重试过多少次之后仍然无法成功被消费。
|
||
|
||
poison pill可能在如下场景下产生:
|
||
- 该记录被损坏
|
||
- 该记录发序列化失败
|
||
|
||
在生产场景中,consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容,将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。
|
||
|
||
在现实场景中,可能因为如下缘故而遭遇poison pill:
|
||
- 生产者改变了key或value的serializer并且持续向先前的topic中发送消息,这将会导致反序列化问题
|
||
- consumer的key或value deserializer配置错误
|
||
- 不同的生产者实例,使用不同的key或value serializer向topic中发送消息
|
||
|
||
在发生poison后,consumer在调用poll拉取数据时将无法反序列化record,调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理,针对该topic分区的消费会被阻塞(因为consumer offset一直无法向前移动)。并且,在consumer不停重试针对该消息的反序列化时,大量的反序列化失败日志将会被追加到日志文件中,磁盘占用量将会急剧增大。
|
||
|
||
#### ErrorHandlingDeserializer
|
||
为了解决poison pill问题,spring引入了ErrorHandlingDeserializer,该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败,那么ErrorHandlingDeserializer将会返回一个null,并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。
|
||
|
||
#### replyErrorChecker
|
||
从版本2.6.7开始,可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker,当提供了checker方法时,template会自动调用该方法,如果该方法抛出异常,那么该reply message对应的future也会以失败状态完成。
|
||
|
||
replychecker使用示例如下:
|
||
```java
|
||
template.setReplyErrorChecker(record -> {
|
||
Header error = record.headers().lastHeader("serverSentAnError");
|
||
if (error != null) {
|
||
return new MyException(new String(error.value()));
|
||
}
|
||
else {
|
||
return null;
|
||
}
|
||
});
|
||
|
||
...
|
||
|
||
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
|
||
try {
|
||
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
|
||
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
|
||
...
|
||
}
|
||
catch (InterruptedException e) {
|
||
...
|
||
}
|
||
catch (ExecutionException e) {
|
||
if (e.getCause instanceof MyException) {
|
||
...
|
||
}
|
||
}
|
||
catch (TimeoutException e) {
|
||
...
|
||
}
|
||
```
|
||
|
||
### AggregatingReplyingKafkaTemplate
|
||
ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景,但如果对于发送一条消息,存在多个接收方,会返回多条消息的场景,则需要使用AggregatingReplyingKafkaTemplate。
|
||
|
||
像ReplyingKafkaTemplate一样,AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener container,listener container用于接收返回的消息。除此之外,AggregatingReplyingKafkaTemplate还会接收第三个参数`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`,该断言每次在接收到新消息时都会被计算。如果断言返回true,该`AggregatingReplyingKafkaTemplate.sendAndReceive`方法返回的Future对象将会被完成,并且Future中的值为断言中ConsumerRecord的集合。
|
||
|
||
从版本2.3.5开始,第三个参数为`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`类型,其会在每次接收到消息或是超时(replyTimeout超时)的情况下被调用,第二个参数传入的boolean即是断言的这次调用是否因为超时。**该断言可以针对ConsumerRecord进行修改**。
|
||
|
||
> #### returnPartialOnTimeout
|
||
> AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout,该属性值默认为false,如果该值被设置为true,那么当请求发生超时时,会返回已经接收到的部分ConsumerRecord集合。
|
||
>
|
||
> BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息,要想成功在超时场景下返回接收到的部分消息,不仅需要returnPartialOnTimeout设置为true,还需要BiPredicate断言在发生timeout的情况下返回值为true。
|
||
|
||
AggregatingReplyingKafkaTemplate使用示例如下:
|
||
```java
|
||
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
|
||
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
|
||
(coll, timeout) -> timeout || coll.size() == releaseSize);
|
||
...
|
||
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
|
||
template.sendAndReceive(record);
|
||
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
|
||
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
|
||
future.get(30, TimeUnit.SECONDS);
|
||
```
|
||
|
||
注意sendAndReceive方法返回的future,其值类型为`ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`类型,外层的ConsumerRecord并不是真正的返回消息,而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的,外层ConsumerRecord用于存储实际接收到的消息集合。
|
||
|
||
> #### ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>
|
||
> `ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`作为聚合返回消息集合的伪ConsumerRecord,其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal release(releaseStrategy返回为true)时,伪ConsumerRecord的topic name为`aggregatedResults`.
|
||
>
|
||
> 当获取该伪ConsumerRecord的原因是timeout(returnPartialOnTimeout被设置为true,并且发生timeout,并且至少获取到一条返回消息),那么伪ConsumerRecord的topic name将会被设置为`partialResultsAfterTimeout`.
|
||
|
||
template为上述伪topic name提供了静态变量:
|
||
```java
|
||
/**
|
||
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
|
||
* results in its value after a normal release by the release strategy.
|
||
*/
|
||
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
|
||
|
||
/**
|
||
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
|
||
* results in its value after a timeout.
|
||
*/
|
||
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
|
||
```
|
||
|
||
伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。
|
||
|
||
#### AggregatingReplyingKafkaTemplate配置要求
|
||
listener container必须要配置为`AckMode.MANUAL`模式或`AckMode.MANUAL_IMMEDIATE`;consumer属性`enable.auto.commit`必须被设置伪false。为了避免任何丢失消息的可能,template只会在没有待处理请求的前提下提交offset,即最后一个未处理请求被releaseStrategy释放。
|
||
|
||
当consumer发生rebalance时,可能会造成返回消息被重复传递(由于template只会在没有待处理请求的情况下提交offset,如果在listener container接收到消息后,尚未提交offset,此时发生rebalance,那么未提交offset的消息将会被重复接收)。对于在途的请求,消息的重复传递将会被忽略(在途的消息还会存在多条消息聚合的过程);针对已经被releaseStrategy释放的返回消息,如果接收到多条重复的返回消息,那么会在log中看到error日志。
|
||
|
||
另外,如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`,那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null,并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。
|
||
|
||
对于AggregatingReplyingKafkaTemplate,`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。
|