Files
rikako-note/mq/kafka/Spring for Apache Kafka.md

701 lines
34 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spring for Apache Kafka
## Introduce
### 依赖
可以通过如下方式来添加Kafka的依赖包
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
### Consumer Demo
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
```
application.properties文件配置示例如下
```properties
spring.kafka.consumer.auto-offset-reset=earliest
```
NewTopic的bean对象会导致broker中创建该topic如果该topic已经存在则该topic不需要。
### Producer Demo
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
```
## Reference
### Using Spring for Apache Kafka
#### 连接到Kafka
- Kafka Admin
- ProducerFactory
- ConsumerFactory
上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers同Kafka集群建立连接的ip:port通过`setBootstrapServersSupplier(() → …​)`向其传递一个`Supplier<String>`传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。
消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用`DefaultKafkaProducerFactory``reset()`方法;如果要关闭现存的所有消费者,可以调用`KafkaListenerEndpointRegistry``stop()`方法调用stop方法之后再调用start方法或者再任何其他的listener容器bean中调用stop和start方法。
为了方便,框架提供了`ABSwitchCluster`来支持两套bootstrap servers集合再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时可以调用`primary()`方法或`secondary()`方法并且在producer factory上调用`reset`方法用于建立新的连接对于消费者在所有listener容器中调用`stop``start`方法。当使用`@KafkaListener`注解时,在`KafkaListenerEndpointRegistry`上调用stop和start方法。
##### Factory Listener
`DefaultKafkaProducerFactory``DefaultKafkaConsumerFactory`可以配置`Listener`来接收通知当生产者或消费者被创建或关闭时Listener接收到提醒。
```java
// Producer Factory Listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
```
```java
// Consumer Factory Listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
```
在上述场景中,`id`创建时都是将`client-id`添加到factory bean-name之后之间用`.`隔开。
#### 设置Topic
如果你定义了一个`KafkaAdmin`的bean对象那么其就能够自动添加topic到broker中。为了创建topic可以为每个topic在应用容器中创建一个`NewTopic`bean对象。可以通过`TopicBuilder`工具类来使创建topic bean对象的过程更加简单。
创建Topic的示例如下所示
```java
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
```
可以省略partitions()方法和replicas()方法在省略的情况下broker默认设置值将会被采用
```java
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
```
可以定义一个NewTopics类型的bean对象NewTopics对象中包含多个NewTopic对象
```java
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
```
> 当使用spring boot时KafkaAdmin对象是自动注册的故而只需要注册NewTopic类型的bean对象即可。
默认情况下如果broker不可访问那么只会打出日志信息会继续加载上下文。如果想要将这种情况标记为fatal需要设置admin的`fatalIfBrokerNotAvailable`属性为true设置完属性后如果broker不可访问上下文加载失败。
KafkaAdmin提供了方法在运行时动态创建topic
- `createOrModifyTopics`
- `describeTopics`
如果想要使用更多特性可以直接使用AdminClient
```java
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
```
#### Sending Messages
##### KafkaTemplate
`KafkaTemplate`类包装了生产者并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法
```java
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
```
`sendDefault`方法需要一个提供给KafkaTemplate的默认Topic。
上述API会接收一个时间戳参数如果时间戳参数没有指定则自动产生一个时间戳并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为`CREATE_TIME`那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为`LOG_APPEND_TIME`那么用户指定的时间戳将会被忽略并且由broker进行添加broker会添加时间戳为local broker time
`metrics`方法和`partitionsFor`方法将会被委托给Producer中的同名方法`execute`方法则是会提供对底层Producer的直接访问。
如果要使用KafkaTemplate需要配置一个Producer并且将其传递给KafkaTemplate的构造函数
```java
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
```
可以通过如下方式覆盖factory的ProducerConfig根据相同的factory创建不同Producer config的template对象
```java
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
```
一个类型为`ProducerFactory<?, ?>`的bean对象可以被任何泛型类型引用。该bean对象已经被spring boot自动配置
当使用带`Message<?>`参数的方法时topic、partition、key information都会在message header中被提供
- KafkaHeaders.TOPIC
- KafkaHeaders.PARTITION
- KafkaHeaders.KEY
- KafkaHeaders.TIMESTAMP
该消息的载荷是data。
可以为KafkaTemplate配置ProducerListener来为发送的结果成功或失败提供一个异步回调而不是通过CompletableFuture。如下是`ProducerListener`接口的定义:
```java
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
```
默认情况下template配置了一个`LoggingProducerListener`,会打印发送失败到日志,但是发送成功时不会做任何事。
为了方便默认的方法实现已经在接口中提供了提供了如果只需要覆盖一个方法只需要对onSuccess或onError进行Override即可。
发送消息会返回一个`CompletableFuture<SendResult>`对象。可以为异步操作注册一个listener
```java
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
```
SendResult拥有两个属性ProducerRecord和RecordMetaData。
Throwable可以被转化为KafkaProducerException该异常的failedProducerRecord可以包含失败的record。
当为producer config设置了`linger.ms`时,如果待发送的消息没有达到`batch.size`会延迟发送消息等待更多的消息出现并被批量发送。默认情况下linger.ms为0不会有延迟但是如果linger.ms有值那么在发送消息之后如果希望消息立马发送需要手动调用flush方法。
如下示例展示了如何通过kafkaTemplate向broker发送消息
```java
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
// Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
```
上述示例中ex为KafkaProducerException类型并且有failedProducerRecord属性。
##### RoutingKafkaTemplate
可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。
> routing template不支持事务、execute、flush、metrics等操作因为这些操作不知道topic名称
routing template需要一个key和value分别为`java.util.regex.Pattern``ProducerFactory<Object, Object>`的map实例。该map应该是有序的因为map会按顺序遍历例如LinkedHashMap集合。并且应该在map最开始的时候指定更加具体的pattern。
如下示例会展示如何通过一个template向不同的topic发送消息并且值的序列化会采用不同的序列化器
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
```
##### DefaultKafkaProducerFactory
在上述template的使用示例中创建一个KafkaTemplate需要使用ProducerFactory。
当不使用transaction时默认情况下DefaultKafkaProducerFactory创建一个单例producer单例producer由所有客户端使用。但是如果在template上调用了flush方法可能会造成其使用同一producer的其他线程的延迟。
> DefaultKafkaProducerFactory有一个属性producerPerThread当该属性被设置为true时Kafka会对每一个线程都创建一个producer以此来避免这个问题。
> 当producerPerThread被设置为true时若producer不再需要用户代码必须在factory上调用closeThreadBoundProducer方法这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer
当创建DefaultKafkaProducerFactory时key和value的serializer class可以通过配置来制定配置的信息会通过接收一个Map的构造函数传递给DefaultKafkaProducerFactory。
serializer实例也可以作为参数传递给DefaultKafkaProducerFactory构造函数此时所有生产者都会共享同一个serializer实例。 可选的,也可以提供一个`Supplier<Serializer>`给构造函数此时每个生产者都会调用该Supplier获取一个独立的Serializer。
```java
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
```
可以在factory创建之后对producer properties进行更新。这些更新并不会影响现存的生产者可以调用reset方法来关闭所有现存的生产者新的生产者会根据新配置项来创建。
> 但是无法将事务的producer factory修改为非事务的反之亦然无法将非事务的producer修改为事物的
目前提供如下两个方法对producer properties进行更新
```java
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
```
##### ReplyingKafkaTemplate
ReplyingKafkaTemplate作为KafkaTemplate的子类提供了请求、回复的语义。相对于KafkaTemplateReplyingKafkaTemplate具有两个额外的方法
```java
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
```
方法的返回结果是一个CompletableFuture实际结果以异步的方式填充到其中。结果含有一个sendFuture属性是调用kafkaTemplate.send方法的结果。可以用该future对象来获知send操作的返回结果。
如果使用第一个方法或是replyTimeout传递参数为null那么会使用默认的replyTimeout默认值为5s。
该template含有一个新的方法`waitForAssignment`如果reply container通过`auto.offset.reset=latest`来进行配置时可以避免发送了一个请求并且结果被返回但是container还尚未被初始化。
如下实例展示了如何使用ReplyingKafkaTemplate
```java
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
```
上述通过spring boot自动配置的container factory来创建了一个reply container。
ReplyingKafkaTemplate会设置一个name为KafkaHeaders.CORRELATION_ID的header并且该header必须被server端消费者端返回。
在这种情况下,如果`@KafkaListener`应用会返回:
```java
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
```
上述`@KafkaListener`结构会回应correlation id并且决定reply topic。template会使用默认header `KafKaHeaders.REPLY_TOPIC`来告知消费者应该将回复发送到哪个topic中。
template会根据配置的reply container来探知reply topic或是分区。如果容器被配置监听单个topic或是单个TopicPartitionOffset会将监听的topic或是分区设置到reply header中。如果容器通过其他方式配置如监听多个topic那么用户必须显式设置reply header。
如下展示了用户如何设置`KafkaHeaders.REPLY_TOPIC`
```java
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
```
如果只设置了一个reply TopicPartitionOffset如果每个实例监听一个不同的分区那么可以多个template共用一个reply topic。
如果只配置了一个reply topic每个实例必须要有不同的`group.id`。在这种情况下每个实例都会接收到每个请求但是只有发送请求的实例能够找到correlation id。这种情况能够自动扩容但是会带来额外的网络负载每个实例接收到不想要消息时的丢弃操作也会带来开销。在这种情况下推荐将template的sharedReplyTopic设置为true将非预期reply的日志级别从info降低为debug。
如下为配置一个shared reply topic容器的示例
```java
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
```
> 如果没有按照上述的方法设置template那么当存在多个template实例时每个实例都需要一个确定的reply topic。
> 一个可选的替代方案是显式设置`KafkaHeaders.REPLY_PARTITION`并且对每个实例使用特定的分区。此时server必须显式使用该header将reply路由到正确的分区中@KafkaListener会做。在这种情况下reply container必须不使用kafka group特性并且被配置监听一个固定的分区。
默认情况下会使用3个header
- KafkaHeaders.CORRELATION_ID用于将reply关联到请求
- KafkaHeaders.REPLY_TOPIC用于告知server写入哪个topic
- KafkaHeaders.REPLY_PARTITION该header是可选的用于告知reply会写入到哪个分区
上述的header会被@KafkaListener使用用于路由reply。
#### Receiving Messages
可以通过配置MessageListenerContainer并提供一个message listener来接收消息也可以通过使用@KafkaListener注解来监听消息
当使用MessageListenerContainer时可以提供一个listener来接收数据。message listener目前有如下接口
```java
// 该接口用于处理单独的ConsumerRecord实例该实例从Kafka消费者的poll
// 操作获取,当使用自动提交或一个由容器管理的提交方法
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
// 该接口用于处理单独的ConsumerRecord实例该实例从Kafka消费者的poll
// 操作获取,当使用一个手动提交方法
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// 该接口和1类似但是该接口可以访问提供的consumer对象
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// 该接口和2类似但是该接口可以访问提供的consumer对象
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// 该接口用于处理所有由kafka消费者的poll操作接收的ConsumerRecord当使用自动
// 提交或容器管理的提交方法时。当使用该接口时,不支持`ACKMODE.RECORD`,该传递
// 给该listener的batch数据是已完成的
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
// 该接口用于处理所有由kafka消费者的poll操作接收到的ConsumerRecord当使用手动
// 提交时
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
// 该接口和5类似但是可以访问consumer对象
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
// 该接口和6类似但是可以访问consumer对象
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
```
> Consumer对象并不是线程安全的故而只能够在调用该listener的线程中调用consumer的方法
##### Message Listener Container
为MessageListenerContainer提供了两个实现
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例从而提供多线程的消费。
**RecordInterceptor**
可以添加一个RecordInterceptor到ListenerContainer在调用listener之前interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null那么listener则不会被调用。而且其还有一个额外的方法可以在listener退出之后被调用退出之后指正常退出或以抛出异常的形式退出
还有一个BatchInterceptor提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。
如果想要调用多个Interceptor则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
默认情况下当使用事务时interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。
ConcurrentMessageListenerContainer支持“静态成员”即固定消费者即使消费者实例重启如此可以降低事件在消费者之间重新负载均衡的开销当并发量大于1时。`group.instance.id`的后缀为`-n`其中n从1开始。
**KafkaMessageListenerContainer**
可以通过如下构造函数来使用MessageListenerContainer
```java
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
```
该构造方法接收一个ConsumerFactory并通过ContainerProperties实例来接收其他配置。ContainerProperties拥有如下的构造函数
```java
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
```
- 第一个构造方法接收一个TopicPartitionOffset的数组作为参数来显式指定container使用哪些分区通过consumer的assign方法并可以附带一个可选的初始offset。默认情况下如果offset为正值代表其是绝对的offset。若offset为负值则offset代表在默认分区中相对于current last offset的相对位置。并且对于TopicPartitionOffset类其提供了一个接收额外boolean参数的构造方法如果该值设置为true无论init offset为正值或者负值都是相对于consumer当前位置的相对值。当容器启动时offset将会被使用。
- 第二个构造方法接收一个topic数组并且kafka根据`group.id`属性来分配分区在group中对分区进行分配
- 第三个构造方法接收一个正则表达式根据正则表达式来选中topic
想要为容器指定一个MessageListener可以在创建容器时使用ContainerProps.setMessageListener方法。如下是创建容器时设置MessageListener的示例
```java
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
`missingTopicsFatal`可以控制topic不存在时container是否启动`missingTopicsFatal`的值默认为false。如果任一topic在broker中不存在那么container的启动会被终止。
**ConcurrentMessageListenerContainer**
`ConcurrentMessageListenerContainer`唯一的构造方法和`KafkaListenerContainer`类似,如下显示了构造方法的签名:
```java
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
```
ConcurrentMessageListenerContainer还有一个concurrency属性例如`container.setConcurrency(3)`会创建3个`KafkaMessageListenerContainer`实例。
kafka会根据其组管理功能再消费者之间分配分区。
> **kafka分配策略**
> 当监听多个topic时默认的分区分配策略可能并不是想要的。例如有三个topic每个topic有5个分区那么将concurrency设置为15时只会看到5个活跃的consumer
> **RangeAssignor**
> 默认情况下kafka的分区分配策略是通过RangeAssignor来进行分配的其会将每个topic的分区在消费者group中所有消费者实例之间进行分配如果消费者实例数大于分区数则是会将消费者按字典顺序排序且分配分区给排序靠前的人。
> 故而在RangeAssignor策略下只有字典排序靠前的消费者实例才能在每个topic中分配到一个分区前5个消费者每个实例3个分区后面的10个消费者完全空闲。
> **RoundRobinAssignor**
> 相对于RangeAssignorRoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区在上述情况下15个消费者实例每个都会分配到一个分区。
>
> 想要改变`PartitionAssignor`可以再提供给DefaultKafkaConsumerFactory的参数中设置`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`属性。
> 如果使用Spring Boot可以按如下方式来配置
> ```properties
> spring.kafka.consumer.properties.partition.assignment.strategy=
> org.apache.kafka.clients.consumer.RoundRobinAssignor
> ```
如果container属性通过`TopicPartitionOffset`配置,`ConcurrentMessageListenerContainer`将会把TopicPartitionOffset分发给所有其委托的KafkaMessageListenerContainer实例。
> 如果提供了六个TopicPartitionOffset实例并且concurrency被设置为3那么每个实例都会获取到2个分区。
> 如果5个TopicPartitionOffset实例被提供且concurrency被设置为2那么两个实例会获取到2个分区一个实例会获取到一个分区。
> 如果concurrency比TopicPartitionOffset的数量更大那么concurrency会向下进行调整让每个实例都获取一个分区。
**Committing Offsets**
如果消费者属性`enable.auto.commit`被设置为trueKafka会对offset进行自动提交。如果该选项被设置为falsecontainer支持一些`AckMode`设置。默认的`AckMode`值为`Batch``enable.auto.commit`被默认设置为false如果想要开启自动提交可以手动将该属性设置为true。
消费者的poll方法会返回一个或者多个ConsumerRecord。MessageListener对于每条record都会被调用。
根据`AckMode`的值container会执行如下操作
- RECORD当listener处理完record返回之后会提交Offset
- BATCH当poll方法返回的所有record都被处理之后提交Offset
- TIME当poll方法返回的所有record都被处理之后提交Offset或者自上次提交之后已经超过了ackTime也会提交Offset
- COUNT当poll方法返回的所有record都被处理之后提交Offset或者自从上次提交之后已经接收了超过ackCount条record也会被提交
- COUNT_TIME和TIME和COUNT类似但是只要TIME和COUNT任一满足也会提交Offset
- MANNUALmessage listener负责调用`Acknowledgment.acknowledge()`方法。除此之外和BATCH语义相同
- MANNUAL_IMMEDIATE当listener调用`Acknowledgment.acknowledge()`之后对Offset进行提交