阅读spring kafka commit offset文档
This commit is contained in:
@@ -851,3 +851,61 @@ public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
|
||||
> 该值代表在使用消费者组管理功能时,两次`poll()`调用之间最长时间间隔。该值代表在调用完poll方法后,在下次调用poll方法之前消费者示例可以闲置的最长时间。如果超过该时间限制之后,poll方法仍然未被调用,那么该消费者示例将会被认为失败,并且消费者组会出发rebalance操作,将分区在消费者组中的其他成员之间进行再分配。**在消费者示例使用的`group.instance.id`不为空时,如果超时,分区将不会立马被重新分配,而是消费者示例停止发送心跳包,分区会在停止发送心跳包超过`session.timeout.ms`时间后出发rebalance。`group.instance.id`不为空时,该消费者实例将会被认为是该消费者组的静态成员。
|
||||
>
|
||||
> 默认情况下,`max.poll.interval.ms`的默认值为5min。
|
||||
|
||||
#### Committing Offsets
|
||||
对于offset提交,spring kafka提供了多个选项。如果`enable.auto.commit`属性在consumer properties中被指定为true,那么kafka会根据其配置对消息进行自动提交。如果`enable.auto.commit`被指定为false,那么MessageListenerContainer支持几种不同的`AckMode`设置。默认的`AckMode`设置为`BATCH`。从2.3版本开始,spring framework会自动将`enable.auto.commit`设置为false,在2.3之前则是默认设置为true。
|
||||
|
||||
`consumer.poll`方法会返回一条或多条`ConsumerRecord`,而MessageListener则是会针对每条ConsumerRecord进行调用。如下列举了container针对每种AckMode会执行的行为:
|
||||
- `RECORD`:在listener处理完该消息并返回之后,提交offset
|
||||
- `BATCH`:当所有被poll方法调用返回的消息都处理完成之后,提交offset
|
||||
- `TIME`:当所有被poll方法返回的消息均被处理完成,或是从上次commit时起已超过`ackTime`时间
|
||||
- `COUNT`:当所有被poll方法返回的消息都被处理完成,或是从上次commit时起已收到超过`ackCount`条记录
|
||||
- `COUNT_TIME`:当TIME或COUNT任一条满足时,提交offset
|
||||
- `MANUAL`:messageListener负责对消息调用`Acknowledgment.acknowledge()`方法,调用之后语义和BATCH相同,会等待所有被poll方法调用返回的消息都处理完成,之后会批量提交offset
|
||||
- `MANUAL_IMMEDIATE`:当方法`Acknowledgment.acknowledge()`被listener调用之后,立刻会提交offset
|
||||
|
||||
当使用事务时,offsets将会被发送给事务,并且语义和RECRD或BATCH相同,基于listener的类型决定语义(看listener是record类型还是batch类型)
|
||||
|
||||
> `MANUAL`或`MANUAL_IMMEDIATE`需要listener为`AcknowledgingMessageListener `或`BatchAcknowledgingMessageListener`
|
||||
|
||||
基于容器的`syncCommits`容器属性,提交offset时会使用`commitSync`或`commitAsync`方法。`syncCommits`属性默认为true,可以通过`setSyncCommitTimeout`设置commitSync时的超时时间。在syncCommits设置为false时,可以通过`setCommitCallback`来获取commitAsync的结果,默认情况下callback是`LoggingCommitCallback`,其会日志输出errors,并且以debug的级别打印成功日志。
|
||||
|
||||
由于listener container拥有自己的提交offset机制,故而更推荐将kafka consumr的`ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG`属性设置为false。2.3版本开始,会将自动提交属性设置为false,除非显式在consumer factory或container的consumer properties中将自动提交属性覆盖。
|
||||
|
||||
Acknowledgment接口具有如下方法:
|
||||
```java
|
||||
public interface Acknowledgment {
|
||||
|
||||
void acknowledge();
|
||||
|
||||
}
|
||||
```
|
||||
该方法能够让container控制何时提交offset。
|
||||
|
||||
从2.3版本开始,`Acknowledgment`接口提供了两个额外的方法`nack(long sleep)`和`nack(int index, long sleep)`。第一个方法和record类型的listener一起使用,第二个方法和batch类型的listener一起使用。如果调用类型错误,会抛出`IllegalStateException`异常。
|
||||
|
||||
- `nack(long sleep)`:对当前record执行nack操作,丢弃poll请求中的remaining records并针对所有分区进行re-seek操作,将position恢复。故而,在指定sleep时间范围后,record将会被重新传递,该操作会阻塞整个message listener的读操作,阻塞时间为sleep,且阻塞范围不限于单个分区。该操作必须在consumer thread中被调用。
|
||||
|
||||
- `nack(int index, long sleep)`:对于batch中index位置的record进行nack操作,该操作会提交位于index之前record的offset,并且会re-seek所有分区,故而位于index和index之后的消息都会在sleep时间后被重新传输。该操作必须在consumer thread中被调用。
|
||||
|
||||
当想要提交batch中部分消息时,请使用`nack(int index, long sleep)`方法。当是使用事务时,设置AckMode为MUANAL;调用nack方法会将已成被成功处理的records offsets发送给事务。
|
||||
|
||||
nack只能在调用listener的consumer线程中进行处理。
|
||||
|
||||
在使用nack方法时,任何在途的offset都会被提交,而从上次poll起的remaining records都会被丢弃。并且在所有分区都会执行seek操作,故而当前失败的那条record和所有未处理的record都会在下次poll时重新拉取。该操作和抛异常类似,当container中配置了一个DefaultErrorHandler时。
|
||||
|
||||
当使用batch listener时,若发生异常可以指定record在bathc中的index。调用nack时,位于index之前的record offset都会被提交,seek操作也会执行,在下次poll操作时也会丢失败和被丢弃的消息进行重新拉取。
|
||||
|
||||
从3.0.10版本开始,batch listener可以提交batch中的部分record offset,通过使用`acknowledge(index)`方法。当该方法被调用时,位于index的record和位于index之前的records都会被提交offset。在调用部分提交之后再调用`acknowledge()`方法,会针对剩余尚未被提交的records进行offset提交。在使用部分提交时,必须满足如下要求:
|
||||
- AckMode.MANUAL_IMMEDIATE需要被开启
|
||||
- 部分提交方法需要在listener thread中被调用
|
||||
- listener必须批量消费recrods而不是消费单条record
|
||||
- index必须要位于batch range中
|
||||
- 重复调用部分提交接口时,后调用的index要比先调用的大
|
||||
|
||||
上述要求是强制的,在不满足时会抛出IllegalArgumentException和IllegalStateException异常。
|
||||
|
||||
> 上述调用nack后所说的阻塞均是指调用`KafkaConsumer.pause`方法后造成的结果。在pause方法被调用后,任何后续调用的poll方法都不会返回任何records,直到`resume`方法被调用。pause方法的调用并不会造成分区的rebalance操作。**调用pause方法并不会造成线程的阻塞,而是通过poll获取指定分区消息的阻塞。**
|
||||
>
|
||||
> 并且,rebalance操作后也不会保留pause/resume状态。
|
||||
|
||||
|
||||
Reference in New Issue
Block a user