Files
rikako-note/spring/spring kafka/spring kafka.md

88 KiB
Raw Blame History

Spring Kafka

连接到kafka

运行时切换bootstrap servers

从2.5版本开始KafkaAdmin、ProducerFactory、ConsumerFactory都继承于KafkaResourceFactory抽象类。通过调用KafkaResourceFactory抽象类的setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)方法可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。

切换bootstrap后关闭旧consumer和producer

kafka consumer和producer通常都是基于长连接的在调用setBootstrapServersSupplier在运行时切换bootstrap servers后如果想要关闭现存的producer可以调用DefaultKafkaProducerFactoryreset方法。如果想要关闭现存的consumer可以调用KafkaListenerEndpointRegistryclose方法调用close后再调用start或是调用其他listener container的close和start方法。

ABSwitchCluster

为了方便起见framework提供了ABSwitchCluster该类支持两套bootstrap servers集合在任一时刻只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier<String>接口,将ABSwitchCluster对象提供给consumer factory, producer factory, KafkaAdmin后如果想要切换bootstrap servers可以调用ABSwitchCluster类的primarysecondary方法并关闭生产者和消费者的旧实例关闭生产者旧实例在producer factory上调用reset方法用于创建到新bootstrap servers的连接对于消费者实例可以对所有listener container先调用close方法再调用start方法当使用@KafkaListener注解时需要对KafkaListenerEndpointRegistrybean对象调用close和start方法。

Factory Listener

从2.5版本开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory都可以配置Listener通过配置Listener可以监听生产者或消费者实例的创建和关闭。

// producer listener
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
// consumer listener
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

再上述接口中id代表再factory bean对象名称后追加client-id属性二者通过.分隔。

配置Topic

如果在当前应用上下文中定义了KafkaAdmin bean对象kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加可以定义一个NewTopic类型的bean对象kafkaAdmin会自动将该topic添加到broker中。

为了方便topic的创建2.3版本中引入了TopicBuilder类。

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

从2.6版本开始创建NewTopic时可以省略partitions()和replicas()方法的调用此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

从版本2.7开始,可以在KafkaAdmin.NewTopics的bean对象中声明多个NewTopic对象

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}

当使用spring boot时KafkaAdmin对象将会被自动注册故而只需要定义NewTopic bean对象即可。

默认情况下如果kafka broker不可用会输出日志进行记录但是此时context的载入还会继续后续可以手动调用KafkaAdmin的initalize方法和进行重试。如果想要在kafka broker不可用时停止context的载入可以将kafka AdminfatalIfBrokerNotAvailable属性设置为true此时context会初始化失败。

从版本2.7开始KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic

  • createOrModifyTopics
  • describeTopics

从版本2.9.10、3.0.9开始KafkaAdmin提供了setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)接口该接口接收一个Predicate<NewTopic>参数通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象每个kafkaAdmin对应不同的broker集群在上下文中含有多个NewTopic对象时可以通过predicate判断每个topic应该属性哪个amdin。

发送消息

KafkaTemplate类对KafkaProducer进行了包装提供了如下接口用于向kafka topic发送消息。

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

其中sendDefault接口需要向KafkaTemplate提供一个默认的topic。

kafkaTemplate中部分api接收timestamp作为参数并且将timestamp存储到record中。接口中指定的timestamp参数如何存储取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为CREATE_TIME那么用户指定的timestamp参数将会被使用如果用户没有指定timestamp那么会自动创建timestampproducer会在发送时将timestamp指定为System.currentTimeMillis()。如果topic中timstamp类型被配置为LOG_APPEND_TIME那么用户指定的timestamp将会被丢弃而broker则会负责为timestamp赋值。

mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法execute接口则是提供了对底层KafkaProducer的直接访问。

要使用KafkaTemplate可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从2.5开始创建KafkaTemplate时可以基于factory进行创建但是覆盖factory中的配置属性具体示例如下

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

当使用KafkaTemplate接收Message\<?\>类型的参数时可以将topic、partition、key和timestamp参数指定在Message的header中header中包含如下条目

  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION
  • KafkaHeaders.KEY
  • KafkaHeaders.TIMESTAMP

除了调用发送方法获取CompletableFuture外还可以为KafkaTemplate配置一个ProducerListener从而在消息发送完成成功或失败后执行一个异步的回调。如下是ProducerListener接口的定义

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下KafkaTemplate配置了一个LoggingProducerListener会在发送失败时打印失败日志在发送成功时并不做任何事。并且为了方便起见方法的默认实现已经被提供可以只覆盖其中一个方法。

send方法默认返回的是CompletableFuture类型可以在发送完成之后为future注册一个回调

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

其中Throwable类型的ex可以被转化为KafkaProducerException该类型的failedProducerRecord属性可以获取发送失败的record。

如果想要同步调用KafkaTemplate的发送方法并且等待返回结果可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下调用CompletableFuture.get推荐使用带超时参数的方法。如果在Producer配置中指定了linger.ms那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便KafkaTemplate提供了带autoFlush参数的构造器版本如果设置autoFlush为truekafkaTemplate在每次发送消息时都会调用flush方法。

发送示例

如下展示了通过KafkaTemplate向broker发送消息的示例

// async
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}
// sync
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

RoutingKafkaTemplate

从2.5版本开始额可以通过RoutingKafkaTemplate在运行时选择producer实例选择过程基于topic名称。

RoutingKafkaTemplate不支持事务也不支持execute、flush、metrics等方法因为RoutingKafkaTemplate根据topic来选择producer但是在执行这些操作时并不知道操作所属topic。

RoutingKafkaTemplate需要一个mapmap的key为java.util.regex.Pattern而value则是ProducerFactory<Object, Object>实例。该map必须是有序的例如LinkedHashMap该map需要按顺序遍历顺序在前的key-value对会优先匹配。

如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息实例中每个topic都使用不同的序列化方式。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

使用DefaultKafkaProducerFactory

