Files
rikako-note/spring/spring kafka/spring kafka.md

1262 lines
72 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spring Kafka
## 连接到kafka
### 运行时切换bootstrap servers
从2.5版本开始KafkaAdmin、ProducerFactory、ConsumerFactory都继承于`KafkaResourceFactory`抽象类。通过调用`KafkaResourceFactory`抽象类的`setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)`方法可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。
> #### 切换bootstrap后关闭旧consumer和producer
> kafka consumer和producer通常都是基于长连接的在调用setBootstrapServersSupplier在运行时切换bootstrap servers后如果想要关闭现存的producer可以调用`DefaultKafkaProducerFactory`的`reset`方法。如果想要关闭现存的consumer可以调用`KafkaListenerEndpointRegistry`的`close`方法调用close后再调用start或是调用其他listener container的close和start方法。
#### ABSwitchCluster
为了方便起见framework提供了`ABSwitchCluster`该类支持两套bootstrap servers集合在任一时刻只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier\<String\>接口,将`ABSwitchCluster`对象提供给consumer factory, producer factory, KafkaAdmin后如果想要切换bootstrap servers可以调用ABSwitchCluster类的`primary``secondary`方法并关闭生产者和消费者的旧实例关闭生产者旧实例在producer factory上调用reset方法用于创建到新bootstrap servers的连接对于消费者实例可以对所有listener container先调用close方法再调用start方法当使用@KafkaListener注解时,需要对`KafkaListenerEndpointRegistry`bean对象调用close和start方法。
### Factory Listener
从2.5版本开始,`DefaultKafkaProducerFactory``DefaultKafkaConsumerFactory`都可以配置Listener通过配置Listener可以监听生产者或消费者实例的创建和关闭。
```java
// producer listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
```
```java
// consumer listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
```
再上述接口中id代表再factory bean对象名称后追加client-id属性二者通过`.`分隔。
## 配置Topic
如果在当前应用上下文中定义了KafkaAdmin bean对象kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加可以定义一个`NewTopic`类型的bean对象kafkaAdmin会自动将该topic添加到broker中。
为了方便topic的创建2.3版本中引入了TopicBuilder类。
```java
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
```
从2.6版本开始创建NewTopic时可以省略partitions()和replicas()方法的调用此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。
```java
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
```
从版本2.7开始,可以在`KafkaAdmin.NewTopics`的bean对象中声明多个NewTopic对象
```java
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
```
> 当使用spring boot时KafkaAdmin对象将会被自动注册故而只需要定义NewTopic bean对象即可。
默认情况下如果kafka broker不可用会输出日志进行记录但是此时context的载入还会继续后续可以手动调用KafkaAdmin的`initalize`方法和进行重试。如果想要在kafka broker不可用时停止context的载入可以将kafka Admin`fatalIfBrokerNotAvailable`属性设置为true此时context会初始化失败。
从版本2.7开始KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic
- `createOrModifyTopics`
- `describeTopics`
从版本2.9.10、3.0.9开始KafkaAdmin提供了`setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)`接口该接口接收一个Predicate\<NewTopic\>参数通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象每个kafkaAdmin对应不同的broker集群在上下文中含有多个NewTopic对象时可以通过predicate判断每个topic应该属性哪个amdin。
## 发送消息
KafkaTemplate类对KafkaProducer进行了包装提供了如下接口用于向kafka topic发送消息。
```java
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
```
其中sendDefault接口需要向KafkaTemplate提供一个默认的topic。
kafkaTemplate中部分api接收timestamp作为参数并且将timestamp存储到record中。接口中指定的timestamp参数如何存储取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为`CREATE_TIME`那么用户指定的timestamp参数将会被使用如果用户没有指定timestamp那么会自动创建timestampproducer会在发送时将timestamp指定为System.currentTimeMillis()。如果topic中timstamp类型被配置为`LOG_APPEND_TIME`那么用户指定的timestamp将会被丢弃而broker则会负责为timestamp赋值。
mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法execute接口则是提供了对底层KafkaProducer的直接访问。
要使用KafkaTemplate可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate
```java
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
```
从2.5开始创建KafkaTemplate时可以基于factory进行创建但是覆盖factory中的配置属性具体示例如下
```java
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
```
当使用KafkaTemplate接收`Message\<?\>`类型的参数时可以将topic、partition、key和timestamp参数指定在Message的header中header中包含如下条目
- KafkaHeaders.TOPIC
- KafkaHeaders.PARTITION
- KafkaHeaders.KEY
- KafkaHeaders.TIMESTAMP
除了调用发送方法获取CompletableFuture外还可以为KafkaTemplate配置一个ProducerListener从而在消息发送完成成功或失败后执行一个异步的回调。如下是ProducerListener接口的定义
```java
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
```
默认情况下KafkaTemplate配置了一个LoggingProducerListener会在发送失败时打印失败日志在发送成功时并不做任何事。并且为了方便起见方法的默认实现已经被提供可以只覆盖其中一个方法。
send方法默认返回的是CompletableFuture类型可以在发送完成之后为future注册一个回调
```java
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
```
其中Throwable类型的ex可以被转化为`KafkaProducerException`该类型的failedProducerRecord属性可以获取发送失败的record。
如果想要同步调用KafkaTemplate的发送方法并且等待返回结果可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下调用`CompletableFuture.get`推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便KafkaTemplate提供了带autoFlush参数的构造器版本如果设置autoFlush为truekafkaTemplate在每次发送消息时都会调用flush方法。
### 发送示例
如下展示了通过KafkaTemplate向broker发送消息的示例
```java
// async
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
```
```java
// sync
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
```
### RoutingKafkaTemplate
从2.5版本开始额可以通过RoutingKafkaTemplate在运行时选择producer实例选择过程基于topic名称。
> RoutingKafkaTemplate不支持事务也不支持execute、flush、metrics等方法因为RoutingKafkaTemplate根据topic来选择producer但是在执行这些操作时并不知道操作所属topic。
RoutingKafkaTemplate需要一个mapmap的key为`java.util.regex.Pattern`而value则是`ProducerFactory<Object, Object>`实例。该map必须是有序的例如LinkedHashMap该map需要按顺序遍历顺序在前的key-value对会优先匹配。
如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息实例中每个topic都使用不同的序列化方式。
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
```
### 使用DefaultKafkaProducerFactory
ProducerFactory是用于创建生产者实例的。当没有使用事务时默认情况下`DefaultKafkaFactory`会创建一个单例的生产者实例所有客户端都会使用生产者实例。但是如果在template中调用了flush方法这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始DefaultKafkaFactory有了新的`producerPerThread`属性当该属性设置为true时factory会针对每个线程都创建并缓存一个producer实例。
> 当`producerPerThread`被设置为true时若线程中的producer不再被需要那么对factory必须手动调用`closeThreadBoundProducer()`。这将会物理上对producer进行关闭并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。
当创建DefaultKafkaFactory时key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时可以选择向构造函数中传入serializer实例或是传入serializer supplier对象
- 当传入serializer实例时通过该factory创建的所有生产者实例都共享该serializer实例
- 当传入的是返回一个serializer的supplier时可令通过该factory创建的producer实例都拥有属于自己的serializer
如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例
```java
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
```
从2.5.10版本开始可以在factory创建之后再更新factory的producer config属性。例如可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例故而需要调用factory的reset方法在调用reset后所有现存producer实例都会被关闭而之后新创建的producer都会使用新的属性配置。
> 在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。
为了更新producer属性配置factory提供了如下两个接口
```java
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
```
### ReplyingKafkaTemplate
从2.1.3版本开始kafka引入了ReplyingKafkaTemplate其是KafkaTemplate的一个子类用于提供request/reply语义。该类相比父类含有两个额外的方法
```java
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
```
该方法的返回类型RequestReplyFuture继承了CompletableFutureRequestReplyFuture会异步的注入该future的结果可能正常返回也可能是一个exception或者timeout
RequestReplyFuture含有一个sendFuture属性该属性是调用kafkaTemplate的send方法发送消息的结果类型为`CompletableFuture<SendResult<K,V>>`可以通过该属性future来判断发送消息操作的结果。
如果在调用sendAndReceive方法时没有传递replyTimeout参数或是指定replyTimeout参数为null那么该template的`defaultReplyTimeout`属性将会被用作超时时间。默认情况下该超时属性为5s。
从2.8.8版本开始该template还有一个`waitForAssingment`方法。当reply container被配置为`auto.offset.reset=latest`时waitForAssingment方法相当有用避免当reply container尚未初始化完成时发送消息对应的reply已经返回了。
如下展示了如何使用ReplyingKafkaTemplate:
```java
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
```
在上述示例中采用了spring自动注入的containerFactory来创建reply container。
#### ReplyingKafkaTemplate header
在使用ReplyingKafkaHeader时template会为消息设置一个headerKafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。
server端返回correlation id的示例如下
```java
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
```
上述@KafkaListener结构会返回correlation id并且决定返回消息发送的topic。
并且ReplyingKafkaTemplate会使用headerKafkaHeaders.REPLY_TOPIC来代表返回消息发送到的topic。
从版本2.2开始该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset该topic或topicpartitionoffset将会被用于设置reply header但是如果reply container被配置监听多个topic或topicpartitionoffset用户必须自己手动设置header。
为消息设置header的示例如下
```java
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
```
若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时可以多个template共享一个reply topic只要每个template监听的分区不同。
若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时那么每个template实例都必须拥有不同的group id在这种情况下所有的template实例都会接受到所有消息但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便但是所有的template都接收所有消息将会带来额外的网络通信开销并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时推荐将template中`sharedReplyTopic`属性设置为true此时将会把收到不想要消息的日志级别从ERROR降低为DEBUG。
如下是使用共享topic配置template的示例
```java
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
```
默认情况下ReplyingKafkaTemplate将会使用如下三种header
- KafkaHeaders.CORRELATION_ID用于关联发送消息和回复消息的关联id
- KafkaHeaders.REPLY_TOPIC用于告诉接收消息的server将消息发送到哪个topic
- KafkaHeaders.REPLY_PARTITION该header是可选的可以通过该header告知server将消息发送到指定的分区
上述header将会被@KafkaListener结构用作返回消息的路由
#### 通过Message<?>来发送请求和返回请求
在2.7版本中对ReplyingKafkaTemplate加入了如下接口来发送和接收`spring-messaging`中的`Message<?>`
```java
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
```
上述接口会使用template默认的replyTimeout也存在接收timeout参数的重载版本。
如下示例展示了如何构建Message类型消息的发送和接收
```java
// template 配置
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
// 通过template发送和接收消息
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
// server端接收消息并且回复消息
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
```
从2.5版本开始若是返回的Message中没有指定headerframework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC如果存在的话。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS也会将其填充到返回消息的header中。
> #### ErrorHandlingDeserializer
> 可以考虑在reply container中使用ErrorHandlingDeserializer如果反序列化失败RequestReplyFuture将会以异常状态完成可以访问获取到的ExecutionException其cause属性中包含DeserializationException。
### kafka poison pill & ErrorHandlingDeserializer
poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败不管重试过多少次之后仍然无法成功被消费。
poison pill可能在如下场景下产生
- 该记录被损坏
- 该记录发序列化失败
在生产场景中consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。
在现实场景中可能因为如下缘故而遭遇poison pill
- 生产者改变了key或value的serializer并且持续向先前的topic中发送消息这将会导致反序列化问题
- consumer的key或value deserializer配置错误
- 不同的生产者实例使用不同的key或value serializer向topic中发送消息
在发生poison后consumer在调用poll拉取数据时将无法反序列化record调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理针对该topic分区的消费会被阻塞因为consumer offset一直无法向前移动。并且在consumer不停重试针对该消息的反序列化时大量的反序列化失败日志将会被追加到日志文件中磁盘占用量将会急剧增大。
#### ErrorHandlingDeserializer
为了解决poison pill问题spring引入了ErrorHandlingDeserializer该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败那么ErrorHandlingDeserializer将会返回一个null并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。
#### replyErrorChecker
从版本2.6.7开始可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker当提供了checker方法时template会自动调用该方法如果该方法抛出异常那么该reply message对应的future也会以失败状态完成。
replychecker使用示例如下
```java
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
```
### AggregatingReplyingKafkaTemplate
ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景但如果对于发送一条消息存在多个接收方会返回多条消息的场景则需要使用AggregatingReplyingKafkaTemplate。
像ReplyingKafkaTemplate一样AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener containerlistener container用于接收返回的消息。除此之外AggregatingReplyingKafkaTemplate还会接收第三个参数`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`该断言每次在接收到新消息时都会被计算。如果断言返回true`AggregatingReplyingKafkaTemplate.sendAndReceive`方法返回的Future对象将会被完成并且Future中的值为断言中ConsumerRecord的集合。
从版本2.3.5开始,第三个参数为`BiPredicate<List<ConsumerRecord<K,v>>, Boolean>`类型其会在每次接收到消息或是超时replyTimeout超时的情况下被调用第二个参数传入的boolean即是断言的这次调用是否因为超时。**该断言可以针对ConsumerRecord进行修改**。
> #### returnPartialOnTimeout
> AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout该属性值默认为false如果该值被设置为true那么当请求发生超时时会返回已经接收到的部分ConsumerRecord集合。
>
> BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息要想成功在超时场景下返回接收到的部分消息不仅需要returnPartialOnTimeout设置为true还需要BiPredicate断言在发生timeout的情况下返回值为true。
AggregatingReplyingKafkaTemplate使用示例如下
```java
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
(coll, timeout) -> timeout || coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
```
注意sendAndReceive方法返回的future其值类型为`ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`类型外层的ConsumerRecord并不是真正的返回消息而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的外层ConsumerRecord用于存储实际接收到的消息集合。
> #### ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>
> `ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>`作为聚合返回消息集合的伪ConsumerRecord其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal releasereleaseStrategy返回为true伪ConsumerRecord的topic name为`aggregatedResults`.
>
> 当获取该伪ConsumerRecord的原因是timeoutreturnPartialOnTimeout被设置为true并且发生timeout并且至少获取到一条返回消息那么伪ConsumerRecord的topic name将会被设置为`partialResultsAfterTimeout`.
template为上述伪topic name提供了静态变量
```java
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
```
伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。
#### AggregatingReplyingKafkaTemplate配置要求
listener container必须要配置为`AckMode.MANUAL`模式或`AckMode.MANUAL_IMMEDIATE`consumer属性`enable.auto.commit`必须被设置伪false。为了避免任何丢失消息的可能template只会在没有待处理请求的前提下提交offset即最后一个未处理请求被releaseStrategy释放。
当consumer发生rebalance时可能会造成返回消息被重复传递由于template只会在没有待处理请求的情况下提交offset如果在listener container接收到消息后尚未提交offset此时发生rebalance那么未提交offset的消息将会被重复接收。对于在途的请求消息的重复传递将会被忽略在途的消息还会存在多条消息聚合的过程针对已经被releaseStrategy释放的返回消息如果接收到多条重复的返回消息那么会在log中看到error日志。
另外如果AggregatingReplyingKafkaTemplate使用`ErrorHandlingDeserializer`那么template将不会自动检测到反序列化异常。因为`ErrorHandlingDeserializer`在反序列化失败时会返回null并且在返回的header中记录反序列化异常信息。推荐在应用中调用`ReplyingKafkaTemplate.checkDeserialization()`方法来判断是否存在反序列化异常。
对于AggregatingReplyingKafkaTemplate`replyErrorChecker`也不会自动调用,需要针对每个元素手动调用`checkForErrors`方法。
## 接收消息
在使用spring kafka时可以通过为MessageListenerContainer提供listener或是通过@KafkaListener注解来从kafka broker接收消息。
### Message Listener
当使用Message Listener container时必须为其提供一个listener用于接收消息。如下是8个用于接收消息的接口
```java
// interface-1
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
// interface-2
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// interface-3
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// interface-4
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// interface-5
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
// interface-6
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
// interface-7
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
// interface-8
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
```
1. interface-1当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理
2. interface-2当使用由容器管理的提交方法时可以使用该接口该接口针对单条消息进行处理
3. interface-3当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理该接口提供对consumer实例的访问
4. interface-4当使用由容器管理的提交方法时可以使用该接口该接口针对单挑消息进行处理该接口提供对consumer实例的访问
5. interface-5当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对`consumer.poll`方法返回的所有消息进行处理;当使用该接口时,不支持`AckMode.RECORD`模式因为所有的poll方法接收到的message batch都给了listener
6. interface-6该当使用容器管理的提交方法时可以使用该接口该接口会处理由poll方法接收到的所有消息
7. interface-7当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对poll方法返回的所有消息进行处理使用该接口时不支持`AckMode.RECORD`使用该接口可以针对consumer实例进行处理
8. interface-8当使用由容器管理的commit method时可以使用该接口该接口针对由poll方法返回的全部消息进行处理该接口提供对consumer实例的访问
> listener接口提供的consumer对象并不是线程安全的故而必须在调用该listener的线程内访问该consumer中的方法
> 在对consumer中的方法进行访问时不应该在listener中执行任何会修改consumer position或commit offset的方法position或offset信息由container来进行管理。
### MessageListenerContainer
spring kafka提供了两种MessageListenerContainer的实现
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
上述两种实现的区别如下:
- KafkaMessageListenerContainer在单个线程中对来源所有topic所有分区的消息进行接收
- ConcurrentMessageListenerContainer则是通过将接收消息的任务委托给多个KafkaMessageListenerContainer实例来实现多线程消费。
#### Interceptors
从2.2.7版本开始可以为listener container添加`RecordInterceptor`拦截器会在container调用listener之前被调用可以通过拦截器来查看或是修改消息内容。如果interceptor在接收到消息或返回为空那么后续listener将不会被调用。
从2.7版本开始RecordInterceptor还增加了方法`afterRecord`该方法在listener退出之后调用正常退出或是抛异常退出。并且从2.7版本开始,还新增了`BatchInterceptor`为Batch Listener提供了类似的拦截器功能ConsumerAwareRecordInterceptor也在提供拦截器功能的基础上提供了对consumer实例的访问。可以通过kafka consumer来访问consumer metrics。
> 如果拦截器对record进行了修改同构创建一个新的record那么topic、partition、offset必须都保持不变从而避免类似消息丢失的副作用
为了调用多个interceptors可以使用`CompositeRecordInterceptor``CompositeBatchInterceptor`
默认情况下从2.8版本开始在使用事务时拦截器会在事务开启之前被调用。可以将listener container的`interceptBeforeTx`属性修改为`false`从而在事务开启之后调用拦截器方法。从2.9版本开始上述会被应用于任何transaction manager而不单单只用于`KafkaAwareTransactionManager`这允许拦截器加入到由container开启的jdbc事务。
从2.3.8、2.4.6版本开始,`ConcurrentMessageListenerContainer`支持Static Membership当concurrency大于1时。而`group.instance.id`的后缀则是`-n``n`从1开始。与此同时增加`session.timeout.ms`可以减少rebalance event的数量例如当应用实例重启时。
#### 使用KafkaMessageListenerContainer
在创建KafkaMessageListenerContainer时可以使用如下构造方法
```java
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
```
上述构造函数接收一个consumerFactory并且通过containerProperties接收topic和partition的信息此外containerProperties中还包含其他配置信息。
containerProperties拥有如下构造方法
```java
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
```
第一个构造方法中接收一个`TopicPartitionOffset`数组通过该数组可以显式的告知container应该使用哪些分区通过调用consumer的assign方法。并且可以选择为TopicPartitionOffset指定初始的offset。当`relativeToCurrent`参数为false时默认为false如果指定offset为正数默认代表绝对offset如果offset为负数默认代表相对于该分区最后位置的偏移量。如果`relativeToCurrent`为true初始offset将会被设置为相对当前的consumer position。当容器启动时设置的offset将会被应用。
第二个构造方法接收了一个topic数组kafka则会根据`group.id`属性来分配分区将分区在组内的订阅了该topic的消费者实例之间进行分配。
第三个构造方法则是接收一个topicPattern通过该pattern来匹配分区。
在创建container时为了将MessageListener分配给container可以使用`ContainerProperties.setMessageListener`。如下展示了添加listener的示例
```java
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
在上述示例中创建DefaultKafkaConsumerFactory实例时其构造方法只接受了一个ConsumerProperties属性代表其会在属性中获取key和value的`Deserializer.class`并创建实例。同时也可以通过调用DefaultKafkaConsumerFactory的其他构造方法接收key和value的Deserializer实例在这种情况下所有consumer都会公用key和value的反序列化实例。另一种选择时提供`Supplier<Deserializer>`这种情况下每个consumer都会使用不同的反序列化器实例。
```java
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
从2.2.1版本开始,提供了一个叫`logContainerConfig`的配置属性当该属性设置为true并且info日志级别开启时每个listener container都会写日志记录其配置。
默认情况下会在debug的日志级别记录topic offset commit。从2.1.2开始,通过`ContainerProperties``commitLogLevel`的属性可以指定commit offset的日志级别。如果要将offset commit的日志级别改为info可以调用`containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)`方法。
从2.2版本开始,添加了`missingTopicsFatal`属性该属性默认值为false。如果在container启动时其配置的任一topic在broker中并不存在那么其将会阻止container启动。如果container被配置为监听pattern那么missingTopicsFatal配置将不会被应用。在未指定missingTopicsFatal时container线程将会循环调用`consumer.poll`方法等待topic出现同时日志输出消息除了日志之外没有任何迹象显示出现了问题。
在2.8版本中,引入了`authExceptionRetryInterval`属性。该属性会导致container在kafkaConsumer得到`AuthenticationException``AuthorizationException`异常后重试fetch消息。该属性用于kafka consumer获取topic消息权限认证失败时在等待`authExceptionRetryInterval`时间间隔后,可以重试获取消息,此时权限可能已经被授予。
> 在默认情况下,没有配置`authExceptionRetryInterval`AuthenticationException和AuthorizationException将被认为是致命的这将会导致messageListenerContainer停止。
#### 使用ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer的构造函数类似于KafkaMessageListenerContainer
```java
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
```
除此之外ConcurrentMessageListenerContainer还有一个concurrency属性该属性代表并行度。如果调用`container.setConcurrency(3)`方法其会创建3个`KafkaMessageListenerContainer`.
对于该构造方法kafka使用组管理功能在消费者实例之间分配分区。
如果container通过TopicPartitonOffset配置时ConcurrentMessageListenerContainer将TopicPartitionOffset实例分配到多个委托的KafkaMessageListenerContainer实例上每个委托实例负责监听不同的TopicPartitionOffset。
如果6个TopicPartitionOffset被提供并且concurrency被设置为3每个委托实例将会被分配2个TopicPartitionOffset。如果5个TopicPartitionOffset被提供并且concurrency被设置为3那么两个委托容器实例将会被分配到2个分区第三个委托容器将被分配到1个分区。如果concurrency数大于TopicPartitionOffset数量那么concurrency将会降低到和TopicPartitionOffset数量相同每个委托容器实例被分配到一个分区。
从1.3版本开始,`MessageListenerContainer`提供了对底层`KafkaConsumer`的metrics信息访问。对于`ConcurrentMessageListenerContainer`其metircs方法将会返回一个map该map中记录了所有委托KafkaMessageListenerContainer实例的metics信息。Map类型为`Map<String, Map<MetricName, ? extends Metric>>`,其中key为底层KafkaConsumer的`client-id`.
从2.3版本开始ContainerProperties提供了`idleBetweenPolls`属性允许listener container在调用`consumer.poll()`方法循环拉取数据时睡眠一段时间。实际的睡眠时间将会取如下两个值中较小的值:`idleBetweenPolls`属性实际配置的值;`max.poll.interval.ms`值减去当前消息批处理时间的差值。
> `max.poll.interval.ms`
>
> 该值代表在使用消费者组管理功能时,两次`poll()`调用之间最长时间间隔。该值代表在调用完poll方法后在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后poll方法仍然未被调用那么该消费者示例将会被认为失败并且消费者组会出发rebalance操作将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的`group.instance.id`不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过`session.timeout.ms`时间后出发rebalance。`group.instance.id`不为空时,该消费者实例将会被认为是该消费者组的静态成员。
>
> 默认情况下,`max.poll.interval.ms`的默认值为5min。
#### Committing Offsets
对于offset提交spring kafka提供了多个选项。如果`enable.auto.commit`属性在consumer properties中被指定为true那么kafka会根据其配置对消息进行自动提交。如果`enable.auto.commit`被指定为false那么MessageListenerContainer支持几种不同的`AckMode`设置。默认的`AckMode`设置为`BATCH`。从2.3版本开始spring framework会自动将`enable.auto.commit`设置为false在2.3之前则是默认设置为true。
`consumer.poll`方法会返回一条或多条`ConsumerRecord`而MessageListener则是会针对每条ConsumerRecord进行调用。如下列举了container针对每种AckMode会执行的行为
- `RECORD`在listener处理完该消息并返回之后提交offset
- `BATCH`当所有被poll方法调用返回的消息都处理完成之后提交offset
- `TIME`当所有被poll方法返回的消息均被处理完成或是从上次commit时起已超过`ackTime`时间
- `COUNT`当所有被poll方法返回的消息都被处理完成或是从上次commit时起已收到超过`ackCount`条记录
- `COUNT_TIME`当TIME或COUNT任一条满足时提交offset
- `MANUAL`messageListener负责对消息调用`Acknowledgment.acknowledge()`方法调用之后语义和BATCH相同会等待所有被poll方法调用返回的消息都处理完成之后会批量提交offset
- `MANUAL_IMMEDIATE`:当方法`Acknowledgment.acknowledge()`被listener调用之后立刻会提交offset
当使用事务时offsets将会被发送给事务并且语义和RECRD或BATCH相同基于listener的类型决定语义看listener是record类型还是batch类型
> `MANUAL`或`MANUAL_IMMEDIATE`需要listener为`AcknowledgingMessageListener `或`BatchAcknowledgingMessageListener`
基于容器的`syncCommits`容器属性提交offset时会使用`commitSync``commitAsync`方法。`syncCommits`属性默认为true可以通过`setSyncCommitTimeout`设置commitSync时的超时时间。在syncCommits设置为false时可以通过`setCommitCallback`来获取commitAsync的结果默认情况下callback是`LoggingCommitCallback`其会日志输出errors并且以debug的级别打印成功日志。
由于listener container拥有自己的提交offset机制故而更推荐将kafka consumr的`ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG`属性设置为false。2.3版本开始会将自动提交属性设置为false除非显式在consumer factory或container的consumer properties中将自动提交属性覆盖。
Acknowledgment接口具有如下方法
```java
public interface Acknowledgment {
void acknowledge();
}
```
该方法能够让container控制何时提交offset。
从2.3版本开始,`Acknowledgment`接口提供了两个额外的方法`nack(long sleep)``nack(int index, long sleep)`。第一个方法和record类型的listener一起使用第二个方法和batch类型的listener一起使用。如果调用类型错误会抛出`IllegalStateException`异常。
- `nack(long sleep)`对当前record执行nack操作丢弃poll请求中的remaining records并针对所有分区进行re-seek操作将position恢复。故而在指定sleep时间范围后record将会被重新传递该操作会阻塞整个message listener的读操作阻塞时间为sleep且阻塞范围不限于单个分区。该操作必须在consumer thread中被调用。
- `nack(int index, long sleep)`对于batch中index位置的record进行nack操作该操作会提交位于index之前record的offset并且会re-seek所有分区故而位于index和index之后的消息都会在sleep时间后被重新传输。该操作必须在consumer thread中被调用。
当想要提交batch中部分消息时请使用`nack(int index, long sleep)`方法。当是使用事务时设置AckMode为MUANAL调用nack方法会将已成被成功处理的records offsets发送给事务。
nack只能在调用listener的consumer线程中进行处理。
在使用nack方法时任何在途的offset都会被提交而从上次poll起的remaining records都会被丢弃。并且在所有分区都会执行seek操作故而当前失败的那条record和所有未处理的record都会在下次poll时重新拉取。该操作和抛异常类似当container中配置了一个DefaultErrorHandler时。
当使用batch listener时若发生异常可以指定record在bathc中的index。调用nack时位于index之前的record offset都会被提交seek操作也会执行在下次poll操作时也会丢失败和被丢弃的消息进行重新拉取。
从3.0.10版本开始batch listener可以提交batch中的部分record offset通过使用`acknowledge(index)`方法。当该方法被调用时位于index的record和位于index之前的records都会被提交offset。在调用部分提交之后再调用`acknowledge()`方法会针对剩余尚未被提交的records进行offset提交。在使用部分提交时必须满足如下要求
- AckMode.MANUAL_IMMEDIATE需要被开启
- 部分提交方法需要在listener thread中被调用
- listener必须批量消费recrods而不是消费单条record
- index必须要位于batch range中
- 重复调用部分提交接口时后调用的index要比先调用的大
上述要求是强制的在不满足时会抛出IllegalArgumentException和IllegalStateException异常。
> 上述调用nack后所说的阻塞均是指调用`KafkaConsumer.pause`方法后造成的结果。在pause方法被调用后任何后续调用的poll方法都不会返回任何records直到`resume`方法被调用。pause方法的调用并不会造成分区的rebalance操作。**调用pause方法并不会造成线程的阻塞而是通过poll获取指定分区消息的阻塞。**
>
> 并且rebalance操作后也不会保留pause/resume状态。
### 手动提交offset
通常,当使用`AckMode.MANUAL``AckMode.MANUAL_IMMEDIATE`ack必须按顺序确认因为kafka并不是为每条record都维护提交状态而是针对每个消费者组/分区只维护一个commit offset。
从2.8版本开始可以设置container属性`asyncAcks`该属性设置后由poll返回的records可以被以任何顺序确认。设置属性后listener container会将无序records可以被按任何顺序确认的提交进行延迟直到接收到缺失的ack。此时消费者会处于paused状态不会有新的records被传递直到上次poll操作中所有的消息都被提交offset。
当启用`asyncAcks`特性后,可以异步的针对消息进行处理(`Acknowledgment.acknowledge()`方法并没有要求只能在listener线程中被调用。但是在对消息进行异步处理的时候如果发生处理失败的情况可能会增加record重复传输的可能性。如果在开启asyncAcks时如果后续消息已经提交ack但是位于前面的消息处理发生异常那么后续成功的record其offset将无法被提交故而后续处理成功的消息将会被重新传递.
> 在处理消息时抛出异常,那么抛出异常的消息和抛出异常之后的消息都会被重复拉取。
并且,当`asyncAcks`启用时,无法使用`nack()`方法来进行nack操作。
### @KafkaListener
`@KafkaListener`会将一个bean方法标记为listener container的listener方法。该bean被封装在一个`MessagingMessageListenerAdapter`对象中该Adapter对象的构造方法如下所示封装了bean对象和bean对象方法
```java
public MessagingMessageListenerAdapter(Object bean, Method method) {
this.bean = bean;
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
}
```
在配置@KafkaListener注解的绝大部分属性时,可以使用`#{}`形式的spel表达式或`${}`形式的属性占位符。
#### Record Listener
@KafkaListener注解为简单pojo的listenr提供了机制,如下展示了使用示例:
```java
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
```
该机制需要在@Configuration类上标注`@EnableKafka`注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的`ConcurrentMessageListenerContainer`。默认情况下需要一个bean name为`kafkaListenerContainerFactory`的bean对象。如下示例展示了如何使用`ConcurrentMessageListenerContainer`
```java
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
```
在上述示例中如果要设置container properties必须要在container factory对象上调用getContainerProperties()然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。
从版本2.1.1开始,对由该注解创建的消费者实例,可以设置`client.id`属性。`client.id``clientIdPrefix-n`格式其中n为整数代表在使用concurrency时的container number。
从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的`concurrency``autoStartup`属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示
```java
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
```
#### 显式分区分配
可以通过@KafkaListener注解为pojo listener配置显式的topic和分区也可以为其指定一个初始offset。如下示例展示了如何为@KafkaListener指定topic和offset
```java
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
```
如上图所示可以在topicPartitions属性的`@TopicPartition的partitions属性`中或是`@TopicPartitionpation的partitionOffsets属性`中指定分区,但是无法同时在上述两处都指定同一个分区。
从2.5.5版本开始可以对所有手动分配的分区都指定一个初始offset
```java
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
```
再上述示例中,`*`通配符代表`partitions`属性锁指定的所有分区。在每个`@TopicPartition`注解中,只能有一个`@PartitionOffset`注解中带有该通配符。
当listener实现了`ConsumerSeekAware`时,`onPartitionAssigned`方法会被调用,即使使用手动分配(assign)。`onPartitionAssigned`允许在分区被分配时执行任意的seek操作。
从2.6.4版本开始,可以指定由`,`分隔的分区列表或分区区间,示例如下:
```java
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
```
该分区范围是闭区间,上述代表的区间是`0,1,2,3,4,5,7,10,11,12,13,14,15`.
上述方式也可以用于指定分区初始的offset
```java
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
```
#### 手动ack
当使用手动ack时可以为listener method提供`Acknowledgment`。如下示例展示了如何使用一个不同的container factory
```java
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
```
#### Consumer Record Metadata
record的元数据可以从message header中进行获取。可以使用如下header名称来获取消息header中的内容
- KafkaHeaders.OFFSET
- KafkaHeaders.RECEIVED_KEY
- KafkaHeaders.RECEIVED_TOPIC
- KafkaHeaders.RECEIVED_PARTITION
- KafkaHeaders.RECEIVED_TIMESTAMP
- KafkaHeaders.TIMESTAMP_TYPE
从2.5版本开始如果接收到的消息key为null那么`RECEIVED_KEY`在header中并不会出现而在2.5版本之前,`RECEIVED_KEY`在header中存在并且值为null。
如下示例展示了如何使用header
```java
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
```
> 参数注解(`@Payload`、`@Header`必须在listener method方法的具体实现上被指定如果参数注解只指定在接口上而没有在具体实现方法上指定那么指定的注解将不会起作用。
从2.5版本开始相比于上述示例中针对单条header内容进行接收可以通过`ConsumerRecordMetadata`类型的参数来接收所有的header数据
```java
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
```
上述`ConsumerRecordMetadata`类型中包含`ConsumerRecord`中所有的数据除了record中的key和value。
#### Batch Listener
从版本1.1开始,可以通过@KafkaListener方法来接收record batch。
为了配置listener container factory支持batch listener需要设置container factory的`batchListener`属性。如下展示了配置示例:
```java
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
```
> 从2.8版本开始,可以通过@KafkaListener属性来覆盖container factory的`batchListener`属性,只需要为@KafkaListener注解指定`batch`属性。
> 从2.9.6版本开始container factory对于`recordMessageConverter`和`batchMessageConverter`有不同的setter。在2.9.6之前的版本中,只有`messageConverter`的setter该converter被同时用于批量和单条场景。
如下示例展示了如何接收list类型的payload
```java
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
```
record batch的topic、分区、offset等信息也可以批量获取如下展示了如何批量获取header
```java
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
```
除了上述方法外,还可以通过`List of Message<?>`来批量接收消息,但是,除了`Acknowledgment``Consumer<?, ?>`外,`List of Message<?>`应该是方法中唯一的参数。如下示例展示了如何使用:
```java
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
```
在上述示例中并不会对payload执行任何转换。
如果当前存在`BatchMessagingMessageConverter``BatchMessagingMessageConverter`通过`RecordMessageConverter`配置),则可以为`Message<?>`添加一个泛型类型payload将会被转化为该类型。
除了上述两种方式外,还可以接收`List<ConsumerRecord<?, ?>>`类型的参数但是其必须为listener method的唯一参数`Acknowledgment`类型和`Consumer<?, ?>`类型参数除外)。如下实例展示了如何使用:
```java
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
```
从2.2版本开始listener可以接收由poll方法返回的完整的`ConsumerRecords<?, ?>`对象该record中封装了一个record的list。从而允许listener访问额外的方法例如`partitions()`方法返回TopicPartition对象的集合。方法的使用示例如下
```java
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
```
#### 注解属性
从2.0版本开始,`id`属性被用作kafka consumer group的`group.id`属性其会覆盖consumer factory中配置的属性。如果不想使用该行为可以显式的另外设置`groupId`属性;或是将`idIsGroup`属性设置为false此时`group.id`仍然会使用consumer factory中配置的属性。
对于@KafkaListener注解可以在绝大多数属性中使用spel表达式和placeholder
```java
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
```
从2.1.2版本开始spel表达式支持`__listener`。该伪bean name代表当前该注解所位于的bean。
例如如下示例:
```java
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
```
如果在项目中,已经存在了名为`__listener`的bean实例那么可以通过`beanRef`属性来对属性表达式进行修改。如下示例展示了如何使用`beanRef`
```java
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
```
从版本2.2.4开始,可以直接通过@KafkaListener注解指定consumer属性通过注解指定的consumer属性会覆盖在consumer factory中指定的属性。
> 但是,无法通过`@KafkaListener`注解的上述方法来指定`group.id`和`client.id`属性,通过上述方法指定这两个属性时,指定会被忽略;需要使用`groupId`和`clientIdPrefix`注解属性来指定consumer属性中的`group.id`和`client.id`属性
在通过@KafkaListener来指定consumer属性时,通过独立的字符串按照`foo:bar``foo bar``foo=bar`的格式进行指定,示例如下所示:
```java
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
```
### 获取consumer的`group.id`
在不同container中运行相同listener代码时需要识别该消息来源于哪个container通过`group.id`标识)。
在需要获取container的groupId时可以在listener线程中调用`KafkaUtils.getConsumerGroupId()`方法;也可以在方法参数中访问`group.id`属性:
```java
@KafkaListener(id = "id", topicPattern = "someTopic")
public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
```
### container线程命名
框架通过一个TaskExecutor来调用consumer和listener。可以在container properties中通过设置`consumerExecutor`属性来设置一个自定义的executor。当使用pooled executor时需要确保线程池中含有足够的线程来处理来自所有container的并发。当使用`ConcurrentMessageListenerContainer`executor中的线程会被所有consumerconcurrency使用。
如果没有为container提供consumer executor那么会为每个container提供`SimpleAsyncTaskExecutor`
> SimpleAsyncTaskExecutor会为所有的task都开启一个新线程故而该类型的executor并不会对线程进行重用。