Files
rikako-note/mq/kafka/kafka.md

141 lines
10 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Apache Kafka
## Introduction
Apache Kafka是一个分布式的stream平台具有如下三个核心功能
- 发布和订阅record stream类似于消息队列或企业消息系统
- 通过一种容错的持久化方式对record stream进行存储
- 当record stream产生时对其进行处理
Kafka通常应用于如下两类应用
- 构建实时的流数据管道,应用或系统之间传递的数据是可靠传输的
- 构建实时stream应用应用会传输流数据并且对流数据进行响应
Kafka核心观念
- Kafka可以作为集群在一台或多台服务器上运行服务器可以跨多个数据中心
- kafka在`topic`的分类中存储stream record
- 每条record都含有一个key、一个value和一个时间戳
Kafka具有四类核心API
- Producer API:允许应用发布records stream到一个或多个kafka topic中
- Consumer API允许应用订阅一个或多个topic并且处理被订阅topic的record流
- Stream APIStream API允许应用像stream processor一样从一个或多个topic消费input stream并向一个或多个output topic产生output stream能够有效的将输入流转化为输出流
- Connector API可以用于构建和运行一个可重用的生产者或消费者生产者或消费者会将Kafka topic现存的应用或data system。**例如一个关系型数据库的connector可以捕获对数据库表中的所有变动
在client和server之间的交流通过一个简单、高性能、语言无关的tcp协议进行。该tcp协议拥有版本并且新的版本会兼容旧的版本。
### Topic and Logs
Topic是被发布record的类别或是源名称。在Kafka中一个Topic可以有0个、1个或多个消费者订阅。
对于每个topickafka集群都会维护一个分区日志每个分区日志都是一个有序的、不可变的record序列record序列被不断添加到分区的尾部。分区中的每条record都分配了一个序列号`offset`序列号将唯一标识分区中的每条record。
Kafka集群将会把所有已发布的record进行持久化无论其是否已经被消费record的保留期是可配置的。**例如如果将保留策略设置为2天在消息被发布的两天之内record都是可消费的但是两天之后record将会被丢弃并且释放空间。**
Kafka即使数据大小不同性能也是恒定的存储大量数据并不会影响Kafka的性能故而长时间存储数据并不会带来性能的问题。
实际上每个消费者保存的元数据是Kafka log的偏移量偏移量由消费者控制。正常情况下消费者会在读取记录后线性推进offset但是offset完全由消费者控制故而消费者可以按其想要的任何顺序对消息进行消费。**例如消费者可以将offset设置为旧的位置并且对已经被消费的record进行重复消费。**
日志中分区的目的如下:
- 允许日志扩展到适合单个server的大小。每个独立的分区必须适合分区位于的服务器但是一个topic可以含有多个分区故而topic可以处理任意数量的数据
- 日志分区某种意义上充当了并行的单元
### Distribution
log分区分布在Kafka集群的服务器上每个服务器处理对共享分区的数据和请求。每个分区都跨可配置数量的服务器进行复制从而增加容错性。
每个分区都有一台server作为“主机”并由0或多台其他server作为“从机”。“主机”会处理所有对该分区的读写请求而“从机”只是会被动的复制主机。如果“主机”宕机那么“从机”会自动成为新的“主机”。每台服务器都充当了部分分区的“主机”和其他分区的“从机”从而负载在集群中间是均衡的。
> 传统的主从复制例如redis主从复制读操作分布在主机和从机之间而写操作只针对主机针对多读少写的场景能在集群之间分担读压力
> 而对于kafka集群通过分区来实现负载均衡每个分区都分布在多台server上而读写操作全部发生在主机上从机只作为主机宕机后的备份。而一个Topic可分为多个分区故而通过分区实现了负载在集群服务器之间的均衡。
### Producers
生产者发送record数据到topic。生产者可以选择将record发送到topic中的哪个分区。可以通过轮询的方式来将record分配到不同的分区。
### Consumers
消费者通过***consumer group name***的属性来标记自己。对于每条发布到topic中的记录都会被发布到订阅了该topic的所有consumer group中对于每个consumer grouprecord都会被发送给consumer group中的一个消费者实例。消费者实例可以在不同的进程上或不同的机器上。
如果所有的消费者实例都位于一个consumer gorup中那么record会在所有的消费者实例之间进行负载均衡。
如果所有的消费者实例位于不同的consumer group中那么record会被广播给所有的consumer group。
> 通常的topic含有较少数量的consumer group每个consumer group都是一个“逻辑订阅者”。每个consumer group都有许多的消费者实例组成从而实现容错性和可拓展性。
> 上述consumer group整体作为一个订阅者**在此语义中订阅者并非是单个进程,而是一个由消费者实例组成的集群**
在Kafka中“消费”操作的实现方式是通过将分区日志根据消费者实例进行划分故而每个实例在任何时间点对于其分配到的期间都是唯一的消费者。
维护consumer group中成员的过程是由Kafka协议动态处理的如果新的消费者实例加入到consumer group中它们会接管从其他group成员中获取到的分区区间如果现有消费者实例宕机那么宕机实例的分区区间将会被分配给其他的消费者实例。
### multi-tenancy多租户
可以将部署Kafka作为多租户的解决方案。可以通过配置哪些topic可以产生和消费数据来启用多租户。这些操作也支持配额。
### Guarantees
在高层Kafka可以提供如下保证
- 一个生产者实例发送给特定topic分区的消息会按消息发送的顺序添加到topic分区中
- 一个消费者实例将会按照消息存储在分区log中的顺序消费消息
- 对于一个复制因子为N的主题其最多可以容忍N-1台服务器宕机而不会丢失提交给日志的任何记录
### Kafka Storage System
写入到Kafka的数据将会被持久化到磁盘中并被复制到从机以解决容错。Kafka允许生产者等待ack返回在数据被完全的被复制并且持久化之前生产者向Kafka的写入操作都不会被认为完成。
Kafka使用的磁盘结构具有很好的拓展性不管有50K或是50T的数据写入到服务器Kafka的性能都相同。
由于严格的存储策略和允许客户端控制其读写位置可以将Kafka看作一种特殊目的的分布式文件系统该文件系统具有高性能、低延时并且容错性高主从复制。
## Quick Start
### Start the server
Kafka使用了Zookeeper在启动Kafka之前必须启动一个zookeeper实例。
启动zookeeper命令
```shell
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
```
启动zookeeper之后可以启动kafka实例
```shell
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
```
### Create a topic
如下命令会创建一个名为”test“的topic该topic含有一个分区和一个复制
```shell
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
```
可以通过如下命令列出现存的topic
```shell
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
```
除了手动创建topic还可以将broker配置为如果record发布到一个不存在topic该不存在topic会自动被创建。
### Send Some Message
可以通过命令行将文件或是标准输入中的内容发送到Kafka集群默认情况下每一行都会作为一个独立的消息被发送
```shell
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
```
### Start a consumer
Kafka同样有一个命令行消费者可以将消息内容输出到标准输出
```shell
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
```
### Setting multi-broker cluster
为每个broker创建一个config文件
```shell
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
```
为每个broker单独设置配置文件
```vim
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
```
`broker.id`属性对集群中的每个节点都是唯一且永久的名称。
通过如下命令可以再开启两个kafka节点
```shell
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
```
通过如下方式可以创建一个复制因子为3的新topic
```shell
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
```
通过如下命令行,可以获知集群和各个节点的信息:
```shell
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
```
对于每个分区leader都是随机选择的并且在leader宕机之后会有一台从机自动的成为leader