227 lines
14 KiB
Markdown
227 lines
14 KiB
Markdown
# Apache Kafka
|
||
## Introduction
|
||
Apache Kafka是一个分布式的stream平台,具有如下三个核心功能:
|
||
- 发布和订阅record stream,类似于消息队列或企业消息系统
|
||
- 通过一种容错的持久化方式对record stream进行存储
|
||
- 当record stream产生时对其进行处理
|
||
|
||
Kafka通常应用于如下两类应用:
|
||
- 构建实时的流数据管道,应用或系统之间传递的数据是可靠传输的
|
||
- 构建实时stream应用,应用会传输流数据并且对流数据进行响应
|
||
|
||
Kafka核心观念:
|
||
- Kafka可以作为集群在一台或多台服务器上运行,服务器可以跨多个数据中心
|
||
- kafka在`topic`的分类中存储stream record
|
||
- 每条record都含有一个key、一个value和一个时间戳
|
||
|
||
Kafka具有四类核心API:
|
||
- Producer API:允许应用发布records stream到一个或多个kafka topic中
|
||
- Consumer API:允许应用订阅一个或多个topic,并且处理被订阅topic的record流
|
||
- Stream API:Stream API允许应用像stream processor一样,从一个或多个topic消费input stream并向一个或多个output topic产生output stream,能够有效的将输入流转化为输出流
|
||
- Connector API:可以用于构建和运行一个可重用的生产者或消费者,生产者或消费者会将Kafka topic现存的应用或data system。**例如,一个关系型数据库的connector可以捕获对数据库表中的所有变动
|
||
|
||
在client和server之间的交流通过一个简单、高性能、语言无关的tcp协议进行。该tcp协议拥有版本,并且新的版本会兼容旧的版本。
|
||
### Topic and Logs
|
||
Topic是被发布record的类别或是源名称。在Kafka中,一个Topic可以有0个、1个或多个消费者订阅。
|
||
对于每个topic,kafka集群都会维护一个分区日志,每个分区日志都是一个有序的、不可变的record序列,record序列被不断添加到分区的尾部。分区中的每条record都分配了一个序列号`offset`,序列号将唯一标识分区中的每条record。
|
||
Kafka集群将会把所有已发布的record进行持久化,无论其是否已经被消费,record的保留期是可配置的。**例如,如果将保留策略设置为2天,在消息被发布的两天之内,record都是可消费的,但是两天之后record将会被丢弃并且释放空间。**
|
||
Kafka即使数据大小不同,性能也是恒定的,存储大量数据并不会影响Kafka的性能,故而长时间存储数据并不会带来性能的问题。
|
||
实际上,每个消费者保存的元数据是Kafka log的偏移量,偏移量由消费者控制。正常情况下,消费者会在读取记录后线性推进offset,但是offset完全由消费者控制,故而消费者可以按其想要的任何顺序对消息进行消费。**例如,消费者可以将offset设置为旧的位置并且对已经被消费的record进行重复消费。**
|
||
|
||
日志中分区的目的如下:
|
||
- 允许日志扩展到适合单个server的大小。每个独立的分区必须适合分区位于的服务器,但是一个topic可以含有多个分区,故而topic可以处理任意数量的数据
|
||
- 日志分区某种意义上充当了并行的单元
|
||
### Distribution
|
||
log分区分布在Kafka集群的服务器上,每个服务器处理对共享分区的数据和请求。每个分区都跨可配置数量的服务器进行复制,从而增加容错性。
|
||
每个分区都有一台server作为“主机”,并由0或多台其他server作为“从机”。“主机”会处理所有对该分区的读写请求,而“从机”只是会被动的复制主机。如果“主机”宕机,那么“从机”会自动成为新的“主机”。每台服务器都充当了部分分区的“主机”和其他分区的“从机”,从而负载在集群中间是均衡的。
|
||
> 传统的主从复制,例如redis主从复制,读操作分布在主机和从机之间,而写操作只针对主机,针对多读少写的场景能在集群之间分担读压力
|
||
> 而对于kafka集群,通过分区来实现负载均衡,每个分区都分布在多台server上,而读写操作全部发生在主机上,从机只作为主机宕机后的备份。而一个Topic可分为多个分区,故而通过分区实现了负载在集群服务器之间的均衡。
|
||
### Producers
|
||
生产者发送record数据到topic。生产者可以选择将record发送到topic中的哪个分区。可以通过轮询的方式来将record分配到不同的分区。
|
||
### Consumers
|
||
消费者通过***consumer group name***的属性来标记自己。对于每条发布到topic中的记录,都会被发布到订阅了该topic的所有consumer group中,对于每个consumer group,record都会被发送给consumer group中的一个消费者实例。消费者实例可以在不同的进程上或不同的机器上。
|
||
如果所有的消费者实例都位于一个consumer gorup中,那么record会在所有的消费者实例之间进行负载均衡。
|
||
如果所有的消费者实例位于不同的consumer group中,那么record会被广播给所有的consumer group。
|
||
> 通常的,topic含有较少数量的consumer group,每个consumer group都是一个“逻辑订阅者”。每个consumer group都有许多的消费者实例组成,从而实现容错性和可拓展性。
|
||
> 上述consumer group整体作为一个订阅者,**在此语义中订阅者并非是单个进程,而是一个由消费者实例组成的集群**
|
||
|
||
在Kafka中,“消费”操作的实现方式是通过将分区日志根据消费者实例进行划分,故而每个实例在任何时间点对于其分配到的期间都是唯一的消费者。
|
||
维护consumer group中成员的过程是由Kafka协议动态处理的,如果新的消费者实例加入到consumer group中,它们会接管从其他group成员中获取到的分区区间;如果现有消费者实例宕机,那么宕机实例的分区区间将会被分配给其他的消费者实例。
|
||
### multi-tenancy(多租户)
|
||
可以将部署Kafka作为多租户的解决方案。可以通过配置哪些topic可以产生和消费数据来启用多租户。这些操作也支持配额。
|
||
### Guarantees
|
||
在高层,Kafka可以提供如下保证:
|
||
- 一个生产者实例发送给特定topic分区的消息会按消息发送的顺序添加到topic分区中
|
||
- 一个消费者实例将会按照消息存储在分区log中的顺序消费消息
|
||
- 对于一个复制因子为N的主题,其最多可以容忍N-1台服务器宕机而不会丢失提交给日志的任何记录
|
||
### Kafka Storage System
|
||
写入到Kafka的数据将会被持久化到磁盘中,并被复制到从机以解决容错。Kafka允许生产者等待ack返回,在数据被完全的被复制并且持久化之前,生产者向Kafka的写入操作都不会被认为完成。
|
||
Kafka使用的磁盘结构具有很好的拓展性,不管有50K或是50T的数据写入到服务器,Kafka的性能都相同。
|
||
由于严格的存储策略和允许客户端控制其读写位置,可以将Kafka看作一种特殊目的的分布式文件系统,该文件系统具有高性能、低延时,并且容错性高,主从复制。
|
||
## Quick Start
|
||
### Start the server
|
||
Kafka使用了Zookeeper,在启动Kafka之前必须启动一个zookeeper实例。
|
||
启动zookeeper命令:
|
||
```shell
|
||
> bin/zookeeper-server-start.sh config/zookeeper.properties
|
||
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
|
||
...
|
||
```
|
||
启动zookeeper之后可以启动kafka实例:
|
||
```shell
|
||
> bin/kafka-server-start.sh config/server.properties
|
||
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
|
||
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
|
||
...
|
||
```
|
||
### Create a topic
|
||
如下命令会创建一个名为”test“的topic,该topic含有一个分区和一个复制:
|
||
```shell
|
||
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
|
||
```
|
||
可以通过如下命令列出现存的topic:
|
||
```shell
|
||
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
|
||
test
|
||
```
|
||
除了手动创建topic,还可以将broker配置为如果record发布到一个不存在topic,该不存在topic会自动被创建。
|
||
### Send Some Message
|
||
可以通过命令行将文件或是标准输入中的内容发送到Kafka集群,默认情况下,每一行都会作为一个独立的消息被发送:
|
||
```shell
|
||
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
|
||
This is a message
|
||
This is another message
|
||
```
|
||
### Start a consumer
|
||
Kafka同样有一个命令行消费者,可以将消息内容输出到标准输出:
|
||
```shell
|
||
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
|
||
This is a message
|
||
This is another message
|
||
```
|
||
### Setting multi-broker cluster
|
||
为每个broker创建一个config文件:
|
||
```shell
|
||
> cp config/server.properties config/server-1.properties
|
||
> cp config/server.properties config/server-2.properties
|
||
```
|
||
为每个broker单独设置配置文件
|
||
```vim
|
||
config/server-1.properties:
|
||
broker.id=1
|
||
listeners=PLAINTEXT://:9093
|
||
log.dirs=/tmp/kafka-logs-1
|
||
|
||
config/server-2.properties:
|
||
broker.id=2
|
||
listeners=PLAINTEXT://:9094
|
||
log.dirs=/tmp/kafka-logs-2
|
||
```
|
||
`broker.id`属性对集群中的每个节点都是唯一且永久的名称。
|
||
通过如下命令可以再开启两个kafka节点:
|
||
```shell
|
||
> bin/kafka-server-start.sh config/server-1.properties &
|
||
...
|
||
> bin/kafka-server-start.sh config/server-2.properties &
|
||
...
|
||
```
|
||
通过如下方式可以创建一个复制因子为3的新topic:
|
||
```shell
|
||
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
|
||
```
|
||
通过如下命令行,可以获知集群和各个节点的信息:
|
||
```shell
|
||
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
|
||
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
|
||
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
|
||
```
|
||
对于每个分区,leader都是随机选择的,并且,在leader宕机之后会有一台从机自动的成为leader
|
||
|
||
## APIS
|
||
Kafka包含5个核心api:
|
||
1. Producer API:允许应用向Kafka集群的Topic发送stream数据
|
||
2. Consumer API:允许应用丛Kafka集群的Topic中读取数据
|
||
3. Streams API:允许在input topic和output topic之间传递stream数据
|
||
4. Connect API:允许实现连接器,连接器会不断从源系统或应用拉拉取数据到Kafka或从Kafka推送数据到系统或者应用
|
||
5. Admin API:允许管理和查看topic,broker和其他Kafka对象
|
||
|
||
### Producer API
|
||
要使用Producer API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-clients</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
### Consumer API
|
||
要使用Consumer API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-clients</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
### Stream API
|
||
要使用Stream API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-streams</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
### Admin API
|
||
要使用Admin API,需要添加如下maven依赖:
|
||
```xml
|
||
<dependency>
|
||
<groupId>org.apache.kafka</groupId>
|
||
<artifactId>kafka-clients</artifactId>
|
||
<version>2.4.1</version>
|
||
</dependency>
|
||
```
|
||
|
||
## Configuration
|
||
### Broker Configs
|
||
基础的配置如下:
|
||
- broker.id
|
||
- log.dirs
|
||
- zookeeper
|
||
> zookeeper.connect
|
||
> 通过`hostname:port`以字符串的形式来指定zookeeper连接,hostname和port为zookeeper server对应的域名和端口。如果要指定zookeeper集群,可以通过`hostname1:port1,hostname2:port2,hostname3:port3`的形式来指定多个hosts。
|
||
|
||
#### Broker Configs 动态更新
|
||
从Kafka版本1.1往后,一些broker config可以被更新而不需要重启broker。可以在`Broker Config`条目的`Dynamic Update Mode`栏查看是否该config栏是否允许动态更新:
|
||
- read-only:如果要更新该条目,需要重新启动broker
|
||
- per-broker:可以对每个broker进行动态更新
|
||
- cluster-wide:可以在集群的范围内进行动态更新
|
||
|
||
如下命令会修改broker 0的配置:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
|
||
```
|
||
如下命令会返回broker 0的动态配置:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
|
||
```
|
||
如果要删除一个覆盖的配置并且将配置值回滚为静态配置或者默认值,可以使用如下命令:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
|
||
```
|
||
一些配置可以设置为集群默认的配置,在整个集群中都维护为一致的值。集群中所有的broker都会处理cluster default update,如下命令会更新集群中所有broker的log cleaner threads:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
|
||
```
|
||
通过如下命令可以输出集群范围内的默认配置:
|
||
```shell
|
||
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
|
||
```
|
||
所有可以在集群层面配置的条目都可以针对单个broker配置。如果一个条目在多个条目都有配置,那么会按照如下优先级:
|
||
- 存储在zookeeper中的针对单个broker的配置
|
||
- 存储在zookeeper中的针对集群的配置
|
||
- server.properties中静态配置的条目
|
||
- Kafka的默认值
|
||
|
||
#### 动态更新Password Configs
|
||
动态更新的password config值在存储到zookeeper之前会被加密。 |