Files
rikako-note/mq/kafka/kafka.md

24 KiB
Raw Blame History

Apache Kafka

Introduction

Apache Kafka是一个分布式的stream平台具有如下三个核心功能

  • 发布和订阅record stream类似于消息队列或企业消息系统
  • 通过一种容错的持久化方式对record stream进行存储
  • 当record stream产生时对其进行处理

Kafka通常应用于如下两类应用

  • 构建实时的流数据管道,应用或系统之间传递的数据是可靠传输的
  • 构建实时stream应用应用会传输流数据并且对流数据进行响应

Kafka核心观念

  • Kafka可以作为集群在一台或多台服务器上运行服务器可以跨多个数据中心
  • kafka在topic的分类中存储stream record
  • 每条record都含有一个key、一个value和一个时间戳

Kafka具有四类核心API

  • Producer API:允许应用发布records stream到一个或多个kafka topic中
  • Consumer API允许应用订阅一个或多个topic并且处理被订阅topic的record流
  • Stream APIStream API允许应用像stream processor一样从一个或多个topic消费input stream并向一个或多个output topic产生output stream能够有效的将输入流转化为输出流
  • Connector API可以用于构建和运行一个可重用的生产者或消费者生产者或消费者会将Kafka topic现存的应用或data system。**例如一个关系型数据库的connector可以捕获对数据库表中的所有变动

在client和server之间的交流通过一个简单、高性能、语言无关的tcp协议进行。该tcp协议拥有版本并且新的版本会兼容旧的版本。

Topic and Logs

Topic是被发布record的类别或是源名称。在Kafka中一个Topic可以有0个、1个或多个消费者订阅。
对于每个topickafka集群都会维护一个分区日志每个分区日志都是一个有序的、不可变的record序列record序列被不断添加到分区的尾部。分区中的每条record都分配了一个序列号offset序列号将唯一标识分区中的每条record。
Kafka集群将会把所有已发布的record进行持久化无论其是否已经被消费record的保留期是可配置的。例如如果将保留策略设置为2天在消息被发布的两天之内record都是可消费的但是两天之后record将会被丢弃并且释放空间。
Kafka即使数据大小不同性能也是恒定的存储大量数据并不会影响Kafka的性能故而长时间存储数据并不会带来性能的问题。
实际上每个消费者保存的元数据是Kafka log的偏移量偏移量由消费者控制。正常情况下消费者会在读取记录后线性推进offset但是offset完全由消费者控制故而消费者可以按其想要的任何顺序对消息进行消费。例如消费者可以将offset设置为旧的位置并且对已经被消费的record进行重复消费。

日志中分区的目的如下:

  • 允许日志扩展到适合单个server的大小。每个独立的分区必须适合分区位于的服务器但是一个topic可以含有多个分区故而topic可以处理任意数量的数据
  • 日志分区某种意义上充当了并行的单元

Distribution

log分区分布在Kafka集群的服务器上每个服务器处理对共享分区的数据和请求。每个分区都跨可配置数量的服务器进行复制从而增加容错性。
每个分区都有一台server作为“主机”并由0或多台其他server作为“从机”。“主机”会处理所有对该分区的读写请求而“从机”只是会被动的复制主机。如果“主机”宕机那么“从机”会自动成为新的“主机”。每台服务器都充当了部分分区的“主机”和其他分区的“从机”从而负载在集群中间是均衡的。

传统的主从复制例如redis主从复制读操作分布在主机和从机之间而写操作只针对主机针对多读少写的场景能在集群之间分担读压力
而对于kafka集群通过分区来实现负载均衡每个分区都分布在多台server上而读写操作全部发生在主机上从机只作为主机宕机后的备份。而一个Topic可分为多个分区故而通过分区实现了负载在集群服务器之间的均衡。

Producers

生产者发送record数据到topic。生产者可以选择将record发送到topic中的哪个分区。可以通过轮询的方式来将record分配到不同的分区。

Consumers

消费者通过consumer group name的属性来标记自己。对于每条发布到topic中的记录都会被发布到订阅了该topic的所有consumer group中对于每个consumer grouprecord都会被发送给consumer group中的一个消费者实例。消费者实例可以在不同的进程上或不同的机器上。
如果所有的消费者实例都位于一个consumer gorup中那么record会在所有的消费者实例之间进行负载均衡。
如果所有的消费者实例位于不同的consumer group中那么record会被广播给所有的consumer group。

通常的topic含有较少数量的consumer group每个consumer group都是一个“逻辑订阅者”。每个consumer group都有许多的消费者实例组成从而实现容错性和可拓展性。
上述consumer group整体作为一个订阅者在此语义中订阅者并非是单个进程,而是一个由消费者实例组成的集群

