64 KiB
Spring Kafka
连接到kafka
运行时切换bootstrap servers
从2.5版本开始,KafkaAdmin、ProducerFactory、ConsumerFactory都继承于KafkaResourceFactory抽象类。通过调用KafkaResourceFactory抽象类的setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)方法,可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。
切换bootstrap后关闭旧consumer和producer
kafka consumer和producer通常都是基于长连接的,在调用setBootstrapServersSupplier在运行时切换bootstrap servers后,如果想要关闭现存的producer,可以调用
DefaultKafkaProducerFactory的reset方法。如果想要关闭现存的consumer,可以调用KafkaListenerEndpointRegistry的close方法(调用close后再调用start),或是调用其他listener container的close和start方法。
ABSwitchCluster
为了方便起见,framework提供了ABSwitchCluster类,该类支持两套bootstrap servers集合,在任一时刻,只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier<String>接口,将ABSwitchCluster对象提供给consumer factory, producer factory, KafkaAdmin后,如果想要切换bootstrap servers,可以调用ABSwitchCluster类的primary和secondary方法,并关闭生产者和消费者的旧实例(关闭生产者旧实例,在producer factory上调用reset方法,用于创建到新bootstrap servers的连接;对于消费者实例,可以对所有listener container先调用close方法再调用start方法,当使用@KafkaListener注解时,需要对KafkaListenerEndpointRegistrybean对象调用close和start方法。
Factory Listener
从2.5版本开始,DefaultKafkaProducerFactory和DefaultKafkaConsumerFactory都可以配置Listener,通过配置Listener可以监听生产者或消费者实例的创建和关闭。
// producer listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
// consumer listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
再上述接口中,id代表再factory bean对象名称后追加client-id属性,二者通过.分隔。
配置Topic
如果在当前应用上下文中定义了KafkaAdmin bean对象,kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加,可以定义一个NewTopic类型的bean对象,kafkaAdmin会自动将该topic添加到broker中。
为了方便topic的创建,2.3版本中引入了TopicBuilder类。
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
从2.6版本开始,创建NewTopic时可以省略partitions()和replicas()方法的调用,此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
从版本2.7开始,可以在KafkaAdmin.NewTopics的bean对象中声明多个NewTopic对象:
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
当使用spring boot时,KafkaAdmin对象将会被自动注册,故而只需要定义NewTopic bean对象即可。
默认情况下,如果kafka broker不可用,会输出日志进行记录,但是此时context的载入还会继续,后续可以手动调用KafkaAdmin的initalize方法和进行重试。如果想要在kafka broker不可用时,停止context的载入,可以将kafka AdminfatalIfBrokerNotAvailable属性设置为true,此时context会初始化失败。
从版本2.7开始,KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic:
createOrModifyTopicsdescribeTopics
从版本2.9.10、3.0.9开始,KafkaAdmin提供了setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)接口,该接口接收一个Predicate<NewTopic>参数,通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象,每个kafkaAdmin对应不同的broker集群,在上下文中含有多个NewTopic对象时,可以通过predicate判断每个topic应该属性哪个amdin。
发送消息
KafkaTemplate类对KafkaProducer进行了包装,提供了如下接口用于向kafka topic发送消息。
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
其中,sendDefault接口需要向KafkaTemplate提供一个默认的topic。
kafkaTemplate中部分api接收timestamp作为参数,并且将timestamp存储到record中。接口中指定的timestamp参数如何存储,取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为CREATE_TIME,那么用户指定的timestamp参数将会被使用(如果用户没有指定timestamp,那么会自动创建timestamp,producer会在发送时将timestamp指定为System.currentTimeMillis())。如果topic中timstamp类型被配置为LOG_APPEND_TIME,那么用户指定的timestamp将会被丢弃,而broker则会负责为timestamp赋值。
mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法,execute接口则是提供了对底层KafkaProducer的直接访问。
要使用KafkaTemplate,可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从2.5开始,创建KafkaTemplate时可以基于factory进行创建,但是覆盖factory中的配置属性,具体示例如下:
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
当使用KafkaTemplate接收Message\<?\>类型的参数时,可以将topic、partition、key和timestamp参数指定在Message的header中,header中包含如下条目:
- KafkaHeaders.TOPIC
- KafkaHeaders.PARTITION
- KafkaHeaders.KEY
- KafkaHeaders.TIMESTAMP
除了调用发送方法获取CompletableFuture外,还可以为KafkaTemplate配置一个ProducerListener,从而在消息发送完成(成功或失败)后执行一个异步的回调。如下是ProducerListener接口的定义:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,KafkaTemplate配置了一个LoggingProducerListener,会在发送失败时打印失败日志,在发送成功时并不做任何事。并且为了方便起见,方法的默认实现已经被提供,可以只覆盖其中一个方法。
send方法默认返回的是CompletableFuture类型,可以在发送完成之后为future注册一个回调:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
其中,Throwable类型的ex可以被转化为KafkaProducerException,该类型的failedProducerRecord属性可以获取发送失败的record。
如果想要同步调用KafkaTemplate的发送方法并且等待返回结果,可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下,调用CompletableFuture.get时,推荐使用带超时参数的方法。如果在Producer配置中指定了linger.ms,那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便,KafkaTemplate提供了带autoFlush参数的构造器版本,如果设置autoFlush为true,kafkaTemplate在每次发送消息时都会调用flush方法。
发送示例
如下展示了通过KafkaTemplate向broker发送消息的示例:
// async
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
// sync
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
RoutingKafkaTemplate
从2.5版本开始,额可以通过RoutingKafkaTemplate在运行时选择producer实例,选择过程基于topic名称。
RoutingKafkaTemplate不支持事务,也不支持execute、flush、metrics等方法,因为RoutingKafkaTemplate根据topic来选择producer,但是在执行这些操作时并不知道操作所属topic。
RoutingKafkaTemplate需要一个map,map的key为java.util.regex.Pattern,而value则是ProducerFactory<Object, Object>实例。该map必须是有序的(例如LinkedHashMap),该map需要按顺序遍历,顺序在前的key-value对会优先匹配。
如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息,实例中每个topic都使用不同的序列化方式。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
使用DefaultKafkaProducerFactory
ProducerFactory是用于创建生产者实例的。当没有使用事务时,默认情况下,DefaultKafkaFactory会创建一个单例的生产者实例,所有客户端都会使用生产者实例。但是,如果在template中调用了flush方法,这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始,DefaultKafkaFactory有了新的producerPerThread属性,当该属性设置为true时,factory会针对每个线程都创建并缓存一个producer实例。
当
producerPerThread被设置为true时,若线程中的producer不再被需要,那么对factory必须手动调用closeThreadBoundProducer()。这将会物理上对producer进行关闭,并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。
当创建DefaultKafkaFactory时,key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时,可以选择向构造函数中传入serializer实例或是传入serializer supplier对象:
- 当传入serializer实例时,通过该factory创建的所有生产者实例都共享该serializer实例
- 当传入的是返回一个serializer的supplier时,可令通过该factory创建的producer实例都拥有属于自己的serializer
如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从2.5.10版本开始,可以在factory创建之后再更新factory的producer config属性。例如,可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例,故而需要调用factory的reset方法,在调用reset后所有现存producer实例都会被关闭,而之后新创建的producer都会使用新的属性配置。
在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。
为了更新producer属性配置,factory提供了如下两个接口:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
ReplyingKafkaTemplate
从2.1.3版本开始,kafka引入了ReplyingKafkaTemplate,其是KafkaTemplate的一个子类,用于提供request/reply语义。该类相比父类含有两个额外的方法:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
该方法的返回类型RequestReplyFuture继承了CompletableFuture,RequestReplyFuture会异步的注入该future的结果(可能正常返回,也可能是一个exception或者timeout)。
RequestReplyFuture含有一个sendFuture属性,该属性是调用kafkaTemplate的send方法发送消息的结果,类型为CompletableFuture<SendResult<K,V>>,可以通过该属性future来判断发送消息操作的结果。
如果在调用sendAndReceive方法时没有传递replyTimeout参数,或是指定replyTimeout参数为null,那么该template的defaultReplyTimeout属性将会被用作超时时间。默认情况下,该超时属性为5s。
从2.8.8版本开始,该template还有一个waitForAssingment方法。当reply container被配置为auto.offset.reset=latest时waitForAssingment方法相当有用,避免当reply container尚未初始化完成时,发送消息对应的reply已经返回了。
如下展示了如何使用ReplyingKafkaTemplate:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
在上述示例中采用了spring自动注入的containerFactory来创建reply container。
ReplyingKafkaTemplate header
在使用ReplyingKafkaHeader时,template会为消息设置一个header(KafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。
server端返回correlation id的示例如下:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
上述@KafkaListener结构会返回correlation id,并且决定返回消息发送的topic。
并且,ReplyingKafkaTemplate会使用header(KafkaHeaders.REPLY_TOPIC)来代表返回消息发送到的topic。
从版本2.2开始,该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset,该topic或topicpartitionoffset将会被用于设置reply header;但是如果reply container被配置监听多个topic或topicpartitionoffset,用户必须自己手动设置header。
为消息设置header的示例如下:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时,可以多个template共享一个reply topic,只要每个template监听的分区不同。
若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时,那么每个template实例都必须拥有不同的group id,在这种情况下,所有的template实例都会接受到所有消息,但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便,但是所有的template都接收所有消息将会带来额外的网络通信开销,并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时,推荐将template中sharedReplyTopic属性设置为true,此时,将会把收到不想要消息的日志级别从ERROR降低为DEBUG。
如下是使用共享topic配置template的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
默认情况下,ReplyingKafkaTemplate将会使用如下三种header:
- KafkaHeaders.CORRELATION_ID:用于关联发送消息和回复消息的关联id
- KafkaHeaders.REPLY_TOPIC:用于告诉接收消息的server将消息发送到哪个topic
- KafkaHeaders.REPLY_PARTITION:该header是可选的,可以通过该header告知server将消息发送到指定的分区
上述header将会被@KafkaListener结构用作返回消息的路由。
通过Message来发送请求和返回请求
在2.7版本中,对ReplyingKafkaTemplate加入了如下接口来发送和接收spring-messaging中的Message<?>:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
上述接口会使用template默认的replyTimeout,也存在接收timeout参数的重载版本。
如下示例展示了如何构建Message类型消息的发送和接收:
// template 配置
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
// 通过template发送和接收消息
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
// server端接收消息并且回复消息
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
从2.5版本开始,若是返回的Message中没有指定header,framework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC(如果存在的话)。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS,也会将其填充到返回消息的header中。
ErrorHandlingDeserializer
可以考虑在reply container中使用ErrorHandlingDeserializer,如果反序列化失败,RequestReplyFuture将会以异常状态完成,可以访问获取到的ExecutionException,其cause属性中包含DeserializationException。
kafka poison pill & ErrorHandlingDeserializer
poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败,不管重试过多少次之后仍然无法成功被消费。
poison pill可能在如下场景下产生:
- 该记录被损坏
- 该记录发序列化失败
在生产场景中,consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容,将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。
在现实场景中,可能因为如下缘故而遭遇poison pill:
- 生产者改变了key或value的serializer并且持续向先前的topic中发送消息,这将会导致反序列化问题
- consumer的key或value deserializer配置错误
- 不同的生产者实例,使用不同的key或value serializer向topic中发送消息
在发生poison后,consumer在调用poll拉取数据时将无法反序列化record,调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理,针对该topic分区的消费会被阻塞(因为consumer offset一直无法向前移动)。并且,在consumer不停重试针对该消息的反序列化时,大量的反序列化失败日志将会被追加到日志文件中,磁盘占用量将会急剧增大。
ErrorHandlingDeserializer
为了解决poison pill问题,spring引入了ErrorHandlingDeserializer,该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败,那么ErrorHandlingDeserializer将会返回一个null,并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。
replyErrorChecker
从版本2.6.7开始,可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker,当提供了checker方法时,template会自动调用该方法,如果该方法抛出异常,那么该reply message对应的future也会以失败状态完成。
replychecker使用示例如下:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
AggregatingReplyingKafkaTemplate
ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景,但如果对于发送一条消息,存在多个接收方,会返回多条消息的场景,则需要使用AggregatingReplyingKafkaTemplate。
像ReplyingKafkaTemplate一样,AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener container,listener container用于接收返回的消息。除此之外,AggregatingReplyingKafkaTemplate还会接收第三个参数BiPredicate<List<ConsumerRecord<K,v>>, Boolean>,该断言每次在接收到新消息时都会被计算。如果断言返回true,该AggregatingReplyingKafkaTemplate.sendAndReceive方法返回的Future对象将会被完成,并且Future中的值为断言中ConsumerRecord的集合。
从版本2.3.5开始,第三个参数为BiPredicate<List<ConsumerRecord<K,v>>, Boolean>类型,其会在每次接收到消息或是超时(replyTimeout超时)的情况下被调用,第二个参数传入的boolean即是断言的这次调用是否因为超时。该断言可以针对ConsumerRecord进行修改。
returnPartialOnTimeout
AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout,该属性值默认为false,如果该值被设置为true,那么当请求发生超时时,会返回已经接收到的部分ConsumerRecord集合。
BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息,要想成功在超时场景下返回接收到的部分消息,不仅需要returnPartialOnTimeout设置为true,还需要BiPredicate断言在发生timeout的情况下返回值为true。
AggregatingReplyingKafkaTemplate使用示例如下:
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
(coll, timeout) -> timeout || coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
注意sendAndReceive方法返回的future,其值类型为ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>类型,外层的ConsumerRecord并不是真正的返回消息,而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的,外层ConsumerRecord用于存储实际接收到的消息集合。
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>作为聚合返回消息集合的伪ConsumerRecord,其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal release(releaseStrategy返回为true)时,伪ConsumerRecord的topic name为aggregatedResults.当获取该伪ConsumerRecord的原因是timeout(returnPartialOnTimeout被设置为true,并且发生timeout,并且至少获取到一条返回消息),那么伪ConsumerRecord的topic name将会被设置为
partialResultsAfterTimeout.
template为上述伪topic name提供了静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。
AggregatingReplyingKafkaTemplate配置要求
listener container必须要配置为AckMode.MANUAL模式或AckMode.MANUAL_IMMEDIATE;consumer属性enable.auto.commit必须被设置伪false。为了避免任何丢失消息的可能,template只会在没有待处理请求的前提下提交offset,即最后一个未处理请求被releaseStrategy释放。
当consumer发生rebalance时,可能会造成返回消息被重复传递(由于template只会在没有待处理请求的情况下提交offset,如果在listener container接收到消息后,尚未提交offset,此时发生rebalance,那么未提交offset的消息将会被重复接收)。对于在途的请求,消息的重复传递将会被忽略(在途的消息还会存在多条消息聚合的过程);针对已经被releaseStrategy释放的返回消息,如果接收到多条重复的返回消息,那么会在log中看到error日志。
另外,如果AggregatingReplyingKafkaTemplate使用ErrorHandlingDeserializer,那么template将不会自动检测到反序列化异常。因为ErrorHandlingDeserializer在反序列化失败时会返回null,并且在返回的header中记录反序列化异常信息。推荐在应用中调用ReplyingKafkaTemplate.checkDeserialization()方法来判断是否存在反序列化异常。
对于AggregatingReplyingKafkaTemplate,replyErrorChecker也不会自动调用,需要针对每个元素手动调用checkForErrors方法。
接收消息
在使用spring kafka时,可以通过为MessageListenerContainer提供listener或是通过@KafkaListener注解来从kafka broker接收消息。
Message Listener
当使用Message Listener container时,必须为其提供一个listener用于接收消息。如下是8个用于接收消息的接口:
// interface-1
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
// interface-2
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// interface-3
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// interface-4
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// interface-5
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
// interface-6
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
// interface-7
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
// interface-8
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
- interface-1:当使用由容器管理的提交方法或自动提交时,可以使用该接口;该接口针对单条消息进行处理
- interface-2:当使用由容器管理的提交方法时,可以使用该接口;该接口针对单条消息进行处理
- interface-3:当使用由容器管理的提交方法或自动提交时,可以使用该接口;该接口针对单条消息进行处理;该接口提供对consumer实例的访问
- interface-4:当使用由容器管理的提交方法时,可以使用该接口;该接口针对单挑消息进行处理;该接口提供对consumer实例的访问
- interface-5:当使用自动提交或由容器管理的提交方法时,可以使用该接口;该接口针对
consumer.poll方法返回的所有消息进行处理;当使用该接口时,不支持AckMode.RECORD模式,因为所有的poll方法接收到的message batch都给了listener - interface-6:该当使用容器管理的提交方法时,可以使用该接口;该接口会处理由poll方法接收到的所有消息
- interface-7:当使用自动提交或由容器管理的提交方法时,可以使用该接口;该接口针对poll方法返回的所有消息进行处理;使用该接口时不支持
AckMode.RECORD;使用该接口可以针对consumer实例进行处理 - interface-8:当使用由容器管理的commit method时,可以使用该接口;该接口针对由poll方法返回的全部消息进行处理;该接口提供对consumer实例的访问
listener接口提供的consumer对象并不是线程安全的,故而必须在调用该listener的线程内访问该consumer中的方法
在对consumer中的方法进行访问时,不应该在listener中执行任何会修改consumer position或commit offset的方法;position或offset信息由container来进行管理。
MessageListenerContainer
spring kafka提供了两种MessageListenerContainer的实现:
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
上述两种实现的区别如下:
- KafkaMessageListenerContainer在单个线程中对来源所有topic所有分区的消息进行接收
- ConcurrentMessageListenerContainer则是通过将接收消息的任务委托给多个KafkaMessageListenerContainer实例来实现多线程消费。
Interceptors
从2.2.7版本开始,可以为listener container添加RecordInterceptor;拦截器会在container调用listener之前被调用,可以通过拦截器来查看或是修改消息内容。如果interceptor在接收到消息或返回为空,那么后续listener将不会被调用。
从2.7版本开始,RecordInterceptor还增加了方法afterRecord,该方法在listener退出之后调用(正常退出或是抛异常退出)。并且从2.7版本开始,还新增了BatchInterceptor,为Batch Listener提供了类似的拦截器功能;ConsumerAwareRecordInterceptor也在提供拦截器功能的基础上提供了对consumer实例的访问。可以通过kafka consumer来访问consumer metrics。
如果拦截器对record进行了修改(同构创建一个新的record),那么topic、partition、offset必须都保持不变,从而避免类似消息丢失的副作用
为了调用多个interceptors,可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
默认情况下,从2.8版本开始,在使用事务时,拦截器会在事务开启之前被调用。可以将listener container的interceptBeforeTx属性修改为false,从而在事务开启之后调用拦截器方法。从2.9版本开始,上述会被应用于任何transaction manager,而不单单只用于KafkaAwareTransactionManager,这允许拦截器加入到由container开启的jdbc事务。
从2.3.8、2.4.6版本开始,ConcurrentMessageListenerContainer支持Static Membership当concurrency大于1时。而group.instance.id的后缀则是-n,n从1开始。与此同时,增加session.timeout.ms值,可以减少rebalance event的数量,例如,当应用实例重启时。
使用KafkaMessageListenerContainer
在创建KafkaMessageListenerContainer时,可以使用如下构造方法:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
上述构造函数接收一个consumerFactory,并且通过containerProperties接收topic和partition的信息,此外containerProperties中还包含其他配置信息。
containerProperties拥有如下构造方法:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造方法中接收一个TopicPartitionOffset数组,通过该数组可以显式的告知container应该使用哪些分区(通过调用consumer的assign方法)。并且,可以选择为TopicPartitionOffset指定初始的offset。当relativeToCurrent参数为false时(默认为false),如果指定offset为正数,默认代表绝对offset;如果offset为负数,默认代表相对于该分区最后位置的偏移量。如果relativeToCurrent为true,初始offset将会被设置为相对当前的consumer position。当容器启动时,设置的offset将会被应用。
第二个构造方法接收了一个topic数组,kafka则会根据group.id属性来分配分区,将分区在组内的订阅了该topic的消费者实例之间进行分配。
第三个构造方法则是接收一个topicPattern,通过该pattern来匹配分区。
在创建container时,为了将MessageListener分配给container,可以使用ContainerProperties.setMessageListener。如下展示了添加listener的示例:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
在上述示例中,创建DefaultKafkaConsumerFactory实例时,其构造方法只接受了一个ConsumerProperties属性,代表其会在属性中获取key和value的Deserializer.class并创建实例。同时,也可以通过调用DefaultKafkaConsumerFactory的其他构造方法,接收key和value的Deserializer实例,在这种情况下,所有consumer都会公用key和value的反序列化实例。另一种选择时提供Supplier<Deserializer>,这种情况下,每个consumer都会使用不同的反序列化器实例。
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
从2.2.1版本开始,提供了一个叫logContainerConfig的配置属性,当该属性设置为true并且info日志级别开启时,每个listener container都会写日志记录其配置。
默认情况下,会在debug的日志级别记录topic offset commit。从2.1.2开始,通过ContainerProperties中commitLogLevel的属性可以指定commit offset的日志级别。如果要将offset commit的日志级别改为info,可以调用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)方法。
从2.2版本开始,添加了missingTopicsFatal属性,该属性默认值为false。如果在container启动时,其配置的任一topic在broker中并不存在,那么其将会阻止container启动。如果container被配置为监听pattern,那么missingTopicsFatal配置将不会被应用。在未指定missingTopicsFatal时,container线程将会循环调用consumer.poll方法等待topic出现,同时日志输出消息,除了日志之外,没有任何迹象显示出现了问题。
在2.8版本中,引入了authExceptionRetryInterval属性。该属性会导致container在kafkaConsumer得到AuthenticationException或AuthorizationException异常后重试fetch消息。该属性用于kafka consumer获取topic消息权限认证失败时,在等待authExceptionRetryInterval时间间隔后,可以重试获取消息,此时权限可能已经被授予。
在默认情况下,没有配置
authExceptionRetryInterval,AuthenticationException和AuthorizationException将被认为是致命的,这将会导致messageListenerContainer停止。
使用ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer的构造函数类似于KafkaMessageListenerContainer:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
除此之外,ConcurrentMessageListenerContainer还有一个concurrency属性,该属性代表并行度。如果调用container.setConcurrency(3)方法,其会创建3个KafkaMessageListenerContainer.
对于该构造方法,kafka使用组管理功能在消费者实例之间分配分区。
如果container通过TopicPartitonOffset配置时,ConcurrentMessageListenerContainer将TopicPartitionOffset实例分配到多个委托的KafkaMessageListenerContainer实例上,每个委托实例负责监听不同的TopicPartitionOffset。
如果,6个TopicPartitionOffset被提供,并且concurrency被设置为3,每个委托实例将会被分配2个TopicPartitionOffset。如果5个TopicPartitionOffset被提供,并且concurrency被设置为3,那么两个委托容器实例将会被分配到2个分区,第三个委托容器将被分配到1个分区。如果concurrency数大于TopicPartitionOffset数量,那么concurrency将会降低到和TopicPartitionOffset数量相同,每个委托容器实例被分配到一个分区。
从1.3版本开始,MessageListenerContainer提供了对底层KafkaConsumer的metrics信息访问。对于ConcurrentMessageListenerContainer,其metircs方法将会返回一个map,该map中记录了所有委托KafkaMessageListenerContainer实例的metics信息。Map类型为Map<String, Map<MetricName, ? extends Metric>>,其中key为底层KafkaConsumer的client-id.
从2.3版本开始,ContainerProperties提供了idleBetweenPolls属性,允许listener container在调用consumer.poll()方法循环拉取数据时睡眠一段时间。实际的睡眠时间将会取如下两个值中较小的值:idleBetweenPolls属性实际配置的值;max.poll.interval.ms值减去当前消息批处理时间的差值。
max.poll.interval.ms该值代表在使用消费者组管理功能时,两次
poll()调用之间最长时间间隔。该值代表在调用完poll方法后,在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后,poll方法仍然未被调用,那么该消费者示例将会被认为失败,并且消费者组会出发rebalance操作,将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的group.instance.id不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过session.timeout.ms时间后出发rebalance。group.instance.id不为空时,该消费者实例将会被认为是该消费者组的静态成员。默认情况下,
max.poll.interval.ms的默认值为5min。
Committing Offsets
对于offset提交,spring kafka提供了多个选项。如果enable.auto.commit属性在consumer properties中被指定为true,那么kafka会根据其配置对消息进行自动提交。如果enable.auto.commit被指定为false,那么MessageListenerContainer支持几种不同的AckMode设置。默认的AckMode设置为BATCH。从2.3版本开始,spring framework会自动将enable.auto.commit设置为false,在2.3之前则是默认设置为true。
consumer.poll方法会返回一条或多条ConsumerRecord,而MessageListener则是会针对每条ConsumerRecord进行调用。如下列举了container针对每种AckMode会执行的行为:
RECORD:在listener处理完该消息并返回之后,提交offsetBATCH:当所有被poll方法调用返回的消息都处理完成之后,提交offsetTIME:当所有被poll方法返回的消息均被处理完成,或是从上次commit时起已超过ackTime时间COUNT:当所有被poll方法返回的消息都被处理完成,或是从上次commit时起已收到超过ackCount条记录COUNT_TIME:当TIME或COUNT任一条满足时,提交offsetMANUAL:messageListener负责对消息调用Acknowledgment.acknowledge()方法,调用之后语义和BATCH相同,会等待所有被poll方法调用返回的消息都处理完成,之后会批量提交offsetMANUAL_IMMEDIATE:当方法Acknowledgment.acknowledge()被listener调用之后,立刻会提交offset
当使用事务时,offsets将会被发送给事务,并且语义和RECRD或BATCH相同,基于listener的类型决定语义(看listener是record类型还是batch类型)
MANUAL或MANUAL_IMMEDIATE需要listener为AcknowledgingMessageListener或BatchAcknowledgingMessageListener
基于容器的syncCommits容器属性,提交offset时会使用commitSync或commitAsync方法。syncCommits属性默认为true,可以通过setSyncCommitTimeout设置commitSync时的超时时间。在syncCommits设置为false时,可以通过setCommitCallback来获取commitAsync的结果,默认情况下callback是LoggingCommitCallback,其会日志输出errors,并且以debug的级别打印成功日志。
由于listener container拥有自己的提交offset机制,故而更推荐将kafka consumr的ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG属性设置为false。2.3版本开始,会将自动提交属性设置为false,除非显式在consumer factory或container的consumer properties中将自动提交属性覆盖。
Acknowledgment接口具有如下方法:
public interface Acknowledgment {
void acknowledge();
}
该方法能够让container控制何时提交offset。
从2.3版本开始,Acknowledgment接口提供了两个额外的方法nack(long sleep)和nack(int index, long sleep)。第一个方法和record类型的listener一起使用,第二个方法和batch类型的listener一起使用。如果调用类型错误,会抛出IllegalStateException异常。
-
nack(long sleep):对当前record执行nack操作,丢弃poll请求中的remaining records并针对所有分区进行re-seek操作,将position恢复。故而,在指定sleep时间范围后,record将会被重新传递,该操作会阻塞整个message listener的读操作,阻塞时间为sleep,且阻塞范围不限于单个分区。该操作必须在consumer thread中被调用。 -
nack(int index, long sleep):对于batch中index位置的record进行nack操作,该操作会提交位于index之前record的offset,并且会re-seek所有分区,故而位于index和index之后的消息都会在sleep时间后被重新传输。该操作必须在consumer thread中被调用。
当想要提交batch中部分消息时,请使用nack(int index, long sleep)方法。当是使用事务时,设置AckMode为MUANAL;调用nack方法会将已成被成功处理的records offsets发送给事务。
nack只能在调用listener的consumer线程中进行处理。
在使用nack方法时,任何在途的offset都会被提交,而从上次poll起的remaining records都会被丢弃。并且在所有分区都会执行seek操作,故而当前失败的那条record和所有未处理的record都会在下次poll时重新拉取。该操作和抛异常类似,当container中配置了一个DefaultErrorHandler时。
当使用batch listener时,若发生异常可以指定record在bathc中的index。调用nack时,位于index之前的record offset都会被提交,seek操作也会执行,在下次poll操作时也会丢失败和被丢弃的消息进行重新拉取。
从3.0.10版本开始,batch listener可以提交batch中的部分record offset,通过使用acknowledge(index)方法。当该方法被调用时,位于index的record和位于index之前的records都会被提交offset。在调用部分提交之后再调用acknowledge()方法,会针对剩余尚未被提交的records进行offset提交。在使用部分提交时,必须满足如下要求:
- AckMode.MANUAL_IMMEDIATE需要被开启
- 部分提交方法需要在listener thread中被调用
- listener必须批量消费recrods而不是消费单条record
- index必须要位于batch range中
- 重复调用部分提交接口时,后调用的index要比先调用的大
上述要求是强制的,在不满足时会抛出IllegalArgumentException和IllegalStateException异常。
上述调用nack后所说的阻塞均是指调用
KafkaConsumer.pause方法后造成的结果。在pause方法被调用后,任何后续调用的poll方法都不会返回任何records,直到resume方法被调用。pause方法的调用并不会造成分区的rebalance操作。调用pause方法并不会造成线程的阻塞,而是通过poll获取指定分区消息的阻塞。并且,rebalance操作后也不会保留pause/resume状态。
手动提交offset
通常,当使用AckMode.MANUAL或AckMode.MANUAL_IMMEDIATE时,ack必须按顺序确认,因为kafka并不是为每条record都维护提交状态,而是针对每个消费者组/分区只维护一个commit offset。
从2.8版本开始,可以设置container属性asyncAcks,该属性设置后,由poll返回的records可以被以任何顺序确认。设置属性后listener container会将无序(records可以被按任何顺序确认)的提交进行延迟,直到接收到缺失的ack。此时,消费者会处于paused状态,不会有新的records被传递,直到上次poll操作中所有的消息都被提交offset。
当启用asyncAcks特性后,可以异步的针对消息进行处理(Acknowledgment.acknowledge()方法并没有要求只能在listener线程中被调用)。但是,在对消息进行异步处理的时候,如果发生处理失败的情况,可能会增加record重复传输的可能性。(如果在开启asyncAcks时,如果后续消息已经提交ack,但是位于前面的消息处理发生异常,那么后续成功的record其offset将无法被提交,故而后续处理成功的消息将会被重新传递).
在处理消息时抛出异常,那么抛出异常的消息和抛出异常之后的消息都会被重复拉取。
并且,当asyncAcks启用时,无法使用nack()方法来进行nack操作。
@KafkaListener
@KafkaListener会将一个bean方法标记为listener container的listener方法。该bean被封装在一个MessagingMessageListenerAdapter对象中,该Adapter对象的构造方法如下所示,封装了bean对象和bean对象方法:
public MessagingMessageListenerAdapter(Object bean, Method method) {
this.bean = bean;
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
}
在配置@KafkaListener注解的绝大部分属性时,可以使用#{}形式的spel表达式或${}形式的属性占位符。
Record Listener
@KafkaListener注解为简单pojo的listenr提供了机制,如下展示了使用示例:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
该机制需要在@Configuration类上标注@EnableKafka注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的ConcurrentMessageListenerContainer。默认情况下,需要一个bean name为kafkaListenerContainerFactory的bean对象。如下示例展示了如何使用ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
在上述示例中,如果要设置container properties,必须要在container factory对象上调用getContainerProperties(),然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。
从版本2.1.1开始,对由该注解创建的消费者实例,可以设置client.id属性。client.id为clientIdPrefix-n格式,其中n为整数,代表在使用concurrency时的container number。
从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的concurrency和autoStartup属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
可以通过@KafkaListener注解为pojo listener配置显式的topic和分区(也可以为其指定一个初始offset)。如下示例展示了如何为@KafkaListener指定topic和offset:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
如上图所示,可以在topicPartitions属性的@TopicPartition的partitions属性中或是@TopicPartitionpation的partitionOffsets属性中指定分区,但是无法同时在上述两处都指定同一个分区。
从2.5.5版本开始,可以对所有手动分配的分区都指定一个初始offset:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
再上述示例中,*通配符代表partitions属性锁指定的所有分区。在每个@TopicPartition注解中,只能有一个@PartitionOffset注解中带有该通配符。
当listener实现了ConsumerSeekAware时,onPartitionAssigned方法会被调用,即使使用手动分配(assign)。onPartitionAssigned允许在分区被分配时执行任意的seek操作。
从2.6.4版本开始,可以指定由,分隔的分区列表或分区区间,示例如下:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
该分区范围是闭区间,上述代表的区间是0,1,2,3,4,5,7,10,11,12,13,14,15.
上述方式也可以用于指定分区初始的offset:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
手动ack
当使用手动ack时,可以为listener method提供Acknowledgment。如下示例展示了如何使用一个不同的container factory:
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
Consumer Record Metadata
record的元数据可以从message header中进行获取。可以使用如下header名称来获取消息header中的内容:
- KafkaHeaders.OFFSET
- KafkaHeaders.RECEIVED_KEY
- KafkaHeaders.RECEIVED_TOPIC
- KafkaHeaders.RECEIVED_PARTITION
- KafkaHeaders.RECEIVED_TIMESTAMP
- KafkaHeaders.TIMESTAMP_TYPE
从2.5版本开始,如果接收到的消息key为null,那么RECEIVED_KEY在header中并不会出现;而在2.5版本之前,RECEIVED_KEY在header中存在并且值为null。
如下示例展示了如何使用header:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
参数注解(
@Payload、@Header)必须在listener method方法的具体实现上被指定,如果参数注解只指定在接口上,而没有在具体实现方法上指定,那么指定的注解将不会起作用。
从2.5版本开始,相比于上述示例中针对单条header内容进行接收,可以通过ConsumerRecordMetadata类型的参数来接收所有的header数据:
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
上述ConsumerRecordMetadata类型中包含ConsumerRecord中所有的数据,除了record中的key和value。