ProducerFactory是用于创建生产者实例的。当没有使用事务时默认情况下DefaultKafkaFactory会创建一个单例的生产者实例所有客户端都会使用生产者实例。但是如果在template中调用了flush方法这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始DefaultKafkaFactory有了新的producerPerThread属性当该属性设置为true时factory会针对每个线程都创建并缓存一个producer实例。

producerPerThread被设置为true时若线程中的producer不再被需要那么对factory必须手动调用closeThreadBoundProducer()。这将会物理上对producer进行关闭并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。

当创建DefaultKafkaFactory时key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时可以选择向构造函数中传入serializer实例或是传入serializer supplier对象

  • 当传入serializer实例时通过该factory创建的所有生产者实例都共享该serializer实例
  • 当传入的是返回一个serializer的supplier时可令通过该factory创建的producer实例都拥有属于自己的serializer

如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从2.5.10版本开始可以在factory创建之后再更新factory的producer config属性。例如可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例故而需要调用factory的reset方法在调用reset后所有现存producer实例都会被关闭而之后新创建的producer都会使用新的属性配置。

在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。

为了更新producer属性配置factory提供了如下两个接口

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

ReplyingKafkaTemplate

从2.1.3版本开始kafka引入了ReplyingKafkaTemplate其是KafkaTemplate的一个子类用于提供request/reply语义。该类相比父类含有两个额外的方法

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

该方法的返回类型RequestReplyFuture继承了CompletableFutureRequestReplyFuture会异步的注入该future的结果可能正常返回也可能是一个exception或者timeout

RequestReplyFuture含有一个sendFuture属性该属性是调用kafkaTemplate的send方法发送消息的结果类型为CompletableFuture<SendResult<K,V>>可以通过该属性future来判断发送消息操作的结果。

如果在调用sendAndReceive方法时没有传递replyTimeout参数或是指定replyTimeout参数为null那么该template的defaultReplyTimeout属性将会被用作超时时间。默认情况下该超时属性为5s。

从2.8.8版本开始该template还有一个waitForAssingment方法。当reply container被配置为auto.offset.reset=latest时waitForAssingment方法相当有用避免当reply container尚未初始化完成时发送消息对应的reply已经返回了。

如下展示了如何使用ReplyingKafkaTemplate:

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

在上述示例中采用了spring自动注入的containerFactory来创建reply container。

ReplyingKafkaTemplate header

在使用ReplyingKafkaHeader时template会为消息设置一个headerKafkaHeaders.CORRELATION_ID),该header必须被接受消息的server端返回。

server端返回correlation id的示例如下

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

上述@KafkaListener结构会返回correlation id并且决定返回消息发送的topic。

并且ReplyingKafkaTemplate会使用headerKafkaHeaders.REPLY_TOPIC来代表返回消息发送到的topic。

从版本2.2开始该template尝试通过配置的reply container中检测reply topic和reply 分区。如果container被配置为监听单个topic或TopicPartitionOffset该topic或topicpartitionoffset将会被用于设置reply header但是如果reply container被配置监听多个topic或topicpartitionoffset用户必须自己手动设置header。

为消息设置header的示例如下

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

若ConcurrentKafkaListenerContainerFactory通过TopicPartitionOffset来配置时可以多个template共享一个reply topic只要每个template监听的分区不同。

若ConcurrentKafkaListenerContainerFactory通过topic来进行配置时那么每个template实例都必须拥有不同的group id在这种情况下所有的template实例都会接受到所有消息但是只有发送那条消息的template实例才能匹配到correlation id。这样在弹性收缩template规模的时候会相当方便但是所有的template都接收所有消息将会带来额外的网络通信开销并且将不想收到的消息丢弃也会带来额外开销。当使用该设置时推荐将template中sharedReplyTopic属性设置为true此时将会把收到不想要消息的日志级别从ERROR降低为DEBUG。

如下是使用共享topic配置template的示例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}

默认情况下ReplyingKafkaTemplate将会使用如下三种header

  • KafkaHeaders.CORRELATION_ID用于关联发送消息和回复消息的关联id
  • KafkaHeaders.REPLY_TOPIC用于告诉接收消息的server将消息发送到哪个topic
  • KafkaHeaders.REPLY_PARTITION该header是可选的可以通过该header告知server将消息发送到指定的分区

上述header将会被@KafkaListener结构用作返回消息的路由。

通过Message来发送请求和返回请求

在2.7版本中对ReplyingKafkaTemplate加入了如下接口来发送和接收spring-messaging中的Message<?>

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

上述接口会使用template默认的replyTimeout也存在接收timeout参数的重载版本。

如下示例展示了如何构建Message类型消息的发送和接收

// template 配置
@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}

// 通过template发送和接收消息
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));

// server端接收消息并且回复消息
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

从2.5版本开始若是返回的Message中没有指定headerframework将会自动检测header并且填充到返回消息的header中。检测到header信息的来源可以是@SendTo决定发送到的topic或是接收到消息的KafkaHeaders.REPLY_TOPIC如果存在的话。如果接收到消息中存在KafkaHeaders.CORRELATION_ID或是KafkaHeaders.REPLY_PARTITIONS也会将其填充到返回消息的header中。

ErrorHandlingDeserializer

可以考虑在reply container中使用ErrorHandlingDeserializer如果反序列化失败RequestReplyFuture将会以异常状态完成可以访问获取到的ExecutionException其cause属性中包含DeserializationException。

kafka poison pill & ErrorHandlingDeserializer

poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败不管重试过多少次之后仍然无法成功被消费。

poison pill可能在如下场景下产生

  • 该记录被损坏
  • 该记录发序列化失败

在生产场景中consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。

在现实场景中可能因为如下缘故而遭遇poison pill

  • 生产者改变了key或value的serializer并且持续向先前的topic中发送消息这将会导致反序列化问题
  • consumer的key或value deserializer配置错误
  • 不同的生产者实例使用不同的key或value serializer向topic中发送消息

在发生poison后consumer在调用poll拉取数据时将无法反序列化record调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理针对该topic分区的消费会被阻塞因为consumer offset一直无法向前移动。并且在consumer不停重试针对该消息的反序列化时大量的反序列化失败日志将会被追加到日志文件中磁盘占用量将会急剧增大。

