diff --git a/mq/kafka/kafka-尚硅谷.md b/mq/kafka/kafka-尚硅谷.md index fc2988a..040dc5e 100644 --- a/mq/kafka/kafka-尚硅谷.md +++ b/mq/kafka/kafka-尚硅谷.md @@ -164,3 +164,86 @@ broker宕机并不会导致分区的重新分配,例如一个分区的replica- > > 该属性默认设置为true,一个后台线程会定期(时间间隔为`leader.imbalance.check.interval.seconds`,默认为300s)检测分区leader是否为默认的perferred leader。如果分区leader不为preferred leader的数量超过一定的比率(` leader.imbalance.per.broker.percentage`,默认为10%),会触发将分区leader改为默认preferred leader的操作。 +##### broker AR & OSR +kafka中副本的默认数量为1个,通常生产环境将其配置为2个或2个以上,以便在leader副本宕机后follower副本能够继续服务,避免数据丢失。 + +- AR: AR代表集群中分区对应的所有副本集合,all-replicas +- ISR: 代表集群中处于同步状态的副本集合,in-sync-replicas 如果ISR中的节点长期未从leader中同步数据,会被剔除出ISR,最长未同步的时间由`replica.lag.time.max.ms`控制,默认为30s。如果leader对应的broker宕机,那么新leader将会从ISR中选举产生 +- OSR:代表集群中不处于同步状态的副本集合,为AR - ISR + +##### leader选举机制 +leader选举根据AR中节点的排序来决定,能够成为leader的节点需要再ISR中存活,然后ISR中存活节点在AR中排序最靠前的将会被选举为leader。如果leader发生宕机,那么由ISR中存活且AR中排序最靠前的的节点成为新leader。 + +> 选举机制中leader决定是按照AR中broker节点的顺序进行决定的,每一个分区都有一个默认的preferred leader。如果某些节点宕机后再恢复,ISR中节点的顺序将会发生变化,但是AR中的节点顺序并不变,并且preferred leader也不会发生变化。 + +##### follower故障恢复细节 +follower故障恢复细节中,涉及到如下两个概念: +- LEO(Log End Offset):每个副本结束offset的下一个位置,类似于java中的List.size() +- HW(High WaterMark):当前ISR队列中所有副本最小的HOW(水位线,类似于木桶效应中最短的那一根木板) + +如果follower发生故障,那么会按顺序发生如下: +1. 故障follower会被剔除出ISR +2. follower故障期间ISR中剩余的follower节点和leader节点会继续接受数据 +3. 故障follower如果恢复,会读取宕机前记录的旧集群HW,并且将大于等于该旧HW的log记录全部都删除,并且从leader重新同步记录 +4. 当故障恢复的follower从旧HW位置同步消息到当前集群中的新HW位置时,此时故障恢复的log区数据已经同步到新HW的水位线水平,此时故障恢复的follower节点可以重新加入到ISR中 + +##### leader宕机的细节 +如果leader broker发生宕机,那么新选举成为leader的follower将会成为leader,并且所有节点会将大于等于HW的数据丢弃(由于宕机的是leader,LEO最大,故而leader宕机不会对HW造成影响),并且重新从新选举的leader中同步数据。 + +由上可知,如果leader宕机前,其他follower尚未同步完leader中全部的消息,那么leader宕机后可能会发生消息的丢失。如果需要确保消息不丢失,需要设置生产者的acks为all,确保消息再提交前同步到ISR中所有的节点中。 + +##### kafka分区存储机制 +kafka中topic是一个逻辑概念,topic由分区组成,每个分区则是可以看作一个log文件,log中存放生产者产生的数据。生产者产生的消息会被追加到log文件的末端,由于是线性追加,不涉及到平衡树等数据结构,故而**kafka追加消息时,不管当前分区数据存储量大小,追加数据的开销都是相同的,追加操作不会随着数据量变大而变慢**。 + +为了防止分区对应的log文件过大而导致的数据定位效率低下,kafka采用了分片和索引的机制,将每个分区分为了多个segment。单个segment默认存储的数据量的大小为1G,且单个segment由如下文件组成: +- .log文件:日志文件,用于存储消息(log文件的命名以当前segment中第一条消息再分区中的offset来命名) +- .index:偏移量索引文件,用于存储偏移量和position的映射关系,用于快速定位消息 +- .timeindex:时间戳索引文件,该文件用于存储消息对应的时间戳信息,kafka中的消息默认保存7天后丢弃,通过时间戳信息来决定消息是否应该丢弃 + +> index索引 +> +> kafka中的索引为稀疏索引,默认每往log中写入4KB的数据,.index文件中会记录一条偏移量索引信息。 +> +> 可以通过`log.index.interval.bytes`来配置索引记录的密度,每写入多少数据才记录一条索引 + +##### kafka文件清除策略 +kafka中默认消息保存的时间为7天,可以通过修改如下配置来对默认保存时间进行修改: +- log.retention.hours:消息默认保存小时:默认为168(24 * 7) +- log.retention.minutes:消息默认保存时间,按分钟计 +- log.retention.ms:消息设置默认保存时间,按ms计 + +上述设置中,优先级为`ms`>`minutes`>`hours`,如果优先级较大的被设置,那么取优先级高的设置,在高优先级条目没有被设置时,才取低优先级设置。 + +> 如果想要设置消息不超时,可以将`log.retention.ms`设置为-1 + +`log.retention.check.interval.ms`参数可以检查消息是否超时的周期,默认情况下该值设置为5min。 + +kafka中的日志清除策略由如下两种: +- 删除 +- 压缩 + +> 删除 +> +> 当`log.cleanup.policy`默认值为delete,当该值被设置为delete时,存储超期日志的文件将会被删除。 +> +> - 基于时间:默认打开。如果segment中所有记录中时间戳最大的记录(最新插入的记录)超过最长时间设置,那么会将该segment删除。(以segment中最新消息时间戳作为segment文件时间戳) +> - 基于大小:基于大小的删除默认是关闭的。`log.retention.bytes`默认值为-1,表示对log文件最大的限制,如果单个log文件的大小超过该大小限制,那么会删除log文件对应的最早的segment + +> 压缩 +> +> 当`log.cleanup.policy`值设置为compact时,会对超时的segment进行压缩(指segment最新一条插入的消息超时),对于相同key的消息,只会保留最后插入的一条消息,演示如下。 +> +> | k2 | k1 | k3 | k1 | k2 | +> |:-:|:-:|:-:|:-:|:-:| +> | 1 | 2 | 3 | 4 | 5 | +> +> 会被压缩为 +> +> | k3 | k1 | k2 | +> |:-:|:-:|:-:| +> | 3 | 4 | 5 | +> +> 压缩后offset可能并不连续,此时若想要消费的offset不存在,那么会拿到比预期offset大的offset的消息 + + +