2395 lines
125 KiB
Markdown
2395 lines
125 KiB
Markdown
- [Spring Kafka](#spring-kafka)
|
||
- [连接到kafka](#连接到kafka)
|
||
- [运行时切换bootstrap servers](#运行时切换bootstrap-servers)
|
||
- [ABSwitchCluster](#abswitchcluster)
|
||
- [Factory Listener](#factory-listener)
|
||
- [配置Topic](#配置topic)
|
||
- [发送消息](#发送消息)
|
||
- [发送示例](#发送示例)
|
||
- [RoutingKafkaTemplate](#routingkafkatemplate)
|
||
- [使用DefaultKafkaProducerFactory](#使用defaultkafkaproducerfactory)
|
||
- [ReplyingKafkaTemplate](#replyingkafkatemplate)
|
||
- [ReplyingKafkaTemplate header](#replyingkafkatemplate-header)
|
||
- [通过Message\<?\>来发送请求和返回请求](#通过message来发送请求和返回请求)
|
||
- [kafka poison pill \& ErrorHandlingDeserializer](#kafka-poison-pill--errorhandlingdeserializer)
|
||
- [ErrorHandlingDeserializer](#errorhandlingdeserializer)
|
||
- [replyErrorChecker](#replyerrorchecker)
|
||
- [AggregatingReplyingKafkaTemplate](#aggregatingreplyingkafkatemplate)
|
||
- [AggregatingReplyingKafkaTemplate配置要求](#aggregatingreplyingkafkatemplate配置要求)
|
||
- [接收消息](#接收消息)
|
||
- [Message Listener](#message-listener)
|
||
- [MessageListenerContainer](#messagelistenercontainer)
|
||
- [Interceptors](#interceptors)
|
||
- [使用KafkaMessageListenerContainer](#使用kafkamessagelistenercontainer)
|
||
- [使用ConcurrentMessageListenerContainer](#使用concurrentmessagelistenercontainer)
|
||
- [Committing Offsets](#committing-offsets)
|
||
- [手动提交offset](#手动提交offset)
|
||
- [@KafkaListener](#kafkalistener)
|
||
- [Record Listener](#record-listener)
|
||
- [显式分区分配](#显式分区分配)
|
||
- [手动ack](#手动ack)
|
||
- [Consumer Record Metadata](#consumer-record-metadata)
|
||
- [Batch Listener](#batch-listener)
|
||
- [注解属性](#注解属性)
|
||
- [获取consumer的`group.id`](#获取consumer的groupid)
|
||
- [container线程命名](#container线程命名)
|
||
- [@KafkaListener用作元注解](#kafkalistener用作元注解)
|
||
- [在类级别使用@KafkaListener](#在类级别使用kafkalistener)
|
||
- [@KafkaListener属性修改](#kafkalistener属性修改)
|
||
- [@KafkaListener生命周期管理](#kafkalistener生命周期管理)
|
||
- [@KafkaListener @Payload校验](#kafkalistener-payload校验)
|
||
- [Rebalancing Listener](#rebalancing-listener)
|
||
- [强制触发consumer rebalance](#强制触发consumer-rebalance)
|
||
- [通过@SendTo注解发送listener结果](#通过sendto注解发送listener结果)
|
||
- [过滤消息](#过滤消息)
|
||
- [通过KafkaTemplate来接收消息](#通过kafkatemplate来接收消息)
|
||
- [动态创建container](#动态创建container)
|
||
- [MessageListener实现](#messagelistener实现)
|
||
- [Prototype Beans](#prototype-beans)
|
||
- [Topic/Partition初始offset](#topicpartition初始offset)
|
||
- [seek到指定offset](#seek到指定offset)
|
||
- [Container Factory](#container-factory)
|
||
- [线程安全](#线程安全)
|
||
- [监控](#监控)
|
||
- [监控listener性能](#监控listener性能)
|
||
- [监控KafkaTemplate性能](#监控kafkatemplate性能)
|
||
- [Micrometer Native Metrics](#micrometer-native-metrics)
|
||
- [事务](#事务)
|
||
- [使用KafkaTransactionManager](#使用kafkatransactionmanager)
|
||
- [事务同步](#事务同步)
|
||
- [使用由Consumer发起的事务](#使用由consumer发起的事务)
|
||
- [KafkaTemplate本地事务](#kafkatemplate本地事务)
|
||
- [KafkaTemplate事务发布和非事务发布](#kafkatemplate事务发布和非事务发布)
|
||
- [事务结合BatchListener使用](#事务结合batchlistener使用)
|
||
- [Exactly Once语义](#exactly-once语义)
|
||
- [Pausing and Resuming Listener Containers](#pausing-and-resuming-listener-containers)
|
||
- [异常处理](#异常处理)
|
||
- [Listener Error Handler](#listener-error-handler)
|
||
- [Container Handler](#container-handler)
|
||
|
||
|
||
# Spring Kafka
|
||
## 连接到kafka
|
||
### 运行时切换bootstrap servers
|
||
从2.5版本开始,KafkaAdmin、ProducerFactory、ConsumerFactory都继承于`KafkaResourceFactory`抽象类。通过调用`KafkaResourceFactory`抽象类的`setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)`方法,可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。
|
||
|
||
> #### 切换bootstrap后关闭旧consumer和producer
|
||
> kafka consumer和producer通常都是基于长连接的,在调用setBootstrapServersSupplier在运行时切换bootstrap servers后,如果想要关闭现存的producer,可以调用`DefaultKafkaProducerFactory`的`reset`方法。如果想要关闭现存的consumer,可以调用`KafkaListenerEndpointRegistry`的`close`方法(调用close后再调用start),或是调用其他listener container的close和start方法。
|
||
|
||
#### ABSwitchCluster
|
||
为了方便起见,framework提供了`ABSwitchCluster`类,该类支持两套bootstrap servers集合,在任一时刻,只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier\<String\>接口,将`ABSwitchCluster`对象提供给consumer factory, producer factory, KafkaAdmin后,如果想要切换bootstrap servers,可以调用ABSwitchCluster类的`primary`和`secondary`方法,并关闭生产者和消费者的旧实例(关闭生产者旧实例,在producer factory上调用reset方法,用于创建到新bootstrap servers的连接;对于消费者实例,可以对所有listener container先调用close方法再调用start方法,当使用@KafkaListener注解时,需要对`KafkaListenerEndpointRegistry`bean对象调用close和start方法。
|
||
|
||
### Factory Listener
|
||
从2.5版本开始,`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`都可以配置Listener,通过配置Listener可以监听生产者或消费者实例的创建和关闭。
|
||
|
||
```java
|
||
// producer listener
|
||
interface Listener<K, V> {
|
||
|
||
default void producerAdded(String id, Producer<K, V> producer) {
|
||
}
|
||
|
||
default void producerRemoved(String id, Producer<K, V> producer) {
|
||
}
|
||
|
||
}
|
||
```
|
||
```java
|
||
// consumer listener
|
||
interface Listener<K, V> {
|
||
|
||
default void consumerAdded(String id, Consumer<K, V> consumer) {
|
||
}
|
||
|
||
default void consumerRemoved(String id, Consumer<K, V> consumer) {
|
||
}
|
||
|
||
}
|
||
```
|
||
再上述接口中,id代表再factory bean对象名称后追加client-id属性,二者通过`.`分隔。
|
||
|
||
## 配置Topic
|
||
如果在当前应用上下文中定义了KafkaAdmin bean对象,kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加,可以定义一个`NewTopic`类型的bean对象,kafkaAdmin会自动将该topic添加到broker中。
|
||
|
||
为了方便topic的创建,2.3版本中引入了TopicBuilder类。
|
||
```java
|
||
@Bean
|
||
public KafkaAdmin admin() {
|
||
Map<String, Object> configs = new HashMap<>();
|
||
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||
return new KafkaAdmin(configs);
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic1() {
|
||
return TopicBuilder.name("thing1")
|
||
.partitions(10)
|
||
.replicas(3)
|
||
.compact()
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic2() {
|
||
return TopicBuilder.name("thing2")
|
||
.partitions(10)
|
||
.replicas(3)
|
||
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic3() {
|
||
return TopicBuilder.name("thing3")
|
||
.assignReplicas(0, List.of(0, 1))
|
||
.assignReplicas(1, List.of(1, 2))
|
||
.assignReplicas(2, List.of(2, 0))
|
||
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
||
.build();
|
||
}
|
||
```
|
||
|
||
从2.6版本开始,创建NewTopic时可以省略partitions()和replicas()方法的调用,此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。
|
||
|
||
```java
|
||
@Bean
|
||
public NewTopic topic4() {
|
||
return TopicBuilder.name("defaultBoth")
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic5() {
|
||
return TopicBuilder.name("defaultPart")
|
||
.replicas(1)
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic6() {
|
||
return TopicBuilder.name("defaultRepl")
|
||
.partitions(3)
|
||
.build();
|
||
}
|
||
```
|
||
从版本2.7开始,可以在`KafkaAdmin.NewTopics`的bean对象中声明多个NewTopic对象:
|
||
```java
|
||
@Bean
|
||
public KafkaAdmin.NewTopics topics456() {
|
||
return new NewTopics(
|
||
TopicBuilder.name("defaultBoth")
|
||
.build(),
|
||
TopicBuilder.name("defaultPart")
|
||
.replicas(1)
|
||
.build(),
|
||
TopicBuilder.name("defaultRepl")
|
||
.partitions(3)
|
||
.build());
|
||
}
|
||
```
|
||
> 当使用spring boot时,KafkaAdmin对象将会被自动注册,故而只需要定义NewTopic bean对象即可。
|
||
|
||
默认情况下,如果kafka broker不可用,会输出日志进行记录,但是此时context的载入还会继续,后续可以手动调用KafkaAdmin的`initalize`方法和进行重试。如果想要在kafka broker不可用时,停止context的载入,可以将kafka Admin`fatalIfBrokerNotAvailable`属性设置为true,此时context会初始化失败。
|
||
|
||
从版本2.7开始,KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic:
|
||
- `createOrModifyTopics`
|
||
- `describeTopics`
|
||
|
||
从版本2.9.10、3.0.9开始,KafkaAdmin提供了`setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)`接口,该接口接收一个Predicate\<NewTopic\>参数,通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象,每个kafkaAdmin对应不同的broker集群,在上下文中含有多个NewTopic对象时,可以通过predicate判断每个topic应该属性哪个amdin。
|
||
|
||
## 发送消息
|
||
KafkaTemplate类对KafkaProducer进行了包装,提供了如下接口用于向kafka topic发送消息。
|
||
```java
|
||
CompletableFuture<SendResult<K, V>> sendDefault(V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
|
||
|
||
CompletableFuture<SendResult<K, V>> send(Message<?> message);
|
||
|
||
Map<MetricName, ? extends Metric> metrics();
|
||
|
||
List<PartitionInfo> partitionsFor(String topic);
|
||
|
||
<T> T execute(ProducerCallback<K, V, T> callback);
|
||
|
||
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
|
||
|
||
// Flush the producer.
|
||
void flush();
|
||
|
||
interface ProducerCallback<K, V, T> {
|
||
|
||
T doInKafka(Producer<K, V> producer);
|
||
|
||
}
|
||
|
||
interface OperationsCallback<K, V, T> {
|
||
|
||
T doInOperations(KafkaOperations<K, V> operations);
|
||
|
||
}
|
||
```
|
||
其中,sendDefault接口需要向KafkaTemplate提供一个默认的topic。
|
||
|
||
kafkaTemplate中部分api接收timestamp作为参数,并且将timestamp存储到record中。接口中指定的timestamp参数如何存储,取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为`CREATE_TIME`,那么用户指定的timestamp参数将会被使用(如果用户没有指定timestamp,那么会自动创建timestamp,producer会在发送时将timestamp指定为System.currentTimeMillis())。如果topic中timstamp类型被配置为`LOG_APPEND_TIME`,那么用户指定的timestamp将会被丢弃,而broker则会负责为timestamp赋值。
|
||
|
||
mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法,execute接口则是提供了对底层KafkaProducer的直接访问。
|
||
|
||
要使用KafkaTemplate,可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate:
|
||
```java
|
||
@Bean
|
||
public ProducerFactory<Integer, String> producerFactory() {
|
||
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
||
}
|
||
|
||
@Bean
|
||
public Map<String, Object> producerConfigs() {
|
||
Map<String, Object> props = new HashMap<>();
|
||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
|
||
return props;
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<Integer, String> kafkaTemplate() {
|
||
return new KafkaTemplate<Integer, String>(producerFactory());
|
||
}
|
||
```
|
||
从2.5开始,创建KafkaTemplate时可以基于factory进行创建,但是覆盖factory中的配置属性,具体示例如下:
|
||
```java
|
||
@Bean
|
||
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
|
||
return new KafkaTemplate<>(pf);
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
|
||
return new KafkaTemplate<>(pf,
|
||
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
|
||
}
|
||
```
|
||
|
||
当使用KafkaTemplate接收`Message\<?\>`类型的参数时,可以将topic、partition、key和timestamp参数指定在Message的header中,header中包含如下条目:
|
||
- KafkaHeaders.TOPIC
|
||
- KafkaHeaders.PARTITION
|
||
- KafkaHeaders.KEY
|
||
- KafkaHeaders.TIMESTAMP
|
||
|
||
除了调用发送方法获取CompletableFuture外,还可以为KafkaTemplate配置一个ProducerListener,从而在消息发送完成(成功或失败)后执行一个异步的回调。如下是ProducerListener接口的定义:
|
||
```java
|
||
public interface ProducerListener<K, V> {
|
||
|
||
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
|
||
|
||
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
|
||
Exception exception);
|
||
|
||
}
|
||
```
|
||
默认情况下,KafkaTemplate配置了一个LoggingProducerListener,会在发送失败时打印失败日志,在发送成功时并不做任何事。并且为了方便起见,方法的默认实现已经被提供,可以只覆盖其中一个方法。
|
||
|
||
send方法默认返回的是CompletableFuture类型,可以在发送完成之后为future注册一个回调:
|
||
```java
|
||
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
|
||
future.whenComplete((result, ex) -> {
|
||
...
|
||
});
|
||
```
|
||
其中,Throwable类型的ex可以被转化为`KafkaProducerException`,该类型的failedProducerRecord属性可以获取发送失败的record。
|
||
|
||
如果想要同步调用KafkaTemplate的发送方法并且等待返回结果,可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下,调用`CompletableFuture.get`时,推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`,那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便,KafkaTemplate提供了带autoFlush参数的构造器版本,如果设置autoFlush为true,kafkaTemplate在每次发送消息时都会调用flush方法。
|
||
|
||
### 发送示例
|
||
如下展示了通过KafkaTemplate向broker发送消息的示例:
|
||
```java
|
||
// async
|
||
public void sendToKafka(final MyOutputData data) {
|
||
final ProducerRecord<String, String> record = createRecord(data);
|
||
|
||
try {
|
||
template.send(record).get(10, TimeUnit.SECONDS);
|
||
handleSuccess(data);
|
||
}
|
||
catch (ExecutionException e) {
|
||
handleFailure(data, record, e.getCause());
|
||
}
|
||
catch (TimeoutException | InterruptedException e) {
|
||
handleFailure(data, record, e);
|
||
}
|
||
}
|
||
```
|
||
```java
|
||
// sync
|
||
public void sendToKafka(final MyOutputData data) {
|
||
final ProducerRecord<String, String> record = createRecord(data);
|
||
|
||
try {
|
||
template.send(record).get(10, TimeUnit.SECONDS);
|
||
handleSuccess(data);
|
||
}
|
||
catch (ExecutionException e) {
|
||
handleFailure(data, record, e.getCause());
|
||
}
|
||
catch (TimeoutException | InterruptedException e) {
|
||
handleFailure(data, record, e);
|
||
}
|
||
}
|
||
```
|
||
|
||
### RoutingKafkaTemplate
|
||
从2.5版本开始,额可以通过RoutingKafkaTemplate在运行时选择producer实例,选择过程基于topic名称。
|
||
|
||
> RoutingKafkaTemplate不支持事务,也不支持execute、flush、metrics等方法,因为RoutingKafkaTemplate根据topic来选择producer,但是在执行这些操作时并不知道操作所属topic。
|
||
|
||
RoutingKafkaTemplate需要一个map,map的key为`java.util.regex.Pattern`,而value则是`ProducerFactory<Object, Object>`实例。该map必须是有序的(例如LinkedHashMap),该map需要按顺序遍历,顺序在前的key-value对会优先匹配。
|
||
|
||
如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息,实例中每个topic都使用不同的序列化方式。
|
||
```java
|
||
@SpringBootApplication
|
||
public class Application {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(Application.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
|
||
ProducerFactory<Object, Object> pf) {
|
||
|
||
// Clone the PF with a different Serializer, register with Spring for shutdown
|
||
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
|
||
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
|
||
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
|
||
|
||
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
|
||
map.put(Pattern.compile("two"), bytesPF);
|
||
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
|
||
return new RoutingKafkaTemplate(map);
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
|
||
return args -> {
|
||
routingTemplate.send("one", "thing1");
|
||
routingTemplate.send("two", "thing2".getBytes());
|
||
};
|
||
}
|
||
|
||
}
|
||
```
|
||
### 使用DefaultKafkaProducerFactory
|
||
ProducerFactory是用于创建生产者实例的。当没有使用事务时,默认情况下,`DefaultKafkaFactory`会创建一个单例的生产者实例,所有客户端都会使用生产者实例。但是,如果在template中调用了flush方法,这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始,DefaultKafkaFactory有了新的`producerPerThread`属性,当该属性设置为true时,factory会针对每个线程都创建并缓存一个producer实例。
|
||
> 当`producerPerThread`被设置为true时,若线程中的producer不再被需要,那么对factory必须手动调用`closeThreadBoundProducer()`。这将会物理上对producer进行关闭,并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。
|
||
|
||
当创建DefaultKafkaFactory时,key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时,可以选择向构造函数中传入serializer实例或是传入serializer supplier对象:
|
||
- 当传入serializer实例时,通过该factory创建的所有生产者实例都共享该serializer实例
|
||
- 当传入的是返回一个serializer的supplier时,可令通过该factory创建的producer实例都拥有属于自己的serializer
|
||
|
||
如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例:
|
||
```java
|
||
@Bean
|
||
public ProducerFactory<Integer, CustomValue> producerFactory() {
|
||
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
|
||
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
|
||
}
|
||
```
|
||
|
||
从2.5.10版本开始,可以在factory创建之后再更新factory的producer config属性。例如,可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例,故而需要调用factory的reset方法,在调用reset后所有现存producer实例都会被关闭,而之后新创建的producer都会使用新的属性配置。
|
||
> 在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。
|
||
|
||
为了更新producer属性配置,factory提供了如下两个接口:
|
||
```java
|
||
void updateConfigs(Map<String, Object> updates);
|
||
|
||
void removeConfig(String configKey);
|
||
```
|
||
### ReplyingKafkaTemplate
|
||
从2.1.3版本开始,kafka引入了ReplyingKafkaTemplate,其是KafkaTemplate的一个子类,用于提供request/reply语义。该类相比父类含有两个额外的方法:
|
||
```java
|
||
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
|
||
|
||
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
|
||
Duration replyTimeout);
|
||
```
|
||
该方法的返回类型RequestReplyFuture继承了CompletableFuture,RequestReplyFuture会异步的注入该future的结果(可能正常返回,也可能是一个exception或者timeout)。
|
||
|
||
RequestReplyFuture含有一个sendFuture属性,该属性是调用kafkaTemplate的send方法发送消息的结果,类型为`CompletableFuture<SendResult<K,V>>`,可以通过该属性future来判断发送消息操作的结果。
|
||
|
||
如果在调用sendAndReceive方法时没有传递replyTimeout参数,或是指定replyTimeout参数为null,那么该template的`defaultReplyTimeout`属性将会被用作超时时间。默认情况下,该超时属性为5s。
|
||
|
||
从2.8.8版本开始,该template还有一个`waitForAssingment`方法。当reply container被配置为`auto.offset.reset=latest`时waitForAssingment方法相当有用,避免当reply container尚未初始化完成时,发送消息对应的reply已经返回了。
|
||
|
||
如下展示了如何使用ReplyingKafkaTemplate:
|
||
```java
|
||
@SpringBootApplication
|
||
public class KRequestingApplication {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(KRequestingApplication.class, args).close();
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
|
||
return args -> {
|
||
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
|
||
throw new IllegalStateException("Reply container did not initialize");
|
||
}
|
||
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
|
||
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
|
||
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
|
||
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
|
||
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
|
||
System.out.println("Return value: " + consumerRecord.value());
|
||
};
|
||
}
|
||
|
||
@Bean
|
||
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
|
||
ProducerFactory<String, String> pf,
|
||
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
|
||
|
||
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> repliesContainer =
|
||
containerFactory.createContainer("kReplies");
|
||
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
|
||
repliesContainer.setAutoStartup(false);
|
||
return repliesContainer;
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic kRequests() {
|
||
return TopicBuilder.name("kRequests")
|
||
.partitions(10)
|
||
.replicas(2)
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic kReplies() {
|
||
return TopicBuilder.name("kReplies")
|
||
.partitions(10)
|
||
.replicas(2)
|
||
.build();
|
||
}
|
||
|
||
}
|
||
```
|
||
在上述示例中采用了spring自动注入的containerFactory来创建reply container。
|
||
|
||
#### ReplyingKafkaTemplate header
|
||
在使用ReplyingKafkaHeader时,template会为消息设置一个header(KafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。
|
||
|
||
server端返回correlation id的示例如下:
|
||
```java
|
||
@SpringBootApplication
|
||
public class KReplyingApplication {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(KReplyingApplication.class, args);
|
||
}
|
||
|
||
@KafkaListener(id="server", topics = "kRequests")
|
||
@SendTo // use default replyTo expression
|
||
public String listen(String in) {
|
||
System.out.println("Server received: " + in);
|
||
return in.toUpperCase();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic kRequests() {
|
||
return TopicBuilder.name("kRequests")
|
||
.partitions(10)
|
||
.replicas(2)
|
||
.build();
|
||
}
|
||
|
||
@Bean // not required if Jackson is on the classpath
|
||
public MessagingMessageConverter simpleMapperConverter() {
|
||
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
|
||
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
|
||
return messagingMessageConverter;
|
||
}
|
||
|
||
}
|
||
```
|
||
上述@KafkaListener结构会返回correlation id,并且决定返回消息发送的topic。
|
||
|
||
并且,ReplyingKafkaTemplate会使用header(KafkaHeaders.REPLY_TOPIC)来代表返回消息发送到的topic。
|
||
|
||
从版本2.2开始,该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset,该topic或topicpartitionoffset将会被用于设置reply header;但是如果reply container被配置监听多个topic或topicpartitionoffset,用户必须自己手动设置header。
|
||
|
||
为消息设置header的示例如下:
|
||
```java
|
||
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
|
||
```
|
||
若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时,可以多个template共享一个reply topic,只要每个template监听的分区不同。
|
||
|
||
若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时,那么每个template实例都必须拥有不同的group id,在这种情况下,所有的template实例都会接受到所有消息,但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便,但是所有的template都接收所有消息将会带来额外的网络通信开销,并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时,推荐将template中`sharedReplyTopic`属性设置为true,此时,将会把收到不想要消息的日志级别从ERROR降低为DEBUG。
|
||
|
||
如下是使用共享topic配置template的示例:
|
||
```java
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, String> replyContainer(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
|
||
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
|
||
Properties props = new Properties();
|
||
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
|
||
container.getContainerProperties().setKafkaConsumerProperties(props);
|
||
return container;
|
||
}
|
||
```
|
||
|
||
默认情况下,ReplyingKafkaTemplate将会使用如下三种header:
|
||
- KafkaHeaders.CORRELATION_ID:用于关联发送消息和回复消息的关联id
|
||
- KafkaHeaders.REPLY_TOPIC:用于告诉接收消息的server将消息发送到哪个topic
|
||
- KafkaHeaders.REPLY_PARTITION:该header是可选的,可以通过该header告知server将消息发送到指定的分区
|
||
|
||
上述header将会被@KafkaListener结构用作返回消息的路由。
|
||
|
||
#### 通过Message<?>来发送请求和返回请求
|
||
在2.7版本中,对ReplyingKafkaTemplate加入了如下接口来发送和接收`spring-messaging`中的`Message<?>`:
|
||
```java
|
||
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
|
||
|
||
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
|
||
ParameterizedTypeReference<P> returnType);
|
||
```
|
||
上述接口会使用template默认的replyTimeout,也存在接收timeout参数的重载版本。
|
||
|
||
如下示例展示了如何构建Message类型消息的发送和接收:
|
||
```java
|
||
// template 配置
|
||
@Bean
|
||
ReplyingKafkaTemplate<String, String, String> template(
|
||
ProducerFactory<String, String> pf,
|
||
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> replyContainer =
|
||
factory.createContainer("replies");
|
||
replyContainer.getContainerProperties().setGroupId("request.replies");
|
||
ReplyingKafkaTemplate<String, String, String> template =
|
||
new ReplyingKafkaTemplate<>(pf, replyContainer);
|
||
template.setMessageConverter(new ByteArrayJsonMessageConverter());
|
||
template.setDefaultTopic("requests");
|
||
return template;
|
||
}
|
||
|
||
// 通过template发送和接收消息
|
||
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
|
||
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
|
||
new ParameterizedTypeReference<Thing>() { });
|
||
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
|
||
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
|
||
log.info(thing.toString());
|
||
|
||
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
|
||
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
|
||
new ParameterizedTypeReference<List<Thing>>() { });
|
||
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
|
||
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
|
||
things.forEach(thing1 -> log.info(thing1.toString()));
|
||
|
||
// server端接收消息并且回复消息
|
||
@KafkaListener(id = "requestor", topics = "request")
|
||
@SendTo
|
||
public Message<?> messageReturn(String in) {
|
||
return MessageBuilder.withPayload(in.toUpperCase())
|
||
.setHeader(KafkaHeaders.TOPIC, replyTo)
|
||
.setHeader(KafkaHeaders.KEY, 42)
|
||
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
|
||
.build();
|
||
}
|
||
```
|
||
从2.5版本开始,若是返回的Message中没有指定header,framework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC(如果存在的话)。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS,也会将其填充到返回消息的header中。
|
||
|
||
> #### ErrorHandlingDeserializer
|
||
> 可以考虑在reply container中使用ErrorHandlingDeserializer,如果反序列化失败,RequestReplyFuture将会以异常状态完成,可以访问获取到的ExecutionException,其cause属性中包含DeserializationException。
|
||
|
||
### kafka poison pill & ErrorHandlingDeserializer
|
||
poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败,不管重试过多少次之后仍然无法成功被消费。
|
||
|
||
poison pill可能在如下场景下产生:
|
||
- 该记录被损坏
|
||
- 该记录发序列化失败
|
||
|
||
在生产场景中,consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容,将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。
|
||
|
||
在现实场景中,可能因为如下缘故而遭遇poison pill:
|
||
- 生产者改变了key或value的serializer并且持续向先前的topic中发送消息,这将会导致反序列化问题
|
||
- consumer的key或value deserializer配置错误
|
||
- 不同的生产者实例,使用不同的key或value serializer向topic中发送消息
|
||
|
||
在发生poison后,consumer在调用poll拉取数据时将无法反序列化record,调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理,针对该topic分区的消费会被阻塞(因为consumer offset一直无法向前移动)。并且,在consumer不停重试针对该消息的反序列化时,大量的反序列化失败日志将会被追加到日志文件中,磁盘占用量将会急剧增大。
|
||
|
||
#### ErrorHandlingDeserializer
|
||
为了解决poison pill问题,spring引入了ErrorHandlingDeserializer,该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败,那么ErrorHandlingDeserializer将会返回一个null,并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。
|
||
|
||
#### replyErrorChecker
|
||
从版本2.6.7开始,可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker,当提供了checker方法时,template会自动调用该方法,如果该方法抛出异常,那么该reply message对应的future也会以失败状态完成。
|
||
|
||
replychecker使用示例如下:
|
||
```java
|
||
template.setReplyErrorChecker(record -> {
|
||
Header error = record.headers().lastHeader("serverSentAnError");
|
||
if (error != null) {
|
||
return new MyException(new String(error.value()));
|
||
}
|
||
else {
|
||
return null;
|
||
}
|
||
});
|
||
|
||
...
|
||
|
||
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
|
||
try {
|
||
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
|
||
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
|
||
...
|
||
}
|
||
catch (InterruptedException e) {
|
||
...
|
||
}
|
||
catch (ExecutionException e) {
|
||
if (e.getCause instanceof MyException) {
|
||
...
|
||
}
|
||
}
|
||
catch (TimeoutException e) {
|
||
...
|
||
}
|
||
```
|
||
|
||
### AggregatingReplyingKafkaTemplate
|
||
ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景,但如果对于发送一条消息,存在多个接收方,会返回多条消息的场景,则需要使用AggregatingReplyingKafkaTemplate。
|
||
|
||
像ReplyingKafkaTemplate一样,AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener container,listener container用于接收返回的消息。除此之外,AggregatingReplyingKafkaTemplate还会接收第三个参数`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`,该断言每次在接收到新消息时都会被计算。如果断言返回true,该`AggregatingReplyingKafkaTemplate.sendAndReceive`方法返回的Future对象将会被完成,并且Future中的值为断言中ConsumerRecord的集合。
|
||
|
||
从版本2.3.5开始,第三个参数为`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`类型,其会在每次接收到消息或是超时(replyTimeout超时)的情况下被调用,第二个参数传入的boolean即是断言的这次调用是否因为超时。**该断言可以针对ConsumerRecord进行修改**。
|
||
|
||
> #### returnPartialOnTimeout
|
||
> AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout,该属性值默认为false,如果该值被设置为true,那么当请求发生超时时,会返回已经接收到的部分ConsumerRecord集合。
|
||
>
|
||
> BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息,要想成功在超时场景下返回接收到的部分消息,不仅需要returnPartialOnTimeout设置为true,还需要BiPredicate断言在发生timeout的情况下返回值为true。
|
||
|
||
AggregatingReplyingKafkaTemplate使用示例如下:
|
||
```java
|
||
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
|
||
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
|
||
(coll, timeout) -> timeout || coll.size() == releaseSize);
|
||
...
|
||
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
|
||
template.sendAndReceive(record);
|
||
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
|
||
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
|
||
future.get(30, TimeUnit.SECONDS);
|
||
```
|
||
|
||
注意sendAndReceive方法返回的future,其值类型为`ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`类型,外层的ConsumerRecord并不是真正的返回消息,而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的,外层ConsumerRecord用于存储实际接收到的消息集合。
|
||
|
||
> #### ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>
|
||
> `ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`作为聚合返回消息集合的伪ConsumerRecord,其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal release(releaseStrategy返回为true)时,伪ConsumerRecord的topic name为`aggregatedResults`.
|
||
>
|
||
> 当获取该伪ConsumerRecord的原因是timeout(returnPartialOnTimeout被设置为true,并且发生timeout,并且至少获取到一条返回消息),那么伪ConsumerRecord的topic name将会被设置为`partialResultsAfterTimeout`.
|
||
|
||
template为上述伪topic name提供了静态变量:
|
||
```java
|
||
/**
|
||
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
|
||
* results in its value after a normal release by the release strategy.
|
||
*/
|
||
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
|
||
|
||
/**
|
||
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
|
||
* results in its value after a timeout.
|
||
*/
|
||
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
|
||
```
|
||
|
||
伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。
|
||
|
||
#### AggregatingReplyingKafkaTemplate配置要求
|
||
listener container必须要配置为`AckMode.MANUAL`模式或`AckMode.MANUAL_IMMEDIATE`;consumer属性`enable.auto.commit`必须被设置伪false。为了避免任何丢失消息的可能,template只会在没有待处理请求的前提下提交offset,即最后一个未处理请求被releaseStrategy释放。
|
||
|
||
当consumer发生rebalance时,可能会造成返回消息被重复传递(由于template只会在没有待处理请求的情况下提交offset,如果在listener container接收到消息后,尚未提交offset,此时发生rebalance,那么未提交offset的消息将会被重复接收)。对于在途的请求,消息的重复传递将会被忽略(在途的消息还会存在多条消息聚合的过程);针对已经被releaseStrategy释放的返回消息,如果接收到多条重复的返回消息,那么会在log中看到error日志。
|
||
|
||
另外,如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`,那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null,并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。
|
||
|
||
对于AggregatingReplyingKafkaTemplate,`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。
|
||
|
||
## 接收消息
|
||
在使用spring kafka时,可以通过为MessageListenerContainer提供listener或是通过@KafkaListener注解来从kafka broker接收消息。
|
||
|
||
### Message Listener
|
||
当使用Message Listener container时,必须为其提供一个listener用于接收消息。如下是8个用于接收消息的接口:
|
||
```java
|
||
// interface-1
|
||
public interface MessageListener<K, V> {
|
||
|
||
void onMessage(ConsumerRecord<K, V> data);
|
||
|
||
}
|
||
|
||
// interface-2
|
||
public interface AcknowledgingMessageListener<K, V> {
|
||
|
||
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
|
||
|
||
}
|
||
|
||
// interface-3
|
||
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
|
||
|
||
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
|
||
|
||
}
|
||
|
||
// interface-4
|
||
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
|
||
|
||
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
|
||
|
||
}
|
||
|
||
// interface-5
|
||
public interface BatchMessageListener<K, V> {
|
||
|
||
void onMessage(List<ConsumerRecord<K, V>> data);
|
||
|
||
}
|
||
|
||
// interface-6
|
||
public interface BatchAcknowledgingMessageListener<K, V> {
|
||
|
||
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
|
||
|
||
}
|
||
|
||
// interface-7
|
||
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
|
||
|
||
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
|
||
|
||
}
|
||
|
||
// interface-8
|
||
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
|
||
|
||
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
|
||
|
||
}
|
||
```
|
||
1. interface-1:当使用由容器管理的提交方法或自动提交时,可以使用该接口;该接口针对单条消息进行处理
|
||
2. interface-2:当使用由容器管理的提交方法时,可以使用该接口;该接口针对单条消息进行处理
|
||
3. interface-3:当使用由容器管理的提交方法或自动提交时,可以使用该接口;该接口针对单条消息进行处理;该接口提供对consumer实例的访问
|
||
4. interface-4:当使用由容器管理的提交方法时,可以使用该接口;该接口针对单挑消息进行处理;该接口提供对consumer实例的访问
|
||
5. interface-5:当使用自动提交或由容器管理的提交方法时,可以使用该接口;该接口针对`consumer.poll`方法返回的所有消息进行处理;当使用该接口时,不支持`AckMode.RECORD`模式,因为所有的poll方法接收到的message batch都给了listener
|
||
6. interface-6:该当使用容器管理的提交方法时,可以使用该接口;该接口会处理由poll方法接收到的所有消息
|
||
7. interface-7:当使用自动提交或由容器管理的提交方法时,可以使用该接口;该接口针对poll方法返回的所有消息进行处理;使用该接口时不支持`AckMode.RECORD`;使用该接口可以针对consumer实例进行处理
|
||
8. interface-8:当使用由容器管理的commit method时,可以使用该接口;该接口针对由poll方法返回的全部消息进行处理;该接口提供对consumer实例的访问
|
||
|
||
> listener接口提供的consumer对象并不是线程安全的,故而必须在调用该listener的线程内访问该consumer中的方法
|
||
|
||
> 在对consumer中的方法进行访问时,不应该在listener中执行任何会修改consumer position或commit offset的方法;position或offset信息由container来进行管理。
|
||
|
||
### MessageListenerContainer
|
||
spring kafka提供了两种MessageListenerContainer的实现:
|
||
- KafkaMessageListenerContainer
|
||
- ConcurrentMessageListenerContainer
|
||
|
||
上述两种实现的区别如下:
|
||
- KafkaMessageListenerContainer在单个线程中对来源所有topic所有分区的消息进行接收
|
||
- ConcurrentMessageListenerContainer则是通过将接收消息的任务委托给多个KafkaMessageListenerContainer实例来实现多线程消费。
|
||
|
||
#### Interceptors
|
||
从2.2.7版本开始,可以为listener container添加`RecordInterceptor`;拦截器会在container调用listener之前被调用,可以通过拦截器来查看或是修改消息内容。如果interceptor在接收到消息或返回为空,那么后续listener将不会被调用。
|
||
|
||
从2.7版本开始,RecordInterceptor还增加了方法`afterRecord`,该方法在listener退出之后调用(正常退出或是抛异常退出)。并且从2.7版本开始,还新增了`BatchInterceptor`,为Batch Listener提供了类似的拦截器功能;ConsumerAwareRecordInterceptor也在提供拦截器功能的基础上提供了对consumer实例的访问。可以通过kafka consumer来访问consumer metrics。
|
||
|
||
> 如果拦截器对record进行了修改(同构创建一个新的record),那么topic、partition、offset必须都保持不变,从而避免类似消息丢失的副作用
|
||
|
||
为了调用多个interceptors,可以使用`CompositeRecordInterceptor`和`CompositeBatchInterceptor`。
|
||
|
||
默认情况下,从2.8版本开始,在使用事务时,拦截器会在事务开启之前被调用。可以将listener container的`interceptBeforeTx`属性修改为`false`,从而在事务开启之后调用拦截器方法。从2.9版本开始,上述会被应用于任何transaction manager,而不单单只用于`KafkaAwareTransactionManager`,这允许拦截器加入到由container开启的jdbc事务。
|
||
|
||
从2.3.8、2.4.6版本开始,`ConcurrentMessageListenerContainer`支持Static Membership当concurrency大于1时。而`group.instance.id`的后缀则是`-n`,`n`从1开始。与此同时,增加`session.timeout.ms`值,可以减少rebalance event的数量,例如,当应用实例重启时。
|
||
|
||
#### 使用KafkaMessageListenerContainer
|
||
在创建KafkaMessageListenerContainer时,可以使用如下构造方法:
|
||
```java
|
||
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
|
||
ContainerProperties containerProperties)
|
||
```
|
||
上述构造函数接收一个consumerFactory,并且通过containerProperties接收topic和partition的信息,此外containerProperties中还包含其他配置信息。
|
||
|
||
containerProperties拥有如下构造方法:
|
||
```java
|
||
public ContainerProperties(TopicPartitionOffset... topicPartitions)
|
||
|
||
public ContainerProperties(String... topics)
|
||
|
||
public ContainerProperties(Pattern topicPattern)
|
||
```
|
||
第一个构造方法中接收一个`TopicPartitionOffset`数组,通过该数组可以显式的告知container应该使用哪些分区(通过调用consumer的assign方法)。并且,可以选择为TopicPartitionOffset指定初始的offset。当`relativeToCurrent`参数为false时(默认为false),如果指定offset为正数,默认代表绝对offset;如果offset为负数,默认代表相对于该分区最后位置的偏移量。如果`relativeToCurrent`为true,初始offset将会被设置为相对当前的consumer position。当容器启动时,设置的offset将会被应用。
|
||
|
||
第二个构造方法接收了一个topic数组,kafka则会根据`group.id`属性来分配分区,将分区在组内的订阅了该topic的消费者实例之间进行分配。
|
||
|
||
第三个构造方法则是接收一个topicPattern,通过该pattern来匹配分区。
|
||
|
||
在创建container时,为了将MessageListener分配给container,可以使用`ContainerProperties.setMessageListener`。如下展示了添加listener的示例:
|
||
```java
|
||
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
|
||
containerProps.setMessageListener(new MessageListener<Integer, String>() {
|
||
...
|
||
});
|
||
DefaultKafkaConsumerFactory<Integer, String> cf =
|
||
new DefaultKafkaConsumerFactory<>(consumerProps());
|
||
KafkaMessageListenerContainer<Integer, String> container =
|
||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||
return container;
|
||
```
|
||
|
||
在上述示例中,创建DefaultKafkaConsumerFactory实例时,其构造方法只接受了一个ConsumerProperties属性,代表其会在属性中获取key和value的`Deserializer.class`并创建实例。同时,也可以通过调用DefaultKafkaConsumerFactory的其他构造方法,接收key和value的Deserializer实例,在这种情况下,所有consumer都会公用key和value的反序列化实例。另一种选择时提供`Supplier<Deserializer>`,这种情况下,每个consumer都会使用不同的反序列化器实例。
|
||
|
||
```java
|
||
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
|
||
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
|
||
KafkaMessageListenerContainer<Integer, String> container =
|
||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||
return container;
|
||
```
|
||
|
||
从2.2.1版本开始,提供了一个叫`logContainerConfig`的配置属性,当该属性设置为true并且info日志级别开启时,每个listener container都会写日志记录其配置。
|
||
|
||
默认情况下,会在debug的日志级别记录topic offset commit。从2.1.2开始,通过`ContainerProperties`中`commitLogLevel`的属性可以指定commit offset的日志级别。如果要将offset commit的日志级别改为info,可以调用`containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)`方法。
|
||
|
||
从2.2版本开始,添加了`missingTopicsFatal`属性,该属性默认值为false。如果在container启动时,其配置的任一topic在broker中并不存在,那么其将会阻止container启动。如果container被配置为监听pattern,那么missingTopicsFatal配置将不会被应用。在未指定missingTopicsFatal时,container线程将会循环调用`consumer.poll`方法等待topic出现,同时日志输出消息,除了日志之外,没有任何迹象显示出现了问题。
|
||
|
||
在2.8版本中,引入了`authExceptionRetryInterval`属性。该属性会导致container在kafkaConsumer得到`AuthenticationException`或`AuthorizationException`异常后重试fetch消息。该属性用于kafka consumer获取topic消息权限认证失败时,在等待`authExceptionRetryInterval`时间间隔后,可以重试获取消息,此时权限可能已经被授予。
|
||
> 在默认情况下,没有配置`authExceptionRetryInterval`,AuthenticationException和AuthorizationException将被认为是致命的,这将会导致messageListenerContainer停止。
|
||
|
||
#### 使用ConcurrentMessageListenerContainer
|
||
ConcurrentMessageListenerContainer的构造函数类似于KafkaMessageListenerContainer:
|
||
```java
|
||
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
|
||
ContainerProperties containerProperties)
|
||
```
|
||
除此之外,ConcurrentMessageListenerContainer还有一个concurrency属性,该属性代表并行度。如果调用`container.setConcurrency(3)`方法,其会创建3个`KafkaMessageListenerContainer`.
|
||
|
||
对于该构造方法,kafka使用组管理功能在消费者实例之间分配分区。
|
||
|
||
如果container通过TopicPartitonOffset配置时,ConcurrentMessageListenerContainer将TopicPartitionOffset实例分配到多个委托的KafkaMessageListenerContainer实例上,每个委托实例负责监听不同的TopicPartitionOffset。
|
||
|
||
如果,6个TopicPartitionOffset被提供,并且concurrency被设置为3,每个委托实例将会被分配2个TopicPartitionOffset。如果5个TopicPartitionOffset被提供,并且concurrency被设置为3,那么两个委托容器实例将会被分配到2个分区,第三个委托容器将被分配到1个分区。如果concurrency数大于TopicPartitionOffset数量,那么concurrency将会降低到和TopicPartitionOffset数量相同,每个委托容器实例被分配到一个分区。
|
||
|
||
从1.3版本开始,`MessageListenerContainer`提供了对底层`KafkaConsumer`的metrics信息访问。对于`ConcurrentMessageListenerContainer`,其metircs方法将会返回一个map,该map中记录了所有委托KafkaMessageListenerContainer实例的metics信息。Map类型为`Map<String, Map<MetricName, ? extends Metric>>`,其中key为底层KafkaConsumer的`client-id`.
|
||
|
||
从2.3版本开始,ContainerProperties提供了`idleBetweenPolls`属性,允许listener container在调用`consumer.poll()`方法循环拉取数据时睡眠一段时间。实际的睡眠时间将会取如下两个值中较小的值:`idleBetweenPolls`属性实际配置的值;`max.poll.interval.ms`值减去当前消息批处理时间的差值。
|
||
|
||
> `max.poll.interval.ms`
|
||
>
|
||
> 该值代表在使用消费者组管理功能时,两次`poll()`调用之间最长时间间隔。该值代表在调用完poll方法后,在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后,poll方法仍然未被调用,那么该消费者示例将会被认为失败,并且消费者组会出发rebalance操作,将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的`group.instance.id`不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过`session.timeout.ms`时间后出发rebalance。`group.instance.id`不为空时,该消费者实例将会被认为是该消费者组的静态成员。
|
||
>
|
||
> 默认情况下,`max.poll.interval.ms`的默认值为5min。
|
||
|
||
#### Committing Offsets
|
||
对于offset提交,spring kafka提供了多个选项。如果`enable.auto.commit`属性在consumer properties中被指定为true,那么kafka会根据其配置对消息进行自动提交。如果`enable.auto.commit`被指定为false,那么MessageListenerContainer支持几种不同的`AckMode`设置。默认的`AckMode`设置为`BATCH`。从2.3版本开始,spring framework会自动将`enable.auto.commit`设置为false,在2.3之前则是默认设置为true。
|
||
|
||
`consumer.poll`方法会返回一条或多条`ConsumerRecord`,而MessageListener则是会针对每条ConsumerRecord进行调用。如下列举了container针对每种AckMode会执行的行为:
|
||
- `RECORD`:在listener处理完该消息并返回之后,提交offset
|
||
- `BATCH`:当所有被poll方法调用返回的消息都处理完成之后,提交offset
|
||
- `TIME`:当所有被poll方法返回的消息均被处理完成,或是从上次commit时起已超过`ackTime`时间
|
||
- `COUNT`:当所有被poll方法返回的消息都被处理完成,或是从上次commit时起已收到超过`ackCount`条记录
|
||
- `COUNT_TIME`:当TIME或COUNT任一条满足时,提交offset
|
||
- `MANUAL`:messageListener负责对消息调用`Acknowledgment.acknowledge()`方法,调用之后语义和BATCH相同,会等待所有被poll方法调用返回的消息都处理完成,之后会批量提交offset
|
||
- `MANUAL_IMMEDIATE`:当方法`Acknowledgment.acknowledge()`被listener调用之后,立刻会提交offset
|
||
|
||
当使用事务时,offsets将会被发送给事务,并且语义和RECRD或BATCH相同,基于listener的类型决定语义(看listener是record类型还是batch类型)
|
||
|
||
> `MANUAL`或`MANUAL_IMMEDIATE`需要listener为`AcknowledgingMessageListener `或`BatchAcknowledgingMessageListener`
|
||
|
||
基于容器的`syncCommits`容器属性,提交offset时会使用`commitSync`或`commitAsync`方法。`syncCommits`属性默认为true,可以通过`setSyncCommitTimeout`设置commitSync时的超时时间。在syncCommits设置为false时,可以通过`setCommitCallback`来获取commitAsync的结果,默认情况下callback是`LoggingCommitCallback`,其会日志输出errors,并且以debug的级别打印成功日志。
|
||
|
||
由于listener container拥有自己的提交offset机制,故而更推荐将kafka consumr的`ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG`属性设置为false。2.3版本开始,会将自动提交属性设置为false,除非显式在consumer factory或container的consumer properties中将自动提交属性覆盖。
|
||
|
||
Acknowledgment接口具有如下方法:
|
||
```java
|
||
public interface Acknowledgment {
|
||
|
||
void acknowledge();
|
||
|
||
}
|
||
```
|
||
该方法能够让container控制何时提交offset。
|
||
|
||
从2.3版本开始,`Acknowledgment`接口提供了两个额外的方法`nack(long sleep)`和`nack(int index, long sleep)`。第一个方法和record类型的listener一起使用,第二个方法和batch类型的listener一起使用。如果调用类型错误,会抛出`IllegalStateException`异常。
|
||
|
||
- `nack(long sleep)`:对当前record执行nack操作,丢弃poll请求中的remaining records并针对所有分区进行re-seek操作,将position恢复。故而,在指定sleep时间范围后,record将会被重新传递,该操作会阻塞整个message listener的读操作,阻塞时间为sleep,且阻塞范围不限于单个分区。该操作必须在consumer thread中被调用。
|
||
|
||
- `nack(int index, long sleep)`:对于batch中index位置的record进行nack操作,该操作会提交位于index之前record的offset,并且会re-seek所有分区,故而位于index和index之后的消息都会在sleep时间后被重新传输。该操作必须在consumer thread中被调用。
|
||
|
||
当想要提交batch中部分消息时,请使用`nack(int index, long sleep)`方法。当是使用事务时,设置AckMode为MUANAL;调用nack方法会将已成被成功处理的records offsets发送给事务。
|
||
|
||
nack只能在调用listener的consumer线程中进行处理。
|
||
|
||
在使用nack方法时,任何在途的offset都会被提交,而从上次poll起的remaining records都会被丢弃。并且在所有分区都会执行seek操作,故而当前失败的那条record和所有未处理的record都会在下次poll时重新拉取。该操作和抛异常类似,当container中配置了一个DefaultErrorHandler时。
|
||
|
||
当使用batch listener时,若发生异常可以指定record在bathc中的index。调用nack时,位于index之前的record offset都会被提交,seek操作也会执行,在下次poll操作时也会丢失败和被丢弃的消息进行重新拉取。
|
||
|
||
从3.0.10版本开始,batch listener可以提交batch中的部分record offset,通过使用`acknowledge(index)`方法。当该方法被调用时,位于index的record和位于index之前的records都会被提交offset。在调用部分提交之后再调用`acknowledge()`方法,会针对剩余尚未被提交的records进行offset提交。在使用部分提交时,必须满足如下要求:
|
||
- AckMode.MANUAL_IMMEDIATE需要被开启
|
||
- 部分提交方法需要在listener thread中被调用
|
||
- listener必须批量消费recrods而不是消费单条record
|
||
- index必须要位于batch range中
|
||
- 重复调用部分提交接口时,后调用的index要比先调用的大
|
||
|
||
上述要求是强制的,在不满足时会抛出IllegalArgumentException和IllegalStateException异常。
|
||
|
||
> 上述调用nack后所说的阻塞均是指调用`KafkaConsumer.pause`方法后造成的结果。在pause方法被调用后,任何后续调用的poll方法都不会返回任何records,直到`resume`方法被调用。pause方法的调用并不会造成分区的rebalance操作。**调用pause方法并不会造成线程的阻塞,而是通过poll获取指定分区消息的阻塞。**
|
||
>
|
||
> 并且,rebalance操作后也不会保留pause/resume状态。
|
||
|
||
### 手动提交offset
|
||
通常,当使用`AckMode.MANUAL`或`AckMode.MANUAL_IMMEDIATE`时,ack必须按顺序确认,因为kafka并不是为每条record都维护提交状态,而是针对每个消费者组/分区只维护一个commit offset。
|
||
|
||
从2.8版本开始,可以设置container属性`asyncAcks`,该属性设置后,由poll返回的records可以被以任何顺序确认。设置属性后listener container会将无序(records可以被按任何顺序确认)的提交进行延迟,直到接收到缺失的ack。此时,消费者会处于paused状态,不会有新的records被传递,直到上次poll操作中所有的消息都被提交offset。
|
||
|
||
当启用`asyncAcks`特性后,可以异步的针对消息进行处理(`Acknowledgment.acknowledge()`方法并没有要求只能在listener线程中被调用)。但是,在对消息进行异步处理的时候,如果发生处理失败的情况,可能会增加record重复传输的可能性。(如果在开启asyncAcks时,如果后续消息已经提交ack,但是位于前面的消息处理发生异常,那么后续成功的record其offset将无法被提交,故而后续处理成功的消息将会被重新传递).
|
||
|
||
> 在处理消息时抛出异常,那么抛出异常的消息和抛出异常之后的消息都会被重复拉取。
|
||
|
||
并且,当`asyncAcks`启用时,无法使用`nack()`方法来进行nack操作。
|
||
|
||
### @KafkaListener
|
||
`@KafkaListener`会将一个bean方法标记为listener container的listener方法。该bean被封装在一个`MessagingMessageListenerAdapter`对象中,该Adapter对象的构造方法如下所示,封装了bean对象和bean对象方法:
|
||
```java
|
||
public MessagingMessageListenerAdapter(Object bean, Method method) {
|
||
this.bean = bean;
|
||
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
|
||
}
|
||
```
|
||
|
||
在配置@KafkaListener注解的绝大部分属性时,可以使用`#{}`形式的spel表达式或`${}`形式的属性占位符。
|
||
|
||
#### Record Listener
|
||
@KafkaListener注解为简单pojo的listenr提供了机制,如下展示了使用示例:
|
||
```java
|
||
public class Listener {
|
||
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
|
||
public void listen(String data) {
|
||
...
|
||
}
|
||
|
||
}
|
||
```
|
||
该机制需要在@Configuration类上标注`@EnableKafka`注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的`ConcurrentMessageListenerContainer`。默认情况下,需要一个bean name为`kafkaListenerContainerFactory`的bean对象。如下示例展示了如何使用`ConcurrentMessageListenerContainer`:
|
||
```java
|
||
@Configuration
|
||
@EnableKafka
|
||
public class KafkaConfig {
|
||
|
||
@Bean
|
||
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
|
||
kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(consumerFactory());
|
||
factory.setConcurrency(3);
|
||
factory.getContainerProperties().setPollTimeout(3000);
|
||
return factory;
|
||
}
|
||
|
||
@Bean
|
||
public ConsumerFactory<Integer, String> consumerFactory() {
|
||
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
|
||
}
|
||
|
||
@Bean
|
||
public Map<String, Object> consumerConfigs() {
|
||
Map<String, Object> props = new HashMap<>();
|
||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
|
||
...
|
||
return props;
|
||
}
|
||
}
|
||
```
|
||
在上述示例中,如果要设置container properties,必须要在container factory对象上调用getContainerProperties(),然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。
|
||
|
||
从版本2.1.1开始,对由该注解创建的消费者实例,可以设置`client.id`属性。`client.id`为`clientIdPrefix-n`格式,其中n为整数,代表在使用concurrency时的container number。
|
||
|
||
从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的`concurrency`和`autoStartup`属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示:
|
||
```java
|
||
@KafkaListener(id = "myListener", topics = "myTopic",
|
||
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
|
||
public void listen(String data) {
|
||
...
|
||
}
|
||
```
|
||
|
||
#### 显式分区分配
|
||
可以通过@KafkaListener注解为pojo listener配置显式的topic和分区(也可以为其指定一个初始offset)。如下示例展示了如何为@KafkaListener指定topic和offset:
|
||
```java
|
||
@KafkaListener(id = "thing2", topicPartitions =
|
||
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
|
||
@TopicPartition(topic = "topic2", partitions = "0",
|
||
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
|
||
})
|
||
public void listen(ConsumerRecord<?, ?> record) {
|
||
...
|
||
}
|
||
```
|
||
如上图所示,可以在topicPartitions属性的`@TopicPartition的partitions属性`中或是`@TopicPartitionpation的partitionOffsets属性`中指定分区,但是无法同时在上述两处都指定同一个分区。
|
||
|
||
从2.5.5版本开始,可以对所有手动分配的分区都指定一个初始offset:
|
||
```java
|
||
@KafkaListener(id = "thing3", topicPartitions =
|
||
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
|
||
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
|
||
})
|
||
public void listen(ConsumerRecord<?, ?> record) {
|
||
...
|
||
}
|
||
```
|
||
再上述示例中,`*`通配符代表`partitions`属性锁指定的所有分区。在每个`@TopicPartition`注解中,只能有一个`@PartitionOffset`注解中带有该通配符。
|
||
|
||
当listener实现了`ConsumerSeekAware`时,`onPartitionAssigned`方法会被调用,即使使用手动分配(assign)。`onPartitionAssigned`允许在分区被分配时执行任意的seek操作。
|
||
|
||
从2.6.4版本开始,可以指定由`,`分隔的分区列表或分区区间,示例如下:
|
||
```java
|
||
@KafkaListener(id = "pp", autoStartup = "false",
|
||
topicPartitions = @TopicPartition(topic = "topic1",
|
||
partitions = "0-5, 7, 10-15"))
|
||
public void process(String in) {
|
||
...
|
||
}
|
||
```
|
||
该分区范围是闭区间,上述代表的区间是`0,1,2,3,4,5,7,10,11,12,13,14,15`.
|
||
|
||
上述方式也可以用于指定分区初始的offset:
|
||
```java
|
||
@KafkaListener(id = "thing3", topicPartitions =
|
||
{ @TopicPartition(topic = "topic1",
|
||
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
|
||
})
|
||
public void listen(ConsumerRecord<?, ?> record) {
|
||
...
|
||
}
|
||
```
|
||
|
||
#### 手动ack
|
||
当使用手动ack时,可以为listener method提供`Acknowledgment`。如下示例展示了如何使用一个不同的container factory:
|
||
```java
|
||
@KafkaListener(id = "cat", topics = "myTopic",
|
||
containerFactory = "kafkaManualAckListenerContainerFactory")
|
||
public void listen(String data, Acknowledgment ack) {
|
||
...
|
||
ack.acknowledge();
|
||
}
|
||
```
|
||
|
||
#### Consumer Record Metadata
|
||
record的元数据可以从message header中进行获取。可以使用如下header名称来获取消息header中的内容:
|
||
- KafkaHeaders.OFFSET
|
||
- KafkaHeaders.RECEIVED_KEY
|
||
- KafkaHeaders.RECEIVED_TOPIC
|
||
- KafkaHeaders.RECEIVED_PARTITION
|
||
- KafkaHeaders.RECEIVED_TIMESTAMP
|
||
- KafkaHeaders.TIMESTAMP_TYPE
|
||
|
||
从2.5版本开始,如果接收到的消息key为null,那么`RECEIVED_KEY`在header中并不会出现;而在2.5版本之前,`RECEIVED_KEY`在header中存在并且值为null。
|
||
|
||
如下示例展示了如何使用header:
|
||
```java
|
||
@KafkaListener(id = "qux", topicPattern = "myTopic1")
|
||
public void listen(@Payload String foo,
|
||
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
|
||
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
|
||
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
|
||
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
|
||
) {
|
||
...
|
||
}
|
||
```
|
||
|
||
> 参数注解(`@Payload`、`@Header`)必须在listener method方法的具体实现上被指定,如果参数注解只指定在接口上,而没有在具体实现方法上指定,那么指定的注解将不会起作用。
|
||
|
||
从2.5版本开始,相比于上述示例中针对单条header内容进行接收,可以通过`ConsumerRecordMetadata`类型的参数来接收所有的header数据:
|
||
```java
|
||
@KafkaListener(...)
|
||
public void listen(String str, ConsumerRecordMetadata meta) {
|
||
...
|
||
}
|
||
```
|
||
上述`ConsumerRecordMetadata`类型中包含`ConsumerRecord`中所有的数据,除了record中的key和value。
|
||
|
||
#### Batch Listener
|
||
从版本1.1开始,可以通过@KafkaListener方法来接收record batch。
|
||
|
||
为了配置listener container factory支持batch listener,需要设置container factory的`batchListener`属性。如下展示了配置示例:
|
||
```java
|
||
@Bean
|
||
public KafkaListenerContainerFactory<?> batchFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(consumerFactory());
|
||
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
|
||
return factory;
|
||
}
|
||
```
|
||
|
||
> 从2.8版本开始,可以通过@KafkaListener属性来覆盖container factory的`batchListener`属性,只需要为@KafkaListener注解指定`batch`属性。
|
||
|
||
> 从2.9.6版本开始,container factory对于`recordMessageConverter`和`batchMessageConverter`有不同的setter。在2.9.6之前的版本中,只有`messageConverter`的setter,该converter被同时用于批量和单条场景。
|
||
|
||
如下示例展示了如何接收list类型的payload:
|
||
```java
|
||
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen(List<String> list,
|
||
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
|
||
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
|
||
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
|
||
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
|
||
...
|
||
}
|
||
```
|
||
record batch的topic、分区、offset等信息也可以批量获取,如下展示了如何批量获取header:
|
||
```java
|
||
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen(List<String> list,
|
||
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
|
||
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
|
||
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
|
||
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
|
||
...
|
||
}
|
||
```
|
||
除了上述方法外,还可以通过`List of Message<?>`来批量接收消息,但是,除了`Acknowledgment`或`Consumer<?, ?>`外,`List of Message<?>`应该是方法中唯一的参数。如下示例展示了如何使用:
|
||
```java
|
||
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen1(List<Message<?>> list) {
|
||
...
|
||
}
|
||
|
||
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen2(List<Message<?>> list, Acknowledgment ack) {
|
||
...
|
||
}
|
||
|
||
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
|
||
...
|
||
}
|
||
```
|
||
在上述示例中,并不会对payload执行任何转换。
|
||
|
||
如果当前存在`BatchMessagingMessageConverter`(`BatchMessagingMessageConverter`通过`RecordMessageConverter`配置),则可以为`Message<?>`添加一个泛型类型,payload将会被转化为该类型。
|
||
|
||
除了上述两种方式外,还可以接收`List<ConsumerRecord<?, ?>>`类型的参数,但是其必须为listener method的唯一参数(`Acknowledgment`类型和`Consumer<?, ?>`类型参数除外)。如下实例展示了如何使用:
|
||
```java
|
||
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen(List<ConsumerRecord<Integer, String>> list) {
|
||
...
|
||
}
|
||
|
||
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
|
||
...
|
||
}
|
||
```
|
||
从2.2版本开始,listener可以接收由poll方法返回的完整的`ConsumerRecords<?, ?>`对象(该record中封装了一个record的list)。从而允许listener访问额外的方法,例如`partitions()`方法,返回TopicPartition对象的集合。方法的使用示例如下:
|
||
```java
|
||
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
|
||
public void pollResults(ConsumerRecords<?, ?> records) {
|
||
...
|
||
}
|
||
```
|
||
|
||
#### 注解属性
|
||
从2.0版本开始,`id`属性被用作kafka consumer group的`group.id`属性,其会覆盖consumer factory中配置的属性。如果不想使用该行为,可以显式的另外设置`groupId`属性;或是将`idIsGroup`属性设置为false,此时`group.id`仍然会使用consumer factory中配置的属性。
|
||
|
||
对于@KafkaListener注解,可以在绝大多数属性中使用spel表达式和placeholder:
|
||
```java
|
||
@KafkaListener(topics = "${some.property}")
|
||
|
||
@KafkaListener(topics = "#{someBean.someProperty}",
|
||
groupId = "#{someBean.someProperty}.group")
|
||
```
|
||
|
||
从2.1.2版本开始,spel表达式支持`__listener`。该伪bean name代表当前该注解所位于的bean。
|
||
|
||
例如如下示例:
|
||
```java
|
||
@Bean
|
||
public Listener listener1() {
|
||
return new Listener("topic1");
|
||
}
|
||
|
||
@Bean
|
||
public Listener listener2() {
|
||
return new Listener("topic2");
|
||
}
|
||
|
||
public class Listener {
|
||
|
||
private final String topic;
|
||
|
||
public Listener(String topic) {
|
||
this.topic = topic;
|
||
}
|
||
|
||
@KafkaListener(topics = "#{__listener.topic}",
|
||
groupId = "#{__listener.topic}.group")
|
||
public void listen(...) {
|
||
...
|
||
}
|
||
|
||
public String getTopic() {
|
||
return this.topic;
|
||
}
|
||
|
||
}
|
||
```
|
||
如果在项目中,已经存在了名为`__listener`的bean实例,那么可以通过`beanRef`属性来对属性表达式进行修改。如下示例展示了如何使用`beanRef`:
|
||
```java
|
||
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
|
||
```
|
||
从版本2.2.4开始,可以直接通过@KafkaListener注解指定consumer属性,通过注解指定的consumer属性会覆盖在consumer factory中指定的属性。
|
||
|
||
> 但是,无法通过`@KafkaListener`注解的上述方法来指定`group.id`和`client.id`属性,通过上述方法指定这两个属性时,指定会被忽略;需要使用`groupId`和`clientIdPrefix`注解属性来指定consumer属性中的`group.id`和`client.id`属性
|
||
|
||
在通过@KafkaListener来指定consumer属性时,通过独立的字符串按照`foo:bar`、`foo bar`、`foo=bar`的格式进行指定,示例如下所示:
|
||
```java
|
||
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
|
||
"max.poll.interval.ms:60000",
|
||
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
|
||
})
|
||
|
||
@KafkaListener(id = "one", topics = "one")
|
||
public void listen1(String in) {
|
||
System.out.println("1: " + in);
|
||
}
|
||
|
||
@KafkaListener(id = "two", topics = "two",
|
||
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
|
||
public void listen2(byte[] in) {
|
||
System.out.println("2: " + new String(in));
|
||
}
|
||
```
|
||
### 获取consumer的`group.id`
|
||
在不同container中运行相同listener代码时,需要识别该消息来源于哪个container(通过`group.id`标识)。
|
||
|
||
在需要获取container的groupId时,可以在listener线程中调用`KafkaUtils.getConsumerGroupId()`方法;也可以在方法参数中访问`group.id`属性:
|
||
|
||
```java
|
||
@KafkaListener(id = "id", topicPattern = "someTopic")
|
||
public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) String groupId) {
|
||
...
|
||
}
|
||
```
|
||
### container线程命名
|
||
框架通过一个TaskExecutor来调用consumer和listener。可以在container properties中通过设置`consumerExecutor`属性来设置一个自定义的executor。当使用pooled executor时,需要确保线程池中含有足够的线程来处理来自所有container的并发。当使用`ConcurrentMessageListenerContainer`时,executor中的线程会被所有consumer(concurrency)使用。
|
||
|
||
如果没有为container提供consumer executor,那么会为每个container提供`SimpleAsyncTaskExecutor`。
|
||
|
||
> SimpleAsyncTaskExecutor会为所有的task都开启一个新线程,故而该类型的executor并不会对线程进行重用。
|
||
>
|
||
> SimpleAsyncTaskExecutor在创建线程时,executor创建线程的名称按照`<beanname>-C-<n>`的格式。对于`ConcurrentMessageListenerContainer`,`<beanname>`部分将会被换成`<beanname>-m`,m代表消费者实例。当每次container启动时,`<n>`都会增加。
|
||
>
|
||
> 故而,对于name为`container`的bean对象,其executor中的线程名称在container第一次启动后为`container-0-C-1`、`container-1-C-1`(n均为1,m为0和1).当containerr再次启动时,线程名称则是会变为`container-0-C-2`、`container-1-C-2 etc`。
|
||
|
||
从3.0.1版本开始,可以修改线程的名称。如果将`AbstractMessageListenerContainer.changeConsumerThreadName`属性设置为true,将会调用`AbstractMessageListenerContainer.threadNameSupplier`来获取线程名称。
|
||
|
||
### @KafkaListener用作元注解
|
||
从2.2版本开始,可以将@KafkaListener用作元注解,如下展示了使用示例:
|
||
```java
|
||
@Target(ElementType.METHOD)
|
||
@Retention(RetentionPolicy.RUNTIME)
|
||
@KafkaListener
|
||
public @interface MyThreeConsumersListener {
|
||
|
||
@AliasFor(annotation = KafkaListener.class, attribute = "id")
|
||
String id();
|
||
|
||
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
|
||
String[] topics();
|
||
|
||
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
|
||
String concurrency() default "3";
|
||
|
||
}
|
||
```
|
||
|
||
在将@KafkaListener用作元注解时,必须在自定义注解中针对`topics`、`topicPattern`、`topicPartitions`中的至少一个进行alias操作。
|
||
|
||
如下展示了如何使用自定义注解:
|
||
```java
|
||
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
|
||
public void listen1(String in) {
|
||
...
|
||
}
|
||
```
|
||
|
||
### 在类级别使用@KafkaListener
|
||
当在类级别使用@KafkaListener时,必须要在方法级别指定@KafkaHandler。当消息被传递到时,消息payload转化为的类型将会用于决定调用哪个handler方法。示例如下所示:
|
||
```java
|
||
@KafkaListener(id = "multi", topics = "myTopic")
|
||
static class MultiListenerBean {
|
||
|
||
@KafkaHandler
|
||
public void listen(String foo) {
|
||
...
|
||
}
|
||
|
||
@KafkaHandler
|
||
public void listen(Integer bar) {
|
||
...
|
||
}
|
||
|
||
@KafkaHandler(isDefault = true)
|
||
public void listenDefault(Object object) {
|
||
...
|
||
}
|
||
|
||
}
|
||
```
|
||
从2.1.3版本开始,可以指定一个@KafkaHandler方法作为默认方法,当类型匹配不到其他handler时,默认handler方法将会被调用。并且,最多只有有一个方法被指定为默认handler方法。
|
||
|
||
当使用@KafkaHandler时,必须要求payload已经被转化为目标对象类型,如此匹配才能进行。
|
||
|
||
基于spring解析方法参数的限制,default kafkahandler无法接收header中的单条内容,其只能通过`ConsumerRecordMetadata`类型来获取header中的内容。
|
||
|
||
例如,如下方式在Object类型为String时不起作用:
|
||
```java
|
||
@KafkaHandler(isDefault = true)
|
||
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
|
||
...
|
||
}
|
||
```
|
||
如果object类型为String,那么topic也会指向object,而不会获取到header中的topic内容。
|
||
|
||
如果要在default handler method中访问元数据,需要按如下方式:
|
||
```java
|
||
@KafkaHandler(isDefault = true)
|
||
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
|
||
String topic = meta.topic();
|
||
...
|
||
}
|
||
```
|
||
|
||
### @KafkaListener属性修改
|
||
从2.7.2版本开始,可以在container创建之前,通过编码的方式来修改注解属性。为了实现改功能,可以在spring context中添加一个或者多个`KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer`。`AnnotationEnhancer`是个`BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>`类型的二元函数,返回一个Map<String,Object>类型的值。属性值可能会包含spel表达式或者properties placeholder,而enhancer则是在spel表达式和属性占位符被解析之前调用。如果当前spring容器存在多个enhancer,且enhancer实现了Ordered接口,那么enhancer将会按照顺序被调用:
|
||
```java
|
||
@Bean
|
||
public static AnnotationEnhancer groupIdEnhancer() {
|
||
return (attrs, element) -> {
|
||
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
|
||
? ((Class<?>) element).getSimpleName()
|
||
: ((Method) element).getDeclaringClass().getSimpleName()
|
||
+ "." + ((Method) element).getName()));
|
||
return attrs;
|
||
};
|
||
}
|
||
```
|
||
|
||
`AnnotationEnhancer`bean定义必须要被声明为static,该bean在spring context生命周期中非常早期的时间点被需要。
|
||
|
||
### @KafkaListener生命周期管理
|
||
为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为`KafkaListenerEndpointRegistry`类型的bean对象中,该bean对象会自动被spring framework创建,并且管理listener container的生命周期。kafkaListenerEndpointRegistry对象会启动所有`autoStartup`属性设置为true的container。所有container factory都在同一阶段创建所有container。通过registry,可以通过编程的方式来管理container的生命周期。对registry执行start或stop操作将会对registry中所有的container都执行start或stop操作。同时,也可以通过container的id属性来获取单独的container对象。
|
||
|
||
可以通过@KafkaListener注解来设置container的autoStartup属性,通过注解指定的autoStartup值将会覆盖在container factory中指定的autoStartup属性。
|
||
|
||
如果要通过registry对象来管理注册的container,可以通过bean注入的方式来获取registry:
|
||
|
||
```java
|
||
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
|
||
public void listen(...) { ... }
|
||
|
||
/**
|
||
* 通过bean注入来获取registry
|
||
*
|
||
**/
|
||
@Autowired
|
||
private KafkaListenerEndpointRegistry registry;
|
||
|
||
...
|
||
|
||
this.registry.getListenerContainer("myContainer").start();
|
||
|
||
...
|
||
```
|
||
regisry只维护其管理container的生命周期;如果以bean形式声明的container,其并不由registry进行管理,而是可以从spring容器中获取。可以调用registry对象的`getListenerContainers`方法来获取其管理的container集合。
|
||
|
||
从2.2.5版本开始,registry新增了一个新的方法`getAllListenerContainers()`,通过该方法可以获取所有的container集合,集合中包括由registry管理的container和以bean对象形式生命的container。该返回集合中将会包含任何prototype的已初始化bean对象,但是集合中不会对懒加载的bean对象进行加载操作。
|
||
|
||
endpoint将会在spring容器被refreshed之后被注册到registry中,并且,endpoint将会立马被启动,不论其autoStartup属性值是什么。
|
||
|
||
### @KafkaListener @Payload校验
|
||
从2.2版本开始,可以更加方便的为@KafkaListener @Payload参数添加validator。在之前的版本中必须要自定义配置一个`DefaultMessageHandlerMethodFactory`并且将其添加到registrar中。现在,可以为registrar中添加validator。如下代码展示了使用示例:
|
||
```java
|
||
@Configuration
|
||
@EnableKafka
|
||
public class Config implements KafkaListenerConfigurer {
|
||
|
||
...
|
||
|
||
@Override
|
||
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
|
||
registrar.setValidator(new MyValidator());
|
||
}
|
||
|
||
}
|
||
```
|
||
|
||
当使用spring boot并且存在有validation的starter时,`LocalValidatorFactoryBean`将会被自动装配,装配逻辑和如下代码类似:
|
||
```java
|
||
@Configuration
|
||
@EnableKafka
|
||
public class Config implements KafkaListenerConfigurer {
|
||
|
||
@Autowired
|
||
private LocalValidatorFactoryBean validator;
|
||
...
|
||
|
||
@Override
|
||
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
|
||
registrar.setValidator(this.validator);
|
||
}
|
||
}
|
||
```
|
||
如下代码示例展示了如果通过validator校验payload:
|
||
```java
|
||
public static class ValidatedClass {
|
||
|
||
@Max(10)
|
||
private int bar;
|
||
|
||
public int getBar() {
|
||
return this.bar;
|
||
}
|
||
|
||
public void setBar(int bar) {
|
||
this.bar = bar;
|
||
}
|
||
|
||
}
|
||
```
|
||
|
||
```java
|
||
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
|
||
containerFactory = "kafkaJsonListenerContainerFactory")
|
||
public void validatedListener(@Payload @Valid ValidatedClass val) {
|
||
...
|
||
}
|
||
|
||
@Bean
|
||
public KafkaListenerErrorHandler validationErrorHandler() {
|
||
return (m, e) -> {
|
||
...
|
||
};
|
||
}
|
||
```
|
||
|
||
从3.1版本开始,可以在`ErrorHandlingDeserializer`上执行validation操作。
|
||
|
||
### Rebalancing Listener
|
||
`ContainerProperties`中存在一个`consumerRebalanceListener`属性,该属性会接收一个`ConsumerRebalanceListener`接口的实现。如果该属性没有被提供,那么container将会自动装配一个logging listener,该listener将会将rebalance event打印到日志中,日志级别为`info`。spring kafka框架还提供了了一个`ConsumerAwareRebalanceListener`接口,如下展示了接口的定义:
|
||
```java
|
||
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
|
||
|
||
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
|
||
|
||
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
|
||
|
||
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
|
||
|
||
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
|
||
|
||
}
|
||
```
|
||
与`ConsumerRebalanceListener`接口不同的是,`ConsumerAwareRebalanceListener`针对revoke存在两个回调方法,`beforeCommit`和`afterCommit`。第一次将会马上被调用,第二次则是会在所有阻塞的offset都提交后再调用。如果想要再外部系统中维护offset,这将非常有用:
|
||
```java
|
||
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||
|
||
@Override
|
||
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
||
// acknowledge any pending Acknowledgments (if using manual acks)
|
||
}
|
||
|
||
@Override
|
||
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
||
// ...
|
||
store(consumer.position(partition));
|
||
// ...
|
||
}
|
||
|
||
@Override
|
||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||
// ...
|
||
consumer.seek(partition, offsetTracker.getOffset() + 1);
|
||
// ...
|
||
}
|
||
});
|
||
```
|
||
### 强制触发consumer rebalance
|
||
kafka client目前支持触发enforced rebalance。从3.1.2版本开始,spring kafka支持通过message listener container来调用kafka consumer的enforce rebalance api。当调用该api时,其会提醒kafka consumer触发一个enforced rebalance,实际rebalance将会作为下一次poll操作的一部分。当正在发生rebalance时,调用该api将不会触发任何操作。调用方必须等待当前rebalance操作完成之后,再调用触发另一个rebalance。
|
||
|
||
如下示例展示了如何触发一个enforced rebalance:
|
||
```java
|
||
@KafkaListener(id = "my.id", topics = "my-topic")
|
||
void listen(ConsumerRecord<String, String> in) {
|
||
System.out.println("From KafkaListener: " + in);
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
|
||
return args -> {
|
||
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
|
||
System.out.println("Enforcing a rebalance");
|
||
Thread.sleep(5_000);
|
||
listenerContainer.enforceRebalance();
|
||
Thread.sleep(5_000);
|
||
};
|
||
}
|
||
```
|
||
再上述代码中,应用通过使用`KafkaListenerEndpointRegistry`来访问message listener container,并调用MessageListenerContainer的enforceRebalance方法。当调用container的enforceRebalance方法时,其会委托调用底层consumer的enforceRebalance方法。而consumer则是会在下次poll操作时触发rebalance。
|
||
|
||
### 通过@SendTo注解发送listener结果
|
||
从2.0版本开始,如果将@SendTo和@KafkaListener一起使用,并且被注解的方法存在返回值,那么方法的返回值将会被发送给@SendTo注解中指定的topic。
|
||
|
||
`@SendTo`值可以存在如下几种格式:
|
||
- `@SendTo("someTopic")`:返回值将会被发送给`someTopic`主题
|
||
- `@SendTo("#{someExpression}")`:返回值将会被发送到表达式计算出的topic,`表达式将会在应用上下文初始化时被计算`
|
||
- `@SendTo("!{someExpression}")`:返回值将会被发送到表达式计算出的topic,`表达式将会在运行时被计算,并且表达式的`#root`对象有3个属性:
|
||
- `request`:该方法接收到的ConsumerRecord(在batch listener时request代表ConsumerRecords)
|
||
- `source`:request转化为的`org.springframework.messaging.Message<?>`
|
||
- `result`:该方法的返回值
|
||
- `@SendTo`:当没有为@SendTo注解指定value时,value默认会被当做`!{source.headers['kafka_replyTopic']}`
|
||
|
||
从2.1.11和2.2.1开始,属性占位符(property placeholder)也会被@SendTo解析。
|
||
|
||
为@SendTo注解指定的表达式,其返回值必须为一个String,该String代表topic name。如下展示了使用@SendTo的不同方式:
|
||
```java
|
||
@KafkaListener(topics = "annotated21")
|
||
@SendTo("!{request.value()}") // runtime SpEL
|
||
public String replyingListener(String in) {
|
||
...
|
||
}
|
||
|
||
@KafkaListener(topics = "${some.property:annotated22}")
|
||
@SendTo("#{myBean.replyTopic}") // config time SpEL
|
||
public Collection<String> replyingBatchListener(List<String> in) {
|
||
...
|
||
}
|
||
|
||
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
|
||
@SendTo("annotated23reply") // static reply topic definition
|
||
public String replyingListenerWithErrorHandler(String in) {
|
||
...
|
||
}
|
||
...
|
||
@KafkaListener(topics = "annotated25")
|
||
@SendTo("annotated25reply1")
|
||
public class MultiListenerSendTo {
|
||
|
||
@KafkaHandler
|
||
public String foo(String in) {
|
||
...
|
||
}
|
||
|
||
@KafkaHandler
|
||
@SendTo("!{'annotated25reply2'}")
|
||
public String bar(@Payload(required = false) KafkaNull nul,
|
||
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
|
||
...
|
||
}
|
||
|
||
}
|
||
```
|
||
|
||
> 为了支持使用@SendTo,listener container factory必须被提供KafkaTemplate(通过`replyTemplate`属性指定),该template将会被用于发送消息。当使用spring boot时,会自动将template注入到container factory中。
|
||
|
||
从2.2版本开始,可以向listener container factory中添加`ReplyHeadersConfigurer`,通过其可以设置接收消息中的哪些header可以被拷贝到返回消息的header中,使用示例如下所示:
|
||
```java
|
||
@Bean
|
||
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(cf());
|
||
factory.setReplyTemplate(template());
|
||
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
|
||
return factory;
|
||
}
|
||
```
|
||
如果在拷贝header外,还想为reply message指定额外的header,可以通过如下方式来实现:
|
||
```java
|
||
@Bean
|
||
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(cf());
|
||
factory.setReplyTemplate(template());
|
||
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
|
||
|
||
@Override
|
||
public boolean shouldCopy(String headerName, Object headerValue) {
|
||
return false;
|
||
}
|
||
|
||
@Override
|
||
public Map<String, Object> additionalHeaders() {
|
||
return Collections.singletonMap("qux", "fiz");
|
||
}
|
||
|
||
});
|
||
return factory;
|
||
}
|
||
```
|
||
在additionHeaders方法返回的map中,如果有key和已经存在的header key重复,那么map中的key-value将会覆盖现存header
|
||
|
||
在使用@SendTo时,必须为`ConcurrentKafkaListenerContainerFactory`配置一个`KafkaTemplate`,需要配置的属性为`replyTemplate`。spring boot将会将自动装配的template注入到replyTemplate属性中。
|
||
|
||
可以将@SendTo添加到无返回值的方法上,通过指定errorHandler属性,当且仅当@SendTo注解的方法发生异常时,将异常消息作为messaage发送给特定topic,使用示例如下:
|
||
```java
|
||
@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
|
||
errorHandler = "voidSendToErrorHandler")
|
||
@SendTo("failures")
|
||
public void voidListenerWithReplyingErrorHandler(String in) {
|
||
throw new RuntimeException("fail");
|
||
}
|
||
|
||
@Bean
|
||
public KafkaListenerErrorHandler voidSendToErrorHandler() {
|
||
return (m, e) -> {
|
||
return ... // some information about the failure and input data
|
||
};
|
||
}
|
||
```
|
||
### 过滤消息
|
||
在某些特定场景下,例如发生rebalance,已经被处理过的消息将会被重复传递。对此,spring kafka提供了类`FilteringMessageListenerAdapter`,其可以对`MessageListener`进行包装。FilteringMessageListenerAdapter类接收一个`RecordFilterStrategy`的实现,`RecordFilterStrategy.filter`方法为一个predicate,接收一条消息并返回boolean。若predicate返回为true,则代表该消息是重复传递的,应当被丢弃。
|
||
|
||
`FilteringMessageListenerAdapter`类存在一个名为`ackDiscarded`的属性,若该属性被设置为true,代表adapter也会针对被丢弃的消息执行ack操作。
|
||
|
||
当使用`@KafkaListener`时,若针对`KafkaListenerContainerFactory`设置`RecordFilterStrategy`或`ackDiscarded`属性,listener将会被包装在特定的adapter中。
|
||
|
||
同样的,spring kafka也提供了`FilteringBatchMessageListenerAdapter`类,用于批量消息的过滤。
|
||
|
||
从2.8.4开始,可以通过`@KafkaListener`注解的filter属性来覆盖container factory的`RecordFilterStrategy`属性。
|
||
|
||
```java
|
||
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
|
||
public void listen(Thing thing) {
|
||
...
|
||
}
|
||
```
|
||
|
||
### 通过KafkaTemplate来接收消息
|
||
从2.8版本开始,kafka template有4个方法用于接收消息:
|
||
```java
|
||
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
|
||
|
||
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
|
||
|
||
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
|
||
|
||
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
|
||
```
|
||
## 动态创建container
|
||
spring kafka提供了一些方式用于在运行时动态的创建container。
|
||
|
||
### MessageListener实现
|
||
如果实现了自己的MessageListener,那么可以通过container factory来为listener创建container:
|
||
```java
|
||
public class MyListener implements MessageListener<String, String> {
|
||
|
||
@Override
|
||
public void onMessage(ConsumerRecord<String, String> data) {
|
||
// ...
|
||
}
|
||
|
||
}
|
||
|
||
private ConcurrentMessageListenerContainer<String, String> createContainer(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
|
||
container.getContainerProperties().setMessageListener(new MyListener());
|
||
container.getContainerProperties().setGroupId(group);
|
||
container.setBeanName(group);
|
||
container.start();
|
||
return container;
|
||
}
|
||
```
|
||
### Prototype Beans
|
||
如果将bean对象scope声明为prototype,那么被`@KafkaListener`注解的方法其对应container将会被动态的创建。
|
||
|
||
使用实例如下所示:
|
||
```java
|
||
public class MyPojo {
|
||
|
||
private final String id;
|
||
|
||
private final String topic;
|
||
|
||
public MyPojo(String id, String topic) {
|
||
this.id = id;
|
||
this.topic = topic;
|
||
}
|
||
|
||
public String getId() {
|
||
return this.id;
|
||
}
|
||
|
||
public String getTopic() {
|
||
return this.topic;
|
||
}
|
||
|
||
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
|
||
public void listen(String in) {
|
||
System.out.println(in);
|
||
}
|
||
|
||
}
|
||
|
||
@Bean
|
||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||
MyPojo pojo(String id, String topic) {
|
||
return new MyPojo(id, topic);
|
||
}
|
||
|
||
applicationContext.getBean(MyPojo.class, "one", "topic2");
|
||
applicationContext.getBean(MyPojo.class, "two", "topic3");
|
||
```
|
||
|
||
> 每个listener都有对应的container,listener必须含有唯一的id,id为该listener对应container的唯一标识符。从2.8.9版本开始,`KafkaListenerEndpointRegistry`包含一个名为`unregisterListenerContainer(String id)`的新方法允许来重用id。对container执行unregister操作并不会stop容器,如果需要对容器执行stop操作,需要手动对container调用stop方法。
|
||
|
||
## Topic/Partition初始offset
|
||
存在如下方式为partition设置初始offset。
|
||
|
||
当手动对分区进行assign时,可以手动设置其初始offset(通过指定TopicPartitionOffset参数)。同时,可以在任何时间调用`seek`方法来将设置到指定的offset。
|
||
|
||
当使用group management时,分区将会由broker来进行分配:
|
||
- 对于新的`group.id`(新的消费者组),initial offset将会由`auto.offset.reset`属性来决定(earliest或者latest)
|
||
- 对于已经存在的`group.id`(当前消费者组已存在),initial offset将会是当前group的offset。但是,可以在初始化时通过seek将offset设置到指定位置
|
||
|
||
## seek到指定offset
|
||
如果想要在listener中执行seek操作,Listener必须实现`ConsumerSeekAware`接口,该接口含有如下方法:
|
||
```java
|
||
void registerSeekCallback(ConsumerSeekCallback callback);
|
||
|
||
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
|
||
|
||
void onPartitionsRevoked(Collection<TopicPartition> partitions)
|
||
|
||
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
|
||
```
|
||
|
||
在container启动或是partition被分配时,`registerSeekCallback`方法都会被调用,用于向listener中注册ConsumerSeekCallback对象。从而在初始化之后,可以通过该callback执行seek操作。
|
||
|
||
> callback用于操作被分配的topic和partition,故而当partition被重新分配时,registerSeekCallback都会被重新调用。
|
||
|
||
应该在listener中对该callback进行存储,如果多个container共同使用同一listener(或是使用ConcurrentMessageListenerContainer),那么应该将callback存储在ThreadLocal中,或是以`Thread`为key的数据结构中。
|
||
|
||
当使用group manangement时,`onPartitionsAssigned`将会在partition被分配时被调用。可以使用onPartitionsAssigned接口来进行分区初始化offset设置。同时,也可以使用`onPartitionsAssigned`接口将thread对应的callback和线程被分配的分区(一个线程对应一个consumer实例)对应起来。在onPartitionsAssigned被调用时,必须使用该方法接收到的callback,而非是registerSeekCallback获取到的callback。
|
||
|
||
`onPartitionRevoked`在container stop或kafka撤销分区分配时被调用。在分配被取消分配后,应该将该thread对应的callback丢弃,并且移除任何指向该分区的引用。
|
||
|
||
`ConsumerSeekCallback`则是含有如下方法:
|
||
```java
|
||
void seek(String topic, int partition, long offset);
|
||
|
||
void seekToBeginning(String topic, int partition);
|
||
|
||
void seekToBeginning(Collection<TopicPartitions> partitions);
|
||
|
||
void seekToEnd(String topic, int partition);
|
||
|
||
void seekToEnd(Collection<TopicPartitions> partitions);
|
||
|
||
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
|
||
|
||
void seekToTimestamp(String topic, int partition, long timestamp);
|
||
|
||
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
|
||
```
|
||
|
||
`seekRelative`是在2.3版本加入的,用于执行relative seek操作:
|
||
- 当offset为负并且toCurrent参数为false时,代表seek到相对分区尾部的偏移量
|
||
- 当offset为正并且toCurrent为false时,代表seek到相对分区起始位置的偏移量
|
||
- 当offset为负并且toCurrent为true时,代表seek到相对当前offset的位置(rewind)
|
||
- offset为正并且toCurrent为true时,代表seek到相对当前offset的位置(fast forward)
|
||
|
||
在2.3版本中,seekToTimestamp方法也被加入。
|
||
|
||
如下是一个如何使用callback的示例:
|
||
```java
|
||
@SpringBootApplication
|
||
public class SeekExampleApplication {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(SeekExampleApplication.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
|
||
return args -> {
|
||
IntStream.range(0, 10).forEach(i -> template.send(
|
||
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
|
||
while (true) {
|
||
System.in.read();
|
||
listener.seekToStart();
|
||
}
|
||
};
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic() {
|
||
return new NewTopic("seekExample", 3, (short) 1);
|
||
}
|
||
|
||
}
|
||
|
||
@Component
|
||
class Listener implements ConsumerSeekAware {
|
||
|
||
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
|
||
|
||
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
|
||
|
||
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
|
||
|
||
@Override
|
||
public void registerSeekCallback(ConsumerSeekCallback callback) {
|
||
this.callbackForThread.set(callback);
|
||
}
|
||
|
||
@Override
|
||
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
|
||
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
|
||
}
|
||
|
||
@Override
|
||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||
partitions.forEach(tp -> this.callbacks.remove(tp));
|
||
this.callbackForThread.remove();
|
||
}
|
||
|
||
@Override
|
||
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
|
||
}
|
||
|
||
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
|
||
public void listen(ConsumerRecord<String, String> in) {
|
||
logger.info(in.toString());
|
||
}
|
||
|
||
public void seekToStart() {
|
||
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
|
||
}
|
||
|
||
}
|
||
```
|
||
为了简化实现,从2.3版本开始,加入了`AbstractConsumerSeekAware`,该类保存了针对某个topic/partition应该使用哪个callback。如下示例展示了使用示例:
|
||
```java
|
||
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
|
||
|
||
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
|
||
public void listen(String in) {
|
||
...
|
||
}
|
||
|
||
@Override
|
||
public void onIdleContainer(Map<TopicPartition, Long> assignments,
|
||
ConsumerSeekCallback callback) {
|
||
|
||
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
|
||
}
|
||
|
||
/**
|
||
* Rewind all partitions one record.
|
||
*/
|
||
public void rewindAllOneRecord() {
|
||
getSeekCallbacks()
|
||
.forEach((tp, callback) ->
|
||
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
|
||
}
|
||
|
||
/**
|
||
* Rewind one partition one record.
|
||
*/
|
||
public void rewindOnePartitionOneRecord(String topic, int partition) {
|
||
getSeekCallbackFor(new TopicPartition(topic, partition))
|
||
.seekRelative(topic, partition, -1, true);
|
||
}
|
||
|
||
}
|
||
```
|
||
从2.6版本开始,为抽象类添加了如下方便起见的方法:
|
||
- seekToBeginning() : 对所有被分配的partition执行seek操作,把offset重置到开始位置
|
||
- seekToEnd() : 对所有被分配的分区都执行seekToEnd操作
|
||
- seekToTimestamp(long timestamp) :将所有被分配的分区offset都重置到timestamp所代表的位置
|
||
|
||
```java
|
||
public class MyListener extends AbstractConsumerSeekAware {
|
||
|
||
@KafkaListener(...)
|
||
void listn(...) {
|
||
...
|
||
}
|
||
}
|
||
|
||
public class SomeOtherBean {
|
||
|
||
MyListener listener;
|
||
|
||
...
|
||
|
||
void someMethod() {
|
||
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
|
||
}
|
||
|
||
}
|
||
```
|
||
## Container Factory
|
||
和之前在`@KafkaListener`中提到的一样,对于被`@KafkaListener`注解的方法,`ConcurrentKafkaListenerContainerFactory`将会被用于创建@KafkaListener方法对应的container。
|
||
|
||
从2.2版本开始,可以使用相同的factory来创建任何ConcurrentMessageListenerContainer。一旦container创建后,可以进一步改变container的属性,大多数属性都可以通过`container.getContainerProperties()`来进行修改。如下示例配置了`ConcurrentMessageListenerContainer`:
|
||
```java
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, String>(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
|
||
|
||
ConcurrentMessageListenerContainer<String, String> container =
|
||
factory.createContainer("topic1", "topic2");
|
||
container.setMessageListener(m -> { ... } );
|
||
return container;
|
||
}
|
||
```
|
||
> 通过上述@Bean方式创建的container并不会被加入到endpoint registry中,而是会作为bean对象被注入到spring容器中。
|
||
>
|
||
> 而通过@KafkaListener创建的container,则是并不会被注册为bean对象,而是统一通过endpoint registry进行管理。
|
||
|
||
从2.3.4版本开始,可以添加为factory添加`ContainerCustomizer`,在factory已经被配置后进一步的对container进行自定义设置。
|
||
```java
|
||
@Bean
|
||
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
...
|
||
factory.setContainerCustomizer(container -> { /* customize the container */ });
|
||
return factory;
|
||
}
|
||
```
|
||
|
||
## 线程安全
|
||
当使用concurrent message listener container时,单个listener将会在所有consumer thread中被调用。故而listener需要是线程安全的,为了保证listener的线程安全,更倾向于使用无状态的listener(stateless)。如果无法将listener变为线程安全的,或是通过synchronization进行同步会大幅降性能,可以使用如下的技术之一:
|
||
- 使用n个container,每个container的concurrency为1,并且container中的MessageListener其scope为prototype,故而每个container都独占一个listener(在使用@KafkaListener时,该方案无法实现)
|
||
- 将状态保存在ThreadLocal中
|
||
|
||
为了促进线程状态的清理,从2.2版本开始,每个线程退出时,listener container将会发布`ConsumerStoppedEvent`事件。可以通过`ApplicationListener`或是`@EventListener`来消费该事件,从而对`ThreadLocal`对象调用remove方法来清除线程状态。
|
||
|
||
> 默认情况下,spring将会在事件触发的线程中调用event listener。如果将multicaster改为async executor,那么清理操作将没有作用。
|
||
|
||
## 监控
|
||
### 监控listener性能
|
||
从2.3版本开始,如果在classpath中检测到`Micrometer`,并且spring容器中只有一个`MeterRegistry`实例,listener container会为listener自动创建和更新Micrometer timer。如果想要禁用micrometer timer,可以将ContainerProperties中的`micrometerEnabled`设置为false。
|
||
|
||
container会为listener维护两个timer,一个记录成功的调用,一个记录失败的调用。
|
||
|
||
timer根据`spring.kafka.listener`命名,并且含有如下的tag:
|
||
- name : (container bean name)
|
||
- result : success or failure(两个timer分开统计)
|
||
- exception: none or ListenerExecutionFailedException
|
||
|
||
可以使用`ContainerProperties`的`micrometerTags`属性来添加额外的tag。
|
||
|
||
从2.9.8、3.0.6版本开始,可以为`ContainerProperties`中的`micrometerTagsProvider`属性中提供一个方法,改方法获取`ConsumerRecord<?, ?>`并且基于record返回tags,并且将tag与`micrometerTags`属性中的任意static tags合并。
|
||
|
||
### 监控KafkaTemplate性能
|
||
从2.5版本开始,如果在classpath中检测到`Micrometer`,并且spring容器中只有一个`MeterRegistry`实例,template会自动的创建并且更新`Micrometer Timer`。如果想要禁用micrometer timer,可以将template的`micrometerEnabled`设置为false。
|
||
|
||
同样会为template维护两个timer,一个记录成功的调用,一个记录失败的调用。
|
||
|
||
timer根据`spring.kafka.template`命名,并且含有如下tag:
|
||
- name : (template bean name)
|
||
- result : success or failure(两个timer分开统计)
|
||
- exception: none or ListenerExecutionFailedException
|
||
|
||
### Micrometer Native Metrics
|
||
从2.5版本开始,spring kafka框架提供了`Factory Listeners`用于管理Micrometer `KafkaClientMetrics`实例,用于监听producer或consumer的创建或关闭。
|
||
|
||
用于启用该属性,只需要为producer或consumer factory添加listener即可:
|
||
```java
|
||
@Bean
|
||
public ConsumerFactory<String, String> myConsumerFactory() {
|
||
Map<String, Object> configs = consumerConfigs();
|
||
...
|
||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
|
||
...
|
||
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
|
||
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
|
||
...
|
||
return cf;
|
||
}
|
||
|
||
@Bean
|
||
public ProducerFactory<String, String> myProducerFactory() {
|
||
Map<String, Object> configs = producerConfigs();
|
||
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
|
||
...
|
||
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
|
||
...
|
||
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
|
||
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
|
||
...
|
||
return pf;
|
||
}
|
||
```
|
||
consumer/producer id将会作为`spring.id`被传递到tag中。
|
||
|
||
获取metrics的示例如下:
|
||
```java
|
||
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
|
||
.tag("customTag", "customTagValue")
|
||
.tag("spring.id", "myProducerFactory.myClientId-1")
|
||
.functionCounter()
|
||
.count();
|
||
```
|
||
## 事务
|
||
spring kafka通过如下方式添加了对事务的支持:
|
||
- KafkaTransactionManager:和spring transaction支持类似(@Transaction、TransactionTemplate)
|
||
- 使用Transactional KafkaMessageListenerContainer
|
||
- kafkaTemplate支持本地事务
|
||
- 和其他transactionManager同步
|
||
|
||
当为DefaultKafkaProducerFactory提供了`transactionPrefixId`属性时,事务是默认启用的。在指定了事务前缀id的情况下,DefaultKafkaProducerFactory并不是只维护一个transactional producer,而是维护了一个transactional producers cache。当在producer上调用close时,producer将会被归还到cache中并在后续操作中被重用。对于每个producer来说,`transaction.id`属性为`transactionPrefixId + n`,n从0开始并且对每个producer依次递增。
|
||
|
||
> 如果当前服务存在多个实例,那么各个实例的`transactionPrefixId`属性必须都不同。
|
||
|
||
对于使用了springboot的程序,只需要为producer factory设置`spring.kafka.producer.transaction-id-prefix`属性即可,spring boot会自动装配`KafkaTransactionManager`bean 对象并将其注入到listener container。
|
||
|
||
### 使用KafkaTransactionManager
|
||
`KafkaTransactionManager`是`PlatformTransactionManager`的一个实现,其构造器中提供了一个到producer factory的引用参数。如果为该参数指定了自定义的producer factory,其必须支持事务。
|
||
|
||
可以将KafkaTransactionManager和spring transaction support结合使用,如果事务被启用,那么在事务范围内的KafkaTemplate操作将会使用transactional producer。取决事务执行成功或者失败,transaction manager将会被事务进行提交或回滚。KafkaTemplate必须和ProducerFactory使用相同的事务管理器。
|
||
|
||
### 事务同步
|
||
本节只涉及到由producer发起的事务(不包含由listener container发起的事务)。
|
||
|
||
如果想要在发送消息到kafka的同时执行一些数据库更新操作,可以使用正常的spring transaction management,例如DataSourceTransactionManager。
|
||
```java
|
||
@Transactional
|
||
public void process(List<Thing> things) {
|
||
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
|
||
updateDb(things);
|
||
}
|
||
```
|
||
@Transactional注解的拦截器将会开启一个事务,并且KafkaTemplate将会与该transaction manager同步一个事务,每次发送record的操作都会加入到该事务。当方法退出时,database transaction提交之后,kafka transaction才会被提交。
|
||
|
||
如果想要让kafka事务先被提交,应该嵌套使用@Transactional注解。外部方法的@Transactional其transactionManager属性应该被配置为DataSourceTransactionManager,内部方法的@Transactional注解其transactionManager属性被配置为kafkaTransactionManager。
|
||
|
||
### 使用由Consumer发起的事务
|
||
从2.7版本开始,`ChainedKafkaTransactionManager`被废弃。container中会通过KafkaTransactionMananger来启用一个事务,并且在listener method上可以加上@Transactional来开启其他非kafka事务。
|
||
|
||
在spring boot中listener container会自动注入kafkaTransactionManager。listener container会开启kafka transaction,并且可以通过@Transactional注解来开启db transaction。
|
||
|
||
db事务会在kafka事务之前提交,如果kafka事务提交失败,那么该recrod会被重新传递,故而数据库操作必须是幂等的。
|
||
|
||
使用示例如下:
|
||
```java
|
||
@SpringBootApplication
|
||
public class Application {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(Application.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
|
||
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
|
||
}
|
||
|
||
@Bean
|
||
public DataSourceTransactionManager dstm(DataSource dataSource) {
|
||
return new DataSourceTransactionManager(dataSource);
|
||
}
|
||
|
||
@Component
|
||
public static class Listener {
|
||
|
||
private final JdbcTemplate jdbcTemplate;
|
||
|
||
private final KafkaTemplate<String, String> kafkaTemplate;
|
||
|
||
public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
|
||
this.jdbcTemplate = jdbcTemplate;
|
||
this.kafkaTemplate = kafkaTemplate;
|
||
}
|
||
|
||
@KafkaListener(id = "group1", topics = "topic1")
|
||
@Transactional("dstm")
|
||
public void listen1(String in) {
|
||
this.kafkaTemplate.send("topic2", in.toUpperCase());
|
||
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
|
||
}
|
||
|
||
@KafkaListener(id = "group2", topics = "topic2")
|
||
public void listen2(String in) {
|
||
System.out.println(in);
|
||
}
|
||
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic1() {
|
||
return TopicBuilder.name("topic1").build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic2() {
|
||
return TopicBuilder.name("topic2").build();
|
||
}
|
||
|
||
}
|
||
```
|
||
|
||
```properties
|
||
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
|
||
spring.datasource.username=root
|
||
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
|
||
|
||
spring.kafka.consumer.auto-offset-reset=earliest
|
||
spring.kafka.consumer.enable-auto-commit=false
|
||
spring.kafka.consumer.properties.isolation.level=read_committed
|
||
|
||
spring.kafka.producer.transaction-id-prefix=tx-
|
||
|
||
#logging.level.org.springframework.transaction=trace
|
||
#logging.level.org.springframework.kafka.transaction=debug
|
||
#logging.level.org.springframework.jdbc=debug
|
||
```
|
||
|
||
对于只存在于producer中的transaction,可以使用transaction synchronize:
|
||
```java
|
||
@Transactional("dstm")
|
||
public void someMethod(String in) {
|
||
this.kafkaTemplate.send("topic2", in.toUpperCase());
|
||
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
|
||
}
|
||
```
|
||
kafkaTemplate将会将kafka transaction和db transaction进行同步,并且kafka事务在db事务之后才执行commit或rollback。
|
||
|
||
如果想要先提交kafka transaction,再提交db transaction,可以使用嵌套@Transactional,示例如下所示:
|
||
```java
|
||
@Transactional("dstm")
|
||
public void someMethod(String in) {
|
||
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
|
||
sendToKafka(in);
|
||
}
|
||
|
||
@Transactional("kafkaTransactionManager")
|
||
public void sendToKafka(String in) {
|
||
this.kafkaTemplate.send("topic2", in.toUpperCase());
|
||
}
|
||
```
|
||
### KafkaTemplate本地事务
|
||
可以通过kafkaTemplate在local transaction中执行一系列操作。如下为使用示例:
|
||
```java
|
||
boolean result = template.executeInTransaction(t -> {
|
||
t.sendDefault("thing1", "thing2");
|
||
t.sendDefault("cat", "hat");
|
||
return true;
|
||
});
|
||
```
|
||
上述示例中,callback的参数代表template本身。如果该callback正常退出,那么事务则执行commit操作;如果callback抛出异常,那么transaction将会回滚。
|
||
|
||
> 如果在调用executeInTransaction时,已经存在KafkaTransactionManager事务或synchronized事务,那么kafkaTransactionManager事务不会被使用,而会使用由executeInTransaction方法开启的事务
|
||
|
||
### KafkaTemplate事务发布和非事务发布
|
||
一般来说,当KafkaTemplate是事务的(producer factory配置了transactionIdPrefix),那么提交record的操作应该是事务的。开启事务可以通过transactionManager、@Transactional方法,executeInTransaction调用,或由listener container开启。任何在事务范围内使用template的行为将会导致抛出`IllegalStateException`。从2.4.3版本开始,可以设置template的`allowNonTransactional`属性从而允许kafkaTemplate执行非事务操作。
|
||
|
||
在allowNonTransactional属性被设置为true时,将会调用producer container的createNonTransactionalProducer方法创建非事务producer,该nontransactional producer创建后也会被缓存,或与线程相绑定,从而进行重用。
|
||
|
||
### 事务结合BatchListener使用
|
||
当listener在事务存在时执行失败,rollback触发之后会调用AfterRollbackProcessor来执行一些操作。在recordListener使用默认的AfterRollbackProcessor时,会执行seek操作,故而失败的消息将会被重新传递。
|
||
|
||
在使用batch listener时,如果处理失败,那么整个batch中的消息都会被重新传递,因为framwork不知道在batch中具体那条record处理失败。
|
||
|
||
## Exactly Once语义
|
||
可以为ListenerContainer提供KafkaAwareTransactionManager实例,当这样配置时,container会在调用listener之前开启一个事务。
|
||
|
||
任何由listener执行的KafkaTemplate操作将会加入该事务。如果listener成功的处理了该条record(或通过BatchMessageListener处理了多条请求),container将会在transaction manager提交事务前调用`producer.sendOffsetsToTransaction()`将offset发送给事务。
|
||
|
||
如果listener抛出一个异常,那么kafka trasaction将会回滚,并且consumer会将position进行回退,被回滚的records将会在下次poll操作时被重新传递。
|
||
|
||
当使用事务时,会开启exactly-once语义(EOS:exactly-once-semantics)。其代表在一个`read -> process -> write`序列中,该序列只会被完成一次。
|
||
|
||
> eos代表`read->process->write`序列只会被执行完成一次,但是`read->process`流程可能会执行多次,拥有at-least-once的语义。
|
||
|
||
## Pausing and Resuming Listener Containers
|
||
2.1.3版本对listener container添加了`pause`和`resume`方法,在2.1.3之前的版本中,可以在ConsumerAwareMessageListener中对container执行pause操作,在ListenerContainerIdleEvent中对container执行resume操作,`ConsumerAwareMessageListener`和`ListenerContainerIdleEvent`中都提供了对Consumer的访问。
|
||
|
||
调用container中的pause和resume方法是线程安全的,调用pause方法之后,其在下次调用poll前生效;调用resume方法后,其会在本次poll调用返回后生效。当在container上调用pause后,其会持续调用poll方法,从而避免在使用group management后发生rebalance,但是在处于paused状态时其不会获取任何records。
|
||
|
||
从2.1.5版本开始,可以通过调用`isPauseRequested`来得知是否`pause`方法被调用。但是,调用pause后,其会在下次调用poll前才生效,当前可能并未生效,可以通过`isConsumerPaused()`方法来获取是否当前consumer实际处于paused状态。
|
||
|
||
如下展示了如何通过registry来对@KafkaListener对应的container执行pause和resume操作:
|
||
```java
|
||
@SpringBootApplication
|
||
public class Application implements ApplicationListener<KafkaEvent> {
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(Application.class, args).close();
|
||
}
|
||
|
||
@Override
|
||
public void onApplicationEvent(KafkaEvent event) {
|
||
System.out.println(event);
|
||
}
|
||
|
||
@Bean
|
||
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
|
||
KafkaTemplate<String, String> template) {
|
||
return args -> {
|
||
template.send("pause.resume.topic", "thing1");
|
||
Thread.sleep(10_000);
|
||
System.out.println("pausing");
|
||
registry.getListenerContainer("pause.resume").pause();
|
||
Thread.sleep(10_000);
|
||
template.send("pause.resume.topic", "thing2");
|
||
Thread.sleep(10_000);
|
||
System.out.println("resuming");
|
||
registry.getListenerContainer("pause.resume").resume();
|
||
Thread.sleep(10_000);
|
||
};
|
||
}
|
||
|
||
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
|
||
public void listen(String in) {
|
||
System.out.println(in);
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic topic() {
|
||
return TopicBuilder.name("pause.resume.topic")
|
||
.partitions(2)
|
||
.replicas(1)
|
||
.build();
|
||
}
|
||
|
||
}
|
||
```
|
||
## 异常处理
|
||
### Listener Error Handler
|
||
从版本2.0开始,`@KafkaListener`注解含有一个新的属性,`errorHandler`.
|
||
|
||
可以为errorHandler属性提供一个`KafkaListenerErrorHandler`实现的bean name,该接口为函数式接口,仅含有一个方法,接口定义如下所示:
|
||
```java
|
||
@FunctionalInterface
|
||
public interface KafkaListenerErrorHandler {
|
||
|
||
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
|
||
|
||
}
|
||
```
|
||
通过该接口,可以访问由message converter生成的spring message对象和由listener抛出的异常,该异常被包装在ListenerExecutionFailedException中。error handler可以抛出原来的异常或抛出一个新异常,抛出的异常将会被抛给container。由该listener方法返回的任何内容都会被忽略。
|
||
|
||
从2.7版本开始,可以为MessagingMessageConverter和BatchMessagingMessageConverter设置rawRecordHeader属性,其会导致原始的`ConsumerRecord`对象将会被设置到converted message中的`KafkaHeaders.RAW_DATA` header上。使用示例如下所示:
|
||
```java
|
||
@Bean
|
||
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
|
||
return (msg, ex) -> {
|
||
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
|
||
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
|
||
return "FAILED";
|
||
}
|
||
throw ex;
|
||
};
|
||
}
|
||
```
|
||
其还含有一个子接口`ConsumerAwareListenerErrorHandler`,该接口可以访问consumer对象。
|
||
```java
|
||
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
|
||
```
|
||
|
||
另一个子接口`ManualAckListenerErrorHandler`则是可以在手动ack时使用:
|
||
```java
|
||
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
|
||
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
|
||
```
|
||
### Container Handler
|
||
从2.8版本开始,`ErrorHandler`接口和`BatchErrorHandler`都由`CommonErrorHandler`取代。该接口可以为单条和批量record处理异常。
|
||
|
||
当使用kafka事务时,默认情况下不会配置任何error handler,故而异常将会导致事务的回滚。transactional container抛出的异常将会由`AfterRollbackProcessor`进行处理。当在使用事务时,如果提供了自定义的error handler,在想要事务回滚时必须让异常被抛出。
|
||
|
||
CommonErrorHandler拥有一个default method `isAckAfterHandle`,该方法在container调用error handler完成后(error handler没有抛出异常)被调用,用于判断是否应该提交offset,默认情况下,其会返回true。
|
||
|
||
通常,由framework提供的error handler会抛出接收的异常。默认情况下,异常将会由container在`error`级别日志打出。
|
||
|
||
可以通过listener container factory指定全局的common listener,示例如下所示:
|
||
```java
|
||
@Bean
|
||
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
|
||
kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
...
|
||
factory.setCommonErrorHandler(myErrorHandler);
|
||
...
|
||
return factory;
|
||
}
|
||
```
|
||
在调用error handler之前,container将会提交所有pending offset。
|
||
|
||
在使用spring boot时,只需要将error handler声明为bean,spring boot会将其添加到自动装配的container factory中。(用于factory生成的所有container)。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|