From 64b66a0b38b1134e249d11f05696bf6d101acbaa Mon Sep 17 00:00:00 2001 From: asahi Date: Mon, 26 Feb 2024 18:55:02 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBspring=20kafka=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 82 +++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index d5b49cb..6afb02e 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -43,6 +43,10 @@ - [通过@SendTo注解发送listener结果](#通过sendto注解发送listener结果) - [过滤消息](#过滤消息) - [通过KafkaTemplate来接收消息](#通过kafkatemplate来接收消息) + - [动态创建container](#动态创建container) + - [MessageListener实现](#messagelistener实现) + - [Prototype Beans](#prototype-beans) + - [Topic/Partition初始offset](#topicpartition初始offset) # Spring Kafka @@ -1700,6 +1704,84 @@ ConsumerRecords receive(Collection requested); ConsumerRecords receive(Collection requested, Duration pollTimeout); ``` +## 动态创建container +spring kafka提供了一些方式用于在运行时动态的创建container。 + +### MessageListener实现 +如果实现了自己的MessageListener,那么可以通过container factory来为listener创建container: +```java +public class MyListener implements MessageListener { + + @Override + public void onMessage(ConsumerRecord data) { + // ... + } + +} + +private ConcurrentMessageListenerContainer createContainer( + ConcurrentKafkaListenerContainerFactory factory, String topic, String group) { + + ConcurrentMessageListenerContainer container = factory.createContainer(topic); + container.getContainerProperties().setMessageListener(new MyListener()); + container.getContainerProperties().setGroupId(group); + container.setBeanName(group); + container.start(); + return container; +} +``` +### Prototype Beans +如果将bean对象scope声明为prototype,那么被`@KafkaListener`注解的方法其对应container将会被动态的创建。 + +使用实例如下所示: +```java +public class MyPojo { + + private final String id; + + private final String topic; + + public MyPojo(String id, String topic) { + this.id = id; + this.topic = topic; + } + + public String getId() { + return this.id; + } + + public String getTopic() { + return this.topic; + } + + @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}") + public void listen(String in) { + System.out.println(in); + } + +} + +@Bean +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +MyPojo pojo(String id, String topic) { + return new MyPojo(id, topic); +} + +applicationContext.getBean(MyPojo.class, "one", "topic2"); +applicationContext.getBean(MyPojo.class, "two", "topic3"); +``` + +> 每个listener都有对应的container,listener必须含有唯一的id,id为该listener对应container的唯一标识符。从2.8.9版本开始,`KafkaListenerEndpointRegistry`包含一个名为`unregisterListenerContainer(String id)`的新方法允许来重用id。对container执行unregister操作并不会stop容器,如果需要对容器执行stop操作,需要手动对container调用stop方法。 + +## Topic/Partition初始offset +存在如下方式为partition设置初始offset。 + +当手动对分区进行assign时,可以手动设置其初始offset(通过指定TopicPartitionOffset参数)。同时,可以在任何时间调用`seek`方法来将设置到指定的offset。 + +当使用group management时,分区将会由broker来进行分配: +- 对于新的`group.id`(新的消费者组),initial offset将会由`auto.offset.reset`属性来决定(earliest或者latest) +- 对于已经存在的`group.id`(当前消费者组已存在),initial offset将会是当前group的offset。但是,可以在初始化时通过seek将offset设置到指定位置 +