From ebf8665806f04cc0cc3c68d2d738672c11000127 Mon Sep 17 00:00:00 2001 From: asahi <496063163@qq.com> Date: Wed, 20 Dec 2023 01:15:16 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBkafka=20exactly-once=E8=AF=AD?= =?UTF-8?q?=E4=B9=89=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/kafka-尚硅谷.md | 79 ++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/mq/kafka/kafka-尚硅谷.md b/mq/kafka/kafka-尚硅谷.md index 68954f2..bb9e04d 100644 --- a/mq/kafka/kafka-尚硅谷.md +++ b/mq/kafka/kafka-尚硅谷.md @@ -1,3 +1,50 @@ +- [Kafka](#kafka) + - [简介](#简介) + - [消息队列应用场景](#消息队列应用场景) + - [缓存/削峰](#缓存削峰) + - [解耦](#解耦) + - [异步通信](#异步通信) + - [消息队列模式](#消息队列模式) + - [点对点](#点对点) + - [发布订阅模式](#发布订阅模式) + - [kafka架构](#kafka架构) + - [分区存储](#分区存储) + - [group消费](#group消费) + - [zookeeper](#zookeeper) + - [生产者](#生产者) + - [分区](#分区) + - [批量发送](#批量发送) + - [ack级别](#ack级别) + - [异步发送](#异步发送) + - [同步发送](#同步发送) + - [生产者发送消息的分区策略](#生产者发送消息的分区策略) + - [kafka消息传递语义](#kafka消息传递语义) + - [数据乱序](#数据乱序) + - [broker](#broker) + - [broker选举](#broker选举) + - [partition reassign](#partition-reassign) + - [broker宕机对分区造成的影响](#broker宕机对分区造成的影响) + - [broker AR \& OSR](#broker-ar--osr) + - [leader选举机制](#leader选举机制) + - [follower故障恢复细节](#follower故障恢复细节) + - [leader宕机的细节](#leader宕机的细节) + - [kafka分区存储机制](#kafka分区存储机制) + - [kafka文件清除策略](#kafka文件清除策略) + - [消费者](#消费者) + - [kafka消费者工作流程](#kafka消费者工作流程) + - [消费者组](#消费者组) + - [消费者实例拉取数据流程](#消费者实例拉取数据流程) + - [消费者api](#消费者api) + - [分区分配和再平衡](#分区分配和再平衡) + - [offset](#offset) + - [rebalance](#rebalance) + - [kafka消费起始offset](#kafka消费起始offset) + - [kafka exactly-once语义](#kafka-exactly-once语义) + - [apache kafka idempotence](#apache-kafka-idempotence) + - [事务-在多个分区之间进行原子写入](#事务-在多个分区之间进行原子写入) + - [transactional message \& non-transactional message](#transactional-message--non-transactional-message) + + # Kafka ## 简介 ### 消息队列应用场景 @@ -412,6 +459,8 @@ kafka中的分区策略通过`partition.assignment.strategy`参数来进行配 > 针对kafka的手动提交,当使用`commitSync`进行同步提交时,如果提交失败,同步提交会无限次的进行重试,直到提交成功或是发生了不可恢复的异常。 > > 但是,在使用`commitAsync`方法进行提交时,kafka消费者在提交失败之后则不会进行重试。在处理kafka commitAsync重试问题时,还需要考虑commit order。当消费者进行异步提交时,如果发现当前batch提交失败,此时可能位于当前batch之后的batch已经处理完成并进行提交(commitAsync并不会等待当前batch提交成功之后再拉取下一批,而是直接拉取下一批继续处理,故而下一批batch可能提交早于当前batch)。故而,如果对当前异常的batch进行重试提交,可能会之后批次的commit offset被覆盖,从而造成消息的重复消费。 +> +> ##### commitAsync接口提交失败后并不会抛出异常,也不会重试 ##### rebalance rebalance通常有两个阶段,`revocation`和`assignment`,即撤销当前消费者被分配的分区和重新分配给消费者新的分区。revocation方法会在rebalance之前被调用,且revocation是rebalance之前消费者最后一次提交offset的机会,可以重写revocation方法,在rebalance发生之前对offset进行同步提交。 @@ -434,5 +483,31 @@ rebalance通常有两个阶段,`revocation`和`assignment`,即撤销当前 > #### 消费指定时间开始的消息 > 如果在消费消息时,想要消费从指定时刻之后的消息,可以通过`kafkaConsumer#offsetsForTimes`接口,能根据传入的分区和时间戳来得到该分区下指定时刻消息的起始offset,返回offset为时间戳大于或等于指定时间戳的第一条消息对应offset -##### exactly-once语义 -如果想要保证broker中消息不会被kafka漏消费或是重复消费,可以通过事务来保证每条消息只会被消费一次。 +## kafka exactly-once语义 +在一个基于发布/订阅的消息系统中,组成该消息系统的节点可能会发生故障。在kafka中,broker可能会宕机,在生产者向topic发送消息时也可能发生网络故障。基于生产者处理这些失败场景的方式,可能会有如下三种语义: +- At-least-once:如果生产者在向broker发送消息时,接收到broker返回的ack(调用同步接口发送消息),并且生产者配置`acks=all`(会等待消息写入到所有isr队列中),这代表该消息已经被写入到消息系统中。但如果生产者向broker发送消息后,消息已经被写入到broker集群中,但是broker集群尚未向生产者返回ack,此时broker leader宕机,那么生产者将不会收到ack。此时生产者会重试发送消息,这将会导致消息在broker集群中存在多条,同一消息也会被消费者消费多次。 +- At-most-once semantics:如果生产者在向broker发送消息时,若出现等待ack超时或发生异常情况后,不进行重试发送,那么消息将最多被发送给broker一次,此时不会出现消息被重复发送的情况。 +- exactly-once:即使生产者多次发送相同的消息到broker集群,也只会有一条消息被传递给消费者。exactly-once语义要求生产者、broker、消费者的协同才能实现。 + +### apache kafka idempotence +kafka生产者支持幂等性操作,即使相同的消息被多次发送给broker,broker也只会向分区中写入一条消息。除此之外,开启生产者的`enable.idempotence=true`属性,还代表生产者发送到同一分区的消息是顺序的。 + +生产者实现幂等性的原理和tcp类似,每次被批量发送给broker集群的消息都含有一个序列号,broker将会使用序列号进行去重操作。但是,和tcp不同的是,该序列号会持久化到replicate log中,故而即使broker leader宕机,重新成为leader的broker也能知道是否接受到的消息是重复发送。 + +并且,kafka生产者开启幂等性带来的开销很小,仅仅为批量提交的消息附加的序列号。 + +### 事务-在多个分区之间进行原子写入 +通过事务api,kafka支持在多个分区之间原子的进行写入。事务特性支持生产者在向broker集群多个分区批量发送消息时,要么发送的所有消息都能被消费者可见,要么发送的消息对消费者都不可见。 + +事务特性允许在事务内对消息进行消费、处理、提交消息处理生成的结果的同时,在同一事务中对消息消费的offset进行提交。通过事务特性,在消息需要消费->处理->提交生成结果的场景下,能够支持exactly-once。 + +#### transactional message & non-transactional message +分区中存储的消费,可以分为事务消息和非事务消息。事务消息,又可以分为进行中事务的消息,事务提交成功的消息,事务提交失败的消息。 + +消费者在消费消息时,可以设置`isolation.level`: +- read_committed:在设置该隔离级别后,消费者只能消费非事务消息或事务提交成功后的消息。`poll`接口在该隔离级别时,会返回LSO之前所有的消息(LSO值第一个活跃事务对应消息的offset - 1)。任何offset位于LSO之后的消息都将会被broker保留,直到活跃的事务被提交。故而,在活跃事务提交前,消费者无法读取到LSO之后的消息。 +- read_uncommitted:设置为该隔离级别之后,消费者可以读取到所有消息,即使该消息关联的事务尚未被提交,**或消息关联事务已经被废弃(aborted)**。 + +默认情况下,`isolation.level`的值为read_uncommitted。 + +