Files
rikako-note/mq/kafka/Spring for Apache Kafka.md

30 KiB
Raw Blame History

Spring for Apache Kafka

Introduce

依赖

可以通过如下方式来添加Kafka的依赖包

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Consumer Demo

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

application.properties文件配置示例如下

spring.kafka.consumer.auto-offset-reset=earliest

NewTopic的bean对象会导致broker中创建该topic如果该topic已经存在则该topic不需要。

Producer Demo

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }

}

Reference

Using Spring for Apache Kafka

连接到Kafka

  • Kafka Admin
  • ProducerFactory
  • ConsumerFactory

上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers同Kafka集群建立连接的ip:port通过setBootstrapServersSupplier(() → …​)向其传递一个Supplier<String>传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。
消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用DefaultKafkaProducerFactoryreset()方法;如果要关闭现存的所有消费者,可以调用KafkaListenerEndpointRegistrystop()方法调用stop方法之后再调用start方法或者再任何其他的listener容器bean中调用stop和start方法。
为了方便,框架提供了ABSwitchCluster来支持两套bootstrap servers集合再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时可以调用primary()方法或secondary()方法并且在producer factory上调用reset方法用于建立新的连接对于消费者在所有listener容器中调用stopstart方法。当使用@KafkaListener注解时,在KafkaListenerEndpointRegistry上调用stop和start方法。

Factory Listener

DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory可以配置Listener来接收通知当生产者或消费者被创建或关闭时Listener接收到提醒。

// Producer Factory Listener
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
// Consumer Factory Listener
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在上述场景中,id创建时都是将client-id添加到factory bean-name之后之间用.隔开。

设置Topic

如果你定义了一个KafkaAdmin的bean对象那么其就能够自动添加topic到broker中。为了创建topic可以为每个topic在应用容器中创建一个NewTopicbean对象。可以通过TopicBuilder工具类来使创建topic bean对象的过程更加简单。
创建Topic的示例如下所示

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, Arrays.asList(0, 1))
            .assignReplicas(1, Arrays.asList(1, 2))
            .assignReplicas(2, Arrays.asList(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

可以省略partitions()方法和replicas()方法在省略的情况下broker默认设置值将会被采用

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

可以定义一个NewTopics类型的bean对象NewTopics对象中包含多个NewTopic对象

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}

当使用spring boot时KafkaAdmin对象是自动注册的故而只需要注册NewTopic类型的bean对象即可。

默认情况下如果broker不可访问那么只会打出日志信息会继续加载上下文。如果想要将这种情况标记为fatal需要设置admin的fatalIfBrokerNotAvailable属性为true设置完属性后如果broker不可访问上下文加载失败。
KafkaAdmin提供了方法在运行时动态创建topic

  • createOrModifyTopics
  • describeTopics

如果想要使用更多特性可以直接使用AdminClient

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

Sending Messages

KafkaTemplate

KafkaTemplate类包装了生产者并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

sendDefault方法需要一个提供给KafkaTemplate的默认Topic。
上述API会接收一个时间戳参数如果时间戳参数没有指定则自动产生一个时间戳并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为CREATE_TIME那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为LOG_APPEND_TIME那么用户指定的时间戳将会被忽略并且由broker进行添加broker会添加时间戳为local broker timemetrics方法和partitionsFor方法将会被委托给Producer中的同名方法execute方法则是会提供对底层Producer的直接访问。
如果要使用KafkaTemplate需要配置一个Producer并且将其传递给KafkaTemplate的构造函数

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

可以通过如下方式覆盖factory的ProducerConfig根据相同的factory创建不同Producer config的template对象

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

一个类型为ProducerFactory<?, ?>的bean对象可以被任何泛型类型引用。该bean对象已经被spring boot自动配置
当使用带Message<?>参数的方法时topic、partition、key information都会在message header中被提供

  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION
  • KafkaHeaders.KEY
  • KafkaHeaders.TIMESTAMP

该消息的载荷是data。
可以为KafkaTemplate配置ProducerListener来为发送的结果成功或失败提供一个异步回调而不是通过CompletableFuture。如下是ProducerListener接口的定义:

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下template配置了一个LoggingProducerListener,会打印发送失败到日志,但是发送成功时不会做任何事。
为了方便默认的方法实现已经在接口中提供了提供了如果只需要覆盖一个方法只需要对onSuccess或onError进行Override即可。
发送消息会返回一个CompletableFuture<SendResult>对象。可以为异步操作注册一个listener

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult拥有两个属性ProducerRecord和RecordMetaData。
Throwable可以被转化为KafkaProducerException该异常的failedProducerRecord可以包含失败的record。
当为producer config设置了linger.ms时,如果待发送的消息没有达到batch.size会延迟发送消息等待更多的消息出现并被批量发送。默认情况下linger.ms为0不会有延迟但是如果linger.ms有值那么在发送消息之后如果希望消息立马发送需要手动调用flush方法。
如下示例展示了如何通过kafkaTemplate向broker发送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}

// Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

上述示例中ex为KafkaProducerException类型并且有failedProducerRecord属性。

RoutingKafkaTemplate

可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。

routing template不支持事务、execute、flush、metrics等操作因为这些操作不知道topic名称

routing template需要一个key和value分别为java.util.regex.PatternProducerFactory<Object, Object>的map实例。该map应该是有序的因为map会按顺序遍历例如LinkedHashMap集合。并且应该在map最开始的时候指定更加具体的pattern。
如下示例会展示如何通过一个template向不同的topic发送消息并且值的序列化会采用不同的序列化器

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}
DefaultKafkaProducerFactory

在上述template的使用示例中创建一个KafkaTemplate需要使用ProducerFactory。
当不使用transaction时默认情况下DefaultKafkaProducerFactory创建一个单例producer单例producer由所有客户端使用。但是如果在template上调用了flush方法可能会造成其使用同一producer的其他线程的延迟。

DefaultKafkaProducerFactory有一个属性producerPerThread当该属性被设置为true时Kafka会对每一个线程都创建一个producer以此来避免这个问题。

当producerPerThread被设置为true时若producer不再需要用户代码必须在factory上调用closeThreadBoundProducer方法这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer

当创建DefaultKafkaProducerFactory时key和value的serializer class可以通过配置来制定配置的信息会通过接收一个Map的构造函数传递给DefaultKafkaProducerFactory。
serializer实例也可以作为参数传递给DefaultKafkaProducerFactory构造函数此时所有生产者都会共享同一个serializer实例。 可选的,也可以提供一个Supplier<Serializer>给构造函数此时每个生产者都会调用该Supplier获取一个独立的Serializer。

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

可以在factory创建之后对producer properties进行更新。这些更新并不会影响现存的生产者可以调用reset方法来关闭所有现存的生产者新的生产者会根据新配置项来创建。

但是无法将事务的producer factory修改为非事务的反之亦然无法将非事务的producer修改为事物的

目前提供如下两个方法对producer properties进行更新

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);
ReplyingKafkaTemplate

ReplyingKafkaTemplate作为KafkaTemplate的子类提供了请求、回复的语义。相对于KafkaTemplateReplyingKafkaTemplate具有两个额外的方法

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

方法的返回结果是一个CompletableFuture实际结果以异步的方式填充到其中。结果含有一个sendFuture属性是调用kafkaTemplate.send方法的结果。可以用该future对象来获知send操作的返回结果。
如果使用第一个方法或是replyTimeout传递参数为null那么会使用默认的replyTimeout默认值为5s。
该template含有一个新的方法waitForAssignment如果reply container通过auto.offset.reset=latest来进行配置时可以避免发送了一个请求并且结果被返回但是container还尚未被初始化。
如下实例展示了如何使用ReplyingKafkaTemplate

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

上述通过spring boot自动配置的container factory来创建了一个reply container。
ReplyingKafkaTemplate会设置一个name为KafkaHeaders.CORRELATION_ID的header并且该header必须被server端消费者端返回。
在这种情况下,如果@KafkaListener应用会返回:

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

上述@KafkaListener结构会回应correlation id并且决定reply topic。template会使用默认header KafKaHeaders.REPLY_TOPIC来告知消费者应该将回复发送到哪个topic中。
template会根据配置的reply container来探知reply topic或是分区。如果容器被配置监听单个topic或是单个TopicPartitionOffset会将监听的topic或是分区设置到reply header中。如果容器通过其他方式配置如监听多个topic那么用户必须显式设置reply header。
如下展示了用户如何设置KafkaHeaders.REPLY_TOPIC

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

