阅读spring kafka文档

This commit is contained in:
asahi
2024-02-26 18:55:02 +08:00
parent 3d89f7a47e
commit 64b66a0b38

View File

@@ -43,6 +43,10 @@
- [通过@SendTo注解发送listener结果](#通过sendto注解发送listener结果)
- [过滤消息](#过滤消息)
- [通过KafkaTemplate来接收消息](#通过kafkatemplate来接收消息)
- [动态创建container](#动态创建container)
- [MessageListener实现](#messagelistener实现)
- [Prototype Beans](#prototype-beans)
- [Topic/Partition初始offset](#topicpartition初始offset)
# Spring Kafka
@@ -1700,6 +1704,84 @@ ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
```
## 动态创建container
spring kafka提供了一些方式用于在运行时动态的创建container。
### MessageListener实现
如果实现了自己的MessageListener那么可以通过container factory来为listener创建container
```java
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
```
### Prototype Beans
如果将bean对象scope声明为prototype那么被`@KafkaListener`注解的方法其对应container将会被动态的创建。
使用实例如下所示:
```java
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
```
> 每个listener都有对应的containerlistener必须含有唯一的idid为该listener对应container的唯一标识符。从2.8.9版本开始,`KafkaListenerEndpointRegistry`包含一个名为`unregisterListenerContainer(String id)`的新方法允许来重用id。对container执行unregister操作并不会stop容器如果需要对容器执行stop操作需要手动对container调用stop方法。
## Topic/Partition初始offset
存在如下方式为partition设置初始offset。
当手动对分区进行assign时可以手动设置其初始offset通过指定TopicPartitionOffset参数。同时可以在任何时间调用`seek`方法来将设置到指定的offset。
当使用group management时分区将会由broker来进行分配
- 对于新的`group.id`新的消费者组initial offset将会由`auto.offset.reset`属性来决定earliest或者latest
- 对于已经存在的`group.id`当前消费者组已存在initial offset将会是当前group的offset。但是可以在初始化时通过seek将offset设置到指定位置