阅读kafka consumer commitSync/commitAsync相关文档
This commit is contained in:
@@ -305,12 +305,25 @@ kafka采用的是拉取模式来进行消费,因为拉取模式可以很好的
|
||||
- `max.poll.records`:该参数用于限制poll()调用返回的最大消息条数。`该参数默认值为500`,调用poll方法时,最多返回500条缓存数据。`max.poll.records`参数用于指定poll行为,并不会对fetch行为造成影响。
|
||||
|
||||
##### 消费者api
|
||||
1. `public void subscribe(Collection<String> topics)`:
|
||||
1. `public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)`:
|
||||
|
||||
消费者调用该方法时会订阅给定的主题,并且会动态分配给该消费者分区。**主题的订阅并不是增量的,本次调用subscribe方法将会覆盖之前该消费者被分配的分区**。
|
||||
|
||||
如果给定topics为空,那么其效果相当于调用`unsubscribe()`。
|
||||
|
||||
> #### rebalance
|
||||
> 作为group management的一部分,组中的消费者实例会跟踪属于同一消费者组中的其他实例,如果发生了如下事件则是会触发rebalance(再平衡):
|
||||
> - 如果消费者组订阅的任一topic其分区数发生变化
|
||||
> - 一个订阅的topic被创建或是删除
|
||||
> - 消费者组中的一个实例发生宕机
|
||||
> - 一个新的消费者实例加入到消费者组
|
||||
> - group中成员单位订阅的topic发生变化(新增订阅或取消订阅)
|
||||
>
|
||||
> 当上述任一事件发生时,都会触发rebalance操作,此时,提供给subscribe接口的listener将会被调用,代表该消费者实例对应的分区分配被取消了。**并且,在接收到新的分区分配方案时,该listener接口会再次被调用**。
|
||||
>
|
||||
> rebalance只会在调用poll(Duration)接口时被触发,故而callback也只会在调用poll的时间内被触发
|
||||
|
||||
|
||||
2. `public void assign(Collection<TopicPartition> partitions)`:
|
||||
|
||||
手动将分区集合分配给消费者。该接口调用同样**不是增量的**。该接口调用会覆盖之前消费者被分配的分区。
|
||||
@@ -391,6 +404,11 @@ kafka中的分区策略通过`partition.assignment.strategy`参数来进行配
|
||||
>
|
||||
> - kafka commitAsync,相比于同步提交,commitAsync在调用后并不会阻塞,而是直接返回,此后可以继续调用poll来继续从broker拉取后续消息。
|
||||
|
||||
> #### kafka的手动提交重试机制
|
||||
> 针对kafka的手动提交,当使用`commitSync`进行同步提交时,如果提交失败,同步提交会无限次的进行重试,直到提交成功或是发生了不可恢复的异常。
|
||||
>
|
||||
> 但是,在使用`commitAsync`方法进行提交时,kafka消费者在提交失败之后则不会进行重试。在处理kafka commitAsync重试问题时,还需要考虑commit order。当消费者进行异步提交时,如果发现当前batch提交失败,此时可能位于当前batch之后的batch已经处理完成并进行提交(commitAsync并不会等待当前batch提交成功之后再拉取下一批,而是直接拉取下一批继续处理,故而下一批batch可能提交早于当前batch)。故而,如果对当前异常的batch进行重试提交,可能会之后批次的commit offset被覆盖,从而造成消息的重复消费。
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user