ErrorHandlingDeserializer

为了解决poison pill问题spring引入了ErrorHandlingDeserializer该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败那么ErrorHandlingDeserializer将会返回一个null并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。

replyErrorChecker

从版本2.6.7开始可以为ReplyingKafkaTemplate设置一个ReplyErrorChecker当提供了checker方法时template会自动调用该方法如果该方法抛出异常那么该reply message对应的future也会以失败状态完成。

replychecker使用示例如下

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

AggregatingReplyingKafkaTemplate

ReplyingKafkaTemplate针对的是发送一条消息针对一条回复的场景但如果对于发送一条消息存在多个接收方会返回多条消息的场景则需要使用AggregatingReplyingKafkaTemplate。

像ReplyingKafkaTemplate一样AggregatingReplyingKafkaTemplate构造方法也接收一个producer factory和一个listener containerlistener container用于接收返回的消息。除此之外AggregatingReplyingKafkaTemplate还会接收第三个参数BiPredicate<List<ConsumerRecord<K,v>>, Boolean>该断言每次在接收到新消息时都会被计算。如果断言返回trueAggregatingReplyingKafkaTemplate.sendAndReceive方法返回的Future对象将会被完成并且Future中的值为断言中ConsumerRecord的集合。

从版本2.3.5开始,第三个参数为BiPredicate<List<ConsumerRecord<K,v>>, Boolean>类型其会在每次接收到消息或是超时replyTimeout超时的情况下被调用第二个参数传入的boolean即是断言的这次调用是否因为超时。该断言可以针对ConsumerRecord进行修改

returnPartialOnTimeout

AggregatingReplyingKafkaTemplate拥有一个属性returnPatialOnTimeout该属性值默认为false如果该值被设置为true那么当请求发生超时时会返回已经接收到的部分ConsumerRecord集合。

BiPredicate参数和returnPartialOnTimeout属性共同决定了在发生超时时是否返回已接收的部分返回消息要想成功在超时场景下返回接收到的部分消息不仅需要returnPartialOnTimeout设置为true还需要BiPredicate断言在发生timeout的情况下返回值为true。

AggregatingReplyingKafkaTemplate使用示例如下

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        (coll, timeout) -> timeout || coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

注意sendAndReceive方法返回的future其值类型为ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>类型外层的ConsumerRecord并不是真正的返回消息而是由AggregatingReplyingKafkaTemplate将返回消息聚合而成的外层ConsumerRecord用于存储实际接收到的消息集合。

ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>

ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>>作为聚合返回消息集合的伪ConsumerRecord其topic name也并非实际存在的。当获取该伪ConsumerRecord的原因是因为normal releasereleaseStrategy返回为true伪ConsumerRecord的topic name为aggregatedResults.

当获取该伪ConsumerRecord的原因是timeoutreturnPartialOnTimeout被设置为true并且发生timeout并且至少获取到一条返回消息那么伪ConsumerRecord的topic name将会被设置为partialResultsAfterTimeout.

template为上述伪topic name提供了静态变量

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

伪ConsumerRecord中存储的ConsumerRecord集合则是AggregatingReplyingKafkaTemplate实际接收到的ConsumerRecord。

AggregatingReplyingKafkaTemplate配置要求

listener container必须要配置为AckMode.MANUAL模式或AckMode.MANUAL_IMMEDIATEconsumer属性enable.auto.commit必须被设置伪false。为了避免任何丢失消息的可能template只会在没有待处理请求的前提下提交offset即最后一个未处理请求被releaseStrategy释放。

当consumer发生rebalance时可能会造成返回消息被重复传递由于template只会在没有待处理请求的情况下提交offset如果在listener container接收到消息后尚未提交offset此时发生rebalance那么未提交offset的消息将会被重复接收。对于在途的请求消息的重复传递将会被忽略在途的消息还会存在多条消息聚合的过程针对已经被releaseStrategy释放的返回消息如果接收到多条重复的返回消息那么会在log中看到error日志。

另外如果AggregatingReplyingKafkaTemplate使用ErrorHandlingDeserializer那么template将不会自动检测到反序列化异常。因为ErrorHandlingDeserializer在反序列化失败时会返回null并且在返回的header中记录反序列化异常信息。推荐在应用中调用ReplyingKafkaTemplate.checkDeserialization()方法来判断是否存在反序列化异常。

对于AggregatingReplyingKafkaTemplatereplyErrorChecker也不会自动调用,需要针对每个元素手动调用checkForErrors方法。

接收消息

在使用spring kafka时可以通过为MessageListenerContainer提供listener或是通过@KafkaListener注解来从kafka broker接收消息。

Message Listener

当使用Message Listener container时必须为其提供一个listener用于接收消息。如下是8个用于接收消息的接口

// interface-1
public interface MessageListener<K, V> {

    void onMessage(ConsumerRecord<K, V> data);

}

// interface-2
public interface AcknowledgingMessageListener<K, V> {

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

// interface-3
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

// interface-4
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

// interface-5
public interface BatchMessageListener<K, V> {

    void onMessage(List<ConsumerRecord<K, V>> data);

}

// interface-6
public interface BatchAcknowledgingMessageListener<K, V> {

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

// interface-7
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

// interface-8
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}
  1. interface-1当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理
  2. interface-2当使用由容器管理的提交方法时可以使用该接口该接口针对单条消息进行处理
  3. interface-3当使用由容器管理的提交方法或自动提交时可以使用该接口该接口针对单条消息进行处理该接口提供对consumer实例的访问
  4. interface-4当使用由容器管理的提交方法时可以使用该接口该接口针对单挑消息进行处理该接口提供对consumer实例的访问
  5. interface-5当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对consumer.poll方法返回的所有消息进行处理;当使用该接口时,不支持AckMode.RECORD模式因为所有的poll方法接收到的message batch都给了listener
  6. interface-6该当使用容器管理的提交方法时可以使用该接口该接口会处理由poll方法接收到的所有消息
  7. interface-7当使用自动提交或由容器管理的提交方法时可以使用该接口该接口针对poll方法返回的所有消息进行处理使用该接口时不支持AckMode.RECORD使用该接口可以针对consumer实例进行处理
  8. interface-8当使用由容器管理的commit method时可以使用该接口该接口针对由poll方法返回的全部消息进行处理该接口提供对consumer实例的访问

listener接口提供的consumer对象并不是线程安全的故而必须在调用该listener的线程内访问该consumer中的方法

在对consumer中的方法进行访问时不应该在listener中执行任何会修改consumer position或commit offset的方法position或offset信息由container来进行管理。

MessageListenerContainer

spring kafka提供了两种MessageListenerContainer的实现

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

上述两种实现的区别如下:

