From 3117618172d54a7017e333f3e84aeb791fc86dea Mon Sep 17 00:00:00 2001 From: asahi Date: Tue, 27 Feb 2024 20:16:01 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBspring=20kafka=E5=85=B3?= =?UTF-8?q?=E4=BA=8Eseek=E6=93=8D=E4=BD=9C=E7=9A=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 182 ++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 6afb02e..40babb5 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -47,6 +47,7 @@ - [MessageListener实现](#messagelistener实现) - [Prototype Beans](#prototype-beans) - [Topic/Partition初始offset](#topicpartition初始offset) + - [seek到指定offset](#seek到指定offset) # Spring Kafka @@ -1782,6 +1783,187 @@ applicationContext.getBean(MyPojo.class, "two", "topic3"); - 对于新的`group.id`(新的消费者组),initial offset将会由`auto.offset.reset`属性来决定(earliest或者latest) - 对于已经存在的`group.id`(当前消费者组已存在),initial offset将会是当前group的offset。但是,可以在初始化时通过seek将offset设置到指定位置 +## seek到指定offset +如果想要在listener中执行seek操作,Listener必须实现`ConsumerSeekAware`接口,该接口含有如下方法: +```java +void registerSeekCallback(ConsumerSeekCallback callback); + +void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback); + +void onPartitionsRevoked(Collection partitions) + +void onIdleContainer(Map assignments, ConsumerSeekCallback callback); +``` + +在container启动或是partition被分配时,`registerSeekCallback`方法都会被调用,用于向listener中注册ConsumerSeekCallback对象。从而在初始化之后,可以通过该callback执行seek操作。 + +> callback用于操作被分配的topic和partition,故而当partition被重新分配时,registerSeekCallback都会被重新调用。 + +应该在listener中对该callback进行存储,如果多个container共同使用同一listener(或是使用ConcurrentMessageListenerContainer),那么应该将callback存储在ThreadLocal中,或是以`Thread`为key的数据结构中。 + +当使用group manangement时,`onPartitionsAssigned`将会在partition被分配时被调用。可以使用onPartitionsAssigned接口来进行分区初始化offset设置。同时,也可以使用`onPartitionsAssigned`接口将thread对应的callback和线程被分配的分区(一个线程对应一个consumer实例)对应起来。在onPartitionsAssigned被调用时,必须使用该方法接收到的callback,而非是registerSeekCallback获取到的callback。 + +`onPartitionRevoked`在container stop或kafka撤销分区分配时被调用。在分配被取消分配后,应该将该thread对应的callback丢弃,并且移除任何指向该分区的引用。 + +`ConsumerSeekCallback`则是含有如下方法: +```java +void seek(String topic, int partition, long offset); + +void seekToBeginning(String topic, int partition); + +void seekToBeginning(Collection partitions); + +void seekToEnd(String topic, int partition); + +void seekToEnd(Collection partitions); + +void seekRelative(String topic, int partition, long offset, boolean toCurrent); + +void seekToTimestamp(String topic, int partition, long timestamp); + +void seekToTimestamp(Collection topicPartitions, long timestamp); +``` + +`seekRelative`是在2.3版本加入的,用于执行relative seek操作: +- 当offset为负并且toCurrent参数为false时,代表seek到相对分区尾部的偏移量 +- 当offset为正并且toCurrent为false时,代表seek到相对分区起始位置的偏移量 +- 当offset为负并且toCurrent为true时,代表seek到相对当前offset的位置(rewind) +- offset为正并且toCurrent为true时,代表seek到相对当前offset的位置(fast forward) + +在2.3版本中,seekToTimestamp方法也被加入。 + +如下是一个如何使用callback的示例: +```java +@SpringBootApplication +public class SeekExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(SeekExampleApplication.class, args); + } + + @Bean + public ApplicationRunner runner(Listener listener, KafkaTemplate template) { + return args -> { + IntStream.range(0, 10).forEach(i -> template.send( + new ProducerRecord<>("seekExample", i % 3, "foo", "bar"))); + while (true) { + System.in.read(); + listener.seekToStart(); + } + }; + } + + @Bean + public NewTopic topic() { + return new NewTopic("seekExample", 3, (short) 1); + } + +} + +@Component +class Listener implements ConsumerSeekAware { + + private static final Logger logger = LoggerFactory.getLogger(Listener.class); + + private final ThreadLocal callbackForThread = new ThreadLocal<>(); + + private final Map callbacks = new ConcurrentHashMap<>(); + + @Override + public void registerSeekCallback(ConsumerSeekCallback callback) { + this.callbackForThread.set(callback); + } + + @Override + public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { + assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get())); + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + partitions.forEach(tp -> this.callbacks.remove(tp)); + this.callbackForThread.remove(); + } + + @Override + public void onIdleContainer(Map assignments, ConsumerSeekCallback callback) { + } + + @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3") + public void listen(ConsumerRecord in) { + logger.info(in.toString()); + } + + public void seekToStart() { + this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition())); + } + +} +``` +为了简化实现,从2.3版本开始,加入了`AbstractConsumerSeekAware`,该类保存了针对某个topic/partition应该使用哪个callback。如下示例展示了使用示例: +```java +public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware { + + @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle") + public void listen(String in) { + ... + } + + @Override + public void onIdleContainer(Map assignments, + ConsumerSeekCallback callback) { + + assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true)); + } + + /** + * Rewind all partitions one record. + */ + public void rewindAllOneRecord() { + getSeekCallbacks() + .forEach((tp, callback) -> + callback.seekRelative(tp.topic(), tp.partition(), -1, true)); + } + + /** + * Rewind one partition one record. + */ + public void rewindOnePartitionOneRecord(String topic, int partition) { + getSeekCallbackFor(new TopicPartition(topic, partition)) + .seekRelative(topic, partition, -1, true); + } + +} +``` +从2.6版本开始,为抽象类添加了如下方便起见的方法: +- seekToBeginning() : 对所有被分配的partition执行seek操作,把offset重置到开始位置 +- seekToEnd() : 对所有被分配的分区都执行seekToEnd操作 +- seekToTimestamp(long timestamp) :将所有被分配的分区offset都重置到timestamp所代表的位置 + +```java +public class MyListener extends AbstractConsumerSeekAware { + + @KafkaListener(...) + void listn(...) { + ... + } +} + +public class SomeOtherBean { + + MyListener listener; + + ... + + void someMethod() { + this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000); + } + +} +``` + + +