阅读spring kafka listener container手动提交的文档
This commit is contained in:
@@ -909,3 +909,16 @@ nack只能在调用listener的consumer线程中进行处理。
|
|||||||
>
|
>
|
||||||
> 并且,rebalance操作后也不会保留pause/resume状态。
|
> 并且,rebalance操作后也不会保留pause/resume状态。
|
||||||
|
|
||||||
|
### 手动提交offset
|
||||||
|
通常,当使用`AckMode.MANUAL`或`AckMode.MANUAL_IMMEDIATE`时,ack必须按顺序确认,因为kafka并不是为每条record都维护提交状态,而是针对每个消费者组/分区只维护一个commit offset。
|
||||||
|
|
||||||
|
从2.8版本开始,可以设置container属性`asyncAcks`,该属性设置后,由poll返回的records可以被以任何顺序确认。设置属性后listener container会将无序(records可以被按任何顺序确认)的提交进行延迟,直到接收到缺失的ack。此时,消费者会处于paused状态,不会有新的records被传递,直到上次poll操作中所有的消息都被提交offset。
|
||||||
|
|
||||||
|
当启用`asyncAcks`特性后,可以异步的针对消息进行处理(`Acknowledgment.acknowledge()`方法并没有要求只能在listener线程中被调用)。但是,在对消息进行异步处理的时候,如果发生处理失败的情况,可能会增加record重复传输的可能性。(如果在开启asyncAcks时,如果后续消息已经提交ack,但是位于前面的消息处理发生异常,那么后续成功的record其offset将无法被提交,故而后续处理成功的消息将会被重新传递).
|
||||||
|
|
||||||
|
> 在处理消息时抛出异常,那么抛出异常的消息和抛出异常之后的消息都会被重复拉取。
|
||||||
|
|
||||||
|
并且,当`asyncAcks`启用时,无法使用`nack()`方法来进行nack操作。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user