如果只设置了一个reply TopicPartitionOffset如果每个实例监听一个不同的分区那么可以多个template共用一个reply topic。
如果只配置了一个reply topic每个实例必须要有不同的group.id。在这种情况下每个实例都会接收到每个请求但是只有发送请求的实例能够找到correlation id。这种情况能够自动扩容但是会带来额外的网络负载每个实例接收到不想要消息时的丢弃操作也会带来开销。在这种情况下推荐将template的sharedReplyTopic设置为true将非预期reply的日志级别从info降低为debug。
如下为配置一个shared reply topic容器的示例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}

如果没有按照上述的方法设置template那么当存在多个template实例时每个实例都需要一个确定的reply topic。
一个可选的替代方案是显式设置KafkaHeaders.REPLY_PARTITION并且对每个实例使用特定的分区。此时server必须显式使用该header将reply路由到正确的分区中@KafkaListener会做。在这种情况下reply container必须不使用kafka group特性并且被配置监听一个固定的分区。

默认情况下会使用3个header

  • KafkaHeaders.CORRELATION_ID用于将reply关联到请求
  • KafkaHeaders.REPLY_TOPIC用于告知server写入哪个topic
  • KafkaHeaders.REPLY_PARTITION该header是可选的用于告知reply会写入到哪个分区

上述的header会被@KafkaListener使用用于路由reply。

Receiving Messages

可以通过配置MessageListenerContainer并提供一个message listener来接收消息也可以通过使用@KafkaListener注解来监听消息。
当使用MessageListenerContainer时可以提供一个listener来接收数据。message listener目前有如下接口

// 该接口用于处理单独的ConsumerRecord实例该实例从Kafka消费者的poll
// 操作获取,当使用自动提交或一个由容器管理的提交方法
public interface MessageListener<K, V> { (1)

    void onMessage(ConsumerRecord<K, V> data);

}

// 该接口用于处理单独的ConsumerRecord实例该实例从Kafka消费者的poll
// 操作获取,当使用一个手动提交方法
public interface AcknowledgingMessageListener<K, V> { (2)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

// 该接口和1类似但是该接口可以访问提供的consumer对象
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

// 该接口和2类似但是该接口可以访问提供的consumer对象
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

// 该接口用于处理所有由kafka消费者的poll操作接收的ConsumerRecord当使用自动
// 提交或容器管理的提交方法时。当使用该接口时,不支持`ACKMODE.RECORD`,该传递
// 给该listener的batch数据是已完成的
public interface BatchMessageListener<K, V> { (5)

    void onMessage(List<ConsumerRecord<K, V>> data);

}

// 该接口用于处理所有由kafka消费者的poll操作接收到的ConsumerRecord当使用手动
// 提交时
public interface BatchAcknowledgingMessageListener<K, V> { (6)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

// 该接口和5类似但是可以访问consumer对象
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

// 该接口和6类似但是可以访问consumer对象
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

Consumer对象并不是线程安全的故而只能够在调用该listener的线程中调用consumer的方法

Message Listener Container

为MessageListenerContainer提供了两个实现

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例从而提供多线程的消费。
可以添加一个RecordInterceptor到ListenerContainer在调用listener之前interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null那么listener则不会被调用。而且其还有一个额外的方法可以在listener退出之后被调用退出之后指正常退出或以抛出异常的形式退出
还有一个BatchInterceptor提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。
如果想要调用多个Interceptor则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
默认情况下当使用事务时interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。
ConcurrentMessageListenerContainer支持“静态成员”即固定消费者即使消费者实例重启如此可以降低事件在消费者之间重新负载均衡的开销当并发量大于1时。group.instance.id的后缀为-n其中n从1开始。

可以通过如下构造函数来使用MessageListenerContainer

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

该构造方法接收一个ConsumerFactory并通过ContainerProperties实例来接收其他配置。ContainerProperties拥有如下的构造函数

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)
  • 第一个构造方法接收一个TopicPartitionOffset的数组作为参数来显式指定container使用哪些分区通过consumer的assign方法并可以附带一个可选的初始offset。默认情况下如果offset为正值代表其是绝对的offset。若offset为负值则offset代表在默认分区中相对于current last offset的相对位置。并且对于TopicPartitionOffset类其提供了一个接收额外boolean参数的构造方法如果该值设置为true无论init offset为正值或者负值都是相对于consumer当前位置的相对值。当容器启动时offset将会被使用。