From 5400fe07eec664d9913206593109545a11d32b1b Mon Sep 17 00:00:00 2001 From: Rikako Wu <496063163@qq.com> Date: Tue, 28 Feb 2023 16:45:40 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=85=B3=E4=BA=8EKafka?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E7=9A=84=E9=98=85=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/Spring for Apache Kafka.md | 241 +++++++++++++++++++++++++++- 1 file changed, 240 insertions(+), 1 deletion(-) diff --git a/mq/kafka/Spring for Apache Kafka.md b/mq/kafka/Spring for Apache Kafka.md index ad4b746..d0307d3 100644 --- a/mq/kafka/Spring for Apache Kafka.md +++ b/mq/kafka/Spring for Apache Kafka.md @@ -382,7 +382,246 @@ public class Application { > DefaultKafkaProducerFactory有一个属性producerPerThread,当该属性被设置为true时,Kafka会对每一个线程都创建一个producer,以此来避免这个问题。 > 当producerPerThread被设置为true时,若producer不再需要,用户代码必须在factory上调用closeThreadBoundProducer方法,这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer + +当创建DefaultKafkaProducerFactory时,key和value的serializer class可以通过配置来制定,配置的信息会通过接收一个Map的构造函数传递给DefaultKafkaProducerFactory。 +serializer实例也可以作为参数传递给DefaultKafkaProducerFactory构造函数,此时所有生产者都会共享同一个serializer实例。 可选的,也可以提供一个`Supplier`给构造函数,此时每个生产者都会调用该Supplier获取一个独立的Serializer。 +```java +@Bean +public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); +} -可以在factory创建之后,对producer properties进行更新。 +@Bean +public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); +} +``` +可以在factory创建之后,对producer properties进行更新。这些更新并不会影响现存的生产者,可以调用reset方法来关闭所有现存的生产者,新的生产者会根据新配置项来创建。 +> 但是,无法将事务的producer factory修改为非事务的,反之亦然,无法将非事务的producer修改为事物的 +目前提供如下两个方法对producer properties进行更新: +```java +void updateConfigs(Map updates); + +void removeConfig(String configKey); +``` +##### ReplyingKafkaTemplate +ReplyingKafkaTemplate作为KafkaTemplate的子类,提供了请求、回复的语义。相对于KafkaTemplate,ReplyingKafkaTemplate具有两个额外的方法: +```java +RequestReplyFuture sendAndReceive(ProducerRecord record); + +RequestReplyFuture sendAndReceive(ProducerRecord record, + Duration replyTimeout); +``` + +方法的返回结果是一个CompletableFuture,实际结果以异步的方式填充到其中。结果含有一个sendFuture属性,是调用kafkaTemplate.send方法的结果。可以用该future对象来获知send操作的返回结果。 +如果使用第一个方法,或是replyTimeout传递参数为null,那么会使用默认的replyTimeout,默认值为5s。 +该template含有一个新的方法`waitForAssignment`,如果reply container通过`auto.offset.reset=latest`来进行配置时,可以避免发送了一个请求并且结果被返回,但是container还尚未被初始化。 +如下实例展示了如何使用ReplyingKafkaTemplate: +```java +@SpringBootApplication +public class KRequestingApplication { + + public static void main(String[] args) { + SpringApplication.run(KRequestingApplication.class, args).close(); + } + + @Bean + public ApplicationRunner runner(ReplyingKafkaTemplate template) { + return args -> { + if (!template.waitForAssignment(Duration.ofSeconds(10))) { + throw new IllegalStateException("Reply container did not initialize"); + } + ProducerRecord record = new ProducerRecord<>("kRequests", "foo"); + RequestReplyFuture replyFuture = template.sendAndReceive(record); + SendResult sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS); + System.out.println("Sent ok: " + sendResult.getRecordMetadata()); + ConsumerRecord consumerRecord = replyFuture.get(10, TimeUnit.SECONDS); + System.out.println("Return value: " + consumerRecord.value()); + }; + } + + @Bean + public ReplyingKafkaTemplate replyingTemplate( + ProducerFactory pf, + ConcurrentMessageListenerContainer repliesContainer) { + + return new ReplyingKafkaTemplate<>(pf, repliesContainer); + } + + @Bean + public ConcurrentMessageListenerContainer repliesContainer( + ConcurrentKafkaListenerContainerFactory containerFactory) { + + ConcurrentMessageListenerContainer repliesContainer = + containerFactory.createContainer("kReplies"); + repliesContainer.getContainerProperties().setGroupId("repliesGroup"); + repliesContainer.setAutoStartup(false); + return repliesContainer; + } + + @Bean + public NewTopic kRequests() { + return TopicBuilder.name("kRequests") + .partitions(10) + .replicas(2) + .build(); + } + + @Bean + public NewTopic kReplies() { + return TopicBuilder.name("kReplies") + .partitions(10) + .replicas(2) + .build(); + } + +} +``` +上述通过spring boot自动配置的container factory来创建了一个reply container。 +ReplyingKafkaTemplate会设置一个name为KafkaHeaders.CORRELATION_ID的header,并且该header必须被server端(消费者端)返回。 +在这种情况下,如果`@KafkaListener`应用会返回: +```java +@SpringBootApplication +public class KReplyingApplication { + + public static void main(String[] args) { + SpringApplication.run(KReplyingApplication.class, args); + } + + @KafkaListener(id="server", topics = "kRequests") + @SendTo // use default replyTo expression + public String listen(String in) { + System.out.println("Server received: " + in); + return in.toUpperCase(); + } + + @Bean + public NewTopic kRequests() { + return TopicBuilder.name("kRequests") + .partitions(10) + .replicas(2) + .build(); + } + + @Bean // not required if Jackson is on the classpath + public MessagingMessageConverter simpleMapperConverter() { + MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter(); + messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper()); + return messagingMessageConverter; + } + +} +``` +上述`@KafkaListener`结构会回应correlation id并且决定reply topic。template会使用默认header `KafKaHeaders.REPLY_TOPIC`来告知消费者应该将回复发送到哪个topic中。 +template会根据配置的reply container来探知reply topic或是分区。如果容器被配置监听单个topic或是单个TopicPartitionOffset,会将监听的topic或是分区设置到reply header中。如果容器通过其他方式配置(如监听多个topic),那么用户必须显式设置reply header。 +如下展示了用户如何设置`KafkaHeaders.REPLY_TOPIC`: +```java +record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes())); +``` +如果只设置了一个reply TopicPartitionOffset,如果每个实例监听一个不同的分区,那么可以多个template共用一个reply topic。 +如果只配置了一个reply topic,每个实例必须要有不同的`group.id`。在这种情况下,每个实例都会接收到每个请求,但是只有发送请求的实例能够找到correlation id。这种情况能够自动扩容,但是会带来额外的网络负载,每个实例接收到不想要消息时的丢弃操作也会带来开销。在这种情况下,推荐将template的sharedReplyTopic设置为true,将非预期reply的日志级别从info降低为debug。 +如下为配置一个shared reply topic容器的示例: +```java +@Bean +public ConcurrentMessageListenerContainer replyContainer( + ConcurrentKafkaListenerContainerFactory containerFactory) { + + ConcurrentMessageListenerContainer container = containerFactory.createContainer("topic2"); + container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique + Properties props = new Properties(); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies + container.getContainerProperties().setKafkaConsumerProperties(props); + return container; +} +``` +> 如果没有按照上述的方法设置template,那么当存在多个template实例时,每个实例都需要一个确定的reply topic。 +> 一个可选的替代方案是显式设置`KafkaHeaders.REPLY_PARTITION`,并且对每个实例使用特定的分区。此时server必须显式使用该header将reply路由到正确的分区中(@KafkaListener会做)。在这种情况下,reply container必须不使用kafka group特性,并且被配置监听一个固定的分区。 + +默认情况下,会使用3个header: +- KafkaHeaders.CORRELATION_ID:用于将reply关联到请求 +- KafkaHeaders.REPLY_TOPIC:用于告知server写入哪个topic +- KafkaHeaders.REPLY_PARTITION:该header是可选的,用于告知reply会写入到哪个分区 + +上述的header会被@KafkaListener使用,用于路由reply。 + +#### Receiving Messages + +可以通过配置MessageListenerContainer并提供一个message listener来接收消息,也可以通过使用@KafkaListener注解来监听消息。 +当使用MessageListenerContainer时,可以提供一个listener来接收数据。message listener目前有如下接口: +```java +// 该接口用于处理单独的ConsumerRecord实例,该实例从Kafka消费者的poll +// 操作获取,当使用自动提交或一个由容器管理的提交方法 +public interface MessageListener { (1) + + void onMessage(ConsumerRecord data); + +} + +// 该接口用于处理单独的ConsumerRecord实例,该实例从Kafka消费者的poll +// 操作获取,当使用一个手动提交方法 +public interface AcknowledgingMessageListener { (2) + + void onMessage(ConsumerRecord data, Acknowledgment acknowledgment); + +} + +// 该接口和(1)类似,但是该接口可以访问提供的consumer对象 +public interface ConsumerAwareMessageListener extends MessageListener { (3) + + void onMessage(ConsumerRecord data, Consumer consumer); + +} + +// 该接口和(2)类似,但是该接口可以访问提供的consumer对象 +public interface AcknowledgingConsumerAwareMessageListener extends MessageListener { (4) + + void onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer consumer); + +} + +// 该接口用于处理所有由kafka消费者的poll操作接收的ConsumerRecord,当使用自动 +// 提交或容器管理的提交方法时。当使用该接口时,不支持`ACKMODE.RECORD`,该传递 +// 给该listener的batch数据是已完成的 +public interface BatchMessageListener { (5) + + void onMessage(List> data); + +} + +// 该接口用于处理所有由kafka消费者的poll操作接收到的ConsumerRecord,当使用手动 +// 提交时 +public interface BatchAcknowledgingMessageListener { (6) + + void onMessage(List> data, Acknowledgment acknowledgment); + +} + +// 该接口和(5)类似,但是可以访问consumer对象 +public interface BatchConsumerAwareMessageListener extends BatchMessageListener { (7) + + void onMessage(List> data, Consumer consumer); + +} + +// 该接口和(6)类似,但是可以访问consumer对象 +public interface BatchAcknowledgingConsumerAwareMessageListener extends BatchMessageListener { (8) + + void onMessage(List> data, Acknowledgment acknowledgment, Consumer consumer); + +} +``` +> Consumer对象并不是线程安全的,故而只能够在调用该listener的线程中调用consumer的方法 + +##### Message Listener Container +为MessageListenerContainer提供了两个实现: +- KafkaMessageListenerContainer +- ConcurrentMessageListenerContainer + +KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例,从而提供多线程的消费。 +可以添加一个RecordInterceptor到ListenerContainer,在调用listener之前,interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null,那么listener则不会被调用。而且,其还有一个额外的方法,可以在listener退出之后被调用(退出之后,指正常退出或以抛出异常的形式退出)。 +还有一个BatchInterceptor,提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。 +如果想要调用多个Interceptor,则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。 +默认情况下,当使用事务时,interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。 +ConcurrentMessageListenerContainer支持“静态成员”(即固定消费者,即使消费者实例重启,如此可以降低事件在消费者之间重新负载均衡的开销),当并发量大于1时。`group.instance.id`的后缀为`-n`,其中n从1开始。