  • KafkaMessageListenerContainer在单个线程中对来源所有topic所有分区的消息进行接收
  • ConcurrentMessageListenerContainer则是通过将接收消息的任务委托给多个KafkaMessageListenerContainer实例来实现多线程消费。

Interceptors

从2.2.7版本开始可以为listener container添加RecordInterceptor拦截器会在container调用listener之前被调用可以通过拦截器来查看或是修改消息内容。如果interceptor在接收到消息或返回为空那么后续listener将不会被调用。

从2.7版本开始RecordInterceptor还增加了方法afterRecord该方法在listener退出之后调用正常退出或是抛异常退出。并且从2.7版本开始,还新增了BatchInterceptor为Batch Listener提供了类似的拦截器功能ConsumerAwareRecordInterceptor也在提供拦截器功能的基础上提供了对consumer实例的访问。可以通过kafka consumer来访问consumer metrics。

如果拦截器对record进行了修改同构创建一个新的record那么topic、partition、offset必须都保持不变从而避免类似消息丢失的副作用

为了调用多个interceptors可以使用CompositeRecordInterceptorCompositeBatchInterceptor

默认情况下从2.8版本开始在使用事务时拦截器会在事务开启之前被调用。可以将listener container的interceptBeforeTx属性修改为false从而在事务开启之后调用拦截器方法。从2.9版本开始上述会被应用于任何transaction manager而不单单只用于KafkaAwareTransactionManager这允许拦截器加入到由container开启的jdbc事务。

从2.3.8、2.4.6版本开始,ConcurrentMessageListenerContainer支持Static Membership当concurrency大于1时。而group.instance.id的后缀则是-nn从1开始。与此同时增加session.timeout.ms可以减少rebalance event的数量例如当应用实例重启时。

使用KafkaMessageListenerContainer

在创建KafkaMessageListenerContainer时可以使用如下构造方法

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

上述构造函数接收一个consumerFactory并且通过containerProperties接收topic和partition的信息此外containerProperties中还包含其他配置信息。

containerProperties拥有如下构造方法

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造方法中接收一个TopicPartitionOffset数组通过该数组可以显式的告知container应该使用哪些分区通过调用consumer的assign方法。并且可以选择为TopicPartitionOffset指定初始的offset。当relativeToCurrent参数为false时默认为false如果指定offset为正数默认代表绝对offset如果offset为负数默认代表相对于该分区最后位置的偏移量。如果relativeToCurrent为true初始offset将会被设置为相对当前的consumer position。当容器启动时设置的offset将会被应用。

第二个构造方法接收了一个topic数组kafka则会根据group.id属性来分配分区将分区在组内的订阅了该topic的消费者实例之间进行分配。

第三个构造方法则是接收一个topicPattern通过该pattern来匹配分区。

在创建container时为了将MessageListener分配给container可以使用ContainerProperties.setMessageListener。如下展示了添加listener的示例

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

在上述示例中创建DefaultKafkaConsumerFactory实例时其构造方法只接受了一个ConsumerProperties属性代表其会在属性中获取key和value的Deserializer.class并创建实例。同时也可以通过调用DefaultKafkaConsumerFactory的其他构造方法接收key和value的Deserializer实例在这种情况下所有consumer都会公用key和value的反序列化实例。另一种选择时提供Supplier<Deserializer>这种情况下每个consumer都会使用不同的反序列化器实例。

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

从2.2.1版本开始,提供了一个叫logContainerConfig的配置属性当该属性设置为true并且info日志级别开启时每个listener container都会写日志记录其配置。

默认情况下会在debug的日志级别记录topic offset commit。从2.1.2开始,通过ContainerPropertiescommitLogLevel的属性可以指定commit offset的日志级别。如果要将offset commit的日志级别改为info可以调用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)方法。

从2.2版本开始,添加了missingTopicsFatal属性该属性默认值为false。如果在container启动时其配置的任一topic在broker中并不存在那么其将会阻止container启动。如果container被配置为监听pattern那么missingTopicsFatal配置将不会被应用。在未指定missingTopicsFatal时container线程将会循环调用consumer.poll方法等待topic出现同时日志输出消息除了日志之外没有任何迹象显示出现了问题。

在2.8版本中,引入了authExceptionRetryInterval属性。该属性会导致container在kafkaConsumer得到AuthenticationExceptionAuthorizationException异常后重试fetch消息。该属性用于kafka consumer获取topic消息权限认证失败时在等待authExceptionRetryInterval时间间隔后,可以重试获取消息,此时权限可能已经被授予。

在默认情况下,没有配置authExceptionRetryIntervalAuthenticationException和AuthorizationException将被认为是致命的这将会导致messageListenerContainer停止。

使用ConcurrentMessageListenerContainer

ConcurrentMessageListenerContainer的构造函数类似于KafkaMessageListenerContainer

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

除此之外ConcurrentMessageListenerContainer还有一个concurrency属性该属性代表并行度。如果调用container.setConcurrency(3)方法其会创建3个KafkaMessageListenerContainer.

对于该构造方法kafka使用组管理功能在消费者实例之间分配分区。

如果container通过TopicPartitonOffset配置时ConcurrentMessageListenerContainer将TopicPartitionOffset实例分配到多个委托的KafkaMessageListenerContainer实例上每个委托实例负责监听不同的TopicPartitionOffset。