在Kafka中“消费”操作的实现方式是通过将分区日志根据消费者实例进行划分故而每个实例在任何时间点对于其分配到的期间都是唯一的消费者。
维护consumer group中成员的过程是由Kafka协议动态处理的如果新的消费者实例加入到consumer group中它们会接管从其他group成员中获取到的分区区间如果现有消费者实例宕机那么宕机实例的分区区间将会被分配给其他的消费者实例。

multi-tenancy多租户

可以将部署Kafka作为多租户的解决方案。可以通过配置哪些topic可以产生和消费数据来启用多租户。这些操作也支持配额。

Guarantees

在高层Kafka可以提供如下保证

  • 一个生产者实例发送给特定topic分区的消息会按消息发送的顺序添加到topic分区中
  • 一个消费者实例将会按照消息存储在分区log中的顺序消费消息
  • 对于一个复制因子为N的主题其最多可以容忍N-1台服务器宕机而不会丢失提交给日志的任何记录

Kafka Storage System

写入到Kafka的数据将会被持久化到磁盘中并被复制到从机以解决容错。Kafka允许生产者等待ack返回在数据被完全的被复制并且持久化之前生产者向Kafka的写入操作都不会被认为完成。
Kafka使用的磁盘结构具有很好的拓展性不管有50K或是50T的数据写入到服务器Kafka的性能都相同。
由于严格的存储策略和允许客户端控制其读写位置可以将Kafka看作一种特殊目的的分布式文件系统该文件系统具有高性能、低延时并且容错性高主从复制。

Quick Start

Start the server

Kafka使用了Zookeeper在启动Kafka之前必须启动一个zookeeper实例。
启动zookeeper命令

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动zookeeper之后可以启动kafka实例

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Create a topic

如下命令会创建一个名为”test“的topic该topic含有一个分区和一个复制

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

可以通过如下命令列出现存的topic

> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

除了手动创建topic还可以将broker配置为如果record发布到一个不存在topic该不存在topic会自动被创建。

Send Some Message

可以通过命令行将文件或是标准输入中的内容发送到Kafka集群默认情况下每一行都会作为一个独立的消息被发送

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Start a consumer

Kafka同样有一个命令行消费者可以将消息内容输出到标准输出

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

Setting multi-broker cluster

为每个broker创建一个config文件

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

为每个broker单独设置配置文件

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

broker.id属性对集群中的每个节点都是唯一且永久的名称。
通过如下命令可以再开启两个kafka节点

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

通过如下方式可以创建一个复制因子为3的新topic

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

通过如下命令行,可以获知集群和各个节点的信息:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

对于每个分区leader都是随机选择的并且在leader宕机之后会有一台从机自动的成为leader

APIS

Kafka包含5个核心api

  1. Producer API允许应用向Kafka集群的Topic发送stream数据
  2. Consumer API允许应用丛Kafka集群的Topic中读取数据
  3. Streams API允许在input topic和output topic之间传递stream数据
  4. Connect API允许实现连接器连接器会不断从源系统或应用拉拉取数据到Kafka或从Kafka推送数据到系统或者应用
  5. Admin API允许管理和查看topicbroker和其他Kafka对象

Producer API

要使用Producer API需要添加如下maven依赖

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.4.1</version>
</dependency>

Consumer API

要使用Consumer API需要添加如下maven依赖

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.4.1</version>
</dependency>

Stream API

要使用Stream API需要添加如下maven依赖

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
			<version>2.4.1</version>
</dependency>

Admin API

要使用Admin API需要添加如下maven依赖

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.4.1</version>
</dependency>

Configuration

Broker Configs

基础的配置如下:

  • broker.id
  • log.dirs
  • zookeeper

zookeeper.connect 通过hostname:port以字符串的形式来指定zookeeper连接hostname和port为zookeeper server对应的域名和端口。如果要指定zookeeper集群可以通过hostname1:port1,hostname2:port2,hostname3:port3的形式来指定多个hosts。

Broker Configs 动态更新

从Kafka版本1.1往后一些broker config可以被更新而不需要重启broker。可以在Broker Config条目的Dynamic Update Mode栏查看是否该config栏是否允许动态更新

  • read-only如果要更新该条目需要重新启动broker
  • per-broker可以对每个broker进行动态更新
  • cluster-wide可以在集群的范围内进行动态更新

如下命令会修改broker 0的配置

  > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

如下命令会返回broker 0的动态配置

 > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe

如果要删除一个覆盖的配置并且将配置值回滚为静态配置或者默认值,可以使用如下命令:

  > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads

一些配置可以设置为集群默认的配置在整个集群中都维护为一致的值。集群中所有的broker都会处理cluster default update如下命令会更新集群中所有broker的log cleaner threads

 > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2

通过如下命令可以输出集群范围内的默认配置:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

所有可以在集群层面配置的条目都可以针对单个broker配置。如果一个条目在多个条目都有配置那么会按照如下优先级

  • 存储在zookeeper中的针对单个broker的配置
  • 存储在zookeeper中的针对集群的配置
  • server.properties中静态配置的条目
  • Kafka的默认值

动态更新Password Configs

动态更新的password config值在存储到zookeeper之前会被加密。如果想要启用password config的动态更新broker config中的password.encoder.secret条目必须要在server.properties中被配置。
如果broker重启那么用于password加密的old secret必须在静态broker config中的password.encoder.old.secret条目中被配置而新的secret将会在password.encoder.secret条目配置。当容器启动时当前zookeeper中存储的所有动态password config都会通过new secret重新加密。

在启动broker之前更新zookeeper password config

kafka-config.sh支持动态broker config在启动broker之前通过zookeeper被更新。如果在alter命令行中包含了任何password config那么broker config条目password.encoder.secret必须被指定。可以指定额外的加密参数password encoder config将不会被存储在zookeeper中。
示例如下所示:

  > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config
    'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'

如上命令中listener.name.internal.ssl.key.password将会使用指定的指定的encoder config被加密存储在zookeeper中encoder.secret和encoder.iterations则不会被存储在zookeeper中。

Design

Motivation

Kafka可以具有如下用例

  • Kafka用于高吞吐量来应对大量事件流的场景
  • Kafka能够处理大量的数据积压从而支持离线系统的周期性事件加载
  • Kafka也支持处理低延迟交付
  • Kafka支持分区和分布式存储

Persistence

filesystem store structure

Kafka非常依赖于文件系统来存储和缓存消息。通常观点中磁盘效率被认为很慢。但是磁盘的效率是依赖于其存储结构的一个拥有良好结构设计的磁盘存储能够提供和网络一样快速的性能。
对于磁盘性能磁盘读写的吞吐量和磁盘查找所带来的延迟会存在极大差异磁盘查找所带来的延迟会极大影响磁盘读写的性能。在磁盘阵列的测试中线性写入的性能接近600MB/sec但是随机写入的性能只在100k/sec左右差距近乎6000倍。线性写入是所有模式中最可预测的并且由操作系统进行了优化。一个现代的操作系统提供了read-ahead和write-behind技术该技术可以以大块的形式预取数据并且将较小的逻辑写入组合成大的物理写入来优化性能。
为了弥补随机读写和线性读写的性能差异现代操作系统在将内存作为磁盘缓存方面变得十分激进。现代操作系统会将当前所有的空闲内存转化为磁盘缓存并且在回收内存时只会带来很小的性能开销。所有的磁盘读写操作都会经过内存除非使用direct IO。
更重要的是Kafka是基于JVM构建的JVM具有如下特性

  • java对象带来的额外内存开销十分大通常是存储数据的两倍或更多
  • 随着堆内存中数据的增加java GC变得越来越复杂和缓慢

由于JVM结构存在以上缺陷故而使用filesystem并依赖页面缓存的方案优于维护一个内存缓存或其他结构

通过将所有可获取的空闲内存作为缓存,至少将可获取缓存大小增加了一倍;而通过紧凑的字节结构而不是对象结构,又几乎将可获取的缓存大小增加了一倍

在32GB内存的机器中缓存大小至多可至28-20GB并且没有GC开销。甚至在服务重启时之前的cache仍然会有效。而对于进程内的内存缓存在服务重启之后缓存的rebuild会带来巨大的开销10GB缓存可能带来10分钟的开销如果重启之后不执行rebuild操作那么重启之后的初始性能将会受到很大的影响。
通过使用操作系统提供的page cache关于维护缓存和文件系统一致性的逻辑全都包含在操作系统中无需在代码中再次实现故而这也大大简化了code的实现。

Constant Time Suffices

