kafka consumer相关文档阅读
This commit is contained in:
@@ -297,4 +297,33 @@ kafka采用的是拉取模式来进行消费,因为拉取模式可以很好的
|
||||
在消费者通过fetch请求拉取到数据后,会将拉取的records缓存起来,然后在调用poll方法时返回缓存的record数据。
|
||||
- `max.poll.records`:该参数用于限制poll()调用返回的最大消息条数。`该参数默认值为500`,调用poll方法时,最多返回500条缓存数据。`max.poll.records`参数用于指定poll行为,并不会对fetch行为造成影响。
|
||||
|
||||
##### 消费者api
|
||||
1. `public void subscribe(Collection<String> topics)`:
|
||||
|
||||
消费者调用该方法时会订阅给定的主题,并且会动态分配给该消费者分区。**主题的订阅并不是增量的,本次调用subscribe方法将会覆盖之前该消费者被分配的分区**。
|
||||
|
||||
如果给定topics为空,那么其效果相当于调用`unsubscribe()`。
|
||||
|
||||
2. `public void assign(Collection<TopicPartition> partitions)`:
|
||||
|
||||
手动将分区集合分配给消费者。该接口调用同样**不是增量的**。该接口调用会覆盖之前消费者被分配的分区。
|
||||
|
||||
如果给定partitions为空,那么其效果相当于调用`unsubscribe()`。
|
||||
|
||||
通过该方法进行手动主题分区分配并不会使用消费者组的管理功能,故而,**若当消费者组成员发生变化,或broker集群或主题元数据发生变化时,消费者组rebalance操作也不会被触发。**
|
||||
|
||||
如果开启了自动提交,那么在新的assignment代替旧assignment之前,async commit会被触发,async commit基于旧的assignment。
|
||||
|
||||
3. `public ConsumerRecords<K, V> poll(final Duration timeout)`:
|
||||
|
||||
|
||||
从被分配的分区中拉取数据,如果在调用poll方法之前并没有被分配任何主题或分区,那么会抛异常。
|
||||
|
||||
每次拉取数据时,消费者都会使用`last consumed offset`作为拉取数据的开始偏移量,并按顺序再拉取数据。`last consumed offset`可以通过` seek(TopicPartition, long)`方法手动设置,`last consumed offset`也会被自动设置为订阅分区列表中的最后一次提交偏移量。
|
||||
|
||||
在存在可获取的记录时,该方法会立刻返回,否则其会等待timeout指定的时间,在超过该时间限制后,会返回一个空的record集合。
|
||||
|
||||
> poll方法调用在没有可用的消息集合时,会发送fetch请求从broker拉取数据
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user