继续关于Kafka文档的阅读

This commit is contained in:
2023-02-28 16:45:40 +08:00
parent 63a9ed1bca
commit 5400fe07ee

View File

@@ -382,7 +382,246 @@ public class Application {
> DefaultKafkaProducerFactory有一个属性producerPerThread当该属性被设置为true时Kafka会对每一个线程都创建一个producer以此来避免这个问题。 > DefaultKafkaProducerFactory有一个属性producerPerThread当该属性被设置为true时Kafka会对每一个线程都创建一个producer以此来避免这个问题。
> 当producerPerThread被设置为true时若producer不再需要用户代码必须在factory上调用closeThreadBoundProducer方法这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer > 当producerPerThread被设置为true时若producer不再需要用户代码必须在factory上调用closeThreadBoundProducer方法这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer
当创建DefaultKafkaProducerFactory时key和value的serializer class可以通过配置来制定配置的信息会通过接收一个Map的构造函数传递给DefaultKafkaProducerFactory。
serializer实例也可以作为参数传递给DefaultKafkaProducerFactory构造函数此时所有生产者都会共享同一个serializer实例。 可选的,也可以提供一个`Supplier<Serializer>`给构造函数此时每个生产者都会调用该Supplier获取一个独立的Serializer。
```java
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
可以在factory创建之后对producer properties进行更新。 @Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
```
可以在factory创建之后对producer properties进行更新。这些更新并不会影响现存的生产者可以调用reset方法来关闭所有现存的生产者新的生产者会根据新配置项来创建。
> 但是无法将事务的producer factory修改为非事务的反之亦然无法将非事务的producer修改为事物的
目前提供如下两个方法对producer properties进行更新
```java
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
```
##### ReplyingKafkaTemplate
ReplyingKafkaTemplate作为KafkaTemplate的子类提供了请求、回复的语义。相对于KafkaTemplateReplyingKafkaTemplate具有两个额外的方法
```java
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
```
方法的返回结果是一个CompletableFuture实际结果以异步的方式填充到其中。结果含有一个sendFuture属性是调用kafkaTemplate.send方法的结果。可以用该future对象来获知send操作的返回结果。
如果使用第一个方法或是replyTimeout传递参数为null那么会使用默认的replyTimeout默认值为5s。
该template含有一个新的方法`waitForAssignment`如果reply container通过`auto.offset.reset=latest`来进行配置时可以避免发送了一个请求并且结果被返回但是container还尚未被初始化。
如下实例展示了如何使用ReplyingKafkaTemplate
```java
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
```
上述通过spring boot自动配置的container factory来创建了一个reply container。
ReplyingKafkaTemplate会设置一个name为KafkaHeaders.CORRELATION_ID的header并且该header必须被server端消费者端返回。
在这种情况下,如果`@KafkaListener`应用会返回:
```java
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
```
上述`@KafkaListener`结构会回应correlation id并且决定reply topic。template会使用默认header `KafKaHeaders.REPLY_TOPIC`来告知消费者应该将回复发送到哪个topic中。
template会根据配置的reply container来探知reply topic或是分区。如果容器被配置监听单个topic或是单个TopicPartitionOffset会将监听的topic或是分区设置到reply header中。如果容器通过其他方式配置如监听多个topic那么用户必须显式设置reply header。
如下展示了用户如何设置`KafkaHeaders.REPLY_TOPIC`
```java
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
```
如果只设置了一个reply TopicPartitionOffset如果每个实例监听一个不同的分区那么可以多个template共用一个reply topic。
如果只配置了一个reply topic每个实例必须要有不同的`group.id`。在这种情况下每个实例都会接收到每个请求但是只有发送请求的实例能够找到correlation id。这种情况能够自动扩容但是会带来额外的网络负载每个实例接收到不想要消息时的丢弃操作也会带来开销。在这种情况下推荐将template的sharedReplyTopic设置为true将非预期reply的日志级别从info降低为debug。
如下为配置一个shared reply topic容器的示例
```java
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
```
> 如果没有按照上述的方法设置template那么当存在多个template实例时每个实例都需要一个确定的reply topic。
> 一个可选的替代方案是显式设置`KafkaHeaders.REPLY_PARTITION`并且对每个实例使用特定的分区。此时server必须显式使用该header将reply路由到正确的分区中@KafkaListener会做。在这种情况下reply container必须不使用kafka group特性并且被配置监听一个固定的分区。
默认情况下会使用3个header
- KafkaHeaders.CORRELATION_ID用于将reply关联到请求
- KafkaHeaders.REPLY_TOPIC用于告知server写入哪个topic
- KafkaHeaders.REPLY_PARTITION该header是可选的用于告知reply会写入到哪个分区
上述的header会被@KafkaListener使用用于路由reply。
#### Receiving Messages
可以通过配置MessageListenerContainer并提供一个message listener来接收消息也可以通过使用@KafkaListener注解来监听消息
当使用MessageListenerContainer时可以提供一个listener来接收数据。message listener目前有如下接口
```java
// 该接口用于处理单独的ConsumerRecord实例该实例从Kafka消费者的poll
// 操作获取,当使用自动提交或一个由容器管理的提交方法
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
// 该接口用于处理单独的ConsumerRecord实例该实例从Kafka消费者的poll
// 操作获取,当使用一个手动提交方法
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// 该接口和1类似但是该接口可以访问提供的consumer对象
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// 该接口和2类似但是该接口可以访问提供的consumer对象
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// 该接口用于处理所有由kafka消费者的poll操作接收的ConsumerRecord当使用自动
// 提交或容器管理的提交方法时。当使用该接口时,不支持`ACKMODE.RECORD`,该传递
// 给该listener的batch数据是已完成的
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
// 该接口用于处理所有由kafka消费者的poll操作接收到的ConsumerRecord当使用手动
// 提交时
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
// 该接口和5类似但是可以访问consumer对象
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
// 该接口和6类似但是可以访问consumer对象
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
```
> Consumer对象并不是线程安全的故而只能够在调用该listener的线程中调用consumer的方法
##### Message Listener Container
为MessageListenerContainer提供了两个实现
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例从而提供多线程的消费。
可以添加一个RecordInterceptor到ListenerContainer在调用listener之前interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null那么listener则不会被调用。而且其还有一个额外的方法可以在listener退出之后被调用退出之后指正常退出或以抛出异常的形式退出
还有一个BatchInterceptor提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。
如果想要调用多个Interceptor则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
默认情况下当使用事务时interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。
ConcurrentMessageListenerContainer支持“静态成员”即固定消费者即使消费者实例重启如此可以降低事件在消费者之间重新负载均衡的开销当并发量大于1时。`group.instance.id`的后缀为`-n`其中n从1开始。