diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index e257276..5f51f47 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -1000,5 +1000,89 @@ public void listen(ConsumerRecord record) { ``` 如上图所示,可以在topicPartitions属性的`@TopicPartition的partitions属性`中或是`@TopicPartitionpation的partitionOffsets属性`中指定分区,但是无法同时在上述两处都指定同一个分区。 +从2.5.5版本开始,可以对所有手动分配的分区都指定一个初始offset: +```java +@KafkaListener(id = "thing3", topicPartitions = + { @TopicPartition(topic = "topic1", partitions = { "0", "1" }, + partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")) + }) +public void listen(ConsumerRecord record) { + ... +} +``` +再上述示例中,`*`通配符代表`partitions`属性锁指定的所有分区。在每个`@TopicPartition`注解中,只能有一个`@PartitionOffset`注解中带有该通配符。 + +当listener实现了`ConsumerSeekAware`时,`onPartitionAssigned`方法会被调用,即使使用手动分配(assign)。`onPartitionAssigned`允许在分区被分配时执行任意的seek操作。 + +从2.6.4版本开始,可以指定由`,`分隔的分区列表或分区区间,示例如下: +```java +@KafkaListener(id = "pp", autoStartup = "false", + topicPartitions = @TopicPartition(topic = "topic1", + partitions = "0-5, 7, 10-15")) +public void process(String in) { + ... +} +``` +该分区范围是闭区间,上述代表的区间是`0,1,2,3,4,5,7,10,11,12,13,14,15`. + +上述方式也可以用于指定分区初始的offset: +```java +@KafkaListener(id = "thing3", topicPartitions = + { @TopicPartition(topic = "topic1", + partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0")) + }) +public void listen(ConsumerRecord record) { + ... +} +``` + +#### 手动ack +当使用手动ack时,可以为listener method提供`Acknowledgment`。如下示例展示了如何使用一个不同的container factory: +```java +@KafkaListener(id = "cat", topics = "myTopic", + containerFactory = "kafkaManualAckListenerContainerFactory") +public void listen(String data, Acknowledgment ack) { + ... + ack.acknowledge(); +} +``` + +#### Consumer Record Metadata +record的元数据可以从message header中进行获取。可以使用如下header名称来获取消息header中的内容: +- KafkaHeaders.OFFSET +- KafkaHeaders.RECEIVED_KEY +- KafkaHeaders.RECEIVED_TOPIC +- KafkaHeaders.RECEIVED_PARTITION +- KafkaHeaders.RECEIVED_TIMESTAMP +- KafkaHeaders.TIMESTAMP_TYPE + +从2.5版本开始,如果接收到的消息key为null,那么`RECEIVED_KEY`在header中并不会出现;而在2.5版本之前,`RECEIVED_KEY`在header中存在并且值为null。 + +如下示例展示了如何使用header: +```java +@KafkaListener(id = "qux", topicPattern = "myTopic1") +public void listen(@Payload String foo, + @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts + ) { + ... +} +``` + +> 参数注解(`@Payload`、`@Header`)必须在listener method方法的具体实现上被指定,如果参数注解只指定在接口上,而没有在具体实现方法上指定,那么指定的注解将不会起作用。 + +从2.5版本开始,相比于上述示例中针对单条header内容进行接收,可以通过`ConsumerRecordMetadata`类型的参数来接收所有的header数据: +```java +@KafkaListener(...) +public void listen(String str, ConsumerRecordMetadata meta) { + ... +} +``` +上述`ConsumerRecordMetadata`类型中包含`ConsumerRecord`中所有的数据,除了record中的key和value。 + + +