From 46d3efb46d900050eeec8cafe0251f15d89fb317 Mon Sep 17 00:00:00 2001 From: Rikako Wu <496063163@qq.com> Date: Mon, 20 Feb 2023 18:47:15 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=85=B3=E4=BA=8EKafka?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E7=9A=84=E9=98=85=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/kafka.md | 68 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/mq/kafka/kafka.md b/mq/kafka/kafka.md index cd5aeb6..e305af1 100644 --- a/mq/kafka/kafka.md +++ b/mq/kafka/kafka.md @@ -224,4 +224,70 @@ Kafka包含5个核心api: - Kafka的默认值 #### 动态更新Password Configs -动态更新的password config值在存储到zookeeper之前会被加密。 \ No newline at end of file +动态更新的password config值在存储到zookeeper之前会被加密。如果想要启用password config的动态更新,broker config中的`password.encoder.secret`条目必须要在server.properties中被配置。 +如果broker重启,那么用于password加密的old secret必须在静态broker config中的`password.encoder.old.secret`条目中被配置,而新的secret将会在`password.encoder.secret`条目配置。当容器启动时,当前zookeeper中存储的所有动态password config都会通过new secret重新加密。 + +#### 在启动broker之前更新zookeeper password config +kafka-config.sh支持动态broker config在启动broker之前通过zookeeper被更新。如果在alter命令行中包含了任何password config,那么broker config条目`password.encoder.secret`必须被指定。可以指定额外的加密参数,password encoder config将不会被存储在zookeeper中。 +示例如下所示: +```shell + > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config + 'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192' +``` +如上命令中,listener.name.internal.ssl.key.password将会使用指定的指定的encoder config被加密存储在zookeeper中,encoder.secret和encoder.iterations则不会被存储在zookeeper中。 + +## Design +### Motivation +Kafka可以具有如下用例: +- Kafka用于高吞吐量来应对大量事件流的场景 +- Kafka能够处理大量的数据积压,从而支持离线系统的周期性事件加载 +- Kafka也支持处理低延迟交付 +- Kafka支持分区和分布式存储 +### Persistence +#### filesystem store structure +Kafka非常依赖于文件系统来存储和缓存消息。通常观点中,磁盘效率被认为很慢。但是,磁盘的效率是依赖于其存储结构的,一个拥有良好结构设计的磁盘存储能够提供和网络一样快速的性能。 +对于磁盘性能,磁盘读写的吞吐量和磁盘查找所带来的延迟会存在极大差异,磁盘查找所带来的延迟会极大影响磁盘读写的性能。在磁盘阵列的测试中,线性写入的性能接近600MB/sec,但是随机写入的性能只在100k/sec左右,差距近乎6000倍。线性写入是所有模式中最可预测的,并且由操作系统进行了优化。一个现代的操作系统提供了read-ahead和write-behind技术,该技术可以以大块的形式预取数据,并且将较小的逻辑写入组合成大的物理写入来优化性能。 +为了弥补随机读写和线性读写的性能差异,现代操作系统在将内存作为磁盘缓存方面变得十分激进。现代操作系统会将当前所有的空闲内存转化为磁盘缓存,并且在回收内存时只会带来很小的性能开销。所有的磁盘读写操作都会经过内存,除非使用direct IO。 +更重要的是,Kafka是基于JVM构建的,JVM具有如下特性: +- java对象带来的额外内存开销十分大,通常是存储数据的两倍或更多 +- 随着堆内存中数据的增加,java GC变得越来越复杂和缓慢 + +由于JVM结构存在以上缺陷,故而使用filesystem并依赖页面缓存的方案优于维护一个内存缓存或其他结构: +> 通过将所有可获取的空闲内存作为缓存,至少将可获取缓存大小增加了一倍;而通过紧凑的字节结构而不是对象结构,又几乎将可获取的缓存大小增加了一倍 + +在32GB内存的机器中,缓存大小至多可至28-20GB,并且没有GC开销。甚至,在服务重启时,之前的cache仍然会有效。而对于进程内的内存缓存,在服务重启之后,缓存的rebuild会带来巨大的开销(10GB缓存可能带来10分钟的开销);如果重启之后不执行rebuild操作,那么重启之后的初始性能将会受到很大的影响。 +通过使用操作系统提供的page cache,关于维护缓存和文件系统一致性的逻辑全都包含在操作系统中,无需在代码中再次实现,故而这也大大简化了code的实现。 +#### Constant Time Suffices +在消息系统中,使用的持久化数据结构通常是对每个消费者队列关联一个BTree或其他通用的随机访问数据结构,用于维护消息的元数据。BTree通常会带来较高的开销。即使BTree操作的时间复杂度是O(logN),虽然O(logN)通常被认为是等于恒定事件,但是对于磁盘操作来说并不是这样。磁盘查找一次需耗费大概10ms,并且每个磁盘同一时刻只能够进行一个查找,故而并行性也会收到限制。故而,即使是少量的磁盘查找也会带来巨大的开销。当存储系统将非常快的缓存操作和非常慢的物理磁盘操作混合在一起时,即使数据增加固定量的缓存,树结构的性能开销通常也是超线性的:例如,将data增加两倍时,性能开销将增加不止两倍。 +持久队列可以建立在简单的文件读取和向文件末尾添加数据上,logging解决方案通常就是这样的。该结构具有如下优势:所有操作都是O(1)的,并且读操作不会阻塞写操作,读操作之间也不会相互阻塞。这也具有明显的性能优势,读写性能完全和数据量大小分离。 +具有几乎无限的磁盘空间,而不会带来任何性能损失,意味着可以提供一些其他消息系统没有的特性。例如,在Kafka中,即使消息已经被消费者消费,也不会像其他消息系统中一样马上将消息删除,而是可以选择将消息保存一段时间之后再删除(例如一周)。 +### 效率 +在消息系统中,低下的效率通常是由糟糕的磁盘访问模式造成的,通常这种效率低下有两种形式: +- 太多的small I/O操作 +- 过多的字节复制 +#### small I/O +small I/O操作通常发生在客户端和服务端之间或服务端自己的持久化操作之中。为了避免这些,Kafka的协议是基于“消息集合”的抽象构建的,“消息集合”会将消息组合在一起。“消息集合”允许网络针对消息集合进行请求,并且分担网络往返的开销,而不是一次仅仅发送一条消息。服务端会一次性将消息块添加到log中,消费者也会一次性获取大的线性块。 +该优化会将速度提高几个数量级,批量处理会导致更大的网络包,更大的连续磁盘操作,连续的内存块等,所有这些都将使Kafka将随机消息的突发流转化为流向消费者线性写入。 +#### byte copying +另一个造成效率低下的问题是过多的字节复制。为了避免过多的字节复制,Kafka采用了一个标准的二进制消息格式,该消息格式由生产者,broker,消费者进行共享,故而消息在生产者、broker、消费者之间进行传递时无需进行任何修改。 +由broker维护的消息日志其本身只是目录下的一些文件,每个文件中都填充了一些列的消息集合,消息写入磁盘的格式和生产者、消费者使用的消息格式完全相同。 +现代unix操作系统提供了一个高度优化的code path用于从页面缓存中传输数据到socket,在linux中其通过sendfile system call完成。 +将数据从文件中传输到socket的公共路径: +- 操作系统将数据从磁盘中读入到内核空间的页面缓存中 +- 应用将数据从内核空间读取到用户空间的缓冲区中 +- 应用将数据写回到内核空间的套接字缓冲区中 +- 操作系统将socket buffer中的数据复制到NIC缓冲区中,并发送给网络 + +上述通用路径显然是低效的,其中有四次复制和两次系统调用,通过sendfile,允许操作系统直接从页面缓存向网络直接发送数据,重复的复制操作可以被避免。故而,在优化后的路径中,只需要将数据复制到NIC buffer中。 +在一个Topic存在多个消费者的用例中,使用上述的优化,数据只需要被复制到页面缓存中一次,而不是被存储到内存中并且每次读取时都复制到用户空间中。 +### 端对端的批量压缩 +在一些用例场景下,瓶颈并不是cpu或disk而是网络带宽。用户可以通过压缩单条消息来节省带宽,但是这样可能会导致压缩率很低,因为重复总是在相同类型消息之间产生(例如多条JSON格式的消息具有相同的field name)。如果要提升压缩效率,需要将多条消息一起压缩,而不是压缩单条消息。 +Kafka支持批量的高效压缩,消息可以被批量压缩并且并且被发送给服务端。该批量消息可以以压缩的格式写入到磁盘中,并且只会被消费者解压缩。 +Kafka支持GZIP,LZ4,ZStandard压缩协议。 +### 生产者 +#### 负载均衡 +生产者会直接将数据发送给作为该分区leader的broker,并不会经过任何的中间路由层。为了帮助生产者实现这些,所有的Kafka节点都能响应对元数据的请求,元数据中包含哪些server还存活以及topic中分区对应的leader都在哪。返回的元数据可以正确的告知生产者应该将请求发送到哪个server。 +客户端会控制将消息发送到哪一个分区。这可以随机完成,实现一种随机负载均衡,或者通过语义分区函数完成。Kafka提供了一个接口用于语义分区,可以允许用户来提供一个key进行分区,并使用该key散列到一个分区(也可以覆盖分区函数)。 +#### 异步发送 +批量处理是提高性能的重要因素之一,为了支持批量处理,Kafka生产者会在内存中累计数据并在单个请求中发送更大的批数据。批处理可以被配置为累计不超过固定数量的消息或不超过特定界限的延迟(例如64K,10ms)。 +### 消费者