在消息系统中使用的持久化数据结构通常是对每个消费者队列关联一个BTree或其他通用的随机访问数据结构用于维护消息的元数据。BTree通常会带来较高的开销。即使BTree操作的时间复杂度是O(logN)虽然O(logN)通常被认为是等于恒定事件但是对于磁盘操作来说并不是这样。磁盘查找一次需耗费大概10ms并且每个磁盘同一时刻只能够进行一个查找故而并行性也会收到限制。故而即使是少量的磁盘查找也会带来巨大的开销。当存储系统将非常快的缓存操作和非常慢的物理磁盘操作混合在一起时即使数据增加固定量的缓存树结构的性能开销通常也是超线性的例如将data增加两倍时性能开销将增加不止两倍。
持久队列可以建立在简单的文件读取和向文件末尾添加数据上logging解决方案通常就是这样的。该结构具有如下优势所有操作都是O(1)的,并且读操作不会阻塞写操作,读操作之间也不会相互阻塞。这也具有明显的性能优势,读写性能完全和数据量大小分离。
具有几乎无限的磁盘空间而不会带来任何性能损失意味着可以提供一些其他消息系统没有的特性。例如在Kafka中即使消息已经被消费者消费也不会像其他消息系统中一样马上将消息删除而是可以选择将消息保存一段时间之后再删除例如一周

效率

在消息系统中,低下的效率通常是由糟糕的磁盘访问模式造成的,通常这种效率低下有两种形式:

  • 太多的small I/O操作
  • 过多的字节复制

small I/O

small I/O操作通常发生在客户端和服务端之间或服务端自己的持久化操作之中。为了避免这些Kafka的协议是基于“消息集合”的抽象构建的“消息集合”会将消息组合在一起。“消息集合”允许网络针对消息集合进行请求并且分担网络往返的开销而不是一次仅仅发送一条消息。服务端会一次性将消息块添加到log中消费者也会一次性获取大的线性块。
该优化会将速度提高几个数量级批量处理会导致更大的网络包更大的连续磁盘操作连续的内存块等所有这些都将使Kafka将随机消息的突发流转化为流向消费者线性写入。

byte copying

另一个造成效率低下的问题是过多的字节复制。为了避免过多的字节复制Kafka采用了一个标准的二进制消息格式该消息格式由生产者broker消费者进行共享故而消息在生产者、broker、消费者之间进行传递时无需进行任何修改。
由broker维护的消息日志其本身只是目录下的一些文件每个文件中都填充了一些列的消息集合消息写入磁盘的格式和生产者、消费者使用的消息格式完全相同。
现代unix操作系统提供了一个高度优化的code path用于从页面缓存中传输数据到socket在linux中其通过sendfile system call完成。
将数据从文件中传输到socket的公共路径

  • 操作系统将数据从磁盘中读入到内核空间的页面缓存中
  • 应用将数据从内核空间读取到用户空间的缓冲区中
  • 应用将数据写回到内核空间的套接字缓冲区中
  • 操作系统将socket buffer中的数据复制到NIC缓冲区中并发送给网络

上述通用路径显然是低效的其中有四次复制和两次系统调用通过sendfile允许操作系统直接从页面缓存向网络直接发送数据重复的复制操作可以被避免。故而在优化后的路径中只需要将数据复制到NIC buffer中。
在一个Topic存在多个消费者的用例中使用上述的优化数据只需要被复制到页面缓存中一次而不是被存储到内存中并且每次读取时都复制到用户空间中。

端对端的批量压缩

在一些用例场景下瓶颈并不是cpu或disk而是网络带宽。用户可以通过压缩单条消息来节省带宽但是这样可能会导致压缩率很低因为重复总是在相同类型消息之间产生例如多条JSON格式的消息具有相同的field name。如果要提升压缩效率需要将多条消息一起压缩而不是压缩单条消息。
Kafka支持批量的高效压缩消息可以被批量压缩并且并且被发送给服务端。该批量消息可以以压缩的格式写入到磁盘中并且只会被消费者解压缩。
Kafka支持GZIPLZ4ZStandard压缩协议。

生产者

负载均衡

生产者会直接将数据发送给作为该分区leader的broker并不会经过任何的中间路由层。为了帮助生产者实现这些所有的Kafka节点都能响应对元数据的请求元数据中包含哪些server还存活以及topic中分区对应的leader都在哪。返回的元数据可以正确的告知生产者应该将请求发送到哪个server。
客户端会控制将消息发送到哪一个分区。这可以随机完成实现一种随机负载均衡或者通过语义分区函数完成。Kafka提供了一个接口用于语义分区可以允许用户来提供一个key进行分区并使用该key散列到一个分区也可以覆盖分区函数

异步发送

批量处理是提高性能的重要因素之一为了支持批量处理Kafka生产者会在内存中累计数据并在单个请求中发送更大的批数据。批处理可以被配置为累计不超过固定数量的消息或不超过特定界限的延迟例如64K10ms

消费者