Files
rikako-note/spring/spring kafka/spring kafka.md
2024-03-04 20:53:25 +08:00

2254 lines
118 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

- [Spring Kafka](#spring-kafka)
- [连接到kafka](#连接到kafka)
- [运行时切换bootstrap servers](#运行时切换bootstrap-servers)
- [ABSwitchCluster](#abswitchcluster)
- [Factory Listener](#factory-listener)
- [配置Topic](#配置topic)
- [发送消息](#发送消息)
- [发送示例](#发送示例)
- [RoutingKafkaTemplate](#routingkafkatemplate)
- [使用DefaultKafkaProducerFactory](#使用defaultkafkaproducerfactory)
- [ReplyingKafkaTemplate](#replyingkafkatemplate)
- [ReplyingKafkaTemplate header](#replyingkafkatemplate-header)
- [通过Message\<?\>来发送请求和返回请求](#通过message来发送请求和返回请求)
- [kafka poison pill \& ErrorHandlingDeserializer](#kafka-poison-pill--errorhandlingdeserializer)
- [ErrorHandlingDeserializer](#errorhandlingdeserializer)
- [replyErrorChecker](#replyerrorchecker)
- [AggregatingReplyingKafkaTemplate](#aggregatingreplyingkafkatemplate)
- [AggregatingReplyingKafkaTemplate配置要求](#aggregatingreplyingkafkatemplate配置要求)
- [接收消息](#接收消息)
- [Message Listener](#message-listener)
- [MessageListenerContainer](#messagelistenercontainer)
- [Interceptors](#interceptors)
- [使用KafkaMessageListenerContainer](#使用kafkamessagelistenercontainer)
- [使用ConcurrentMessageListenerContainer](#使用concurrentmessagelistenercontainer)
- [Committing Offsets](#committing-offsets)
- [手动提交offset](#手动提交offset)
- [@KafkaListener](#kafkalistener)
- [Record Listener](#record-listener)
- [显式分区分配](#显式分区分配)
- [手动ack](#手动ack)
- [Consumer Record Metadata](#consumer-record-metadata)
- [Batch Listener](#batch-listener)
- [注解属性](#注解属性)
- [获取consumer的`group.id`](#获取consumer的groupid)
- [container线程命名](#container线程命名)
- [@KafkaListener用作元注解](#kafkalistener用作元注解)
- [在类级别使用@KafkaListener](#在类级别使用kafkalistener)
- [@KafkaListener属性修改](#kafkalistener属性修改)
- [@KafkaListener生命周期管理](#kafkalistener生命周期管理)
- [@KafkaListener @Payload校验](#kafkalistener-payload校验)
- [Rebalancing Listener](#rebalancing-listener)
- [强制触发consumer rebalance](#强制触发consumer-rebalance)
- [通过@SendTo注解发送listener结果](#通过sendto注解发送listener结果)
- [过滤消息](#过滤消息)
- [通过KafkaTemplate来接收消息](#通过kafkatemplate来接收消息)
- [动态创建container](#动态创建container)
- [MessageListener实现](#messagelistener实现)
- [Prototype Beans](#prototype-beans)
- [Topic/Partition初始offset](#topicpartition初始offset)
- [seek到指定offset](#seek到指定offset)
- [Container Factory](#container-factory)
- [线程安全](#线程安全)
- [监控](#监控)
- [监控listener性能](#监控listener性能)
- [监控KafkaTemplate性能](#监控kafkatemplate性能)
- [Micrometer Native Metrics](#micrometer-native-metrics)
- [事务](#事务)
- [使用KafkaTransactionManager](#使用kafkatransactionmanager)
- [事务同步](#事务同步)
- [使用由Consumer发起的事务](#使用由consumer发起的事务)
- [KafkaTemplate本地事务](#kafkatemplate本地事务)
- [KafkaTemplate事务发布和非事务发布](#kafkatemplate事务发布和非事务发布)
- [事务结合BatchListener使用](#事务结合batchlistener使用)
# Spring Kafka
## 连接到kafka
### 运行时切换bootstrap servers
从2.5版本开始KafkaAdmin、ProducerFactory、ConsumerFactory都继承于`KafkaResourceFactory`抽象类。通过调用`KafkaResourceFactory`抽象类的`setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)`方法可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。
> #### 切换bootstrap后关闭旧consumer和producer
> kafka consumer和producer通常都是基于长连接的在调用setBootstrapServersSupplier在运行时切换bootstrap servers后如果想要关闭现存的producer可以调用`DefaultKafkaProducerFactory`的`reset`方法。如果想要关闭现存的consumer可以调用`KafkaListenerEndpointRegistry`的`close`方法调用close后再调用start或是调用其他listener container的close和start方法。
#### ABSwitchCluster
为了方便起见framework提供了`ABSwitchCluster`该类支持两套bootstrap servers集合在任一时刻只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier\<String\>接口,将`ABSwitchCluster`对象提供给consumer factory, producer factory, KafkaAdmin后如果想要切换bootstrap servers可以调用ABSwitchCluster类的`primary``secondary`方法并关闭生产者和消费者的旧实例关闭生产者旧实例在producer factory上调用reset方法用于创建到新bootstrap servers的连接对于消费者实例可以对所有listener container先调用close方法再调用start方法当使用@KafkaListener注解时,需要对`KafkaListenerEndpointRegistry`bean对象调用close和start方法。
### Factory Listener
从2.5版本开始,`DefaultKafkaProducerFactory``DefaultKafkaConsumerFactory`都可以配置Listener通过配置Listener可以监听生产者或消费者实例的创建和关闭。
```java
// producer listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
```
```java
// consumer listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
```
再上述接口中id代表再factory bean对象名称后追加client-id属性二者通过`.`分隔。
## 配置Topic
如果在当前应用上下文中定义了KafkaAdmin bean对象kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加可以定义一个`NewTopic`类型的bean对象kafkaAdmin会自动将该topic添加到broker中。
为了方便topic的创建2.3版本中引入了TopicBuilder类。
```java
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
```
从2.6版本开始创建NewTopic时可以省略partitions()和replicas()方法的调用此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。
```java
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
```
从版本2.7开始,可以在`KafkaAdmin.NewTopics`的bean对象中声明多个NewTopic对象
```java
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
```
> 当使用spring boot时KafkaAdmin对象将会被自动注册故而只需要定义NewTopic bean对象即可。
默认情况下如果kafka broker不可用会输出日志进行记录但是此时context的载入还会继续后续可以手动调用KafkaAdmin的`initalize`方法和进行重试。如果想要在kafka broker不可用时停止context的载入可以将kafka Admin`fatalIfBrokerNotAvailable`属性设置为true此时context会初始化失败。
从版本2.7开始KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic
- `createOrModifyTopics`
- `describeTopics`
从版本2.9.10、3.0.9开始KafkaAdmin提供了`setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)`接口该接口接收一个Predicate\<NewTopic\>参数通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象每个kafkaAdmin对应不同的broker集群在上下文中含有多个NewTopic对象时可以通过predicate判断每个topic应该属性哪个amdin。
## 发送消息
KafkaTemplate类对KafkaProducer进行了包装提供了如下接口用于向kafka topic发送消息。
```java
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
```
其中sendDefault接口需要向KafkaTemplate提供一个默认的topic。
kafkaTemplate中部分api接收timestamp作为参数并且将timestamp存储到record中。接口中指定的timestamp参数如何存储取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为`CREATE_TIME`那么用户指定的timestamp参数将会被使用如果用户没有指定timestamp那么会自动创建timestampproducer会在发送时将timestamp指定为System.currentTimeMillis()。如果topic中timstamp类型被配置为`LOG_APPEND_TIME`那么用户指定的timestamp将会被丢弃而broker则会负责为timestamp赋值。
mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法execute接口则是提供了对底层KafkaProducer的直接访问。
要使用KafkaTemplate可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate
```java
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
```
从2.5开始创建KafkaTemplate时可以基于factory进行创建但是覆盖factory中的配置属性具体示例如下
```java
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
```
当使用KafkaTemplate接收`Message\<?\>`类型的参数时可以将topic、partition、key和timestamp参数指定在Message的header中header中包含如下条目
- KafkaHeaders.TOPIC
- KafkaHeaders.PARTITION
- KafkaHeaders.KEY
- KafkaHeaders.TIMESTAMP
除了调用发送方法获取CompletableFuture外还可以为KafkaTemplate配置一个ProducerListener从而在消息发送完成成功或失败后执行一个异步的回调。如下是ProducerListener接口的定义
```java
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
```
默认情况下KafkaTemplate配置了一个LoggingProducerListener会在发送失败时打印失败日志在发送成功时并不做任何事。并且为了方便起见方法的默认实现已经被提供可以只覆盖其中一个方法。
send方法默认返回的是CompletableFuture类型可以在发送完成之后为future注册一个回调
```java
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
```
其中Throwable类型的ex可以被转化为`KafkaProducerException`该类型的failedProducerRecord属性可以获取发送失败的record。
如果想要同步调用KafkaTemplate的发送方法并且等待返回结果可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下调用`CompletableFuture.get`推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便KafkaTemplate提供了带autoFlush参数的构造器版本如果设置autoFlush为truekafkaTemplate在每次发送消息时都会调用flush方法。
### 发送示例
如下展示了通过KafkaTemplate向broker发送消息的示例
```java
// async
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
```
```java
// sync
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
```
### RoutingKafkaTemplate
从2.5版本开始额可以通过RoutingKafkaTemplate在运行时选择producer实例选择过程基于topic名称。
> RoutingKafkaTemplate不支持事务也不支持execute、flush、metrics等方法因为RoutingKafkaTemplate根据topic来选择producer但是在执行这些操作时并不知道操作所属topic。
RoutingKafkaTemplate需要一个mapmap的key为`java.util.regex.Pattern`而value则是`ProducerFactory<Object, Object>`实例。该map必须是有序的例如LinkedHashMap该map需要按顺序遍历顺序在前的key-value对会优先匹配。
如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息实例中每个topic都使用不同的序列化方式。
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
```
### 使用DefaultKafkaProducerFactory
ProducerFactory是用于创建生产者实例的。当没有使用事务时默认情况下`DefaultKafkaFactory`会创建一个单例的生产者实例所有客户端都会使用生产者实例。但是如果在template中调用了flush方法这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始DefaultKafkaFactory有了新的`producerPerThread`属性当该属性设置为true时factory会针对每个线程都创建并缓存一个producer实例。
> 当`producerPerThread`被设置为true时若线程中的producer不再被需要那么对factory必须手动调用`closeThreadBoundProducer()`。这将会物理上对producer进行关闭并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。
当创建DefaultKafkaFactory时key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时可以选择向构造函数中传入serializer实例或是传入serializer supplier对象
- 当传入serializer实例时通过该factory创建的所有生产者实例都共享该serializer实例
- 当传入的是返回一个serializer的supplier时可令通过该factory创建的producer实例都拥有属于自己的serializer
如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例
```java
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
```
从2.5.10版本开始可以在factory创建之后再更新factory的producer config属性。例如可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例故而需要调用factory的reset方法在调用reset后所有现存producer实例都会被关闭而之后新创建的producer都会使用新的属性配置。
> 在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。
为了更新producer属性配置factory提供了如下两个接口
```java
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
```
### ReplyingKafkaTemplate
从2.1.3版本开始kafka引入了ReplyingKafkaTemplate其是KafkaTemplate的一个子类用于提供request/reply语义。该类相比父类含有两个额外的方法
```java
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
```
该方法的返回类型RequestReplyFuture继承了CompletableFutureRequestReplyFuture会异步的注入该future的结果可能正常返回也可能是一个exception或者timeout
RequestReplyFuture含有一个sendFuture属性该属性是调用kafkaTemplate的send方法发送消息的结果类型为`CompletableFuture<SendResult<K,V>>`可以通过该属性future来判断发送消息操作的结果。
如果在调用sendAndReceive方法时没有传递replyTimeout参数或是指定replyTimeout参数为null那么该template的`defaultReplyTimeout`属性将会被用作超时时间。默认情况下该超时属性为5s。
从2.8.8版本开始该template还有一个`waitForAssingment`方法。当reply container被配置为`auto.offset.reset=latest`时waitForAssingment方法相当有用避免当reply container尚未初始化完成时发送消息对应的reply已经返回了。
如下展示了如何使用ReplyingKafkaTemplate:
```java
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
```
在上述示例中采用了spring自动注入的containerFactory来创建reply container。
#### ReplyingKafkaTemplate header
在使用ReplyingKafkaHeader时template会为消息设置一个headerKafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。
server端返回correlation id的示例如下
```java
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
```
上述@KafkaListener结构会返回correlation id并且决定返回消息发送的topic。
并且ReplyingKafkaTemplate会使用headerKafkaHeaders.REPLY_TOPIC来代表返回消息发送到的topic。
从版本2.2开始该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset该topic或topicpartitionoffset将会被用于设置reply header但是如果reply container被配置监听多个topic或topicpartitionoffset用户必须自己手动设置header。
为消息设置header的示例如下
```java
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
```
若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时可以多个template共享一个reply topic只要每个template监听的分区不同。
若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时那么每个template实例都必须拥有不同的group id在这种情况下所有的template实例都会接受到所有消息但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便但是所有的template都接收所有消息将会带来额外的网络通信开销并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时推荐将template中`sharedReplyTopic`属性设置为true此时将会把收到不想要消息的日志级别从ERROR降低为DEBUG。
如下是使用共享topic配置template的示例
```java
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
```
默认情况下ReplyingKafkaTemplate将会使用如下三种header
- KafkaHeaders.CORRELATION_ID用于关联发送消息和回复消息的关联id
- KafkaHeaders.REPLY_TOPIC用于告诉接收消息的server将消息发送到哪个topic
- KafkaHeaders.REPLY_PARTITION该header是可选的可以通过该header告知server将消息发送到指定的分区
上述header将会被@KafkaListener结构用作返回消息的路由
#### 通过Message<?>来发送请求和返回请求
在2.7版本中对ReplyingKafkaTemplate加入了如下接口来发送和接收`spring-messaging`中的`Message<?>`
```java
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
```
上述接口会使用template默认的replyTimeout也存在接收timeout参数的重载版本。
如下示例展示了如何构建Message类型消息的发送和接收
```java
// template 配置
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
// 通过template发送和接收消息
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
// server端接收消息并且回复消息
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
```
从2.5版本开始若是返回的Message中没有指定headerframework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC如果存在的话。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS也会将其填充到返回消息的header中。
> #### ErrorHandlingDeserializer
> 可以考虑在reply container中使用ErrorHandlingDeserializer如果反序列化失败RequestReplyFuture将会以异常状态完成可以访问获取到的ExecutionException其cause属性中包含DeserializationException。
### kafka poison pill & ErrorHandlingDeserializer
poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败不管重试过多少次之后仍然无法成功被消费。
poison pill可能在如下场景下产生
- 该记录被损坏
- 该记录发序列化失败
在生产场景中consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。
在现实场景中可能因为如下缘故而遭遇poison pill
- 生产者改变了key或value的serializer并且持续向先前的topic中发送消息这将会导致反序列化问题
- consumer的key或value deserializer配置错误
- 不同的生产者实例使用不同的key或value serializer向topic中发送消息
在发生poison后consumer在调用poll拉取数据时将无法反序列化record调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理针对该topic分区的消费会被阻塞因为consumer offset一直无法向前移动。并且在consumer不停重试针对该消息的反序列化时大量的反序列化失败日志将会被追加到日志文件中磁盘占用量将会急剧增大。
#### ErrorHandlingDeserializer
为了解决poison pill问题spring引入了ErrorHandlingDeserializer该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败那么ErrorHandlingDeserializer将会返回一个null并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。
#### replyErrorChecker
从版本2.6.7开始可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker当提供了checker方法时template会自动调用该方法如果该方法抛出异常那么该reply message对应的future也会以失败状态完成。
replychecker使用示例如下
```java
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
```
### AggregatingReplyingKafkaTemplate
ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景但如果对于发送一条消息存在多个接收方会返回多条消息的场景则需要使用AggregatingReplyingKafkaTemplate。
像ReplyingKafkaTemplate一样AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener containerlistener container用于接收返回的消息。除此之外AggregatingReplyingKafkaTemplate还会接收第三个参数`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`该断言每次在接收到新消息时都会被计算。如果断言返回true`AggregatingReplyingKafkaTemplate.sendAndReceive`方法返回的Future对象将会被完成并且Future中的值为断言中ConsumerRecord的集合。
从版本2.3.5开始,第三个参数为`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`类型其会在每次接收到消息或是超时replyTimeout超时的情况下被调用第二个参数传入的boolean即是断言的这次调用是否因为超时。**该断言可以针对ConsumerRecord进行修改**。
> #### returnPartialOnTimeout
> AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout该属性值默认为false如果该值被设置为true那么当请求发生超时时会返回已经接收到的部分ConsumerRecord集合。
>
> BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息要想成功在超时场景下返回接收到的部分消息不仅需要returnPartialOnTimeout设置为true还需要BiPredicate断言在发生timeout的情况下返回值为true。
AggregatingReplyingKafkaTemplate使用示例如下
```java
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
(coll, timeout) -> timeout || coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
```
注意sendAndReceive方法返回的future其值类型为`ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`类型外层的ConsumerRecord并不是真正的返回消息而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的外层ConsumerRecord用于存储实际接收到的消息集合。
> #### ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>
> `ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`作为聚合返回消息集合的伪ConsumerRecord其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal releasereleaseStrategy返回为true伪ConsumerRecord的topic name为`aggregatedResults`.
>
> 当获取该伪ConsumerRecord的原因是timeoutreturnPartialOnTimeout被设置为true并且发生timeout并且至少获取到一条返回消息那么伪ConsumerRecord的topic name将会被设置为`partialResultsAfterTimeout`.
template为上述伪topic name提供了静态变量
```java
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
```
伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。
#### AggregatingReplyingKafkaTemplate配置要求
listener container必须要配置为`AckMode.MANUAL`模式或`AckMode.MANUAL_IMMEDIATE`consumer属性`enable.auto.commit`必须被设置伪false。为了避免任何丢失消息的可能template只会在没有待处理请求的前提下提交offset即最后一个未处理请求被releaseStrategy释放。
当consumer发生rebalance时可能会造成返回消息被重复传递由于template只会在没有待处理请求的情况下提交offset如果在listener container接收到消息后尚未提交offset此时发生rebalance那么未提交offset的消息将会被重复接收。对于在途的请求消息的重复传递将会被忽略在途的消息还会存在多条消息聚合的过程针对已经被releaseStrategy释放的返回消息如果接收到多条重复的返回消息那么会在log中看到error日志。
另外如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。
对于AggregatingReplyingKafkaTemplate`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。
## 接收消息
在使用spring kafka时可以通过为MessageListenerContainer提供listener或是通过@KafkaListener注解来从kafka broker接收消息。
### Message Listener
当使用Message Listener container时必须为其提供一个listener用于接收消息。如下是8个用于接收消息的接口
```java
// interface-1
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
// interface-2
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// interface-3
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// interface-4
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// interface-5
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
// interface-6
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
// interface-7
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
// interface-8
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
```
1. interface-1当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理
2. interface-2当使用由容器管理的提交方法时可以使用该接口该接口针对单条消息进行处理
3. interface-3当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理该接口提供对consumer实例的访问
4. interface-4当使用由容器管理的提交方法时可以使用该接口该接口针对单挑消息进行处理该接口提供对consumer实例的访问
5. interface-5当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对`consumer.poll`方法返回的所有消息进行处理;当使用该接口时,不支持`AckMode.RECORD`模式因为所有的poll方法接收到的message batch都给了listener
6. interface-6该当使用容器管理的提交方法时可以使用该接口该接口会处理由poll方法接收到的所有消息
7. interface-7当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对poll方法返回的所有消息进行处理使用该接口时不支持`AckMode.RECORD`使用该接口可以针对consumer实例进行处理
8. interface-8当使用由容器管理的commit method时可以使用该接口该接口针对由poll方法返回的全部消息进行处理该接口提供对consumer实例的访问
> listener接口提供的consumer对象并不是线程安全的故而必须在调用该listener的线程内访问该consumer中的方法
> 在对consumer中的方法进行访问时不应该在listener中执行任何会修改consumer position或commit offset的方法position或offset信息由container来进行管理。
### MessageListenerContainer
spring kafka提供了两种MessageListenerContainer的实现
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
上述两种实现的区别如下:
- KafkaMessageListenerContainer在单个线程中对来源所有topic所有分区的消息进行接收
- ConcurrentMessageListenerContainer则是通过将接收消息的任务委托给多个KafkaMessageListenerContainer实例来实现多线程消费。
#### Interceptors
从2.2.7版本开始可以为listener container添加`RecordInterceptor`拦截器会在container调用listener之前被调用可以通过拦截器来查看或是修改消息内容。如果interceptor在接收到消息或返回为空那么后续listener将不会被调用。
从2.7版本开始RecordInterceptor还增加了方法`afterRecord`该方法在listener退出之后调用正常退出或是抛异常退出。并且从2.7版本开始,还新增了`BatchInterceptor`为Batch Listener提供了类似的拦截器功能ConsumerAwareRecordInterceptor也在提供拦截器功能的基础上提供了对consumer实例的访问。可以通过kafka consumer来访问consumer metrics。
> 如果拦截器对record进行了修改同构创建一个新的record那么topic、partition、offset必须都保持不变从而避免类似消息丢失的副作用
为了调用多个interceptors可以使用`CompositeRecordInterceptor``CompositeBatchInterceptor`
默认情况下从2.8版本开始在使用事务时拦截器会在事务开启之前被调用。可以将listener container的`interceptBeforeTx`属性修改为`false`从而在事务开启之后调用拦截器方法。从2.9版本开始上述会被应用于任何transaction manager而不单单只用于`KafkaAwareTransactionManager`这允许拦截器加入到由container开启的jdbc事务。
从2.3.8、2.4.6版本开始,`ConcurrentMessageListenerContainer`支持Static Membership当concurrency大于1时。而`group.instance.id`的后缀则是`-n``n`从1开始。与此同时增加`session.timeout.ms`可以减少rebalance event的数量例如当应用实例重启时。
#### 使用KafkaMessageListenerContainer
在创建KafkaMessageListenerContainer时可以使用如下构造方法
```java
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
```
上述构造函数接收一个consumerFactory并且通过containerProperties接收topic和partition的信息此外containerProperties中还包含其他配置信息。
containerProperties拥有如下构造方法
```java
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
```
第一个构造方法中接收一个`TopicPartitionOffset`数组通过该数组可以显式的告知container应该使用哪些分区通过调用consumer的assign方法。并且可以选择为TopicPartitionOffset指定初始的offset。当`relativeToCurrent`参数为false时默认为false如果指定offset为正数默认代表绝对offset如果offset为负数默认代表相对于该分区最后位置的偏移量。如果`relativeToCurrent`为true初始offset将会被设置为相对当前的consumer position。当容器启动时设置的offset将会被应用。
第二个构造方法接收了一个topic数组kafka则会根据`group.id`属性来分配分区将分区在组内的订阅了该topic的消费者实例之间进行分配。
第三个构造方法则是接收一个topicPattern通过该pattern来匹配分区。
在创建container时为了将MessageListener分配给container可以使用`ContainerProperties.setMessageListener`。如下展示了添加listener的示例
```java
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
在上述示例中创建DefaultKafkaConsumerFactory实例时其构造方法只接受了一个ConsumerProperties属性代表其会在属性中获取key和value的`Deserializer.class`并创建实例。同时也可以通过调用DefaultKafkaConsumerFactory的其他构造方法接收key和value的Deserializer实例在这种情况下所有consumer都会公用key和value的反序列化实例。另一种选择时提供`Supplier<Deserializer>`这种情况下每个consumer都会使用不同的反序列化器实例。
```java
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
从2.2.1版本开始,提供了一个叫`logContainerConfig`的配置属性当该属性设置为true并且info日志级别开启时每个listener container都会写日志记录其配置。
默认情况下会在debug的日志级别记录topic offset commit。从2.1.2开始,通过`ContainerProperties``commitLogLevel`的属性可以指定commit offset的日志级别。如果要将offset commit的日志级别改为info可以调用`containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)`方法。
从2.2版本开始,添加了`missingTopicsFatal`属性该属性默认值为false。如果在container启动时其配置的任一topic在broker中并不存在那么其将会阻止container启动。如果container被配置为监听pattern那么missingTopicsFatal配置将不会被应用。在未指定missingTopicsFatal时container线程将会循环调用`consumer.poll`方法等待topic出现同时日志输出消息除了日志之外没有任何迹象显示出现了问题。
在2.8版本中,引入了`authExceptionRetryInterval`属性。该属性会导致container在kafkaConsumer得到`AuthenticationException``AuthorizationException`异常后重试fetch消息。该属性用于kafka consumer获取topic消息权限认证失败时在等待`authExceptionRetryInterval`时间间隔后,可以重试获取消息,此时权限可能已经被授予。
> 在默认情况下,没有配置`authExceptionRetryInterval`AuthenticationException和AuthorizationException将被认为是致命的这将会导致messageListenerContainer停止。
#### 使用ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer的构造函数类似于KafkaMessageListenerContainer
```java
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
```
除此之外ConcurrentMessageListenerContainer还有一个concurrency属性该属性代表并行度。如果调用`container.setConcurrency(3)`方法其会创建3个`KafkaMessageListenerContainer`.
对于该构造方法kafka使用组管理功能在消费者实例之间分配分区。
如果container通过TopicPartitonOffset配置时ConcurrentMessageListenerContainer将TopicPartitionOffset实例分配到多个委托的KafkaMessageListenerContainer实例上每个委托实例负责监听不同的TopicPartitionOffset。
如果6个TopicPartitionOffset被提供并且concurrency被设置为3每个委托实例将会被分配2个TopicPartitionOffset。如果5个TopicPartitionOffset被提供并且concurrency被设置为3那么两个委托容器实例将会被分配到2个分区第三个委托容器将被分配到1个分区。如果concurrency数大于TopicPartitionOffset数量那么concurrency将会降低到和TopicPartitionOffset数量相同每个委托容器实例被分配到一个分区。
从1.3版本开始,`MessageListenerContainer`提供了对底层`KafkaConsumer`的metrics信息访问。对于`ConcurrentMessageListenerContainer`其metircs方法将会返回一个map该map中记录了所有委托KafkaMessageListenerContainer实例的metics信息。Map类型为`Map<String, Map<MetricName, ? extends Metric>>`,其中key为底层KafkaConsumer的`client-id`.
从2.3版本开始ContainerProperties提供了`idleBetweenPolls`属性允许listener container在调用`consumer.poll()`方法循环拉取数据时睡眠一段时间。实际的睡眠时间将会取如下两个值中较小的值:`idleBetweenPolls`属性实际配置的值;`max.poll.interval.ms`值减去当前消息批处理时间的差值。
> `max.poll.interval.ms`
>
> 该值代表在使用消费者组管理功能时,两次`poll()`调用之间最长时间间隔。该值代表在调用完poll方法后在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后poll方法仍然未被调用那么该消费者示例将会被认为失败并且消费者组会出发rebalance操作将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的`group.instance.id`不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过`session.timeout.ms`时间后出发rebalance。`group.instance.id`不为空时,该消费者实例将会被认为是该消费者组的静态成员。
>
> 默认情况下,`max.poll.interval.ms`的默认值为5min。
#### Committing Offsets
对于offset提交spring kafka提供了多个选项。如果`enable.auto.commit`属性在consumer properties中被指定为true那么kafka会根据其配置对消息进行自动提交。如果`enable.auto.commit`被指定为false那么MessageListenerContainer支持几种不同的`AckMode`设置。默认的`AckMode`设置为`BATCH`。从2.3版本开始spring framework会自动将`enable.auto.commit`设置为false在2.3之前则是默认设置为true。
`consumer.poll`方法会返回一条或多条`ConsumerRecord`而MessageListener则是会针对每条ConsumerRecord进行调用。如下列举了container针对每种AckMode会执行的行为
- `RECORD`在listener处理完该消息并返回之后提交offset
- `BATCH`当所有被poll方法调用返回的消息都处理完成之后提交offset
- `TIME`当所有被poll方法返回的消息均被处理完成或是从上次commit时起已超过`ackTime`时间
- `COUNT`当所有被poll方法返回的消息都被处理完成或是从上次commit时起已收到超过`ackCount`条记录
- `COUNT_TIME`当TIME或COUNT任一条满足时提交offset
- `MANUAL`messageListener负责对消息调用`Acknowledgment.acknowledge()`方法调用之后语义和BATCH相同会等待所有被poll方法调用返回的消息都处理完成之后会批量提交offset
- `MANUAL_IMMEDIATE`:当方法`Acknowledgment.acknowledge()`被listener调用之后立刻会提交offset
当使用事务时offsets将会被发送给事务并且语义和RECRD或BATCH相同基于listener的类型决定语义看listener是record类型还是batch类型
> `MANUAL`或`MANUAL_IMMEDIATE`需要listener为`AcknowledgingMessageListener `或`BatchAcknowledgingMessageListener`
基于容器的`syncCommits`容器属性提交offset时会使用`commitSync``commitAsync`方法。`syncCommits`属性默认为true可以通过`setSyncCommitTimeout`设置commitSync时的超时时间。在syncCommits设置为false时可以通过`setCommitCallback`来获取commitAsync的结果默认情况下callback是`LoggingCommitCallback`其会日志输出errors并且以debug的级别打印成功日志。
由于listener container拥有自己的提交offset机制故而更推荐将kafka consumr的`ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG`属性设置为false。2.3版本开始会将自动提交属性设置为false除非显式在consumer factory或container的consumer properties中将自动提交属性覆盖。
Acknowledgment接口具有如下方法
```java
public interface Acknowledgment {
void acknowledge();
}
```
该方法能够让container控制何时提交offset。
从2.3版本开始,`Acknowledgment`接口提供了两个额外的方法`nack(long sleep)``nack(int index, long sleep)`。第一个方法和record类型的listener一起使用第二个方法和batch类型的listener一起使用。如果调用类型错误会抛出`IllegalStateException`异常。
- `nack(long sleep)`对当前record执行nack操作丢弃poll请求中的remaining records并针对所有分区进行re-seek操作将position恢复。故而在指定sleep时间范围后record将会被重新传递该操作会阻塞整个message listener的读操作阻塞时间为sleep且阻塞范围不限于单个分区。该操作必须在consumer thread中被调用。
- `nack(int index, long sleep)`对于batch中index位置的record进行nack操作该操作会提交位于index之前record的offset并且会re-seek所有分区故而位于index和index之后的消息都会在sleep时间后被重新传输。该操作必须在consumer thread中被调用。
当想要提交batch中部分消息时请使用`nack(int index, long sleep)`方法。当是使用事务时设置AckMode为MUANAL调用nack方法会将已成被成功处理的records offsets发送给事务。
nack只能在调用listener的consumer线程中进行处理。
在使用nack方法时任何在途的offset都会被提交而从上次poll起的remaining records都会被丢弃。并且在所有分区都会执行seek操作故而当前失败的那条record和所有未处理的record都会在下次poll时重新拉取。该操作和抛异常类似当container中配置了一个DefaultErrorHandler时。
当使用batch listener时若发生异常可以指定record在bathc中的index。调用nack时位于index之前的record offset都会被提交seek操作也会执行在下次poll操作时也会丢失败和被丢弃的消息进行重新拉取。
从3.0.10版本开始batch listener可以提交batch中的部分record offset通过使用`acknowledge(index)`方法。当该方法被调用时位于index的record和位于index之前的records都会被提交offset。在调用部分提交之后再调用`acknowledge()`方法会针对剩余尚未被提交的records进行offset提交。在使用部分提交时必须满足如下要求
- AckMode.MANUAL_IMMEDIATE需要被开启
- 部分提交方法需要在listener thread中被调用
- listener必须批量消费recrods而不是消费单条record
- index必须要位于batch range中
- 重复调用部分提交接口时后调用的index要比先调用的大
上述要求是强制的在不满足时会抛出IllegalArgumentException和IllegalStateException异常。
> 上述调用nack后所说的阻塞均是指调用`KafkaConsumer.pause`方法后造成的结果。在pause方法被调用后任何后续调用的poll方法都不会返回任何records直到`resume`方法被调用。pause方法的调用并不会造成分区的rebalance操作。**调用pause方法并不会造成线程的阻塞而是通过poll获取指定分区消息的阻塞。**
>
> 并且rebalance操作后也不会保留pause/resume状态。
### 手动提交offset
通常,当使用`AckMode.MANUAL``AckMode.MANUAL_IMMEDIATE`ack必须按顺序确认因为kafka并不是为每条record都维护提交状态而是针对每个消费者组/分区只维护一个commit offset。
从2.8版本开始可以设置container属性`asyncAcks`该属性设置后由poll返回的records可以被以任何顺序确认。设置属性后listener container会将无序records可以被按任何顺序确认的提交进行延迟直到接收到缺失的ack。此时消费者会处于paused状态不会有新的records被传递直到上次poll操作中所有的消息都被提交offset。
当启用`asyncAcks`特性后,可以异步的针对消息进行处理(`Acknowledgment.acknowledge()`方法并没有要求只能在listener线程中被调用。但是在对消息进行异步处理的时候如果发生处理失败的情况可能会增加record重复传输的可能性。如果在开启asyncAcks时如果后续消息已经提交ack但是位于前面的消息处理发生异常那么后续成功的record其offset将无法被提交故而后续处理成功的消息将会被重新传递.
> 在处理消息时抛出异常,那么抛出异常的消息和抛出异常之后的消息都会被重复拉取。
并且,当`asyncAcks`启用时,无法使用`nack()`方法来进行nack操作。
### @KafkaListener
`@KafkaListener`会将一个bean方法标记为listener container的listener方法。该bean被封装在一个`MessagingMessageListenerAdapter`对象中该Adapter对象的构造方法如下所示封装了bean对象和bean对象方法
```java
public MessagingMessageListenerAdapter(Object bean, Method method) {
this.bean = bean;
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
}
```
在配置@KafkaListener注解的绝大部分属性时,可以使用`#{}`形式的spel表达式或`${}`形式的属性占位符。
#### Record Listener
@KafkaListener注解为简单pojo的listenr提供了机制,如下展示了使用示例:
```java
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
```
该机制需要在@Configuration类上标注`@EnableKafka`注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的`ConcurrentMessageListenerContainer`。默认情况下需要一个bean name为`kafkaListenerContainerFactory`的bean对象。如下示例展示了如何使用`ConcurrentMessageListenerContainer`
```java
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
```
在上述示例中如果要设置container properties必须要在container factory对象上调用getContainerProperties()然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。
从版本2.1.1开始,对由该注解创建的消费者实例,可以设置`client.id`属性。`client.id``clientIdPrefix-n`格式其中n为整数代表在使用concurrency时的container number。
从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的`concurrency``autoStartup`属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示
```java
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
```
#### 显式分区分配
可以通过@KafkaListener注解为pojo listener配置显式的topic和分区也可以为其指定一个初始offset。如下示例展示了如何为@KafkaListener指定topic和offset
```java
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
```
如上图所示可以在topicPartitions属性的`@TopicPartition的partitions属性`中或是`@TopicPartitionpation的partitionOffsets属性`中指定分区,但是无法同时在上述两处都指定同一个分区。
从2.5.5版本开始可以对所有手动分配的分区都指定一个初始offset
```java
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
```
再上述示例中,`*`通配符代表`partitions`属性锁指定的所有分区。在每个`@TopicPartition`注解中,只能有一个`@PartitionOffset`注解中带有该通配符。
当listener实现了`ConsumerSeekAware`时,`onPartitionAssigned`方法会被调用,即使使用手动分配(assign)。`onPartitionAssigned`允许在分区被分配时执行任意的seek操作。
从2.6.4版本开始,可以指定由`,`分隔的分区列表或分区区间,示例如下:
```java
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
```
该分区范围是闭区间,上述代表的区间是`0,1,2,3,4,5,7,10,11,12,13,14,15`.
上述方式也可以用于指定分区初始的offset
```java
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
```
#### 手动ack
当使用手动ack时可以为listener method提供`Acknowledgment`。如下示例展示了如何使用一个不同的container factory
```java
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
```
#### Consumer Record Metadata
record的元数据可以从message header中进行获取。可以使用如下header名称来获取消息header中的内容
- KafkaHeaders.OFFSET
- KafkaHeaders.RECEIVED_KEY
- KafkaHeaders.RECEIVED_TOPIC
- KafkaHeaders.RECEIVED_PARTITION
- KafkaHeaders.RECEIVED_TIMESTAMP
- KafkaHeaders.TIMESTAMP_TYPE
从2.5版本开始如果接收到的消息key为null那么`RECEIVED_KEY`在header中并不会出现而在2.5版本之前,`RECEIVED_KEY`在header中存在并且值为null。
如下示例展示了如何使用header
```java
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
```
> 参数注解(`@Payload`、`@Header`必须在listener method方法的具体实现上被指定如果参数注解只指定在接口上而没有在具体实现方法上指定那么指定的注解将不会起作用。
从2.5版本开始相比于上述示例中针对单条header内容进行接收可以通过`ConsumerRecordMetadata`类型的参数来接收所有的header数据
```java
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
```
上述`ConsumerRecordMetadata`类型中包含`ConsumerRecord`中所有的数据除了record中的key和value。
#### Batch Listener
从版本1.1开始,可以通过@KafkaListener方法来接收record batch。
为了配置listener container factory支持batch listener需要设置container factory的`batchListener`属性。如下展示了配置示例:
```java
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
```
> 从2.8版本开始,可以通过@KafkaListener属性来覆盖container factory的`batchListener`属性,只需要为@KafkaListener注解指定`batch`属性。
> 从2.9.6版本开始container factory对于`recordMessageConverter`和`batchMessageConverter`有不同的setter。在2.9.6之前的版本中,只有`messageConverter`的setter该converter被同时用于批量和单条场景。
如下示例展示了如何接收list类型的payload
```java
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
```
record batch的topic、分区、offset等信息也可以批量获取如下展示了如何批量获取header
```java
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
```
除了上述方法外,还可以通过`List of Message<?>`来批量接收消息,但是,除了`Acknowledgment``Consumer<?, ?>`外,`List of Message<?>`应该是方法中唯一的参数。如下示例展示了如何使用:
```java
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
```
在上述示例中并不会对payload执行任何转换。
如果当前存在`BatchMessagingMessageConverter``BatchMessagingMessageConverter`通过`RecordMessageConverter`配置),则可以为`Message<?>`添加一个泛型类型payload将会被转化为该类型。
除了上述两种方式外,还可以接收`List<ConsumerRecord<?, ?>>`类型的参数但是其必须为listener method的唯一参数`Acknowledgment`类型和`Consumer<?, ?>`类型参数除外)。如下实例展示了如何使用:
```java
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
```
从2.2版本开始listener可以接收由poll方法返回的完整的`ConsumerRecords<?, ?>`对象该record中封装了一个record的list。从而允许listener访问额外的方法例如`partitions()`方法返回TopicPartition对象的集合。方法的使用示例如下
```java
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
```
#### 注解属性
从2.0版本开始,`id`属性被用作kafka consumer group的`group.id`属性其会覆盖consumer factory中配置的属性。如果不想使用该行为可以显式的另外设置`groupId`属性;或是将`idIsGroup`属性设置为false此时`group.id`仍然会使用consumer factory中配置的属性。
对于@KafkaListener注解可以在绝大多数属性中使用spel表达式和placeholder
```java
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
```
从2.1.2版本开始spel表达式支持`__listener`。该伪bean name代表当前该注解所位于的bean。
例如如下示例:
```java
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
```
如果在项目中,已经存在了名为`__listener`的bean实例那么可以通过`beanRef`属性来对属性表达式进行修改。如下示例展示了如何使用`beanRef`
```java
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
```
从版本2.2.4开始,可以直接通过@KafkaListener注解指定consumer属性通过注解指定的consumer属性会覆盖在consumer factory中指定的属性。
> 但是,无法通过`@KafkaListener`注解的上述方法来指定`group.id`和`client.id`属性,通过上述方法指定这两个属性时,指定会被忽略;需要使用`groupId`和`clientIdPrefix`注解属性来指定consumer属性中的`group.id`和`client.id`属性
在通过@KafkaListener来指定consumer属性时,通过独立的字符串按照`foo:bar``foo bar``foo=bar`的格式进行指定,示例如下所示:
```java
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
```
### 获取consumer的`group.id`
在不同container中运行相同listener代码时需要识别该消息来源于哪个container通过`group.id`标识)。
在需要获取container的groupId时可以在listener线程中调用`KafkaUtils.getConsumerGroupId()`方法;也可以在方法参数中访问`group.id`属性:
```java
@KafkaListener(id = "id", topicPattern = "someTopic")
public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
```
### container线程命名
框架通过一个TaskExecutor来调用consumer和listener。可以在container properties中通过设置`consumerExecutor`属性来设置一个自定义的executor。当使用pooled executor时需要确保线程池中含有足够的线程来处理来自所有container的并发。当使用`ConcurrentMessageListenerContainer`executor中的线程会被所有consumerconcurrency使用。
如果没有为container提供consumer executor那么会为每个container提供`SimpleAsyncTaskExecutor`
> SimpleAsyncTaskExecutor会为所有的task都开启一个新线程故而该类型的executor并不会对线程进行重用。
>
> SimpleAsyncTaskExecutor在创建线程时executor创建线程的名称按照`<beanname>-C-<n>`的格式。对于`ConcurrentMessageListenerContainer``<beanname>`部分将会被换成`<beanname>-m`m代表消费者实例。当每次container启动时`<n>`都会增加。
>
> 故而对于name为`container`的bean对象其executor中的线程名称在container第一次启动后为`container-0-C-1`、`container-1-C-1`n均为1m为0和1.当containerr再次启动时线程名称则是会变为`container-0-C-2`、`container-1-C-2 etc`。
从3.0.1版本开始,可以修改线程的名称。如果将`AbstractMessageListenerContainer.changeConsumerThreadName`属性设置为true将会调用`AbstractMessageListenerContainer.threadNameSupplier`来获取线程名称。
### @KafkaListener用作元注解
从2.2版本开始,可以将@KafkaListener用作元注解,如下展示了使用示例:
```java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
```
在将@KafkaListener用作元注解时,必须在自定义注解中针对`topics``topicPattern``topicPartitions`中的至少一个进行alias操作。
如下展示了如何使用自定义注解:
```java
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
```
### 在类级别使用@KafkaListener
当在类级别使用@KafkaListener时,必须要在方法级别指定@KafkaHandler。当消息被传递到时消息payload转化为的类型将会用于决定调用哪个handler方法。示例如下所示
```java
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
```
从2.1.3版本开始,可以指定一个@KafkaHandler方法作为默认方法当类型匹配不到其他handler时默认handler方法将会被调用。并且最多只有有一个方法被指定为默认handler方法。
当使用@KafkaHandler时必须要求payload已经被转化为目标对象类型如此匹配才能进行。
基于spring解析方法参数的限制default kafkahandler无法接收header中的单条内容其只能通过`ConsumerRecordMetadata`类型来获取header中的内容。
例如如下方式在Object类型为String时不起作用
```java
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
```
如果object类型为String那么topic也会指向object而不会获取到header中的topic内容。
如果要在default handler method中访问元数据需要按如下方式
```java
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
```
### @KafkaListener属性修改
从2.7.2版本开始可以在container创建之前通过编码的方式来修改注解属性。为了实现改功能可以在spring context中添加一个或者多个`KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer``AnnotationEnhancer`是个`BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>`类型的二元函数返回一个Map<String,Object>类型的值。属性值可能会包含spel表达式或者properties placeholder而enhancer则是在spel表达式和属性占位符被解析之前调用。如果当前spring容器存在多个enhancer且enhancer实现了Ordered接口那么enhancer将会按照顺序被调用
```java
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
```
`AnnotationEnhancer`bean定义必须要被声明为static该bean在spring context生命周期中非常早期的时间点被需要。
### @KafkaListener生命周期管理
为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为`KafkaListenerEndpointRegistry`类型的bean对象中该bean对象会自动被spring framework创建并且管理listener container的生命周期。kafkaListenerEndpointRegistry对象会启动所有`autoStartup`属性设置为true的container。所有container factory都在同一阶段创建所有container。通过registry可以通过编程的方式来管理container的生命周期。对registry执行start或stop操作将会对registry中所有的container都执行start或stop操作。同时也可以通过container的id属性来获取单独的container对象。
可以通过@KafkaListener注解来设置container的autoStartup属性通过注解指定的autoStartup值将会覆盖在container factory中指定的autoStartup属性。
如果要通过registry对象来管理注册的container可以通过bean注入的方式来获取registry
```java
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
/**
* 通过bean注入来获取registry
*
**/
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
```
regisry只维护其管理container的生命周期如果以bean形式声明的container其并不由registry进行管理而是可以从spring容器中获取。可以调用registry对象的`getListenerContainers`方法来获取其管理的container集合。
从2.2.5版本开始registry新增了一个新的方法`getAllListenerContainers()`通过该方法可以获取所有的container集合集合中包括由registry管理的container和以bean对象形式生命的container。该返回集合中将会包含任何prototype的已初始化bean对象但是集合中不会对懒加载的bean对象进行加载操作。
endpoint将会在spring容器被refreshed之后被注册到registry中并且endpoint将会立马被启动不论其autoStartup属性值是什么。
### @KafkaListener @Payload校验
从2.2版本开始,可以更加方便的为@KafkaListener @Payload参数添加validator。在之前的版本中必须要自定义配置一个`DefaultMessageHandlerMethodFactory`并且将其添加到registrar中。现在可以为registrar中添加validator。如下代码展示了使用示例
```java
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
```
当使用spring boot并且存在有validation的starter时`LocalValidatorFactoryBean`将会被自动装配,装配逻辑和如下代码类似:
```java
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
```
如下代码示例展示了如果通过validator校验payload
```java
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
```
```java
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
```
从3.1版本开始,可以在`ErrorHandlingDeserializer`上执行validation操作。
### Rebalancing Listener
`ContainerProperties`中存在一个`consumerRebalanceListener`属性,该属性会接收一个`ConsumerRebalanceListener`接口的实现。如果该属性没有被提供那么container将会自动装配一个logging listener该listener将会将rebalance event打印到日志中日志级别为`info`。spring kafka框架还提供了了一个`ConsumerAwareRebalanceListener`接口,如下展示了接口的定义:
```java
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
```
`ConsumerRebalanceListener`接口不同的是,`ConsumerAwareRebalanceListener`针对revoke存在两个回调方法`beforeCommit``afterCommit`。第一次将会马上被调用第二次则是会在所有阻塞的offset都提交后再调用。如果想要再外部系统中维护offset这将非常有用
```java
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
```
### 强制触发consumer rebalance
kafka client目前支持触发enforced rebalance。从3.1.2版本开始spring kafka支持通过message listener container来调用kafka consumer的enforce rebalance api。当调用该api时其会提醒kafka consumer触发一个enforced rebalance实际rebalance将会作为下一次poll操作的一部分。当正在发生rebalance时调用该api将不会触发任何操作。调用方必须等待当前rebalance操作完成之后再调用触发另一个rebalance。
如下示例展示了如何触发一个enforced rebalance
```java
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
```
再上述代码中,应用通过使用`KafkaListenerEndpointRegistry`来访问message listener container并调用MessageListenerContainer的enforceRebalance方法。当调用container的enforceRebalance方法时其会委托调用底层consumer的enforceRebalance方法。而consumer则是会在下次poll操作时触发rebalance。
### 通过@SendTo注解发送listener结果
从2.0版本开始,如果将@SendTo和@KafkaListener一起使用,并且被注解的方法存在返回值,那么方法的返回值将会被发送给@SendTo注解中指定的topic
`@SendTo`值可以存在如下几种格式:
- `@SendTo("someTopic")`:返回值将会被发送给`someTopic`主题
- `@SendTo("#{someExpression}")`返回值将会被发送到表达式计算出的topic`表达式将会在应用上下文初始化时被计算`
- `@SendTo("!{someExpression}")`返回值将会被发送到表达式计算出的topic`表达式将会在运行时被计算,并且表达式的`#root`对象有3个属性
- `request`该方法接收到的ConsumerRecord在batch listener时request代表ConsumerRecords
- `source`request转化为的`org.springframework.messaging.Message<?>`
- `result`:该方法的返回值
- `@SendTo`:当没有为@SendTo注解指定value时value默认会被当做`!{source.headers['kafka_replyTopic']}`
从2.1.11和2.2.1开始属性占位符property placeholder也会被@SendTo解析
@SendTo注解指定的表达式其返回值必须为一个String该String代表topic name。如下展示了使用@SendTo的不同方式
```java
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
```
> 为了支持使用@SendTolistener container factory必须被提供KafkaTemplate通过`replyTemplate`属性指定该template将会被用于发送消息。当使用spring boot时会自动将template注入到container factory中。
从2.2版本开始可以向listener container factory中添加`ReplyHeadersConfigurer`通过其可以设置接收消息中的哪些header可以被拷贝到返回消息的header中使用示例如下所示
```java
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
```
如果在拷贝header外还想为reply message指定额外的header可以通过如下方式来实现
```java
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
```
在additionHeaders方法返回的map中如果有key和已经存在的header key重复那么map中的key-value将会覆盖现存header
在使用@SendTo时,必须为`ConcurrentKafkaListenerContainerFactory`配置一个`KafkaTemplate`,需要配置的属性为`replyTemplate`。spring boot将会将自动装配的template注入到replyTemplate属性中。
可以将@SendTo添加到无返回值的方法上通过指定errorHandler属性当且仅当@SendTo注解的方法发生异常时将异常消息作为messaage发送给特定topic使用示例如下
```java
@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
throw new RuntimeException("fail");
}
@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
return (m, e) -> {
return ... // some information about the failure and input data
};
}
```
### 过滤消息
在某些特定场景下例如发生rebalance已经被处理过的消息将会被重复传递。对此spring kafka提供了类`FilteringMessageListenerAdapter`,其可以对`MessageListener`进行包装。FilteringMessageListenerAdapter类接收一个`RecordFilterStrategy`的实现,`RecordFilterStrategy.filter`方法为一个predicate接收一条消息并返回boolean。若predicate返回为true则代表该消息是重复传递的应当被丢弃。
`FilteringMessageListenerAdapter`类存在一个名为`ackDiscarded`的属性若该属性被设置为true代表adapter也会针对被丢弃的消息执行ack操作。
当使用`@KafkaListener`时,若针对`KafkaListenerContainerFactory`设置`RecordFilterStrategy``ackDiscarded`属性listener将会被包装在特定的adapter中。
同样的spring kafka也提供了`FilteringBatchMessageListenerAdapter`类,用于批量消息的过滤。
从2.8.4开始,可以通过`@KafkaListener`注解的filter属性来覆盖container factory的`RecordFilterStrategy`属性。
```java
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
```
### 通过KafkaTemplate来接收消息
从2.8版本开始kafka template有4个方法用于接收消息
```java
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
```
## 动态创建container
spring kafka提供了一些方式用于在运行时动态的创建container。
### MessageListener实现
如果实现了自己的MessageListener那么可以通过container factory来为listener创建container
```java
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
```
### Prototype Beans
如果将bean对象scope声明为prototype那么被`@KafkaListener`注解的方法其对应container将会被动态的创建。
使用实例如下所示:
```java
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
```
> 每个listener都有对应的containerlistener必须含有唯一的idid为该listener对应container的唯一标识符。从2.8.9版本开始,`KafkaListenerEndpointRegistry`包含一个名为`unregisterListenerContainer(String id)`的新方法允许来重用id。对container执行unregister操作并不会stop容器如果需要对容器执行stop操作需要手动对container调用stop方法。
## Topic/Partition初始offset
存在如下方式为partition设置初始offset。
当手动对分区进行assign时可以手动设置其初始offset通过指定TopicPartitionOffset参数。同时可以在任何时间调用`seek`方法来将设置到指定的offset。
当使用group management时分区将会由broker来进行分配
- 对于新的`group.id`新的消费者组initial offset将会由`auto.offset.reset`属性来决定earliest或者latest
- 对于已经存在的`group.id`当前消费者组已存在initial offset将会是当前group的offset。但是可以在初始化时通过seek将offset设置到指定位置
## seek到指定offset
如果想要在listener中执行seek操作Listener必须实现`ConsumerSeekAware`接口,该接口含有如下方法:
```java
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
```
在container启动或是partition被分配时`registerSeekCallback`方法都会被调用用于向listener中注册ConsumerSeekCallback对象。从而在初始化之后可以通过该callback执行seek操作。
> callback用于操作被分配的topic和partition故而当partition被重新分配时registerSeekCallback都会被重新调用。
应该在listener中对该callback进行存储如果多个container共同使用同一listener或是使用ConcurrentMessageListenerContainer那么应该将callback存储在ThreadLocal中或是以`Thread`为key的数据结构中。
当使用group manangement时`onPartitionsAssigned`将会在partition被分配时被调用。可以使用onPartitionsAssigned接口来进行分区初始化offset设置。同时也可以使用`onPartitionsAssigned`接口将thread对应的callback和线程被分配的分区一个线程对应一个consumer实例对应起来。在onPartitionsAssigned被调用时必须使用该方法接收到的callback而非是registerSeekCallback获取到的callback。
`onPartitionRevoked`在container stop或kafka撤销分区分配时被调用。在分配被取消分配后应该将该thread对应的callback丢弃并且移除任何指向该分区的引用。
`ConsumerSeekCallback`则是含有如下方法:
```java
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
```
`seekRelative`是在2.3版本加入的用于执行relative seek操作
- 当offset为负并且toCurrent参数为false时代表seek到相对分区尾部的偏移量
- 当offset为正并且toCurrent为false时代表seek到相对分区起始位置的偏移量
- 当offset为负并且toCurrent为true时代表seek到相对当前offset的位置rewind
- offset为正并且toCurrent为true时代表seek到相对当前offset的位置fast forward
在2.3版本中seekToTimestamp方法也被加入。
如下是一个如何使用callback的示例
```java
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
```
为了简化实现从2.3版本开始,加入了`AbstractConsumerSeekAware`该类保存了针对某个topic/partition应该使用哪个callback。如下示例展示了使用示例
```java
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
```
从2.6版本开始,为抽象类添加了如下方便起见的方法:
- seekToBeginning() : 对所有被分配的partition执行seek操作把offset重置到开始位置
- seekToEnd() 对所有被分配的分区都执行seekToEnd操作
- seekToTimestamp(long timestamp) 将所有被分配的分区offset都重置到timestamp所代表的位置
```java
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
```
## Container Factory
和之前在`@KafkaListener`中提到的一样,对于被`@KafkaListener`注解的方法,`ConcurrentKafkaListenerContainerFactory`将会被用于创建@KafkaListener方法对应的container
从2.2版本开始可以使用相同的factory来创建任何ConcurrentMessageListenerContainer。一旦container创建后可以进一步改变container的属性大多数属性都可以通过`container.getContainerProperties()`来进行修改。如下示例配置了`ConcurrentMessageListenerContainer`
```java
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
```
> 通过上述@Bean方式创建的container并不会被加入到endpoint registry中而是会作为bean对象被注入到spring容器中。
>
> 而通过@KafkaListener创建的container则是并不会被注册为bean对象而是统一通过endpoint registry进行管理。
从2.3.4版本开始可以添加为factory添加`ContainerCustomizer`在factory已经被配置后进一步的对container进行自定义设置。
```java
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
```
## 线程安全
当使用concurrent message listener container时单个listener将会在所有consumer thread中被调用。故而listener需要是线程安全的为了保证listener的线程安全更倾向于使用无状态的listenerstateless。如果无法将listener变为线程安全的或是通过synchronization进行同步会大幅降性能可以使用如下的技术之一
- 使用n个container每个container的concurrency为1并且container中的MessageListener其scope为prototype故而每个container都独占一个listener在使用@KafkaListener时,该方案无法实现)
- 将状态保存在ThreadLocal中
为了促进线程状态的清理从2.2版本开始每个线程退出时listener container将会发布`ConsumerStoppedEvent`事件。可以通过`ApplicationListener`或是`@EventListener`来消费该事件,从而对`ThreadLocal`对象调用remove方法来清除线程状态。
> 默认情况下spring将会在事件触发的线程中调用event listener。如果将multicaster改为async executor那么清理操作将没有作用。
## 监控
### 监控listener性能
从2.3版本开始如果在classpath中检测到`Micrometer`并且spring容器中只有一个`MeterRegistry`实例listener container会为listener自动创建和更新Micrometer timer。如果想要禁用micrometer timer可以将ContainerProperties中的`micrometerEnabled`设置为false。
container会为listener维护两个timer一个记录成功的调用一个记录失败的调用。
timer根据`spring.kafka.listener`命名并且含有如下的tag
- name : (container bean name)
- result success or failure两个timer分开统计
- exception none or ListenerExecutionFailedException
可以使用`ContainerProperties``micrometerTags`属性来添加额外的tag。
从2.9.8、3.0.6版本开始,可以为`ContainerProperties`中的`micrometerTagsProvider`属性中提供一个方法,改方法获取`ConsumerRecord<?, ?>`并且基于record返回tags并且将tag与`micrometerTags`属性中的任意static tags合并。
### 监控KafkaTemplate性能
从2.5版本开始如果在classpath中检测到`Micrometer`并且spring容器中只有一个`MeterRegistry`实例template会自动的创建并且更新`Micrometer Timer`。如果想要禁用micrometer timer可以将template的`micrometerEnabled`设置为false。
同样会为template维护两个timer一个记录成功的调用一个记录失败的调用。
timer根据`spring.kafka.template`命名并且含有如下tag
- name : (template bean name)
- result success or failure两个timer分开统计
- exception none or ListenerExecutionFailedException
### Micrometer Native Metrics
从2.5版本开始spring kafka框架提供了`Factory Listeners`用于管理Micrometer `KafkaClientMetrics`实例用于监听producer或consumer的创建或关闭。
用于启用该属性只需要为producer或consumer factory添加listener即可
```java
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
```
consumer/producer id将会作为`spring.id`被传递到tag中。
获取metrics的示例如下
```java
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
```
## 事务
spring kafka通过如下方式添加了对事务的支持
- KafkaTransactionManager和spring transaction支持类似@Transaction、TransactionTemplate
- 使用Transactional KafkaMessageListenerContainer
- kafkaTemplate支持本地事务
- 和其他transactionManager同步
当为DefaultKafkaProducerFactory提供了`transactionPrefixId`属性时事务是默认启用的。在指定了事务前缀id的情况下DefaultKafkaProducerFactory并不是只维护一个transactional producer而是维护了一个transactional producers cache。当在producer上调用close时producer将会被归还到cache中并在后续操作中被重用。对于每个producer来说`transaction.id`属性为`transactionPrefixId + n`n从0开始并且对每个producer依次递增。
> 如果当前服务存在多个实例,那么各个实例的`transactionPrefixId`属性必须都不同。
对于使用了springboot的程序只需要为producer factory设置`spring.kafka.producer.transaction-id-prefix`属性即可spring boot会自动装配`KafkaTransactionManager`bean 对象并将其注入到listener container。
### 使用KafkaTransactionManager
`KafkaTransactionManager``PlatformTransactionManager`的一个实现其构造器中提供了一个到producer factory的引用参数。如果为该参数指定了自定义的producer factory其必须支持事务。
可以将KafkaTransactionManager和spring transaction support结合使用如果事务被启用那么在事务范围内的KafkaTemplate操作将会使用transactional producer。取决事务执行成功或者失败transaction manager将会被事务进行提交或回滚。KafkaTemplate必须和ProducerFactory使用相同的事务管理器。
### 事务同步
本节只涉及到由producer发起的事务不包含由listener container发起的事务
如果想要在发送消息到kafka的同时执行一些数据库更新操作可以使用正常的spring transaction management例如DataSourceTransactionManager。
```java
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
```
@Transactional注解的拦截器将会开启一个事务并且KafkaTemplate将会与该transaction manager同步一个事务每次发送record的操作都会加入到该事务。当方法退出时database transaction提交之后kafka transaction才会被提交。
如果想要让kafka事务先被提交应该嵌套使用@Transactional注解。外部方法的@Transactional其transactionManager属性应该被配置为DataSourceTransactionManager,内部方法的@Transactional注解其transactionManager属性被配置为kafkaTransactionManager
### 使用由Consumer发起的事务
从2.7版本开始,`ChainedKafkaTransactionManager`被废弃。container中会通过KafkaTransactionMananger来启用一个事务并且在listener method上可以加上@Transactional来开启其他非kafka事务
在spring boot中listener container会自动注入kafkaTransactionManager。listener container会开启kafka transaction并且可以通过@Transactional注解来开启db transaction。
db事务会在kafka事务之前提交如果kafka事务提交失败那么该recrod会被重新传递故而数据库操作必须是幂等的。
使用示例如下:
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
}
@Bean
public DataSourceTransactionManager dstm(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Component
public static class Listener {
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(id = "group1", topics = "topic1")
@Transactional("dstm")
public void listen1(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}
@KafkaListener(id = "group2", topics = "topic2")
public void listen2(String in) {
System.out.println(in);
}
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic1").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic2").build();
}
}
```
```properties
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.producer.transaction-id-prefix=tx-
#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
```
对于只存在于producer中的transaction可以使用transaction synchronize
```java
@Transactional("dstm")
public void someMethod(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}
```
kafkaTemplate将会将kafka transaction和db transaction进行同步并且kafka事务在db事务之后才执行commit或rollback。
如果想要先提交kafka transaction再提交db transaction可以使用嵌套@Transactional,示例如下所示:
```java
@Transactional("dstm")
public void someMethod(String in) {
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
sendToKafka(in);
}
@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
}
```
### KafkaTemplate本地事务
可以通过kafkaTemplate在local transaction中执行一系列操作。如下为使用示例
```java
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
```
上述示例中callback的参数代表template本身。如果该callback正常退出那么事务则执行commit操作如果callback抛出异常那么transaction将会回滚。
> 如果在调用executeInTransaction时已经存在KafkaTransactionManager事务或synchronized事务那么kafkaTransactionManager事务不会被使用而会使用由executeInTransaction方法开启的事务
### KafkaTemplate事务发布和非事务发布
一般来说当KafkaTemplate是事务的producer factory配置了transactionIdPrefix那么提交record的操作应该是事务的。开启事务可以通过transactionManager、@Transactional方法executeInTransaction调用或由listener container开启。任何在事务范围内使用template的行为将会导致抛出`IllegalStateException`。从2.4.3版本开始可以设置template的`allowNonTransactional`属性从而允许kafkaTemplate执行非事务操作。
在allowNonTransactional属性被设置为true时将会调用producer container的createNonTransactionalProducer方法创建非事务producer该nontransactional producer创建后也会被缓存或与线程相绑定从而进行重用。
### 事务结合BatchListener使用
当listener在事务存在时执行失败rollback触发之后会调用AfterRollbackProcessor来执行一些操作。在recordListener使用默认的AfterRollbackProcessor时会执行seek操作故而失败的消息将会被重新传递。
在使用batch listener时如果处理失败那么整个batch中的消息都会被重新传递因为framwork不知道在batch中具体那条record处理失败。