359 lines
33 KiB
Markdown
359 lines
33 KiB
Markdown
# Apache Kafka
|
||
## Introduction
|
||
Apache Kafka是一个分布式的stream平台,具有如下三个核心功能:
|
||
- 发布和订阅record stream,类似于消息队列或企业消息系统
|
||
- 通过一种容错的持久化方式对record stream进行存储
|
||
- 当record stream产生时对其进行处理
|
||
|
||
Kafka通常应用于如下两类应用:
|
||
- 构建实时的流数据管道,应用或系统之间传递的数据是可靠传输的
|
||
- 构建实时stream应用,应用会传输流数据并且对流数据进行响应
|
||
|
||
Kafka核心观念:
|
||
- Kafka可以作为集群在一台或多台服务器上运行,服务器可以跨多个数据中心
|
||
- kafka在`topic`的分类中存储stream record
|
||
- 每条record都含有一个key、一个value和一个时间戳
|
||
|
||
Kafka具有四类核心API:
|
||
- Producer API:允许应用发布records stream到一个或多个kafka topic中
|
||
- Consumer API:允许应用订阅一个或多个topic,并且处理被订阅topic的record流
|
||
- Stream API:Stream API允许应用像stream processor一样,从一个或多个topic消费input stream并向一个或多个output topic产生output stream,能够有效的将输入流转化为输出流
|
||
- Connector API:可以用于构建和运行一个可重用的生产者或消费者,生产者或消费者会将Kafka topic现存的应用或data system。**例如,一个关系型数据库的connector可以捕获对数据库表中的所有变动
|
||
|
||
在client和server之间的交流通过一个简单、高性能、语言无关的tcp协议进行。该tcp协议拥有版本,并且新的版本会兼容旧的版本。
|
||
### Topic and Logs
|
||
Topic是被发布record的类别或是源名称。在Kafka中,一个Topic可以有0个、1个或多个消费者订阅。
|
||
对于每个topic,kafka集群都会维护一个分区日志,每个分区日志都是一个有序的、不可变的record序列,record序列被不断添加到分区的尾部。分区中的每条record都分配了一个序列号`offset`,序列号将唯一标识分区中的每条record。
|
||
Kafka集群将会把所有已发布的record进行持久化,无论其是否已经被消费,record的保留期是可配置的。**例如,如果将保留策略设置为2天,在消息被发布的两天之内,record都是可消费的,但是两天之后record将会被丢弃并且释放空间。**
|
||
Kafka即使数据大小不同,性能也是恒定的,存储大量数据并不会影响Kafka的性能,故而长时间存储数据并不会带来性能的问题。
|
||
实际上,每个消费者保存的元数据是Kafka log的偏移量,偏移量由消费者控制。正常情况下,消费者会在读取记录后线性推进offset,但是offset完全由消费者控制,故而消费者可以按其想要的任何顺序对消息进行消费。**例如,消费者可以将offset设置为旧的位置并且对已经被消费的record进行重复消费。**
|
||
|
||
日志中分区的目的如下:
|
||
- 允许日志扩展到适合单个server的大小。每个独立的分区必须适合分区位于的服务器,但是一个topic可以含有多个分区,故而topic可以处理任意数量的数据
|
||
- 日志分区某种意义上充当了并行的单元
|
||
### Distribution
|
||
log分区分布在Kafka集群的服务器上,每个服务器处理对共享分区的数据和请求。每个分区都跨可配置数量的服务器进行复制,从而增加容错性。
|
||
每个分区都有一台server作为“主机”,并由0或多台其他server作为“从机”。“主机”会处理所有对该分区的读写请求,而“从机”只是会被动的复制主机。如果“主机”宕机,那么“从机”会自动成为新的“主机”。每台服务器都充当了部分分区的“主机”和其他分区的“从机”,从而负载在集群中间是均衡的。
|
||
> 传统的主从复制,例如redis主从复制,读操作分布在主机和从机之间,而写操作只针对主机,针对多读少写的场景能在集群之间分担读压力
|
||
> 而对于kafka集群,通过分区来实现负载均衡,每个分区都分布在多台server上,而读写操作全部发生在主机上,从机只作为主机宕机后的备份。而一个Topic可分为多个分区,故而通过分区实现了负载在集群服务器之间的均衡。
|
||
### Producers
|
||
生产者发送record数据到topic。生产者可以选择将record发送到topic中的哪个分区。可以通过轮询的方式来将record分配到不同的分区。
|
||
### Consumers
|
||
消费者通过***consumer group name***的属性来标记自己。对于每条发布到topic中的记录,都会被发布到订阅了该topic的所有consumer group中,对于每个consumer group,record都会被发送给consumer group中的一个消费者实例。消费者实例可以在不同的进程上或不同的机器上。
|
||
如果所有的消费者实例都位于一个consumer gorup中,那么record会在所有的消费者实例之间进行负载均衡。
|
||
如果所有的消费者实例位于不同的consumer group中,那么record会被广播给所有的consumer group。
|
||
> 通常的,topic含有较少数量的consumer group,每个consumer group都是一个“逻辑订阅者”。每个consumer group都有许多的消费者实例组成,从而实现容错性和可拓展性。
|
||
> 上述consumer group整体作为一个订阅者,**在此语义中订阅者并非是单个进程,而是一个由消费者实例组成的集群**
|
||
|
||
在Kafka中,“消费”操作的实现方式是通过将分区根据消费者实例进行划分,故而每个实例在任何时间点对于其分配到的分区都是唯一的消费者。
|
||
> 一个消费者实例可能会被分配到多个分区,但是任何一个分区,其对应的消费者实例只能有一个
|
||
维护consumer group中成员的过程是由Kafka协议动态处理的,如果新的消费者实例加入到consumer group中,它们会接管从其他group成员中获取到的分区;如果现有消费者实例宕机,那么宕机实例的分区将会被分配给其他的消费者实例。
|
||
### multi-tenancy(多租户)
|
||
可以将部署Kafka作为多租户的解决方案。可以通过配置哪些topic可以产生和消费数据来启用多租户。这些操作也支持配额。
|
||
### Guarantees
|
||
在高层,Kafka可以提供如下保证:
|
||
- 一个生产者实例发送给特定topic分区的消息会按消息发送的顺序添加到topic分区中
|
||
- 一个消费者实例将会按照消息存储在分区log中的顺序消费消息
|
||
- 对于一个复制因子为N的主题,其最多可以容忍N-1台服务器宕机而不会丢失提交给日志的任何记录
|
||
### Kafka Storage System
|
||
写入到Kafka的数据将会被持久化到磁盘中,并被复制到从机以解决容错。Kafka允许生产者等待ack返回,在数据被完全的被复制并且持久化之前,生产者向Kafka的写入操作都不会被认为完成。
|
||
Kafka使用的磁盘结构具有很好的拓展性,不管有50K或是50T的数据写入到服务器,Kafka的性能都相同。
|
||
由于严格的存储策略和允许客户端控制其读写位置,可以将Kafka看作一种特殊目的的分布式文件系统,该文件系统具有高性能、低延时,并且容错性高,主从复制。
|
||
## Quick Start
|
||
### Start the server
|
||
Kafka使用了Zookeeper,在启动Kafka之前必须启动一个zookeeper实例。
|
||
启动zookeeper命令:
|
||
```shell
|
||
> bin/zookeeper-server-start.sh config/zookeeper.properties
|
||
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
|
||
...
|
||
```
|
||
启动zookeeper之后可以启动kafka实例:
|
||
```shell
|
||
> bin/kafka-server-start.sh config/server.properties
|
||
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
|
||
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
|
||
...
|
||
```
|
||
### Create a topic
|
||
如下命令会创建一个名为”test“的topic,该topic含有一个分区和一个复制:
|
||
```shell
|
||
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
|
||
```
|
||
可以通过如下命令列出现存的topic:
|
||
```shell
|
||
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
|
||
test
|
||
```
|
||
除了手动创建topic,还可以将broker配置为如果record发布到一个不存在topic,该不存在topic会自动被创建。
|
||
### Send Some Message
|
||
可以通过命令行将文件或是标准输入中的内容发送到Kafka集群,默认情况下,每一行都会作为一个独立的消息被发送:
|
||
```shell
|
||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
|
||
This is a message
|
||
This is another message
|
||
```
|
||
### Start a consumer
|
||
Kafka同样有一个命令行消费者,可以将消息内容输出到标准输出:
|
||
```shell
|
||
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
|
||
This is a message
|
||
This is another message
|
||
```
|
||
### Setting multi-broker cluster
|
||
为每个broker创建一个config文件:
|
||
```shell
|
||
> cp config/server.properties config/server-1.properties
|
||
> cp config/server.properties config/server-2.properties
|
||
```
|
||
为每个broker单独设置配置文件
|
||
```vim
|
||
config/server-1.properties:
|
||
broker.id=1
|
||
listeners=PLAINTEXT://:9093
|
||
log.dirs=/tmp/kafka-logs-1
|
||
|
||
config/server-2.properties:
|
||
broker.id=2
|
||
listeners=PLAINTEXT://:9094
|
||
log.dirs=/tmp/kafka-logs-2
|
||
```
|
||
`broker.id`属性对集群中的每个节点都是唯一且永久的名称。
|
||
通过如下命令可以再开启两个kafka节点:
|
||
```shell
|
||
> bin/kafka-server-start.sh config/server-1.properties &
|
||
...
|
||
> bin/kafka-server-start.sh config/server-2.properties &
|
||
...
|
||
```
|
||
通过如下方式可以创建一个复制因子为3的新topic:
|
||
```shell
|
||
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
|
||
```
|
||
通过如下命令行,可以获知集群和各个节点的信息:
|
||
```shell
|
||
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
|
||
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
|
||
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
|
||
```
|
||
对于每个分区,leader都是随机选择的,并且,在leader宕机之后会有一台从机自动的成为leader
|
||
|
||
## APIS
|
||
Kafka包含5个核心api:
|
||
1. Producer API:允许应用向Kafka集群的Topic发送stream数据
|
||
2. Consumer API:允许应用丛Kafka集群的Topic中读取数据
|
||
3. Streams API:允许在input topic和output topic之间传递stream数据
|
||
4. Connect API:允许实现连接器,连接器会不断从源系统或应用拉拉取数据到Kafka或从Kafka推送数据到系统或者应用
|
||
5. Admin API:允许管理和查看topic,broker和其他Kafka对象
|
||
|
||
### Producer API
|
||
要使用Producer API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-clients</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
### Consumer API
|
||
要使用Consumer API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-clients</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
### Stream API
|
||
要使用Stream API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-streams</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
### Admin API
|
||
要使用Admin API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-clients</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
|
||
## Configuration
|
||
### Broker Configs
|
||
基础的配置如下:
|
||
- broker.id
|
||
- log.dirs
|
||
- zookeeper
|
||
> zookeeper.connect
|
||
> 通过`hostname:port`以字符串的形式来指定zookeeper连接,hostname和port为zookeeper server对应的域名和端口。如果要指定zookeeper集群,可以通过`hostname1:port1,hostname2:port2,hostname3:port3`的形式来指定多个hosts。
|
||
|
||
#### Broker Configs 动态更新
|
||
从Kafka版本1.1往后,一些broker config可以被更新而不需要重启broker。可以在`Broker Config`条目的`Dynamic Update Mode`栏查看是否该config栏是否允许动态更新:
|
||
- read-only:如果要更新该条目,需要重新启动broker
|
||
- per-broker:可以对每个broker进行动态更新
|
||
- cluster-wide:可以在集群的范围内进行动态更新
|
||
|
||
如下命令会修改broker 0的配置:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
|
||
```
|
||
如下命令会返回broker 0的动态配置:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
|
||
```
|
||
如果要删除一个覆盖的配置并且将配置值回滚为静态配置或者默认值,可以使用如下命令:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
|
||
```
|
||
一些配置可以设置为集群默认的配置,在整个集群中都维护为一致的值。集群中所有的broker都会处理cluster default update,如下命令会更新集群中所有broker的log cleaner threads:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
|
||
```
|
||
通过如下命令可以输出集群范围内的默认配置:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
|
||
```
|
||
所有可以在集群层面配置的条目都可以针对单个broker配置。如果一个条目在多个条目都有配置,那么会按照如下优先级:
|
||
- 存储在zookeeper中的针对单个broker的配置
|
||
- 存储在zookeeper中的针对集群的配置
|
||
- server.properties中静态配置的条目
|
||
- Kafka的默认值
|
||
|
||
#### 动态更新Password Configs
|
||
动态更新的password config值在存储到zookeeper之前会被加密。如果想要启用password config的动态更新,broker config中的`password.encoder.secret`条目必须要在server.properties中被配置。
|
||
如果broker重启,那么用于password加密的old secret必须在静态broker config中的`password.encoder.old.secret`条目中被配置,而新的secret将会在`password.encoder.secret`条目配置。当容器启动时,当前zookeeper中存储的所有动态password config都会通过new secret重新加密。
|
||
|
||
#### 在启动broker之前更新zookeeper password config
|
||
kafka-config.sh支持动态broker config在启动broker之前通过zookeeper被更新。如果在alter命令行中包含了任何password config,那么broker config条目`password.encoder.secret`必须被指定。可以指定额外的加密参数,password encoder config将不会被存储在zookeeper中。
|
||
示例如下所示:
|
||
```shell
|
||
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config
|
||
'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'
|
||
```
|
||
如上命令中,listener.name.internal.ssl.key.password将会使用指定的指定的encoder config被加密存储在zookeeper中,encoder.secret和encoder.iterations则不会被存储在zookeeper中。
|
||
|
||
## Design
|
||
### Motivation
|
||
Kafka可以具有如下用例:
|
||
- Kafka用于高吞吐量来应对大量事件流的场景
|
||
- Kafka能够处理大量的数据积压,从而支持离线系统的周期性事件加载
|
||
- Kafka也支持处理低延迟交付
|
||
- Kafka支持分区和分布式存储
|
||
### Persistence
|
||
#### filesystem store structure
|
||
Kafka非常依赖于文件系统来存储和缓存消息。通常观点中,磁盘效率被认为很慢。但是,磁盘的效率是依赖于其存储结构的,一个拥有良好结构设计的磁盘存储能够提供和网络一样快速的性能。
|
||
对于磁盘性能,磁盘读写的吞吐量和磁盘查找所带来的延迟会存在极大差异,磁盘查找所带来的延迟会极大影响磁盘读写的性能。在磁盘阵列的测试中,线性写入的性能接近600MB/sec,但是随机写入的性能只在100k/sec左右,差距近乎6000倍。线性写入是所有模式中最可预测的,并且由操作系统进行了优化。一个现代的操作系统提供了read-ahead和write-behind技术,该技术可以以大块的形式预取数据,并且将较小的逻辑写入组合成大的物理写入来优化性能。
|
||
为了弥补随机读写和线性读写的性能差异,现代操作系统在将内存作为磁盘缓存方面变得十分激进。现代操作系统会将当前所有的空闲内存转化为磁盘缓存,并且在回收内存时只会带来很小的性能开销。所有的磁盘读写操作都会经过内存,除非使用direct IO。
|
||
更重要的是,Kafka是基于JVM构建的,JVM具有如下特性:
|
||
- java对象带来的额外内存开销十分大,通常是存储数据的两倍或更多
|
||
- 随着堆内存中数据的增加,java GC变得越来越复杂和缓慢
|
||
|
||
由于JVM结构存在以上缺陷,故而使用filesystem并依赖页面缓存的方案优于维护一个内存缓存或其他结构:
|
||
> 通过将所有可获取的空闲内存作为缓存,至少将可获取缓存大小增加了一倍;而通过紧凑的字节结构而不是对象结构,又几乎将可获取的缓存大小增加了一倍
|
||
|
||
在32GB内存的机器中,缓存大小至多可至28-20GB,并且没有GC开销。甚至,在服务重启时,之前的cache仍然会有效。而对于进程内的内存缓存,在服务重启之后,缓存的rebuild会带来巨大的开销(10GB缓存可能带来10分钟的开销);如果重启之后不执行rebuild操作,那么重启之后的初始性能将会受到很大的影响。
|
||
通过使用操作系统提供的page cache,关于维护缓存和文件系统一致性的逻辑全都包含在操作系统中,无需在代码中再次实现,故而这也大大简化了code的实现。
|
||
#### Constant Time Suffices
|
||
在消息系统中,使用的持久化数据结构通常是对每个消费者队列关联一个BTree或其他通用的随机访问数据结构,用于维护消息的元数据。BTree通常会带来较高的开销。即使BTree操作的时间复杂度是O(logN),虽然O(logN)通常被认为是等于恒定事件,但是对于磁盘操作来说并不是这样。磁盘查找一次需耗费大概10ms,并且每个磁盘同一时刻只能够进行一个查找,故而并行性也会收到限制。故而,即使是少量的磁盘查找也会带来巨大的开销。当存储系统将非常快的缓存操作和非常慢的物理磁盘操作混合在一起时,即使数据增加固定量的缓存,树结构的性能开销通常也是超线性的:例如,将data增加两倍时,性能开销将增加不止两倍。
|
||
持久队列可以建立在简单的文件读取和向文件末尾添加数据上,logging解决方案通常就是这样的。该结构具有如下优势:所有操作都是O(1)的,并且读操作不会阻塞写操作,读操作之间也不会相互阻塞。这也具有明显的性能优势,读写性能完全和数据量大小分离。
|
||
具有几乎无限的磁盘空间,而不会带来任何性能损失,意味着可以提供一些其他消息系统没有的特性。例如,在Kafka中,即使消息已经被消费者消费,也不会像其他消息系统中一样马上将消息删除,而是可以选择将消息保存一段时间之后再删除(例如一周)。
|
||
### 效率
|
||
在消息系统中,低下的效率通常是由糟糕的磁盘访问模式造成的,通常这种效率低下有两种形式:
|
||
- 太多的small I/O操作
|
||
- 过多的字节复制
|
||
#### small I/O
|
||
small I/O操作通常发生在客户端和服务端之间或服务端自己的持久化操作之中。为了避免这些,Kafka的协议是基于“消息集合”的抽象构建的,“消息集合”会将消息组合在一起。“消息集合”允许网络针对消息集合进行请求,并且分担网络往返的开销,而不是一次仅仅发送一条消息。服务端会一次性将消息块添加到log中,消费者也会一次性获取大的线性块。
|
||
该优化会将速度提高几个数量级,批量处理会导致更大的网络包,更大的连续磁盘操作,连续的内存块等,所有这些都将使Kafka将随机消息的突发流转化为流向消费者线性写入。
|
||
#### byte copying
|
||
另一个造成效率低下的问题是过多的字节复制。为了避免过多的字节复制,Kafka采用了一个标准的二进制消息格式,该消息格式由生产者,broker,消费者进行共享,故而消息在生产者、broker、消费者之间进行传递时无需进行任何修改。
|
||
由broker维护的消息日志其本身只是目录下的一些文件,每个文件中都填充了一些列的消息集合,消息写入磁盘的格式和生产者、消费者使用的消息格式完全相同。
|
||
现代unix操作系统提供了一个高度优化的code path用于从页面缓存中传输数据到socket,在linux中其通过sendfile system call完成。
|
||
将数据从文件中传输到socket的公共路径:
|
||
- 操作系统将数据从磁盘中读入到内核空间的页面缓存中
|
||
- 应用将数据从内核空间读取到用户空间的缓冲区中
|
||
- 应用将数据写回到内核空间的套接字缓冲区中
|
||
- 操作系统将socket buffer中的数据复制到NIC缓冲区中,并发送给网络
|
||
|
||
上述通用路径显然是低效的,其中有四次复制和两次系统调用,通过sendfile,允许操作系统直接从页面缓存向网络直接发送数据,重复的复制操作可以被避免。故而,在优化后的路径中,只需要将数据复制到NIC buffer中。
|
||
在一个Topic存在多个消费者的用例中,使用上述的优化,数据只需要被复制到页面缓存中一次,而不是被存储到内存中并且每次读取时都复制到用户空间中。
|
||
### 端对端的批量压缩
|
||
在一些用例场景下,瓶颈并不是cpu或disk而是网络带宽。用户可以通过压缩单条消息来节省带宽,但是这样可能会导致压缩率很低,因为重复总是在相同类型消息之间产生(例如多条JSON格式的消息具有相同的field name)。如果要提升压缩效率,需要将多条消息一起压缩,而不是压缩单条消息。
|
||
Kafka支持批量的高效压缩,消息可以被批量压缩并且并且被发送给服务端。该批量消息可以以压缩的格式写入到磁盘中,并且只会被消费者解压缩。
|
||
Kafka支持GZIP,LZ4,ZStandard压缩协议。
|
||
### 生产者
|
||
#### 负载均衡
|
||
生产者会直接将数据发送给作为该分区leader的broker,并不会经过任何的中间路由层。为了帮助生产者实现这些,所有的Kafka节点都能响应对元数据的请求,元数据中包含哪些server还存活以及topic中分区对应的leader都在哪。返回的元数据可以正确的告知生产者应该将请求发送到哪个server。
|
||
客户端会控制将消息发送到哪一个分区。这可以随机完成,实现一种随机负载均衡,或者通过语义分区函数完成。Kafka提供了一个接口用于语义分区,可以允许用户来提供一个key进行分区,并使用该key散列到一个分区(也可以覆盖分区函数)。
|
||
#### 异步发送
|
||
批量处理是提高性能的重要因素之一,为了支持批量处理,Kafka生产者会在内存中累计数据并在单个请求中发送更大的批数据。批处理可以被配置为累计不超过固定数量的消息或不超过特定界限的延迟(例如64K,10ms)。
|
||
### 消费者
|
||
Kafka消费者会向其消费分区的leader broker发送fetch请求。消费者在每次请求中指定log offset并从该位置接收一块log。consumer对于位置具有绝对的控制权,故而在需要时可以执行rewind操作对数据进行重新消费。
|
||
#### push vs pull
|
||
在Kafka的实现中,和大多数消息系统一样,data从生产者push到broker,消费者从broker中pull data。
|
||
#### consumer position
|
||
其他消息系统消费者通常使用ack来告知broker该消息已经被正确处理,但是这样会存在如下问题:如果被consumer拉取的消息被处理,但是在发送ack之前消费者失败,那么该消息会被重复消费。
|
||
相比于通过ack来保证消息被消费者正确处理,kafka通过其他方式来对其进行处理。kafka topic由一系列分区组成,在一个时间点,每个分区都只会由一个consumer group中的一个消费者实例消费。每个消费者实例只需要维护一个integer,作为ack确认的等价物,维护position的开销相当小。
|
||
#### 静态成员
|
||
静态成员用于提升基于group rebalance协议应用的性能。rebalance协议依赖于组协调器,组协调器用于为组成员分配实体id。组协调器产生的实体id是短暂的,并且当成员重启并重新加入到组之后,实体id会发生变化。对于基于consumer的app,上述“动态成员”可能会造成在应用周期性重新启动时大量任务会被新分配给其他消费者实例。
|
||
由于上述缺点,Kafka group管理协议允许组成员提供持久的实体id,基于这些id,组成员身份保持不变,故而rebalance不会被触发。
|
||
如果想要使用静态成员:
|
||
- 将`ConsumerConfig#GROUP_INSTANCE_ID_CONFIG`设置为一个该组中唯一的值,不与同组其他消费者实例重复
|
||
#### 消息传递语义
|
||
Kafka在生产者和消费者之间提供了如下语义保证:
|
||
- at most once:消息可能丢失,但是消息永远不会被重复传递
|
||
- at least once:消息永远不会丢失,但是消息可能被重复传递
|
||
- Exaclty once:每条被传输一次并且只能被传输一次
|
||
|
||
可以将上述问题拆分为两个问题:发送消息的持久性保证和消费消息的保证。
|
||
Kafka的语义是直截了当的,当消息被发布时,有一个消息被提交到log。只要消息被提交,且有一个包含消息所写入分区的broker存活,那么消息就不会被丢失。
|
||
> 如果一个生产者尝试发布一条消息,并且遭遇到了网络错误,那么生产者无法确定网络错误发生在消息提交之前或之后。
|
||
|
||
在0.11.0.0之前,如果一个生产者没有接收到消息已经成功被提交的相应,那么生产者会重新发送该消息。这种实现提供了at-least-once的语义,如果前一次消息已经被写入到log中,那么重新发送的重复消息仍会被重复写入到log中。
|
||
自从0.11.0.0开始,kafka生产者还支持幂等传输选项,幂等传输会保证消息的重复发送并不会导致log中存在重复条目。为了实现幂等传输,broker会分配给每个生产者一个id,并且通过和每条消息一起发送的序列号来判断去除重复消息。**同时,从0.11.0.0开始,生产者支持以类似事务的方式向多个topic分区发送多条消息:要么所有的消息被成功写入,要么所有的消息都未被写入。**
|
||
但是,并非是所有的用例都需要如此严格的保证。对于延迟敏感的场景,生产者可以指定其期望持久性级别。如果生产者指定其需要等待消息被提交,那么可能会需要10ms;但是生产者也可以指定异步发送消息或只需要等待到leader broker收到该信息(followers不用收到)。
|
||
在消费者的视角看来,所有的复制分区中,log和offset都完全相同,消费者控制log的位置。如果消费者永远不会宕机,那么可以消费者可以将position存储在内存中;但是如果消费者失败,若想让该topic分区被一个新的消费者实例接管,新的消费者实例需要选定一个position开始处理接管分区。
|
||
在消费者处理消息并更新position时,有如下选择:
|
||
- `at-most-once`:读取消息,然后将position保存,最后处理消息。这样,如果消费者在保存完position之后崩溃,但此时消息尚未处理完成,那么其他消费者实例接管分区时会从被保存的position开始,即使之前的消息没有被正确处理。
|
||
- `at-least-once`:读取消息,处理消息,然后将position保存。这样,如果消费者在处理完消息但是尚未保存position时崩溃,那么接管该分区的消费者实例可能重复消费先前的数据。这就是`at-least-once`语义。在很多场景下,更新操作是幂等的,故而消息
|
||
- `exactly-once`:当写入到外部系统时,限制是需要协调消费者位置和实际存储内容。通常情况下,实现该功能需要在消费者position的存储系统和消费者输出的存储系统之间引入两阶段提交。但是,可以通过将position和consumer output存储在一个位置来简化该过程。因为任何想要写入的系统可能并不支持两阶段提交。
|
||
> Kafka在Kafka Streams中支持`exactly-once`传输,并且在处理topics之间数据时通过使用transactional producer/consumer来提供`exactly-once`功能。
|
||
> 其他情况下,kafka默认保证at-least-once传输,但是也允许用户实现at-most-once传输,通过关闭生产者的重试功能并且在处理数据之前提交position变动。
|
||
|
||
### 复制
|
||
#### 分区备份
|
||
kafka会将topic分区复制到多台mq server上,mq server的数量可配置。(可以针对单个topic来设置replication factor的数量)。在其他server上保存副本允许当某台server发生故障宕机后,仍然可以从其他server上存储的副本中获取信息。
|
||
|
||
复制针对的是topic分区。在kafka中,每个分区都有一个leader和零或多个follower。leader加上follower的数量构成了replication factor。所有的写操作都会针对leader分区,而读操作则是可以针对leader或follower分区。通常情况下,分区比broker要多,leader分区分布在broker中。follower分区上的日志和leader分区上的日志都相同,offset和日志内容都相同(在特定时间点,leader尾部可能有一些尚未被复制到follower的消息)。
|
||
|
||
follower从leader分区消息消息,就像一个普通的消费者一样,并且将从leader处消费的消息追加到自己分区的尾部。
|
||
|
||
#### broker节点的活跃状态定义
|
||
就像大多数的分布式系统一样,自动的故障容灾需要精确定义一个节点的“活跃”状态。在kafka中,存在一个节点单独来负责管理集群中broker的注册,该节点被称之为controller。broker的活跃状态需要满足如下要求:
|
||
- broker需要和controller维持一个active session来接收常规的元数据更新
|
||
- 作为follower的broker需要从leader处同步leader的写操作变动,从而保持分区数据和leader处相同
|
||
|
||
对于使用zookeeper的集群,在broker节点初始化与zookeeper的连接会话时,会在zookeeper中创建一个节点,如果broker没能在`zookeeper.session.timeout.ms`时间内向zookeeper发送心跳包,那么broker就会丢失和zookeeper的会话,zookeeper中对应的节点也会被删除。controller会根据zookeeper watch注意到节点的删除,并且将该节点对应的broker标记为离线。
|
||
|
||
#### ISR
|
||
满足上述“活跃”要求的节点会被成为"in sync"状态。而leader会跟踪那些处于“in sync”状态的副本,处于“in sync”状态的副本集合被称之为ISR(in sync replias,ISR中包含leader和follower).
|
||
|
||
如果ISR中的节点不再满足”活跃“对应的两条要求,那么该节点将会从ISR中被移除。例如,如果一个follower宕机,那么controller将会注意到zookeeper中节点的删除,并且将该broker中ISR中移除。
|
||
|
||
另外,**如果一个节点仍然处于活跃状态,但是离同步leader的数据有很长的延迟,那么leader将会将该节点从ISR中移除**。延迟的最长时间通过`replica.lag.time.max.ms`来配置。如果在`replica.lag.time.max.ms`时间内副本没能通过leader到日志结尾的数据,那么副本节点将会从ISR中被移除。
|
||
|
||
#### 消息提交的定义
|
||
在某分区对应ISR中所有的副本都将消息追加到它们的log后,则可以认为该消息被提交。故而,消费者并无需担心分区的leader宕机后消息会丢失的问题,因为消息在提交前已经持久化到其分区副本中。
|
||
|
||
作为生产者,可以在发送消息时决定是否等待消息被提交,这需要在等待提交所带来的延迟和不等待提交所带来的消息丢失风险中进行权衡。是否等待提交取决于生产者的ack设置。
|
||
#### 最小写入副本数
|
||
topic可以设置一个最小写入的副本数,通过配置`min.insync.replicas`,可以对最小写入副本数进行配置。即使消息已经同步到所有ISR副本后,如果同步数目小于该值(同步数目包含leader),消息也无法被视为提交
|
||
|
||
> kafka保证一条消息只要被提交,只要有一个in-sync-replica处于活跃状态,那么消息就不会被丢失
|
||
|
||
|