diff --git a/mq/kafka/kafka-尚硅谷.md b/mq/kafka/kafka-尚硅谷.md index 0fb95e5..9f3fc7f 100644 --- a/mq/kafka/kafka-尚硅谷.md +++ b/mq/kafka/kafka-尚硅谷.md @@ -297,4 +297,33 @@ kafka采用的是拉取模式来进行消费,因为拉取模式可以很好的 在消费者通过fetch请求拉取到数据后,会将拉取的records缓存起来,然后在调用poll方法时返回缓存的record数据。 - `max.poll.records`:该参数用于限制poll()调用返回的最大消息条数。`该参数默认值为500`,调用poll方法时,最多返回500条缓存数据。`max.poll.records`参数用于指定poll行为,并不会对fetch行为造成影响。 +##### 消费者api +1. `public void subscribe(Collection topics)`: + + 消费者调用该方法时会订阅给定的主题,并且会动态分配给该消费者分区。**主题的订阅并不是增量的,本次调用subscribe方法将会覆盖之前该消费者被分配的分区**。 + + 如果给定topics为空,那么其效果相当于调用`unsubscribe()`。 + +2. `public void assign(Collection partitions)`: + + 手动将分区集合分配给消费者。该接口调用同样**不是增量的**。该接口调用会覆盖之前消费者被分配的分区。 + + 如果给定partitions为空,那么其效果相当于调用`unsubscribe()`。 + + 通过该方法进行手动主题分区分配并不会使用消费者组的管理功能,故而,**若当消费者组成员发生变化,或broker集群或主题元数据发生变化时,消费者组rebalance操作也不会被触发。** + + 如果开启了自动提交,那么在新的assignment代替旧assignment之前,async commit会被触发,async commit基于旧的assignment。 + +3. `public ConsumerRecords poll(final Duration timeout)`: + + + 从被分配的分区中拉取数据,如果在调用poll方法之前并没有被分配任何主题或分区,那么会抛异常。 + + 每次拉取数据时,消费者都会使用`last consumed offset`作为拉取数据的开始偏移量,并按顺序再拉取数据。`last consumed offset`可以通过` seek(TopicPartition, long)`方法手动设置,`last consumed offset`也会被自动设置为订阅分区列表中的最后一次提交偏移量。 + + 在存在可获取的记录时,该方法会立刻返回,否则其会等待timeout指定的时间,在超过该时间限制后,会返回一个空的record集合。 + + > poll方法调用在没有可用的消息集合时,会发送fetch请求从broker拉取数据 + +