如果6个TopicPartitionOffset被提供并且concurrency被设置为3每个委托实例将会被分配2个TopicPartitionOffset。如果5个TopicPartitionOffset被提供并且concurrency被设置为3那么两个委托容器实例将会被分配到2个分区第三个委托容器将被分配到1个分区。如果concurrency数大于TopicPartitionOffset数量那么concurrency将会降低到和TopicPartitionOffset数量相同每个委托容器实例被分配到一个分区。

从1.3版本开始,MessageListenerContainer提供了对底层KafkaConsumer的metrics信息访问。对于ConcurrentMessageListenerContainer其metircs方法将会返回一个map该map中记录了所有委托KafkaMessageListenerContainer实例的metics信息。Map类型为Map<String, Map<MetricName, ? extends Metric>>,其中key为底层KafkaConsumer的client-id.

从2.3版本开始ContainerProperties提供了idleBetweenPolls属性允许listener container在调用consumer.poll()方法循环拉取数据时睡眠一段时间。实际的睡眠时间将会取如下两个值中较小的值:idleBetweenPolls属性实际配置的值;max.poll.interval.ms值减去当前消息批处理时间的差值。

max.poll.interval.ms

该值代表在使用消费者组管理功能时,两次poll()调用之间最长时间间隔。该值代表在调用完poll方法后在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后poll方法仍然未被调用那么该消费者示例将会被认为失败并且消费者组会出发rebalance操作将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的group.instance.id不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过session.timeout.ms时间后出发rebalance。group.instance.id不为空时,该消费者实例将会被认为是该消费者组的静态成员。

默认情况下,max.poll.interval.ms的默认值为5min。

Committing Offsets

对于offset提交spring kafka提供了多个选项。如果enable.auto.commit属性在consumer properties中被指定为true那么kafka会根据其配置对消息进行自动提交。如果enable.auto.commit被指定为false那么MessageListenerContainer支持几种不同的AckMode设置。默认的AckMode设置为BATCH。从2.3版本开始spring framework会自动将enable.auto.commit设置为false在2.3之前则是默认设置为true。

consumer.poll方法会返回一条或多条ConsumerRecord而MessageListener则是会针对每条ConsumerRecord进行调用。如下列举了container针对每种AckMode会执行的行为

  • RECORD在listener处理完该消息并返回之后提交offset
  • BATCH当所有被poll方法调用返回的消息都处理完成之后提交offset
  • TIME当所有被poll方法返回的消息均被处理完成或是从上次commit时起已超过ackTime时间
  • COUNT当所有被poll方法返回的消息都被处理完成或是从上次commit时起已收到超过ackCount条记录
  • COUNT_TIME当TIME或COUNT任一条满足时提交offset
  • MANUALmessageListener负责对消息调用Acknowledgment.acknowledge()方法调用之后语义和BATCH相同会等待所有被poll方法调用返回的消息都处理完成之后会批量提交offset
  • MANUAL_IMMEDIATE:当方法Acknowledgment.acknowledge()被listener调用之后立刻会提交offset

当使用事务时offsets将会被发送给事务并且语义和RECRD或BATCH相同基于listener的类型决定语义看listener是record类型还是batch类型

MANUALMANUAL_IMMEDIATE需要listener为AcknowledgingMessageListener BatchAcknowledgingMessageListener

基于容器的syncCommits容器属性提交offset时会使用commitSynccommitAsync方法。syncCommits属性默认为true可以通过setSyncCommitTimeout设置commitSync时的超时时间。在syncCommits设置为false时可以通过setCommitCallback来获取commitAsync的结果默认情况下callback是LoggingCommitCallback其会日志输出errors并且以debug的级别打印成功日志。

由于listener container拥有自己的提交offset机制故而更推荐将kafka consumr的ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG属性设置为false。2.3版本开始会将自动提交属性设置为false除非显式在consumer factory或container的consumer properties中将自动提交属性覆盖。

Acknowledgment接口具有如下方法

public interface Acknowledgment {

    void acknowledge();

}

该方法能够让container控制何时提交offset。

从2.3版本开始,Acknowledgment接口提供了两个额外的方法nack(long sleep)nack(int index, long sleep)。第一个方法和record类型的listener一起使用第二个方法和batch类型的listener一起使用。如果调用类型错误会抛出IllegalStateException异常。

  • nack(long sleep)对当前record执行nack操作丢弃poll请求中的remaining records并针对所有分区进行re-seek操作将position恢复。故而在指定sleep时间范围后record将会被重新传递该操作会阻塞整个message listener的读操作阻塞时间为sleep且阻塞范围不限于单个分区。该操作必须在consumer thread中被调用。

  • nack(int index, long sleep)对于batch中index位置的record进行nack操作该操作会提交位于index之前record的offset并且会re-seek所有分区故而位于index和index之后的消息都会在sleep时间后被重新传输。该操作必须在consumer thread中被调用。

当想要提交batch中部分消息时请使用nack(int index, long sleep)方法。当是使用事务时设置AckMode为MUANAL调用nack方法会将已成被成功处理的records offsets发送给事务。

nack只能在调用listener的consumer线程中进行处理。

在使用nack方法时任何在途的offset都会被提交而从上次poll起的remaining records都会被丢弃。并且在所有分区都会执行seek操作故而当前失败的那条record和所有未处理的record都会在下次poll时重新拉取。该操作和抛异常类似当container中配置了一个DefaultErrorHandler时。

当使用batch listener时若发生异常可以指定record在bathc中的index。调用nack时位于index之前的record offset都会被提交seek操作也会执行在下次poll操作时也会丢失败和被丢弃的消息进行重新拉取。

从3.0.10版本开始batch listener可以提交batch中的部分record offset通过使用acknowledge(index)方法。当该方法被调用时位于index的record和位于index之前的records都会被提交offset。在调用部分提交之后再调用acknowledge()方法会针对剩余尚未被提交的records进行offset提交。在使用部分提交时必须满足如下要求

  • AckMode.MANUAL_IMMEDIATE需要被开启
  • 部分提交方法需要在listener thread中被调用
  • listener必须批量消费recrods而不是消费单条record
  • index必须要位于batch range中
  • 重复调用部分提交接口时后调用的index要比先调用的大

