阅读spring kafka关于seek操作的文档

This commit is contained in:
asahi
2024-02-27 20:16:01 +08:00
parent 64b66a0b38
commit 3117618172

View File

@@ -47,6 +47,7 @@
- [MessageListener实现](#messagelistener实现)
- [Prototype Beans](#prototype-beans)
- [Topic/Partition初始offset](#topicpartition初始offset)
- [seek到指定offset](#seek到指定offset)
# Spring Kafka
@@ -1782,6 +1783,187 @@ applicationContext.getBean(MyPojo.class, "two", "topic3");
- 对于新的`group.id`新的消费者组initial offset将会由`auto.offset.reset`属性来决定earliest或者latest
- 对于已经存在的`group.id`当前消费者组已存在initial offset将会是当前group的offset。但是可以在初始化时通过seek将offset设置到指定位置
## seek到指定offset
如果想要在listener中执行seek操作Listener必须实现`ConsumerSeekAware`接口,该接口含有如下方法:
```java
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
```
在container启动或是partition被分配时`registerSeekCallback`方法都会被调用用于向listener中注册ConsumerSeekCallback对象。从而在初始化之后可以通过该callback执行seek操作。
> callback用于操作被分配的topic和partition故而当partition被重新分配时registerSeekCallback都会被重新调用。
应该在listener中对该callback进行存储如果多个container共同使用同一listener或是使用ConcurrentMessageListenerContainer那么应该将callback存储在ThreadLocal中或是以`Thread`为key的数据结构中。
当使用group manangement时`onPartitionsAssigned`将会在partition被分配时被调用。可以使用onPartitionsAssigned接口来进行分区初始化offset设置。同时也可以使用`onPartitionsAssigned`接口将thread对应的callback和线程被分配的分区一个线程对应一个consumer实例对应起来。在onPartitionsAssigned被调用时必须使用该方法接收到的callback而非是registerSeekCallback获取到的callback。
`onPartitionRevoked`在container stop或kafka撤销分区分配时被调用。在分配被取消分配后应该将该thread对应的callback丢弃并且移除任何指向该分区的引用。
`ConsumerSeekCallback`则是含有如下方法:
```java
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
```
`seekRelative`是在2.3版本加入的用于执行relative seek操作
- 当offset为负并且toCurrent参数为false时代表seek到相对分区尾部的偏移量
- 当offset为正并且toCurrent为false时代表seek到相对分区起始位置的偏移量
- 当offset为负并且toCurrent为true时代表seek到相对当前offset的位置rewind
- offset为正并且toCurrent为true时代表seek到相对当前offset的位置fast forward
在2.3版本中seekToTimestamp方法也被加入。
如下是一个如何使用callback的示例
```java
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
```
为了简化实现从2.3版本开始,加入了`AbstractConsumerSeekAware`该类保存了针对某个topic/partition应该使用哪个callback。如下示例展示了使用示例
```java
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
```
从2.6版本开始,为抽象类添加了如下方便起见的方法:
- seekToBeginning() : 对所有被分配的partition执行seek操作把offset重置到开始位置
- seekToEnd() 对所有被分配的分区都执行seekToEnd操作
- seekToTimestamp(long timestamp) 将所有被分配的分区offset都重置到timestamp所代表的位置
```java
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
```