167 lines
16 KiB
Markdown
167 lines
16 KiB
Markdown
# Kafka
|
||
## 简介
|
||
### 消息队列应用场景
|
||
缓存/削峰、解耦、异步通信
|
||
### 缓存/削峰
|
||
当生产端生产数据的速率大于消费端消费数据的速率时,消息队列可用于对消费不完的数据进行缓存
|
||
### 解耦
|
||
当数据生产方来源存在多个(例如数据库、网络接口等),且消费方也存在多种(Hadoop大数据平台、spring boot服务实例等)时,可以将消息队列中存储的消息作为数据从生产端到消费端的中间格式。
|
||
|
||
生产端负责将产生的数据转化成特定格式的消息发送到消息队列,而消费端负责消费消息队列中的消息。不同种类的生产端和消费端只用针对消息的队列中的消息各自进行适配即可。
|
||
|
||
### 异步通信
|
||
通过消息队列,可以实现多个服务实例之间的异步调用,服务调用方在将调用信息封装到消息并发送消息到mq后,即可返回,并不需要同步等待。被调用方可以异步的从mq中获取消息并进行消费。
|
||
|
||
### 消息队列模式
|
||
#### 点对点
|
||
一个消息队列可以对应多个生产者和多个消费者,**但消息只能被一个消费者进行消费**,消费者将数据拉取并消费后,向mq发送确认,mq中将被消费的消息删除
|
||
#### 发布订阅模式
|
||
**生产者产生的数据可以被多个消费者消费**,生产者将消息发送到topic,订阅该topic的消费者会拉取消息进行消费,消费完成后并不会删除该消息,该消息仍可被其他订阅该topic的消费者进行消费。(消息并不会永久保存在消息队列中,通常会设置超期时间,消息保存超过该时间后自动删除)
|
||
|
||
### kafka架构
|
||
#### 分区存储
|
||
kafka将一个topic分为了多个分区,用于分布式存储数据。而每个分区都跨多台服务器实例进行存储,从而增加容错性。对特定分区来说,其存在于多台服务器实例,其中一台为主机,其他服务器为从机,**主机会处理对该分区数据所有的读写请求**(由于读写操作全都在主机上发生,而每台服务器都担当了某些分区的主机和其他分区的从机,故而读写请求在不同的服务器之间进行了负载均衡),而从机只会被动的复制主机修改。
|
||
#### group消费
|
||
每个消费者实例都属于一个consume group,一个consume group则是可以保存多个消费者实例。对于发送到kafka mq topic的每一条消息,都会被广播给订阅了该topic的每个consume group。而consume group接收到消息后,只会将消息发送给group中的一个消费者实例来进行处理,故而在consume group中,实现了消息在不同消费实例之间的负载均衡。
|
||
> 在kafka mq中,topic的实际订阅者是consume group,kafka mq会将topic中的消息广播给所有订阅了该topic的consume group,而每条消息都只会被consume group中的一个消费者实例进行消费
|
||
>
|
||
> 在一个consume group中,每个消费者实例都负责某一topic中相应的分区,每个特定分区只有一个消费者实例负责。
|
||
|
||
#### zookeeper
|
||
zookeeper作为注册中心,用于记录存在多个kafka实例时,当前已上线且状态正常的kafka实例,以及各个分区leader实例的信息
|
||
|
||
#### 生产者
|
||
##### 分区
|
||
生产者客户端将会决定将消息发送到某个分区,可以通过负载均衡随机决定将消息发送到哪个分区,也可以提供一个key并通过算法决定将消息发送到哪个分区。
|
||
|
||
决定将消息发送到哪个分区后,生产者会直接将消息发送到该分区对应的leader broker中。
|
||
|
||
##### 批量发送
|
||
生产者并不会每次产生消息后都立即将消息发送到broker,而是会累积消息,**直到消息数据积累到特定大小(默认为16K)后**,才会将消息发送给broker。
|
||
> **消息发送条件**
|
||
>
|
||
> - batch.size: 当消息累计到特定大小(默认为16K)后,发送给broker。如果消息的大小大于`batch.size`,那么并不会对消息做累积操作。**发送到broker的请求将会包含多个batch,每个batch对应一个分区,不同分区的消息通过不同的batch进行发送**
|
||
> - linger.ms: 生产者将会把发送给broker的两个请求之间的消息都累积到一个batch里,该batch将会在下次发送请求给broker时发送。但是,对特定分区累积大小如果没有达到`batch.size`的限制,哪个通过linger.ms来控制该消息延迟发送的最长时间。`linger.ms`单位为ms,**其默认值为0,代表即使消息累积没有达到`batch.size`,也会立马发送给broker**。若`linger.ms`设置为1000,则代表没有累积到`batch.size`大小的消息也会在延迟1s后发送给broker
|
||
|
||
##### ack级别
|
||
生产者将消息发送到broker后,broker对生产者有三种应答级别:
|
||
- 0:生产者发送数据后无需等待broker返回ack应答
|
||
- 1:生产者发送消息到broker后,leader broker将消息持久化后向生产者返回ack
|
||
- -1:生产者发送过来的数据,leader broker和isr队列中所有的broker都持久化完数据后,返回ack
|
||
|
||
##### 异步发送
|
||
kafka生产者异步发送是指生产者调用kafka客户端接口将发送的消息传递给kafka的客户端,此时消息并未发送到broker,而异步调用接口即返回。调用异步接口可以通过指定回调来获取消息传递到的topic、分区等信息。
|
||
|
||
##### 同步发送
|
||
除了异步调用接口外,还可以调用同步调用的接口来发送消息。在调用异步接口发送消息时,仅仅将消息放到缓冲区后调用就返回,可能放到缓冲区的消息并没有发送到broker。可以通过调用同步发送消息的接口来发送消息,其会阻塞并等待broker将消息持久化后返回的异常或者ack应答。
|
||
|
||
想要同步发送消息,只需要对异步接口返回的future对象调用`.get`即可,其会等待future对象完成。
|
||
|
||
##### 生产者发送消息的分区策略
|
||
在生产者发送消息时,需要决定将消息发送到哪个分区,决定策略如下所示:
|
||
|
||
1. 如果发送消息时指定了要发送到哪个分区,那么发送到哪个分区
|
||
2. 如果发送消息时没有指定分区,但是指定了消息的key,那么对key进行散列,根据key散列后的值决定发送到哪个分区
|
||
3. 如果发送消息时没有指定发送到哪个分区,也没有指定key,那么根据黏性策略发送消息
|
||
> 黏性分区策略
|
||
>
|
||
> 黏性分区策略会随机选择一个分区进行发送,并尽可能的一直使用该分区,直到该分区的batch size已满,此时kafka会随机再选择另一个分区进行发送
|
||
|
||
除了上述分区策略外,kafka支持自定义分区策略。
|
||
|
||
##### kafka消息传递语义
|
||
1. 至少一次:acks=-1情况下,消息能被确保传递不丢失,但是可能会存在消息重复传递的问题
|
||
2. 至多一次:acks=0的情况下,kafka能保证消息最多只传递一次的问题,但是无法保证消息会发送到broker,可能存在消息丢失问题
|
||
3. 精确一次:数据既不会重复也不会丢失
|
||
|
||
为了保证精确一次,kafka引入了两个特性:`幂等性`和`事务`
|
||
|
||
> 幂等性
|
||
>
|
||
> 生产者在初始化时会被分配一个producer id,并且生产者在发送消息时,针对每条消息都存在序列号。生产者每次发送消息时,都会附带pid(生产者id)和消息的序列号。序列号是针对topic分区的,生产者向分区发送消息时,针对每个分区都维护了一套序列号。当消息被发送给broker后,其会比较当前分区来源PID和消息PID相同,并且已经被ack的最大序列号,如果当前消息的序列号小于已经ack的序列号,说明该消息已经发送过,属于重复消息,被丢弃。
|
||
>
|
||
> **由于每个新的生产者实例都会被分配一个新的producer id,那么只能在单个生产者会话中保证幂等。**
|
||
>
|
||
> 生产者可以配置`enable.idempotence`属性用于控制幂等是否开启,默认情况下该属性值为true,默认开启幂等性,生产者会保证对每条消息只有一个副本被写入
|
||
|
||
> 事务
|
||
> #### 概念
|
||
> kafka事务能够保证应用能够原子的向多个topic分区中写入数据,所有写入操作要么同时成功,要么同时失败。
|
||
>
|
||
> 另外,消费过程可以看作是对offset topic的写操作,那么kafka事务允许将消息的消费和消息的生成包含在一个原子的单元中。
|
||
>
|
||
> #### 保证
|
||
> 为了实现kafka事务,需要应用提供一个全局唯一的transactionId,该transactionId会稳定贯穿所有该应用的会话。当提供了transactionId后,kafka会做出如下保证:
|
||
>
|
||
> - 只有一个活跃的生产者具有指定transactionId。为了实现该保证,当具有相同transactionId的新生产者实例上线后,会隔离旧的实例
|
||
> - 在应用会话之间将会进行事务的恢复。如果应用实例宕机,那么那么下一个实例恢复工作前将会有一个干净的状态,任何尚未完成的事务都会处于完成状态(提交或异常退出)
|
||
>
|
||
> #### 事务协调器
|
||
> 每个生产者实例都被分配了一个事务协调器,分配producer id、管理事务等逻辑都由事务协调器负责
|
||
> #### Transaction Log
|
||
> Transaction Log是一个内部的kafka topic,类似于consumer offset topic,transaction log是一条持久化和主从复制的记录,记录了每个事务的状态。transaction log是对事务协调器的状态存储,最新版本log快照封装了当前每个活跃事务的状态。
|
||
> #### 控制消息
|
||
> 控制消息是写入用户topic中的特殊消息,由客户端进行处理,但是不暴露给用户。例如,其被用于broker告知消费者其先前拉取的消息是否已经被原子的提交。
|
||
> #### transactionId
|
||
> transactionlId用于唯一标识producer,并且transactionId是持久化的(producer id在生产者实例重启后会重新获取)。如果不同生产者实例具有相同的transactionId,那么会恢复或终止上一生产者实例所初始化的任何事务
|
||
>
|
||
> #### 数据流
|
||
> ##### 发现事务协调器
|
||
> 由于事务协调器是分配PID和管理事务的中心,首先生产者会发送请求给任一broker来询问其所属的事务协调器
|
||
> ##### 获取PID
|
||
> 查询到其所属事务协调器后,会获取producer id。如果transactionId已经被指定,那么在获取PID的请求中会包含transactionId,并且transactionId和PID的关联将会被记录到transaction log中,以便于之后如果上线具有相同transactionId的新实例,能够根据transactionId该映射返回旧实例的PID。
|
||
>
|
||
> 在上线新实例,返回具有相同transactionId的旧实例后,会执行如下操作:
|
||
>
|
||
> 1. 隔离旧实例,旧实例所属的事务无法再继续推进
|
||
> 2. 针对旧生产者实例所属的尚未完成的事务进行恢复操作
|
||
>
|
||
> 如果请求PID时transactionId尚未被指定,那么PID将会被分配,但是生产者实例只能在单个会话中使用幂等语义和事务语义
|
||
>
|
||
> #### 开启事务
|
||
> 可以通过beginTransaction()接口调用来开启事务,生产者会将本地状态改为事务已开启,但是事务协调器直到发送了第一条记录时才会将事务状态标记为已开启
|
||
> #### 消费-处理-生产
|
||
> 开启事务之后,生产者可以对消息进行消费和处理,并且产生新的消息,
|
||
> ##### AddPartitionsToTxnRequest
|
||
> 当某个topic分区第一次作为事务的一部分被执行写操作时,生产者将会发送AddPartitionsToTxnRequest请求到事务协调器。将该topic分区添加到事务的操作将会被事务协调器记录。可以通过该记录信息来对事务中的分区进行提交或回滚操作。如果第一个分区被添加到事务中,事务协调器将会开启事务计时器。
|
||
> ##### ProduceRequest
|
||
> 生产者会向topic分区发送多个ProduceRequest来写入消息,请求中包含PID、epoch、序列号。
|
||
>
|
||
> #### 提交或回滚事务
|
||
> 一旦消息被写入,用户必须调用 commitTransaction或abortTransaction方法来对事务进行提交或者回滚。
|
||
>
|
||
|
||
##### 数据乱序
|
||
kafka如果想要保证消息在分区内是有序的,那么需要开启幂等性(默认开启)并且`max.inflight.requests.per.connection`需要设置小于或等于5(默认情况下该值为5),在不开启幂等性的情况下如果想要保证分区内数据有序需要设置`max.inflight.requests.per.connection`的值为1.
|
||
|
||
> `max.inflight.requests.per.connect`
|
||
>
|
||
> 该值用于设置客户端向broker发送数据时,在阻塞前每个连接能发送的未收到ack的请求数目。如果该值被设置为大于1并且幂等性未开启,那么在消息发送重试时可能会导致消息的乱序,此时需要开启幂等性或者关闭消息重试机制才能重新保证幂等性。
|
||
>
|
||
> 并且,启用幂等性需要该值小于或等于5,如果该值大于5,且幂等性开启
|
||
> - 如果幂等性未显式指定开启,那么在检测到冲突设置后,幂等性会关闭
|
||
> - 如果显式指定幂等性开启并且该值大于5,那么会抛出ConfigException异常
|
||
|
||
#### broker
|
||
##### broker选举
|
||
每个broker节点中都存在controller模块,只有ISR中的节点才能够参与选举(acks=all时,也只需要ISR队列中所有节点都同步消息,才将消息视为已提交)。controller会通过watch来监听zookeeper中节点信息的变化,如果某个分区的leader broker宕机,那么controller模块在监听到变化后会重新开始选举,在ISR中重新选出一个节点作为leader。
|
||
|
||
##### partition reassign
|
||
如果kafka集群在运行时新增了新的节点,**此时节点中旧的topic分区并不会自动同步到新增节点中,如果要将旧topic分区存储到新增broker节点,可以调用重新分区的命令。**
|
||
|
||
可以通过`kafka-reassign-partitions.sh --generate`命令对分区在所有节点(包含新节点)之间进行重新分配,此命令调用后会生成一个topic分区在节点间重新分配的计划。
|
||
|
||
在确认了生成的重新分配计划后,可以调用`kafka-reassign-partitions.sh --execute`命令来执行新生成的重新分配计划。
|
||
|
||
如果想要在kafka集群中退役一台broker server,不能直接关闭该broker节点,也需要跟新增broker节点类似,重新生成一个分区分配计划,分配计划中移除待下线节点,然后执行该分配计划。执行分配计划后,该节点上便不再存储分区,此后该节点并可以安全的关闭。
|
||
|
||
##### broker宕机对分区造成的影响
|
||
broker宕机并不会导致分区的重新分配,例如一个分区的replica-factor为2,存在broker-1,broker-2,broker-3,分区存储在(1,3)服务器上,leader为1号服务器。如果broker-1宕机,那么分区的leader会自动切换为broker-3,但是分区存在的副本数量从2个变成了一个,ISR中也只有(3),此时并不会在未存储该分区的broker-2节点上新增一个副本。
|
||
|
||
如果宕机的broker-1重新再上线,那么broker-1会重新存储之前负责存储的分区,但是此时分区对应的leader节点仍然是broker-2,此时broker-2作为分区的leader会导致读写操作全都转移到broker-2节点,负载均衡会造成偏移。
|
||
|
||
> `auto.leader.rebalance.enable`
|
||
>
|
||
> 该属性默认设置为true,一个后台线程会定期(时间间隔为`leader.imbalance.check.interval.seconds`,默认为300s)检测分区leader是否为默认的perferred leader。如果分区leader不为preferred leader的数量超过一定的比率(` leader.imbalance.per.broker.percentage`,默认为10%),会触发将分区leader改为默认preferred leader的操作。
|
||
|