上述要求是强制的在不满足时会抛出IllegalArgumentException和IllegalStateException异常。

上述调用nack后所说的阻塞均是指调用KafkaConsumer.pause方法后造成的结果。在pause方法被调用后任何后续调用的poll方法都不会返回任何records直到resume方法被调用。pause方法的调用并不会造成分区的rebalance操作。调用pause方法并不会造成线程的阻塞而是通过poll获取指定分区消息的阻塞。

并且rebalance操作后也不会保留pause/resume状态。

手动提交offset

通常,当使用AckMode.MANUALAckMode.MANUAL_IMMEDIATEack必须按顺序确认因为kafka并不是为每条record都维护提交状态而是针对每个消费者组/分区只维护一个commit offset。

从2.8版本开始可以设置container属性asyncAcks该属性设置后由poll返回的records可以被以任何顺序确认。设置属性后listener container会将无序records可以被按任何顺序确认的提交进行延迟直到接收到缺失的ack。此时消费者会处于paused状态不会有新的records被传递直到上次poll操作中所有的消息都被提交offset。

当启用asyncAcks特性后,可以异步的针对消息进行处理(Acknowledgment.acknowledge()方法并没有要求只能在listener线程中被调用。但是在对消息进行异步处理的时候如果发生处理失败的情况可能会增加record重复传输的可能性。如果在开启asyncAcks时如果后续消息已经提交ack但是位于前面的消息处理发生异常那么后续成功的record其offset将无法被提交故而后续处理成功的消息将会被重新传递.

在处理消息时抛出异常,那么抛出异常的消息和抛出异常之后的消息都会被重复拉取。

并且,当asyncAcks启用时,无法使用nack()方法来进行nack操作。

@KafkaListener

@KafkaListener会将一个bean方法标记为listener container的listener方法。该bean被封装在一个MessagingMessageListenerAdapter对象中该Adapter对象的构造方法如下所示封装了bean对象和bean对象方法

public MessagingMessageListenerAdapter(Object bean, Method method) {
		this.bean = bean;
		this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
	}

在配置@KafkaListener注解的绝大部分属性时可以使用#{}形式的spel表达式或${}形式的属性占位符。

Record Listener

@KafkaListener注解为简单pojo的listenr提供了机制如下展示了使用示例

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

该机制需要在@Configuration类上标注@EnableKafka注解,并且需要一个 listener container factory bean对象。该listener container用于配置底层的ConcurrentMessageListenerContainer。默认情况下需要一个bean name为kafkaListenerContainerFactory的bean对象。如下示例展示了如何使用ConcurrentMessageListenerContainer

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

在上述示例中如果要设置container properties必须要在container factory对象上调用getContainerProperties()然后再设置。该被返回的container properties将会被作为实际被注入到container对象中properties的模板。

从版本2.1.1开始,对由该注解创建的消费者实例,可以设置client.id属性。client.idclientIdPrefix-n格式其中n为整数代表在使用concurrency时的container number。

从2.2版本开始,可以通过@KafkaListener注解覆盖container factory的concurrencyautoStartup属性。为注解属性赋的值可以是simple value、spel表达式或property placeholder使用示例如下所示

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

显式分区分配

可以通过@KafkaListener注解为pojo listener配置显式的topic和分区也可以为其指定一个初始offset。如下示例展示了如何为@KafkaListener指定topic和offset

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

如上图所示可以在topicPartitions属性的@TopicPartition的partitions属性中或是@TopicPartitionpation的partitionOffsets属性中指定分区,但是无法同时在上述两处都指定同一个分区。

从2.5.5版本开始可以对所有手动分配的分区都指定一个初始offset

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

再上述示例中,*通配符代表partitions属性锁指定的所有分区。在每个@TopicPartition注解中,只能有一个@PartitionOffset注解中带有该通配符。

当listener实现了ConsumerSeekAware时,onPartitionAssigned方法会被调用,即使使用手动分配(assign)。onPartitionAssigned允许在分区被分配时执行任意的seek操作。

从2.6.4版本开始,可以指定由,分隔的分区列表或分区区间,示例如下:

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

该分区范围是闭区间,上述代表的区间是0,1,2,3,4,5,7,10,11,12,13,14,15.

上述方式也可以用于指定分区初始的offset

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

手动ack

当使用手动ack时可以为listener method提供Acknowledgment。如下示例展示了如何使用一个不同的container factory

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

Consumer Record Metadata

record的元数据可以从message header中进行获取。可以使用如下header名称来获取消息header中的内容

  • KafkaHeaders.OFFSET
  • KafkaHeaders.RECEIVED_KEY
  • KafkaHeaders.RECEIVED_TOPIC
  • KafkaHeaders.RECEIVED_PARTITION
  • KafkaHeaders.RECEIVED_TIMESTAMP
  • KafkaHeaders.TIMESTAMP_TYPE

从2.5版本开始如果接收到的消息key为null那么RECEIVED_KEY在header中并不会出现而在2.5版本之前,RECEIVED_KEY在header中存在并且值为null。

如下示例展示了如何使用header

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

参数注解(@Payload@Header必须在listener method方法的具体实现上被指定如果参数注解只指定在接口上而没有在具体实现方法上指定那么指定的注解将不会起作用。

从2.5版本开始相比于上述示例中针对单条header内容进行接收可以通过ConsumerRecordMetadata类型的参数来接收所有的header数据

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}

上述ConsumerRecordMetadata类型中包含ConsumerRecord中所有的数据除了record中的key和value。

Batch Listener

从版本1.1开始,可以通过@KafkaListener方法来接收record batch。

为了配置listener container factory支持batch listener需要设置container factory的batchListener属性。如下展示了配置示例:

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
   return factory;
}

从2.8版本开始,可以通过@KafkaListener属性来覆盖container factory的batchListener属性,只需要为@KafkaListener注解指定batch属性。

