519 lines
46 KiB
Markdown
519 lines
46 KiB
Markdown
- [Kafka](#kafka)
|
||
- [简介](#简介)
|
||
- [消息队列应用场景](#消息队列应用场景)
|
||
- [缓存/削峰](#缓存削峰)
|
||
- [解耦](#解耦)
|
||
- [异步通信](#异步通信)
|
||
- [消息队列模式](#消息队列模式)
|
||
- [点对点](#点对点)
|
||
- [发布订阅模式](#发布订阅模式)
|
||
- [kafka架构](#kafka架构)
|
||
- [分区存储](#分区存储)
|
||
- [group消费](#group消费)
|
||
- [zookeeper](#zookeeper)
|
||
- [生产者](#生产者)
|
||
- [分区](#分区)
|
||
- [批量发送](#批量发送)
|
||
- [ack级别](#ack级别)
|
||
- [异步发送](#异步发送)
|
||
- [同步发送](#同步发送)
|
||
- [生产者发送消息的分区策略](#生产者发送消息的分区策略)
|
||
- [kafka消息传递语义](#kafka消息传递语义)
|
||
- [数据乱序](#数据乱序)
|
||
- [broker](#broker)
|
||
- [broker选举](#broker选举)
|
||
- [partition reassign](#partition-reassign)
|
||
- [broker宕机对分区造成的影响](#broker宕机对分区造成的影响)
|
||
- [broker AR \& OSR](#broker-ar--osr)
|
||
- [leader选举机制](#leader选举机制)
|
||
- [follower故障恢复细节](#follower故障恢复细节)
|
||
- [leader宕机的细节](#leader宕机的细节)
|
||
- [kafka分区存储机制](#kafka分区存储机制)
|
||
- [kafka文件清除策略](#kafka文件清除策略)
|
||
- [消费者](#消费者)
|
||
- [kafka消费者工作流程](#kafka消费者工作流程)
|
||
- [消费者组](#消费者组)
|
||
- [消费者实例拉取数据流程](#消费者实例拉取数据流程)
|
||
- [消费者api](#消费者api)
|
||
- [分区分配和再平衡](#分区分配和再平衡)
|
||
- [offset](#offset)
|
||
- [rebalance](#rebalance)
|
||
- [kafka消费起始offset](#kafka消费起始offset)
|
||
- [kafka exactly-once语义](#kafka-exactly-once语义)
|
||
- [apache kafka idempotence](#apache-kafka-idempotence)
|
||
- [事务-在多个分区之间进行原子写入](#事务-在多个分区之间进行原子写入)
|
||
- [transactional message \& non-transactional message](#transactional-message--non-transactional-message)
|
||
|
||
|
||
# Kafka
|
||
## 简介
|
||
### 消息队列应用场景
|
||
缓存/削峰、解耦、异步通信
|
||
### 缓存/削峰
|
||
当生产端生产数据的速率大于消费端消费数据的速率时,消息队列可用于对消费不完的数据进行缓存
|
||
### 解耦
|
||
当数据生产方来源存在多个(例如数据库、网络接口等),且消费方也存在多种(Hadoop大数据平台、spring boot服务实例等)时,可以将消息队列中存储的消息作为数据从生产端到消费端的中间格式。
|
||
|
||
生产端负责将产生的数据转化成特定格式的消息发送到消息队列,而消费端负责消费消息队列中的消息。不同种类的生产端和消费端只用针对消息的队列中的消息各自进行适配即可。
|
||
|
||
### 异步通信
|
||
通过消息队列,可以实现多个服务实例之间的异步调用,服务调用方在将调用信息封装到消息并发送消息到mq后,即可返回,并不需要同步等待。被调用方可以异步的从mq中获取消息并进行消费。
|
||
|
||
### 消息队列模式
|
||
#### 点对点
|
||
一个消息队列可以对应多个生产者和多个消费者,**但消息只能被一个消费者进行消费**,消费者将数据拉取并消费后,向mq发送确认,mq中将被消费的消息删除
|
||
#### 发布订阅模式
|
||
**生产者产生的数据可以被多个消费者消费**,生产者将消息发送到topic,订阅该topic的消费者会拉取消息进行消费,消费完成后并不会删除该消息,该消息仍可被其他订阅该topic的消费者进行消费。(消息并不会永久保存在消息队列中,通常会设置超期时间,消息保存超过该时间后自动删除)
|
||
|
||
### kafka架构
|
||
#### 分区存储
|
||
kafka将一个topic分为了多个分区,用于分布式存储数据。而每个分区都跨多台服务器实例进行存储,从而增加容错性。对特定分区来说,其存在于多台服务器实例,其中一台为主机,其他服务器为从机,**主机会处理对该分区数据所有的读写请求**(由于读写操作全都在主机上发生,而每台服务器都担当了某些分区的主机和其他分区的从机,故而读写请求在不同的服务器之间进行了负载均衡),而从机只会被动的复制主机修改。
|
||
#### group消费
|
||
每个消费者实例都属于一个consume group,一个consume group则是可以保存多个消费者实例。对于发送到kafka mq topic的每一条消息,都会被广播给订阅了该topic的每个consume group。而consume group接收到消息后,只会将消息发送给group中的一个消费者实例来进行处理,故而在consume group中,实现了消息在不同消费实例之间的负载均衡。
|
||
> 在kafka mq中,topic的实际订阅者是consume group,kafka mq会将topic中的消息广播给所有订阅了该topic的consume group,而每条消息都只会被consume group中的一个消费者实例进行消费
|
||
>
|
||
> 在一个consume group中,每个消费者实例都负责某一topic中相应的分区,每个特定分区只有一个消费者实例负责。
|
||
|
||
|
||
> #### 消费者订阅
|
||
> 消费者的订阅操作是在消费者实例中调用的,在同一消费者组中,**不同消费者实例可以订阅不同的topic集合**。topic的分区只会在订阅了该topic的实例之间进行分配。
|
||
>
|
||
> 例如存在消费者实例c1,c2,c3, 其中c1订阅了t1,c2订阅了t1和t2,c3订阅了t1, t2, t3, 那么t1的分区将会在c1, c2, c3之间进行发呢配,t2的分区只会在c2和c3之间进行分配,t3的分区只会被分配给c3
|
||
|
||
|
||
#### zookeeper
|
||
zookeeper作为注册中心,用于记录存在多个kafka实例时,当前已上线且状态正常的kafka实例,以及各个分区leader实例的信息
|
||
|
||
#### 生产者
|
||
##### 分区
|
||
生产者客户端将会决定将消息发送到某个分区,可以通过负载均衡随机决定将消息发送到哪个分区,也可以提供一个key并通过算法决定将消息发送到哪个分区。
|
||
|
||
决定将消息发送到哪个分区后,生产者会直接将消息发送到该分区对应的leader broker中。
|
||
|
||
##### 批量发送
|
||
生产者并不会每次产生消息后都立即将消息发送到broker,而是会累积消息,**直到消息数据积累到特定大小(默认为16K)后**,才会将消息发送给broker。
|
||
> **消息发送条件**
|
||
>
|
||
> - batch.size: 当消息累计到特定大小(默认为16K)后,发送给broker。如果消息的大小大于`batch.size`,那么并不会对消息做累积操作。**发送到broker的请求将会包含多个batch,每个batch对应一个分区,不同分区的消息通过不同的batch进行发送**
|
||
> - linger.ms: 生产者将会把发送给broker的两个请求之间的消息都累积到一个batch里,该batch将会在下次发送请求给broker时发送。但是,对特定分区累积大小如果没有达到`batch.size`的限制,哪个通过linger.ms来控制该消息延迟发送的最长时间。`linger.ms`单位为ms,**其默认值为0,代表即使消息累积没有达到`batch.size`,也会立马发送给broker**。若`linger.ms`设置为1000,则代表没有累积到`batch.size`大小的消息也会在延迟1s后发送给broker
|
||
|
||
##### ack级别
|
||
生产者将消息发送到broker后,broker对生产者有三种应答级别:
|
||
- 0:生产者发送数据后无需等待broker返回ack应答
|
||
- 1:生产者发送消息到broker后,leader broker将消息持久化后向生产者返回ack
|
||
- -1:生产者发送过来的数据,leader broker和isr队列中所有的broker都持久化完数据后,返回ack
|
||
|
||
##### 异步发送
|
||
kafka生产者异步发送是指生产者调用kafka客户端接口将发送的消息传递给kafka的客户端,此时消息并未发送到broker,而异步调用接口即返回。调用异步接口可以通过指定回调来获取消息传递到的topic、分区等信息。
|
||
|
||
##### 同步发送
|
||
除了异步调用接口外,还可以调用同步调用的接口来发送消息。在调用异步接口发送消息时,仅仅将消息放到缓冲区后调用就返回,可能放到缓冲区的消息并没有发送到broker。可以通过调用同步发送消息的接口来发送消息,其会阻塞并等待broker将消息持久化后返回的异常或者ack应答。
|
||
|
||
想要同步发送消息,只需要对异步接口返回的future对象调用`.get`即可,其会等待future对象完成。
|
||
|
||
##### 生产者发送消息的分区策略
|
||
在生产者发送消息时,需要决定将消息发送到哪个分区,决定策略如下所示:
|
||
|
||
1. 如果发送消息时指定了要发送到哪个分区,那么发送到哪个分区
|
||
2. 如果发送消息时没有指定分区,但是指定了消息的key,那么对key进行散列,根据key散列后的值决定发送到哪个分区
|
||
3. 如果发送消息时没有指定发送到哪个分区,也没有指定key,那么根据黏性策略发送消息
|
||
> 黏性分区策略
|
||
>
|
||
> 黏性分区策略会随机选择一个分区进行发送,并尽可能的一直使用该分区,直到该分区的batch size已满,此时kafka会随机再选择另一个分区进行发送
|
||
|
||
除了上述分区策略外,kafka支持自定义分区策略。
|
||
|
||
##### kafka消息传递语义
|
||
1. 至少一次:acks=-1情况下,消息能被确保传递不丢失,但是可能会存在消息重复传递的问题
|
||
2. 至多一次:acks=0的情况下,kafka能保证消息最多只传递一次的问题,但是无法保证消息会发送到broker,可能存在消息丢失问题
|
||
3. 精确一次:数据既不会重复也不会丢失
|
||
|
||
为了保证精确一次,kafka引入了两个特性:`幂等性`和`事务`
|
||
|
||
> 幂等性
|
||
>
|
||
> 生产者在初始化时会被分配一个producer id,并且生产者在发送消息时,针对每条消息都存在序列号。生产者每次发送消息时,都会附带pid(生产者id)和消息的序列号。序列号是针对topic分区的,生产者向分区发送消息时,针对每个分区都维护了一套序列号。当消息被发送给broker后,其会比较当前分区来源PID和消息PID相同,并且已经被ack的最大序列号,如果当前消息的序列号小于已经ack的序列号,说明该消息已经发送过,属于重复消息,被丢弃。
|
||
>
|
||
> **由于每个新的生产者实例都会被分配一个新的producer id,那么只能在单个生产者会话中保证幂等。**
|
||
>
|
||
> 生产者可以配置`enable.idempotence`属性用于控制幂等是否开启,默认情况下该属性值为true,默认开启幂等性,生产者会保证对每条消息只有一个副本被写入
|
||
|
||
> 事务
|
||
> #### 概念
|
||
> kafka事务能够保证应用能够原子的向多个topic分区中写入数据,所有写入操作要么同时成功,要么同时失败。
|
||
>
|
||
> 另外,消费过程可以看作是对offset topic的写操作,那么kafka事务允许将消息的消费和消息的生成包含在一个原子的单元中。
|
||
>
|
||
> #### 保证
|
||
> 为了实现kafka事务,需要应用提供一个全局唯一的transactionId,该transactionId会稳定贯穿所有该应用的会话。当提供了transactionId后,kafka会做出如下保证:
|
||
>
|
||
> - 只有一个活跃的生产者具有指定transactionId。为了实现该保证,当具有相同transactionId的新生产者实例上线后,会隔离旧的实例
|
||
> - 在应用会话之间将会进行事务的恢复。如果应用实例宕机,那么那么下一个实例恢复工作前将会有一个干净的状态,任何尚未完成的事务都会处于完成状态(提交或异常退出)
|
||
>
|
||
> #### 事务协调器
|
||
> 每个生产者实例都被分配了一个事务协调器,分配producer id、管理事务等逻辑都由事务协调器负责
|
||
> #### Transaction Log
|
||
> Transaction Log是一个内部的kafka topic,类似于consumer offset topic,transaction log是一条持久化和主从复制的记录,记录了每个事务的状态。transaction log是对事务协调器的状态存储,最新版本log快照封装了当前每个活跃事务的状态。
|
||
> #### 控制消息
|
||
> 控制消息是写入用户topic中的特殊消息,由客户端进行处理,但是不暴露给用户。例如,其被用于broker告知消费者其先前拉取的消息是否已经被原子的提交。
|
||
> #### transactionId
|
||
> transactionlId用于唯一标识producer,并且transactionId是持久化的(producer id在生产者实例重启后会重新获取)。如果不同生产者实例具有相同的transactionId,那么会恢复或终止上一生产者实例所初始化的任何事务
|
||
>
|
||
> #### 数据流
|
||
> ##### 发现事务协调器
|
||
> 由于事务协调器是分配PID和管理事务的中心,首先生产者会发送请求给任一broker来询问其所属的事务协调器
|
||
> ##### 获取PID
|
||
> 查询到其所属事务协调器后,会获取producer id。如果transactionId已经被指定,那么在获取PID的请求中会包含transactionId,并且transactionId和PID的关联将会被记录到transaction log中,以便于之后如果上线具有相同transactionId的新实例,能够根据transactionId该映射返回旧实例的PID。
|
||
>
|
||
> 在上线新实例,返回具有相同transactionId的旧实例后,会执行如下操作:
|
||
>
|
||
> 1. 隔离旧实例,旧实例所属的事务无法再继续推进
|
||
> 2. 针对旧生产者实例所属的尚未完成的事务进行恢复操作
|
||
>
|
||
> 如果请求PID时transactionId尚未被指定,那么PID将会被分配,但是生产者实例只能在单个会话中使用幂等语义和事务语义
|
||
>
|
||
> #### 开启事务
|
||
> 可以通过beginTransaction()接口调用来开启事务,生产者会将本地状态改为事务已开启,但是事务协调器直到发送了第一条记录时才会将事务状态标记为已开启
|
||
> #### 消费-处理-生产
|
||
> 开启事务之后,生产者可以对消息进行消费和处理,并且产生新的消息,
|
||
> ##### AddPartitionsToTxnRequest
|
||
> 当某个topic分区第一次作为事务的一部分被执行写操作时,生产者将会发送AddPartitionsToTxnRequest请求到事务协调器。将该topic分区添加到事务的操作将会被事务协调器记录。可以通过该记录信息来对事务中的分区进行提交或回滚操作。如果第一个分区被添加到事务中,事务协调器将会开启事务计时器。
|
||
> ##### ProduceRequest
|
||
> 生产者会向topic分区发送多个ProduceRequest来写入消息,请求中包含PID、epoch、序列号。
|
||
>
|
||
> #### 提交或回滚事务
|
||
> 一旦消息被写入,用户必须调用 commitTransaction或abortTransaction方法来对事务进行提交或者回滚。
|
||
>
|
||
|
||
##### 数据乱序
|
||
kafka如果想要保证消息在分区内是有序的,那么需要开启幂等性(默认开启)并且`max.inflight.requests.per.connection`需要设置小于或等于5(默认情况下该值为5),在不开启幂等性的情况下如果想要保证分区内数据有序需要设置`max.inflight.requests.per.connection`的值为1.
|
||
|
||
> `max.inflight.requests.per.connect`
|
||
>
|
||
> 该值用于设置客户端向broker发送数据时,在阻塞前每个连接能发送的未收到ack的请求数目。如果该值被设置为大于1并且幂等性未开启,那么在消息发送重试时可能会导致消息的乱序,此时需要开启幂等性或者关闭消息重试机制才能重新保证幂等性。
|
||
>
|
||
> 并且,启用幂等性需要该值小于或等于5,如果该值大于5,且幂等性开启
|
||
> - 如果幂等性未显式指定开启,那么在检测到冲突设置后,幂等性会关闭
|
||
> - 如果显式指定幂等性开启并且该值大于5,那么会抛出ConfigException异常
|
||
|
||
#### broker
|
||
##### broker选举
|
||
每个broker节点中都存在controller模块,只有ISR中的节点才能够参与选举(acks=all时,也只需要ISR队列中所有节点都同步消息,才将消息视为已提交)。controller会通过watch来监听zookeeper中节点信息的变化,如果某个分区的leader broker宕机,那么controller模块在监听到变化后会重新开始选举,在ISR中重新选出一个节点作为leader。
|
||
|
||
##### partition reassign
|
||
如果kafka集群在运行时新增了新的节点,**此时节点中旧的topic分区并不会自动同步到新增节点中,如果要将旧topic分区存储到新增broker节点,可以调用重新分区的命令。**
|
||
|
||
可以通过`kafka-reassign-partitions.sh --generate`命令对分区在所有节点(包含新节点)之间进行重新分配,此命令调用后会生成一个topic分区在节点间重新分配的计划。
|
||
|
||
在确认了生成的重新分配计划后,可以调用`kafka-reassign-partitions.sh --execute`命令来执行新生成的重新分配计划。
|
||
|
||
如果想要在kafka集群中退役一台broker server,不能直接关闭该broker节点,也需要跟新增broker节点类似,重新生成一个分区分配计划,分配计划中移除待下线节点,然后执行该分配计划。执行分配计划后,该节点上便不再存储分区,此后该节点并可以安全的关闭。
|
||
|
||
##### broker宕机对分区造成的影响
|
||
broker宕机并不会导致分区的重新分配,例如一个分区的replica-factor为2,存在broker-1,broker-2,broker-3,分区存储在(1,3)服务器上,leader为1号服务器。如果broker-1宕机,那么分区的leader会自动切换为broker-3,但是分区存在的副本数量从2个变成了一个,ISR中也只有(3),此时并不会在未存储该分区的broker-2节点上新增一个副本。
|
||
|
||
如果宕机的broker-1重新再上线,那么broker-1会重新存储之前负责存储的分区,但是此时分区对应的leader节点仍然是broker-2,此时broker-2作为分区的leader会导致读写操作全都转移到broker-2节点,负载均衡会造成偏移。
|
||
|
||
> `auto.leader.rebalance.enable`
|
||
>
|
||
> 该属性默认设置为true,一个后台线程会定期(时间间隔为`leader.imbalance.check.interval.seconds`,默认为300s)检测分区leader是否为默认的perferred leader。如果分区leader不为preferred leader的数量超过一定的比率(` leader.imbalance.per.broker.percentage`,默认为10%),会触发将分区leader改为默认preferred leader的操作。
|
||
|
||
##### broker AR & OSR
|
||
kafka中副本的默认数量为1个,通常生产环境将其配置为2个或2个以上,以便在leader副本宕机后follower副本能够继续服务,避免数据丢失。
|
||
|
||
- AR: AR代表集群中分区对应的所有副本集合,all-replicas
|
||
- ISR: 代表集群中处于同步状态的副本集合,in-sync-replicas 如果ISR中的节点长期未从leader中同步数据,会被剔除出ISR,最长未同步的时间由`replica.lag.time.max.ms`控制,默认为30s。如果leader对应的broker宕机,那么新leader将会从ISR中选举产生
|
||
- OSR:代表集群中不处于同步状态的副本集合,为AR - ISR
|
||
|
||
##### leader选举机制
|
||
leader选举根据AR中节点的排序来决定,能够成为leader的节点需要再ISR中存活,然后ISR中存活节点在AR中排序最靠前的将会被选举为leader。如果leader发生宕机,那么由ISR中存活且AR中排序最靠前的的节点成为新leader。
|
||
|
||
> 选举机制中leader决定是按照AR中broker节点的顺序进行决定的,每一个分区都有一个默认的preferred leader。如果某些节点宕机后再恢复,ISR中节点的顺序将会发生变化,但是AR中的节点顺序并不变,并且preferred leader也不会发生变化。
|
||
|
||
##### follower故障恢复细节
|
||
follower故障恢复细节中,涉及到如下两个概念:
|
||
- LEO(Log End Offset):每个副本结束offset的下一个位置,类似于java中的List.size()
|
||
- HW(High WaterMark):当前ISR队列中所有副本最小的HOW(水位线,类似于木桶效应中最短的那一根木板)
|
||
|
||
如果follower发生故障,那么会按顺序发生如下:
|
||
1. 故障follower会被剔除出ISR
|
||
2. follower故障期间ISR中剩余的follower节点和leader节点会继续接受数据
|
||
3. 故障follower如果恢复,会读取宕机前记录的旧集群HW,并且将大于等于该旧HW的log记录全部都删除,并且从leader重新同步记录
|
||
4. 当故障恢复的follower从旧HW位置同步消息到当前集群中的新HW位置时,此时故障恢复的log区数据已经同步到新HW的水位线水平,此时故障恢复的follower节点可以重新加入到ISR中
|
||
|
||
##### leader宕机的细节
|
||
如果leader broker发生宕机,那么新选举成为leader的follower将会成为leader,并且所有节点会将大于等于HW的数据丢弃(由于宕机的是leader,LEO最大,故而leader宕机不会对HW造成影响),并且重新从新选举的leader中同步数据。
|
||
|
||
由上可知,如果leader宕机前,其他follower尚未同步完leader中全部的消息,那么leader宕机后可能会发生消息的丢失。如果需要确保消息不丢失,需要设置生产者的acks为all,确保消息再提交前同步到ISR中所有的节点中。
|
||
|
||
##### kafka分区存储机制
|
||
kafka中topic是一个逻辑概念,topic由分区组成,每个分区则是可以看作一个log文件,log中存放生产者产生的数据。生产者产生的消息会被追加到log文件的末端,由于是线性追加,不涉及到平衡树等数据结构,故而**kafka追加消息时,不管当前分区数据存储量大小,追加数据的开销都是相同的,追加操作不会随着数据量变大而变慢**。
|
||
|
||
为了防止分区对应的log文件过大而导致的数据定位效率低下,kafka采用了分片和索引的机制,将每个分区分为了多个segment。单个segment默认存储的数据量的大小为1G,且单个segment由如下文件组成:
|
||
- .log文件:日志文件,用于存储消息(log文件的命名以当前segment中第一条消息再分区中的offset来命名)
|
||
- .index:偏移量索引文件,用于存储偏移量和position的映射关系,用于快速定位消息
|
||
- .timeindex:时间戳索引文件,该文件用于存储消息对应的时间戳信息,kafka中的消息默认保存7天后丢弃,通过时间戳信息来决定消息是否应该丢弃
|
||
|
||
> index索引
|
||
>
|
||
> kafka中的索引为稀疏索引,默认每往log中写入4KB的数据,.index文件中会记录一条偏移量索引信息。
|
||
>
|
||
> 可以通过`log.index.interval.bytes`来配置索引记录的密度,每写入多少数据才记录一条索引
|
||
|
||
##### kafka文件清除策略
|
||
kafka中默认消息保存的时间为7天,可以通过修改如下配置来对默认保存时间进行修改:
|
||
- log.retention.hours:消息默认保存小时:默认为168(24 * 7)
|
||
- log.retention.minutes:消息默认保存时间,按分钟计
|
||
- log.retention.ms:消息设置默认保存时间,按ms计
|
||
|
||
上述设置中,优先级为`ms`>`minutes`>`hours`,如果优先级较大的被设置,那么取优先级高的设置,在高优先级条目没有被设置时,才取低优先级设置。
|
||
|
||
> 如果想要设置消息不超时,可以将`log.retention.ms`设置为-1
|
||
|
||
`log.retention.check.interval.ms`参数可以检查消息是否超时的周期,默认情况下该值设置为5min。
|
||
|
||
kafka中的日志清除策略由如下两种:
|
||
- 删除
|
||
- 压缩
|
||
|
||
> 删除
|
||
>
|
||
> 当`log.cleanup.policy`默认值为delete,当该值被设置为delete时,存储超期日志的文件将会被删除。
|
||
>
|
||
> - 基于时间:默认打开。如果segment中所有记录中时间戳最大的记录(最新插入的记录)超过最长时间设置,那么会将该segment删除。(以segment中最新消息时间戳作为segment文件时间戳)
|
||
> - 基于大小:基于大小的删除默认是关闭的。`log.retention.bytes`默认值为-1,表示对log文件最大的限制,如果单个log文件的大小超过该大小限制,那么会删除log文件对应的最早的segment
|
||
|
||
> 压缩
|
||
>
|
||
> 当`log.cleanup.policy`值设置为compact时,会对超时的segment进行压缩(指segment最新一条插入的消息超时),对于相同key的消息,只会保留最后插入的一条消息,演示如下。
|
||
>
|
||
> | k2 | k1 | k3 | k1 | k2 |
|
||
> |:-:|:-:|:-:|:-:|:-:|
|
||
> | 1 | 2 | 3 | 4 | 5 |
|
||
>
|
||
> 会被压缩为
|
||
>
|
||
> | k3 | k1 | k2 |
|
||
> |:-:|:-:|:-:|
|
||
> | 3 | 4 | 5 |
|
||
>
|
||
> 压缩后offset可能并不连续,此时若想要消费的offset不存在,那么会拿到比预期offset大的offset的消息
|
||
|
||
#### 消费者
|
||
通过消息队列的消费方式分为两种:
|
||
- pull: 拉取
|
||
- push: 推送
|
||
|
||
kafka采用的是拉取模式来进行消费,因为拉取模式可以很好的兼容不同消费者实例的消费速率。各个消费者可以根据自己消费消息的速度来选择拉取消息的速度,如此能避免消息在多个消费者实例间的不合适分配,造成有的消费者实例空闲,而有的消费者实例消息堆积的情况。
|
||
|
||
##### kafka消费者工作流程
|
||
在kafka架构中,一个分区只能够被同一消费者组中的一个消费者实例进行消费,但是消费者组中的一个消费者实例能够对多个分区进行消费。
|
||
|
||
并且,任一分区中的消息都会被广播到所有订阅该topic的消费者组中,但是一条消息只能够被消费者组中的一个消费者实例进行消费。
|
||
|
||
> #### offset
|
||
> kafka中一个一个分区能够被多个实例进行消费(被多个位于不同消费者组中的消费者进行消费),kafka通过`offset`来记录每个消费者消费到分区的偏移量。
|
||
>
|
||
> offset被存储在kafka的一个主题中(_consumer_offsets),用于持久化offset数据。
|
||
|
||
##### 消费者组
|
||
消费者组是由若干个消费者实例组成的集合,位于同一消费者组中的消费者实例,其groupid都相同。
|
||
|
||
在同一消费者组中,一条消息只能由一个消费者实例进行消费。消费者组中的消费者实例负责消费同一topic中的不同分区,各实例负责的分区没有重叠部分。
|
||
|
||
> 当消费者组中的消费者实例大于分区数时,消费者组中将存在闲置的消费者实例,同一分区无法由同一消费者组中的多个消费者实例共同消费。
|
||
>
|
||
|
||
> ##### 消费者组初始化流程
|
||
> 在消费者组中,通过coordinator来协助消费者组实现初始化和分区分配操作,kafka集群中的每个broker实例都有一个对应的coordinator。
|
||
>
|
||
> 消费者组在选择通过哪个coordinator实例来协助进行初始化时,会将groupid根据_consumer_offset主题的分区数量进行取模(默认_consumer_offset主题默认有50个分区,故而默认为groupid%50),根据取模后的结果查找_consumer_offsets的第N号分区位于哪台broker上,该broker的coordinator即会协助消费者组进行初始化。(例如`groupid%50=4`,即会选取_consumer_offsets第四号分区所在的broker来作为协助初始化的broker,该broker上的coordinator会协助消费者组进行初始化。
|
||
>
|
||
> 在消费者组初始化时,组内每个消费者实例都会向选中的coordinator发送消息,请求加入消费者组,而coordinator则会选中其中一个消费者实例作为该消费者组的leader。
|
||
>
|
||
> 选中leader分区之后,coordinator会把broker集群中待消费的topic信息全部发送给leader消费者,leader消费者则是会指定分区分配计划,将topic分区在组内的多个消费者实例之间进行分配。
|
||
>
|
||
> 在leader指定完分区分配计划之后,会将分配计划发送给coordinator,coordinator收到分配方案后则会将分配方案分发给消费者组内的各个消费者实例。
|
||
|
||
> ##### 消费者组的心跳机制
|
||
> 消费者组中的每个消费者都会和coordinator维持长连接,消费者会向coordinator发送心跳包(心跳包默认时间为3s)。
|
||
>
|
||
> 一旦超过指定时间(`session.timeout.ms`,默认为45s)消费者没有向coordinator发送心跳包,那么该消费者将会被从消费者组中移除,并且会触发分区的再平衡,该宕机消费者实例负责的分区将会被分配给同组中别的消费者实例。
|
||
>
|
||
> 如果消费者实例处理消息的时间过长(`max.poll.interval.ms`,默认为5min),自从上一次拉取分区消息时起超过5min还未再次拉取数据,也会触发再平衡。
|
||
|
||
##### 消费者实例拉取数据流程
|
||
消费者实例在拉取消息时,首先会向broker发送一个fetch请求,从broker处批量拉取数据。其中,fetch过程能够通过如下参数自定义:
|
||
- `fetch.min.bytes`:单个fetch请求中,broker server返回的最小消息大小。如果消费者向broker发送fetch请求时,可获取的消息没有达到该参数限制的大小,那么broker会等待消息累积,直到累积消息达到`fetch.min.bytes`指定的大小,再将消息批量返回给消费者。`默认情况下,该参数的值为1`,代表fetch请求时消息会被立刻返回。增大该值会批量累积消息提升效率,但是也会增加fetch请求的延迟。
|
||
- `fetch.max.bytes`:单个fetch请求中,broker server返回消息最大数据量大小。fetch操作请求数据时,数据将会被broker批量返回。如果如果分区中待返回第一条record的大小大于`fetch.max.bytes`指定的大小,那么该消息大小也会被返回,即使消息大小大于限制。`默认情况下,该参数的默认值为50m`。
|
||
- `fetch.max.wait.ms`:在单个请求中,如果broker没有累积到`fetch.max.bytes`指定规模的消息数量,当broker阻塞时间超过`fetch.max.wait.ms`指定的超时时间后,也会将消息返回给消费者。`默认情况下,该参数默认值为500ms`。
|
||
|
||
在消费者通过fetch请求拉取到数据后,会将拉取的records缓存起来,然后在调用poll方法时返回缓存的record数据。
|
||
- `max.poll.records`:该参数用于限制poll()调用返回的最大消息条数。`该参数默认值为500`,调用poll方法时,最多返回500条缓存数据。`max.poll.records`参数用于指定poll行为,并不会对fetch行为造成影响。
|
||
|
||
##### 消费者api
|
||
1. `public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)`:
|
||
|
||
消费者调用该方法时会订阅给定的主题,并且会动态分配给该消费者分区。**主题的订阅并不是增量的,本次调用subscribe方法将会覆盖之前该消费者被分配的分区**。
|
||
|
||
如果给定topics为空,那么其效果相当于调用`unsubscribe()`。
|
||
|
||
> #### rebalance
|
||
> 作为group management的一部分,组中的消费者实例会跟踪属于同一消费者组中的其他实例,如果发生了如下事件则是会触发rebalance(再平衡):
|
||
> - 如果消费者组订阅的任一topic其分区数发生变化
|
||
> - 一个订阅的topic被创建或是删除
|
||
> - 消费者组中的一个实例发生宕机
|
||
> - 一个新的消费者实例加入到消费者组
|
||
> - group中成员单位订阅的topic发生变化(新增订阅或取消订阅)
|
||
>
|
||
> 当上述任一事件发生时,都会触发rebalance操作,此时,提供给subscribe接口的listener将会被调用,代表该消费者实例对应的分区分配被取消了。**并且,在接收到新的分区分配方案时,该listener接口会再次被调用**。
|
||
>
|
||
> rebalance只会在调用poll(Duration)接口时被触发,故而callback也只会在调用poll的时间内被触发
|
||
|
||
|
||
2. `public void assign(Collection<TopicPartition> partitions)`:
|
||
|
||
手动将分区集合分配给消费者。该接口调用同样**不是增量的**。该接口调用会覆盖之前消费者被分配的分区。
|
||
|
||
如果给定partitions为空,那么其效果相当于调用`unsubscribe()`。
|
||
|
||
通过该方法进行手动主题分区分配并不会使用消费者组的管理功能,故而,**若当消费者组成员发生变化,或broker集群或主题元数据发生变化时,消费者组rebalance操作也不会被触发。**
|
||
|
||
如果开启了自动提交,那么在新的assignment代替旧assignment之前,async commit会被触发,async commit基于旧的assignment。
|
||
|
||
3. `public ConsumerRecords<K, V> poll(final Duration timeout)`:
|
||
|
||
|
||
从被分配的分区中拉取数据,如果在调用poll方法之前并没有被分配任何主题或分区,那么会抛异常。
|
||
|
||
每次拉取数据时,消费者都会使用`last consumed offset`作为拉取数据的开始偏移量,并按顺序再拉取数据。`last consumed offset`可以通过` seek(TopicPartition, long)`方法手动设置,`last consumed offset`也会被自动设置为订阅分区列表中的最后一次提交偏移量。
|
||
|
||
在存在可获取的记录时,该方法会立刻返回,否则其会等待timeout指定的时间,在超过该时间限制后,会返回一个空的record集合。
|
||
|
||
> poll方法调用在没有可用的消息集合时,会发送fetch请求从broker拉取数据
|
||
|
||
##### 分区分配和再平衡
|
||
一个消费者组中含有多个消费者实例,而一个topic会包含多个分区,需要将topic中的分区在消费者组中的多个消费者实例之间进行分配。
|
||
|
||
kafka中包含如下分区策略:
|
||
- range
|
||
- RoundRobin
|
||
- Sticky
|
||
- CooperativeSticky
|
||
|
||
kafka中的分区策略通过`partition.assignment.strategy`参数来进行配置,kafka可以通过使用多个分区分配策略,多个策略会进行叠加。默认情况下,kafka采用`Range + CooperativeSticky`的策略来进行分区分配。
|
||
|
||
> #### Range策略及再平衡
|
||
> range策略会针对每个topic进行分配。如果一个topic中含有n个分区,消费者组中含有m个消费者实例,那么每个消费者实例将会分配到n/m(整除)个分区,多出来的n%m个分区将会被分配给实例名称靠前的n%m个实例。
|
||
>
|
||
> range策略将会有如下弊端,如果每个topic都无法对消费者实例整除,那么剩余的分区都会被分配给排序靠前的消费者实例,这样会造成分区在消费者实例之间分配的不平衡,排序靠前的消费者实例会负担更多的分区
|
||
>
|
||
> 在使用range策略时,如果某台消费者实例宕机,那么在宕机超过限定时间(45s内没有向broker coordinator发送心跳包),会将topic分区重新在消费组中的剩余实例之间进行再分配(完全重新生成分配方案,排序靠前的消费者实例在分区数量无法整除实例个数时,负担更多的分区消费)
|
||
|
||
> #### RoundRobin
|
||
> RoundRobin策略针对所有消费者组订阅的所有topic中的分区,会将所有分区根据hashcode进行排序,并且按轮询的顺序分配给消费者实例。例如,第一个分区分配给第一个消费者实例,第二个分区分配给第二个实例...第m+1个分区再次分配给第一个实例(实例总数为m)。RoundRobin将所有分区在所有消费者实例之间进行了平衡,而不像range一样只是针对单个topic中的分区。
|
||
>
|
||
> 在使用RoundRobin策略时,如果消费者中某一实例宕机,超过超时时间后,分区同样会在剩余的消费者实例之间进行重新分配。
|
||
|
||
> #### Sticky
|
||
> Sticky策略同样是针对所有topic的分区,类似于RoundRobin。但是,不同于RoundRobin的轮询,Sticky在分配策略时会遵循如下原则:
|
||
> - Sticky策略会尽可能均匀的在不同消费者实例之间分配分区
|
||
> - 如果因为某些原因(例如实例宕机)导致需要进行分区的重新分配,Sticky会尽可能的保证存活实例上已经产生的分区分配不被改变,并再次基础上令所有分区在实例间的分配尽量均衡
|
||
>
|
||
> 在使用Sticky策略时,即使某台实例宕机,再平衡后存活实例被分配的分区仍然不会变,只是会将宕机实例负责的分区在存活实例之间尽可能均衡的分配
|
||
|
||
##### offset
|
||
在kafka集群中,会保存各个分区的消费情况,将分区针对每个消费者组的偏移量存储在__consumer_offsets主题中。默认情况下,__consumer_offsets采用key/value的形式来存储数据,key为`groupid+topic+分区号`,value则是当前offset的值。
|
||
|
||
每个一段时间,kafka就会对该topic进行压缩。
|
||
|
||
> #### kafka offset自动提交
|
||
> kafka默认开启了自动提交功能,在使用kafka时可以专注消费的业务逻辑
|
||
>
|
||
> 自动提交相关参数如下:
|
||
> - `enable.auto.commit`:自动提交是否开启,该参数默认值为true
|
||
> - `auto.commit.interval.ms`:自动提交默认的间隔时间为5s
|
||
>
|
||
> 在开启自动提交时,每次消费者调用poll接口时,都会检查是否距离上次提交的时间间隔已超过5s,若超过则执行自动提交逻辑。
|
||
>
|
||
> 在自动提交场景下,可能会造成消息的重复消费,如果自动提交的间隔为10s,在上次提交完成后,过了6s,消费完100条消息,但是此时消费者宕机了,导致消费的100条消息没有提交offset;此时该宕机消费者负责的分区将会被分配给消费者组中的其他消费者,其他消费者消费时仍然会从最后一次提交的offset开始消费,导致100条消息会被重复消费。
|
||
|
||
> #### kafka手动提交
|
||
> kafka可以选择设置手动提交,只用把`enable.auto.commit`关闭。kafka手动提交分为如下两种场景:
|
||
> - commitSync:同步提交。当关闭自动提交时,可以调用`commitSync`接口来进行手动提交。手动提交的offset为最后一次调用poll方法返回的offset。该方法通常在消费完所有poll返回的消息后再调用,否则若commit后消息还没有消费完,消费者宕机,则会导致消息丢失,有些消费不会被消费。<br>
|
||
> 在同步提交的场景下,调用commitSync后,线程会一直阻塞,直到接收到kafka server返回的ack,ack代表broker对commit offsets的确认
|
||
> - commitAsync:由于同步提交会造成阻塞,一直等待broker返回ack,故而会影响消息消费的吞吐量。可以通过异步提交来解决吞吐量问题,但是,`异步提交可能会造成更多消息被重复消费`。
|
||
|
||
> #### kafka手动提交、自动提交的优劣
|
||
> - kafka自动提交可能会造成消息的丢失,自动提交默认间隔为5s,如果在上次poll后的5s内消息并未被消费完成,那么在kafka自动提交后,即使尚未被消费的消息后续未被消费,那么kafka也会将其视为已消费,从而造成消息丢失
|
||
>
|
||
> - kafka commitSync可手动同步提交offset,但是在调用commitSync接口后,会等待broker返回确认信息,在此之前消费者会一直阻塞。这样会影响消费者的吞吐量,故而,为了提高吞吐量,可以尽量减少commitSync的提交次数
|
||
>
|
||
> - kafka commitAsync,相比于同步提交,commitAsync在调用后并不会阻塞,而是直接返回,此后可以继续调用poll来继续从broker拉取后续消息。但是,相比同步提交,commitAsync可能在发生rebalance时造成重复消费的情况。
|
||
>
|
||
> 在使用异步提交时,如果在发生rebalance之前(rebalance只能发生在poll过程中),commitAsync提交失败,由于commitAsync不会失败重试,故而在分区重新分配后,新分配到该分区的消费者实例将会重新消费之前未提交成功的消息,因此产生了消息的重复消费)。
|
||
>
|
||
> 而同步提交时,commitSync在提交失败后会无限次重试,直到提交成功,故而在发生rebalance时(rebalance只能发生在poll的过程中),在发生rebalance之前,可以保证之前commitSync操作已经成功。
|
||
>
|
||
> #### kafka commitSync
|
||
> kafka commitSync方法调用可以指定一个超时时间,在超过该超时事件后,会抛出TimoutException。如果调用该api时没有指定超时时间,会默认使用`default.api.timeout.ms`来作为超时时间,`default.api.timeout.ms`的默认值为1min。
|
||
>
|
||
> **对于kafka commitSync方法,在超时前,都会一直对提交进行重试,直至提交成功或是发生不可恢复(unrecoverable)的异常。**
|
||
|
||
> #### kafka的手动提交重试机制
|
||
> 针对kafka的手动提交,当使用`commitSync`进行同步提交时,如果提交失败,同步提交会无限次的进行重试,直到提交成功或是发生了不可恢复的异常。
|
||
>
|
||
> 但是,在使用`commitAsync`方法进行提交时,kafka消费者在提交失败之后则不会进行重试。在处理kafka commitAsync重试问题时,还需要考虑commit order。当消费者进行异步提交时,如果发现当前batch提交失败,此时可能位于当前batch之后的batch已经处理完成并进行提交(commitAsync并不会等待当前batch提交成功之后再拉取下一批,而是直接拉取下一批继续处理,故而下一批batch可能提交早于当前batch)。故而,如果对当前异常的batch进行重试提交,可能会之后批次的commit offset被覆盖,从而造成消息的重复消费。
|
||
>
|
||
> ##### commitAsync接口提交失败后并不会抛出异常,也不会重试
|
||
|
||
##### rebalance
|
||
rebalance通常有两个阶段,`revocation`和`assignment`,即撤销当前消费者被分配的分区和重新分配给消费者新的分区。revocation方法会在rebalance之前被调用,且revocation是rebalance之前消费者最后一次提交offset的机会,可以重写revocation方法,在rebalance发生之前对offset进行同步提交。
|
||
|
||
而assignment则是发生在rebalance之后,可以重写assignment方法来初始化各分区的offset。
|
||
|
||
通常情况下,commitAsync相较commitSync是更不安全的,在宕机之前提交失败将会造成消息的重复消费。可以通过在回调中使用commitSync来减轻消息的重复消费风险。
|
||
|
||
##### kafka消费起始offset
|
||
通过`auto.offset.reset`属性,可以配置当kafka broker中没有存储分区与特定消费者组offset关系时,消费者消费的行为,其可配置值如下:
|
||
- earliest:从分区最开始的位置进行消费
|
||
- latest:默认值为latest,当offset不存在时,从最新的offset开始消费
|
||
- none:当消费者组针对该分区没有找到offset记录时,抛出异常
|
||
|
||
默认情况下,`auto.offset.reset`值为latest,故而当消费者组新订阅一个topic时,并不会从头开始消费分区中的历史消息,而是从分区最新offset开始,消费后续分区接收到的消息。
|
||
|
||
> ##### 自定义分区起始消费offset
|
||
> 除了配置上述属性指定消费起始位置外,还可以通过`KafkaConsumer#seek`接口来指定起始消费分区的offset位置。
|
||
|
||
> #### 消费指定时间开始的消息
|
||
> 如果在消费消息时,想要消费从指定时刻之后的消息,可以通过`kafkaConsumer#offsetsForTimes`接口,能根据传入的分区和时间戳来得到该分区下指定时刻消息的起始offset,返回offset为时间戳大于或等于指定时间戳的第一条消息对应offset
|
||
|
||
## kafka exactly-once语义
|
||
在一个基于发布/订阅的消息系统中,组成该消息系统的节点可能会发生故障。在kafka中,broker可能会宕机,在生产者向topic发送消息时也可能发生网络故障。基于生产者处理这些失败场景的方式,可能会有如下三种语义:
|
||
- At-least-once:如果生产者在向broker发送消息时,接收到broker返回的ack(调用同步接口发送消息),并且生产者配置`acks=all`(会等待消息写入到所有isr队列中),这代表该消息已经被写入到消息系统中。但如果生产者向broker发送消息后,消息已经被写入到broker集群中,但是broker集群尚未向生产者返回ack,此时broker leader宕机,那么生产者将不会收到ack。此时生产者会重试发送消息,这将会导致消息在broker集群中存在多条,同一消息也会被消费者消费多次。
|
||
- At-most-once semantics:如果生产者在向broker发送消息时,若出现等待ack超时或发生异常情况后,不进行重试发送,那么消息将最多被发送给broker一次,此时不会出现消息被重复发送的情况。
|
||
- exactly-once:即使生产者多次发送相同的消息到broker集群,也只会有一条消息被传递给消费者。exactly-once语义要求生产者、broker、消费者的协同才能实现。
|
||
|
||
### apache kafka idempotence
|
||
kafka生产者支持幂等性操作,即使相同的消息被多次发送给broker,broker也只会向分区中写入一条消息。除此之外,开启生产者的`enable.idempotence=true`属性,还代表生产者发送到同一分区的消息是顺序的。
|
||
|
||
生产者实现幂等性的原理和tcp类似,每次被批量发送给broker集群的消息都含有一个序列号,broker将会使用序列号进行去重操作。但是,和tcp不同的是,该序列号会持久化到replicate log中,故而即使broker leader宕机,重新成为leader的broker也能知道是否接受到的消息是重复发送。
|
||
|
||
并且,kafka生产者开启幂等性带来的开销很小,仅仅为批量提交的消息附加的序列号。
|
||
|
||
### 事务-在多个分区之间进行原子写入
|
||
通过事务api,kafka支持在多个分区之间原子的进行写入。事务特性支持生产者在向broker集群多个分区批量发送消息时,要么发送的所有消息都能被消费者可见,要么发送的消息对消费者都不可见。
|
||
|
||
事务特性允许在事务内对消息进行消费、处理、提交消息处理生成的结果的同时,在同一事务中对消息消费的offset进行提交。通过事务特性,在消息需要消费->处理->提交生成结果的场景下,能够支持exactly-once。
|
||
|
||
#### transactional message & non-transactional message
|
||
分区中存储的消费,可以分为事务消息和非事务消息。事务消息,又可以分为进行中事务的消息,事务提交成功的消息,事务提交失败的消息。
|
||
|
||
消费者在消费消息时,可以设置`isolation.level`:
|
||
- read_committed:在设置该隔离级别后,消费者只能消费非事务消息或事务提交成功后的消息。`poll`接口在该隔离级别时,会返回LSO之前所有的消息(LSO值第一个活跃事务对应消息的offset - 1)。任何offset位于LSO之后的消息都将会被broker保留,直到活跃的事务被提交。故而,在活跃事务提交前,消费者无法读取到LSO之后的消息。
|
||
- read_uncommitted:设置为该隔离级别之后,消费者可以读取到所有消息,即使该消息关联的事务尚未被提交,**或消息关联事务已经被废弃(aborted)**。
|
||
|
||
默认情况下,`isolation.level`的值为read_uncommitted。
|
||
|
||
|