从2.9.6版本开始container factory对于recordMessageConverterbatchMessageConverter有不同的setter。在2.9.6之前的版本中,只有messageConverter的setter该converter被同时用于批量和单条场景。

如下示例展示了如何接收list类型的payload

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

record batch的topic、分区、offset等信息也可以批量获取如下展示了如何批量获取header

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

除了上述方法外,还可以通过List of Message<?>来批量接收消息,但是,除了AcknowledgmentConsumer<?, ?>外,List of Message<?>应该是方法中唯一的参数。如下示例展示了如何使用:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在上述示例中并不会对payload执行任何转换。

如果当前存在BatchMessagingMessageConverterBatchMessagingMessageConverter通过RecordMessageConverter配置),则可以为Message<?>添加一个泛型类型payload将会被转化为该类型。

除了上述两种方式外,还可以接收List<ConsumerRecord<?, ?>>类型的参数但是其必须为listener method的唯一参数Acknowledgment类型和Consumer<?, ?>类型参数除外)。如下实例展示了如何使用:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从2.2版本开始listener可以接收由poll方法返回的完整的ConsumerRecords<?, ?>对象该record中封装了一个record的list。从而允许listener访问额外的方法例如partitions()方法返回TopicPartition对象的集合。方法的使用示例如下

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

注解属性

从2.0版本开始,id属性被用作kafka consumer group的group.id属性其会覆盖consumer factory中配置的属性。如果不想使用该行为可以显式的另外设置groupId属性;或是将idIsGroup属性设置为false此时group.id仍然会使用consumer factory中配置的属性。

对于@KafkaListener注解可以在绝大多数属性中使用spel表达式和placeholder

@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")

从2.1.2版本开始spel表达式支持__listener。该伪bean name代表当前该注解所位于的bean。

例如如下示例:

@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}

public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}

如果在项目中,已经存在了名为__listener的bean实例那么可以通过beanRef属性来对属性表达式进行修改。如下示例展示了如何使用beanRef

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")

从版本2.2.4开始,可以直接通过@KafkaListener注解指定consumer属性通过注解指定的consumer属性会覆盖在consumer factory中指定的属性。

但是,无法通过@KafkaListener注解的上述方法来指定group.idclient.id属性,通过上述方法指定这两个属性时,指定会被忽略;需要使用groupIdclientIdPrefix注解属性来指定consumer属性中的group.idclient.id属性

在通过@KafkaListener来指定consumer属性时通过独立的字符串按照foo:barfoo barfoo=bar的格式进行指定,示例如下所示:

@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}

获取consumer的group.id

在不同container中运行相同listener代码时需要识别该消息来源于哪个container通过group.id标识)。

在需要获取container的groupId时可以在listener线程中调用KafkaUtils.getConsumerGroupId()方法;也可以在方法参数中访问group.id属性:

@KafkaListener(id = "id", topicPattern = "someTopic")
public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) String groupId) {
    ...
}

container线程命名

框架通过一个TaskExecutor来调用consumer和listener。可以在container properties中通过设置consumerExecutor属性来设置一个自定义的executor。当使用pooled executor时需要确保线程池中含有足够的线程来处理来自所有container的并发。当使用ConcurrentMessageListenerContainerexecutor中的线程会被所有consumerconcurrency使用。

如果没有为container提供consumer executor那么会为每个container提供SimpleAsyncTaskExecutor

SimpleAsyncTaskExecutor会为所有的task都开启一个新线程故而该类型的executor并不会对线程进行重用。

SimpleAsyncTaskExecutor在创建线程时executor创建线程的名称按照<beanname>-C-<n>的格式。对于ConcurrentMessageListenerContainer<beanname>部分将会被换成<beanname>-mm代表消费者实例。当每次container启动时<n>都会增加。

故而对于name为container的bean对象其executor中的线程名称在container第一次启动后为container-0-C-1container-1-C-1n均为1m为0和1.当containerr再次启动时线程名称则是会变为container-0-C-2container-1-C-2 etc

从3.0.1版本开始,可以修改线程的名称。如果将AbstractMessageListenerContainer.changeConsumerThreadName属性设置为true将会调用AbstractMessageListenerContainer.threadNameSupplier来获取线程名称。

@KafkaListener用作元注解

从2.2版本开始,可以将@KafkaListener用作元注解如下展示了使用示例

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id();

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics();

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";

}

在将@KafkaListener用作元注解时必须在自定义注解中针对topicstopicPatterntopicPartitions中的至少一个进行alias操作。

如下展示了如何使用自定义注解:

@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
    ...
}

在类级别使用@KafkaListener

当在类级别使用@KafkaListener时必须要在方法级别指定@KafkaHandler。当消息被传递到时消息payload转化为的类型将会用于决定调用哪个handler方法。示例如下所示

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从2.1.3版本开始,可以指定一个@KafkaHandler方法作为默认方法当类型匹配不到其他handler时默认handler方法将会被调用。并且最多只有有一个方法被指定为默认handler方法。

当使用@KafkaHandler时必须要求payload已经被转化为目标对象类型如此匹配才能进行。

基于spring解析方法参数的限制default kafkahandler无法接收header中的单条内容其只能通过ConsumerRecordMetadata类型来获取header中的内容。

例如如下方式在Object类型为String时不起作用

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果object类型为String那么topic也会指向object而不会获取到header中的topic内容。

如果要在default handler method中访问元数据需要按如下方式

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}

@KafkaListener属性修改

从2.7.2版本开始可以在container创建之前通过编码的方式来修改注解属性。为了实现改功能可以在spring context中添加一个或者多个KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancerAnnotationEnhancer是个BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>类型的二元函数返回一个Map<String,Object>类型的值。属性值可能会包含spel表达式或者properties placeholder而enhancer则是在spel表达式和属性占位符被解析之前调用。如果当前spring容器存在多个enhancer且enhancer实现了Ordered接口那么enhancer将会按照顺序被调用

@Bean
public static AnnotationEnhancer groupIdEnhancer() {
    return (attrs, element) -> {
        attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
                ? ((Class<?>) element).getSimpleName()
                : ((Method) element).getDeclaringClass().getSimpleName()
                        +  "." + ((Method) element).getName()));
        return attrs;
    };
}

AnnotationEnhancerbean定义必须要被声明为static该bean在spring context生命周期中非常早期的时间点被需要。

@KafkaListener生命周期管理

为了@KafkaListener被创建的listener container并不是应用context中的bean对象。containers对象被注册在类型为KafkaListenerEndpointRegistry类型的bean对象中该bean对象会自动被spring framework创建并且管理listener container的生命周期。kafkaListenerEndpointRegistry对象会启动所有autoStartup属性设置为true的container。所有container factory都在同一阶段创建所有container。通过registry可以通过编程的方式来管理container的生命周期。对registry执行start或stop操作将会对registry中所有的container都执行start或stop操作。同时也可以通过container的id属性来获取单独的container对象。

可以通过@KafkaListener注解来设置container的autoStartup属性通过注解指定的autoStartup值将会覆盖在container factory中指定的autoStartup属性。

如果要通过registry对象来管理注册的container可以通过bean注入的方式来获取registry

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }

/**
 * 通过bean注入来获取registry
 * 
 **/
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

regisry只维护其管理container的生命周期如果以bean形式声明的container其并不由registry进行管理而是可以从spring容器中获取。可以调用registry对象的getListenerContainers方法来获取其管理的container集合。

从2.2.5版本开始registry新增了一个新的方法getAllListenerContainers()通过该方法可以获取所有的container集合集合中包括由registry管理的container和以bean对象形式生命的container。该返回集合中将会包含任何prototype的已初始化bean对象但是集合中不会对懒加载的bean对象进行加载操作。

endpoint将会在spring容器被refreshed之后被注册到registry中并且endpoint将会立马被启动不论其autoStartup属性值是什么。

@KafkaListener @Payload校验

从2.2版本开始,可以更加方便的为@KafkaListener @Payload参数添加validator。在之前的版本中必须要自定义配置一个DefaultMessageHandlerMethodFactory并且将其添加到registrar中。现在可以为registrar中添加validator。如下代码展示了使用示例

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }

}

当使用spring boot并且存在有validation的starter时LocalValidatorFactoryBean将会被自动装配,装配逻辑和如下代码类似:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

如下代码示例展示了如果通过validator校验payload

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

从3.1版本开始,可以在ErrorHandlingDeserializer上执行validation操作。

Rebalancing Listener

ContainerProperties中存在一个consumerRebalanceListener属性,该属性会接收一个ConsumerRebalanceListener接口的实现。如果该属性没有被提供那么container将会自动装配一个logging listener该listener将会将rebalance event打印到日志中日志级别为info。spring kafka框架还提供了了一个ConsumerAwareRebalanceListener接口,如下展示了接口的定义:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

ConsumerRebalanceListener接口不同的是,ConsumerAwareRebalanceListener针对revoke存在两个回调方法beforeCommitafterCommit。第一次将会马上被调用第二次则是会在所有阻塞的offset都提交后再调用。如果想要再外部系统中维护offset这将非常有用

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});

强制触发consumer rebalance

kafka client目前支持触发enforced rebalance。从3.1.2版本开始spring kafka支持通过message listener container来调用kafka consumer的enforce rebalance api。当调用该api时其会提醒kafka consumer触发一个enforced rebalance实际rebalance将会作为下一次poll操作的一部分。当正在发生rebalance时调用该api将不会触发任何操作。调用方必须等待当前rebalance操作完成之后再调用触发另一个rebalance。

如下示例展示了如何触发一个enforced rebalance

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

再上述代码中,应用通过使用KafkaListenerEndpointRegistry来访问message listener container并调用MessageListenerContainer的enforceRebalance方法。当调用container的enforceRebalance方法时其会委托调用底层consumer的enforceRebalance方法。而consumer则是会在下次poll操作时触发rebalance。

通过@SendTo注解发送listener结果

从2.0版本开始,如果将@SendTo和@KafkaListener一起使用并且被注解的方法存在返回值那么方法的返回值将会被发送给@SendTo注解中指定的topic。

@SendTo值可以存在如下几种格式:

  • @SendTo("someTopic"):返回值将会被发送给someTopic主题
  • @SendTo("#{someExpression}")返回值将会被发送到表达式计算出的topic表达式将会在应用上下文初始化时被计算
  • @SendTo("!{someExpression}")返回值将会被发送到表达式计算出的topic表达式将会在运行时被计算,并且表达式的#root`对象有3个属性
    • request该方法接收到的ConsumerRecord在batch listener时request代表ConsumerRecords
    • sourcerequest转化为的org.springframework.messaging.Message<?>
    • result:该方法的返回值
  • @SendTo:当没有为@SendTo注解指定value时value默认会被当做!{source.headers['kafka_replyTopic']}

从2.1.11和2.2.1开始属性占位符property placeholder也会被@SendTo解析。

为@SendTo注解指定的表达式其返回值必须为一个String该String代表topic name。如下展示了使用@SendTo的不同方式

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

为了支持使用@SendTolistener container factory必须被提供KafkaTemplate通过replyTemplate属性指定该template将会被用于发送消息。当使用spring boot时会自动将template注入到container factory中。

从2.2版本开始可以向listener container factory中添加ReplyHeadersConfigurer通过其可以设置接收消息中的哪些header可以被拷贝到返回消息的header中使用示例如下所示

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果在拷贝header外还想为reply message指定额外的header可以通过如下方式来实现

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

在additionHeaders方法返回的map中如果有key和已经存在的header key重复那么map中的key-value将会覆盖现存header

在使用@SendTo时必须为ConcurrentKafkaListenerContainerFactory配置一个KafkaTemplate,需要配置的属性为replyTemplate。spring boot将会将自动装配的template注入到replyTemplate属性中。

可以将@SendTo添加到无返回值的方法上通过指定errorHandler属性当且仅当@SendTo注解的方法发生异常时将异常消息作为messaage发送给特定topic使用示例如下

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}