Kafka原理解析
【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>
Kafka是一款高性能的流式处理平台,其天然支持集群的水平扩展,并且以其独特的设计支持极高的消息处理效率。本文首先会对kafka进行简单的介绍,然后会对kafka的优点进行讲解,以协助读者更好的理解kafka的优势,最后分别对producer、broker和consumer端的设计方式进行讲解。
1. kafka简介
对于kafka,其是一款消息系统,而消息系统不可避免的会有producer、消息服务器和consumer这三种角色。在kafka中,我们称kafka集群中的每一台服务器都为broker。这三者的整体工作流程如下:
可以看到,这里producer生产消息,然后发送给broker,在broker存储之后,由消费者进行消息的消费。在kafka中,消息是分为三个层次的,即topic-partition-message。每个producer生成一个消息的时候,都需要指定将该消息发送到那个topic上,而consumer在消费消息时则需要指定其消费的是哪个topic的消息,可以将topic理解为某一类型的消息的标识,比如用户数据更新的消息可以用user-update-topic
,而用户访问的消息使用user-access-topic
,各个消费者会根据自身的业务需要订阅不同的消息,从而进行不同的处理。在topic之下是partition,也即分区的概念,这里存在分区的主要原因主要有两个:
- kafka会将每个分区平均的分配到多个不同的broker上,而每个topic都会将消息平均的分配到其所拥有的partition上,通过这种方式,就可以将topic下的消息均匀的负载到各个不同的机器上;
- 另外,如果消息的数量比较大,由于分区是分配在不同机器上的,而现有的分区无法承载这么大量的消息,此时就可以通过增加机器和分区的方式将消息分散到更多的分区上,以减小服务器的压力。
在partition下面则是一个一个的消息,每条消息在partition中都会有一个唯一的编号,并且这些编号是依次增加的,我们称之为offset。在consumer进行消息消费时,多个consumer是可以组成一个组的,kafka会保证每个topic的每条消息是只会被一个消费者组所消费一次。这里的消费者组的概念其实就是由多个消费者实例所组成的一个集合,通过这个集合来共同消费一个topic下的消息,以此实现将消息分散到各个不同的消费者上的目的,从而实现消息消费压力的分散。kafka增加消费者组的优点主要在于:
- 通过消费者组实现消息负载的均衡分配;
- 实现消费者的容灾处理,比如如果某个消费者宕机了,那么kafka就会将该消费者所消费的分区指派给同组之内的另外的消费者进行消费;
- 实现消费者的水平扩容,比如某个消息的量非常大,现有的消费者无法支撑这么多消息的消费,那么可以通过在消费者组中增加消费者的方式来使消息均匀的分散到更多的消费者上。
如下是kafka进行消息生产,存储和消费的一个架构示意图:
图中除了producer和consumer的工作方式以外,需要说明的主要有两点:
- 每个topic的消息是分散到多个分区上的,而kafka是会为每个分区都创建指定数量的副本的,比如图中有三个分区:partition-0、partition-1和partition-2。这三个分区的副本因子都是2,也就是包括leader副本总共有两个副本,可以看到,partition-0的leader分区在broker 1上,而follower副本分区则在broker 2上,其余的两个分区partition-1和partition-2也是类似的。这种方式的优点在于,如果某个broker宕机,那么kafka就可以将服务切换到对应的副本broker上,从而实现消息的容灾处理;
- broker和consumer都依赖的zookeeper集群,其主要是需要zookeeper进行controller或者leader副本的选举等功能,需要注意的是,producer是没有依赖zookeeper的。
2. kafka优点
除了前面讲解的partition分区和消费者组所提供的高扩展性以外,kafka实现高吞吐量与其本身的设计方式是息息相关的,如下是其设计上的主要优点:
- 支持消息的压缩,压缩的类型主要支持GZIP、Snappy和LZ4,其中针对于kafka,相对来说LZ4压缩效果最好。压缩的优点主要有两点:a. 减少producer和consumer与kafka服务器之间网络交互的IO流量;b. 减小kafka服务器上对存储消息所使用的空间。对于一款消息系统来说,网络IO消耗和磁盘消耗是非常巨大的,虽然压缩会消耗producer和consumer端的CPU时钟,但是在producer和consumer端CPU资源不是很紧张的情况下,启用压缩功能能极大的提升kafka的效率;
- producer端支持消息的批量发送。对于消息的批量发送,kafka为每一个topic的每一个分区都分配了一个发送缓冲区,只有在消息缓冲区满了或者延迟发送的时延到了之后kafka就会将一批数据发送给服务端进行处理。这种批量发送的优点主要在于能够减少与producer与broker的交互次数,从而提升发送效率;
- broker端使用零拷贝机制来存储producer端发送过来的消息。所谓的零拷贝机制,就是kafka将从网络IO对应的内核缓存中的数据直接写入到磁盘内核中,并且使用DMA技术写入数据,这中方式减少了两次的数据在内核态与用户态之间相互复制的消耗;
- broker为每个分区数据往磁盘写入或者读取数据的时候都是顺序进行的,这种方式的最大优点在于避免了磁头的随机选址,而我们知道,磁头的随机访问效率是非常低的。另外,有数据表明,磁盘的顺序访问效率比内存的随机访问还要高;
3. producer设计
对于producer端的设计,其相对于broker和consumer来说是比较简单的。producer的主要功能是实现消息的发送,相信读者朋友一定有一个疑惑,就是kafka既然实现了集群,那么producer是如何知晓将当前消息发送到哪一个broker上的呢。这主要是与producer的负载均衡机制有关,默认的负载均衡机制是轮询,也即producer在连接broker之后,会尝试将集群以及topic相关的元数据拉取下来进行缓存,producer在发送消息时,其会获取当前消息的topic有哪些分区,以及这些分区所在的broker机器地址,然后采用轮询的方式将消息依次发送到这些分区上,从而实现消息的负载均衡。如下是producer生产消息的一个示意图:
可以看到,producer在经过负载均衡策略之后,会依次将消息写入到各个分区,并且写入的方式是追加的。对于producer端的设计,其本身除了消息的负载均衡外,还提供了消息的序列化、拦截器、压缩以及批量发送的功能。如下是producer端发送消息的整体流程示意图:
从图中可以看出,在KafkaProducer
发送一个ProducerRecord
的时候,其首先会经过指定的序列化器序列化当前消息的key和value,然后通过当前消息的topic获取其元数据,比如partition信息,接着经过指定的分区器进行消息目标分区的选择,如果没有指定分区器,则会使用默认的轮询的方式选择目标分区,最后选择分区之后,会将该消息放到该topic下该分区的消息缓冲池中,在producer中,每个topic的每个分区都会有一个消息缓冲池,以方便将该缓冲池中的消息批量的发送到指定的分区。如此就是KafkaProducer.send()
方法在发送消息时所完成的工作,可以看到,对于主线程而言,其效率是非常高的,而最有可能产生损耗的也只有消息的序列化,但这个的影响不是特别大,也就是说,KafkaProducer
的发送效率是非常高的。
在KafkaProducer
将消息放到消息缓冲池之后,这里会有一个专门的线程进行消息的发送。在发送消息的时候主要有如下几个参数需要注意:
-
batch.size
:该参数指定的是每次批量发送的时候消息的大小,默认是16KB,这是一个比较保守的数字,读者朋友可以适当增加该数值; -
linger.ms
:该参数指定了当前的消息会等待多久才进行发送,增加这个参数的原因主要在于将消息在缓冲池中进行适当的等待,因为批量发送消息的效率始终要比单次发送的高。这个参数与batch.size
参数的主要区别在于,当缓冲池中的消息大小达到了batch.size
就会被发送,而如果消息在延迟linger.ms
时间段之后如果其还未达到batch.size
指定的大小,那么其还是会立即发送; -
max.in.flight.requests.per.connection
:这个参数主要指定的是,在每次发送消息的时候,在每个request连接上当前消息允许有多少个未被响应的消息。由于producer会与每个broker都建立连接,比如当前发送给broker有5条消息,而剩余的消息都在等待,直到这5条消息都发送响应,这个时候未被响应的消息数量就是5。指定这个参数的主要作用在于要保证消息的严格有序的时候,该参数必须指定为1,也就是说,在前一个消息还未发送响应之前,下一条消息是不能发送的。需要这么做的主要原因在于,如果进行批量发送,比如发送5条消息,如果其中第三条消息发送失败,而第1、2、4、5条消息都发送成功了,那么kafka就会对第3条消息进行重试,如果成功了,那么本应该在第三条的消息现在却变成了第五条,这在要求严格有序的topic中是不能被允许的。
从图中可以看出在broker发送完之后,如果当前消息发生了异常,那么其是会进行重试的,重试次数使用retires
参数进行指定,当然这里发生重试主要是由于可重试的异常才会进行重试,比如controller选举、leader选举和网络抖动异常等等,重试过程是producer自动进行的,如果发生的异常不是可重试异常,比如序列化失败和未知异常等,kafka就会直接将异常返回。
4. broker设计
对于broker端的设计,这里主要从controller设计、分区leader与follower选举与复制、log日志记录方式等方面进行介绍。
4.1 controller设计
关于controller,其主要是在kafka集群中用于管理broker、topic以及集群preferred leader副本选举等工作的。controller本质上是一个特殊的controller,前面我们讲到,kafka是依赖于zookeeper进行集群协调的,这里的controller就是依赖于zookeeper选举出来的,在每个broker启动之后,其都会尝试在zookeeper创建/chroot/controller
节点,当某个broker创建成功时,该broker就会被选举为controller,从而进行前述的相关管理工作。如果当前controller宕机,那么zookeeper就会通知其余的broker进行新一轮的选举。controller对各个broker的管理工作基本上都是通过zookeeper的事件监听机制来实现的,如下是其原理图:
关于topic的管理,其也是通过zookeeper的监听器实现的,在producer或者kafka脚本工具创建一个topic的时候,其都是往zookeeper的/chroot/topics
节点下写入当前创建的topic子节点,然后controller由于监听了/chroot/topics
节点下所有子节点的创建事件,因而其就会得到通知,就会进行topic创建相关的工作。
所谓preferred leader副本选举,这主要是每个partition都会有多个副本,而只有一个leader副本会接收producer发送的消息,其余的副本都是从leader副本上复制数据。这里的preferred leader就是指在进行leader选举时,优先选择哪一个副本作为leader副本。
4.2 leader与follower副本选举与复制
在kafka中,会为每个分区都创建多个副本,每个副本一般都会分布在不同的机器上,以此达到容灾的目的。前面我们讲到,在每个partition上,消息都是顺序写入的,并且会为每条消息都指定一个offset,我们称之为偏移量。需要注意的是,这个偏移量指的是当前分区最新写入的消息的偏移量。而在partition中还有一个与偏移量有关的概念是high watermark,也即日志高水印值,也称为水位。水位指的是各个处于ISR中的副本都处于同步状态的消息的位移,而ISR则表示当前与leader副本保持同步状态的副本。ISR全称为in-sync replica,这里存在这个概念的原因在于,一个partition存在多个副本的时候,有可能某些副本因为网络原因或者宕机而导致其数据与leader副本相差很大,这些副本就不是出于同步状态的。对于出于同步状态的副本,其与leader副本相互复制的进度就是水位。如下图所示:
可以看到,水位一定小于等于前分区的最新位移。上图中就是offset <= 7
的数据在所有的ISR副本中都存在的,而后面的8~11
号消息则还处于同步中。上面图中的12表示当前分区的LEO,即last end offset,其永远指向的是当前分区最新消息的下一个位移,也即该消息并不存在。如下图所示是各个副本之间的工作流程图:
可以看出,图中producer写入消息到一个分区之后,主要分为5个步骤:
- producer选择某个分区后,将3条消息发送到该分区的leader副本上;
- leader副本接收到消息之后,将其LEO由2更新到5;
- follower发送请求给leader副本,并从其上拉取消息数据;
- follower副本更新自己分区的LEO值,将其从1更新到4,并且在下次请求时将更新后的元数据信息发送给leader;
- leader接收到follower发送的元数据信息之后,对比各个follower的最新写入的消息偏移量,从而更新当前分区的HW值。
4.3 log日志处理方式
kafka的日志设计方式是其实现高吞吐量的重要原因之一。在存储形式上,kafka则采用了顺序写入的方式进行消息日志的记录,而在消息的读取方式上,kafka采用了零拷贝技术进行消息的读取,这极大的提升了数据的读取效率。这里我们主要从这两方面对kafka日志进行讨论。
4.3.1 零拷贝机制
在一般的数据读取机制中,如果要将数据从磁盘读取到网卡中,其主要步骤可以分为四步:
- CPU通过DMA技术将数据读取到内核缓冲区中;
- 用户将数据从内核缓冲区复制到应用程序缓冲区中;
- 应用程序将数据写入到socket相关的内核缓冲区中;
- socket内核将数据发送到网卡中进行数据的发送;
如下图所示为这个过程的一个示意图。可以看到,这整个过程数据的复制主要发生了两次,也即从OS的内核缓冲区读取数据到应用程序缓冲区和从应用程序缓冲区写入数据到socket内核缓冲区。
所谓的零拷贝机制就是指在数据通过DMA技术加载到OS内核缓冲区之后,直接将其发送到网卡中进行发送,从而减少其中的两次数据拷贝动作。本质上,零拷贝技术的实现是在内核层面进行的,主要是通过Linux的sendfile命令进行的。如下是零拷贝过程的一个示意图:
可以看到,数据在经过第一步加载到OS内核缓冲区之后,直接以该数据页的地址作为socket缓冲区的地址,然后在第二步中将其直接发送到网卡缓存中。另外,零拷贝技术还能够大量减少复制的动作,因为在磁盘数据读取时,是有多个consumer进行消费的,那么势必需要将一份数据发送到多个网卡中,如果采用之前的那种方式,将会有大量的数据复制,而对于零拷贝机制而言,其只需要在同一份socket数据页中读取数据并进行发送。
4.3.2 日志记录
对于日志的记录方式,前面已经进行了简单的讲解。其主要就是采用的追加的方式将数据写入到磁盘中。需要进行强调的是,kafka为每个topic的每个分区都建立了一个日志目录,当往某一个分区中写入消息日志时,就会往该分区的目录下的日志文件中进行日志的写入。如下是kafka日志存储目录的示意图:
图中的每一个目录都是一个分区,而前面的名字表示topic,后面的数字则是分区号。在每个分区日志目录下都有三个文件:xxx.log
、xxx.index
和xxx.timeindex
,如下图所示:
其中xxx.log
中存储的是当前分区消息的日志,而xxx.index
文件则存储的是xxx.log
中各个消息的偏移量索引,其可以简单的理解为一个offset和该offset对应的实际消息数据的物理地址的键值对,通过该文件我们可以非常快速的通过offset定位到任意一个历史的消息,并从该位置开始消费。xxx.timeindex
文件则是消息的时间索引,在kafka消息日志中,其为每一条消息都设置了一个创建时间,而xxx.timeindex
文件中就保存了这个时间与消息的对应关系。但是需要注意的是xxx.timeindex
中并不是保存的time和消息物理地址的键值对,而是保存的time和offset的一一对应关系,也就是说,如果我们需要通过时间来定位消息,那么就需要首先通过xxx.timeindex
获取到对应的offset,然后通过offset在xxx.index
文件中定位消息的具体物理地址。
关于kafka日志的管理,其有两个主要的参数:log.retention.{hours|minutes|ms}
和log.retention.bytes
,这两个参数指定了消息保存的最长时间和保存的最大字节数,当两个条件中的任意一个条件满足时,该消息就会被删除。kafka默认的消息保留时间是7天,而消息的默认保存字节数是-1,也即没有指定。另外需要说明的一点是,在kafka日志中,无论是xxx.log
文件,还是xxx.timeindex
和xxx.index
文件,都是分段的,默认以1G为一个日志段文件。比如某个分区的日志比较大,其总共有3.6G,那么每写入1G的日志的时候,该文件就会被关闭,然后新创建一个文件进行日志的写入。这里的1G主要是针对xxx.log
文件而言的,在分段之后,每个日志段的文件的文件名都会以该文件的最早的消息的偏移量来命名,比如上面图中的文件的偏移量就是0,表示是最新的一个日志段文件。对应的,xxx.timeindex
和xxx.index
文件名与xxx.log
文件是一样的,只是后缀不同。每个索引文件都对应了一个日志文件的分段。
5. Consumer设计
Consumer最主要的设计是其以组为单位的消费模式。如下图是consumer进行分组消费的一个示意图:
图中的kafka集群有两个broker实例,而topic则有四个分区,这四个分区平均的分布在了两台机器上,消费者则有两组:Consumer Group A和Consumer Group B。读者朋友一定要注意图中的箭头的指向方式,从图中可以看出,对于每一个分组而言,其所有的消费者一定会将所有的分区数据都消费到,只不过区别在于,如果当前分组中的消费者比较多,那么分区在这些消费者上分布就更均匀,每个消费者的压力就更小。这也是kafka发布订阅的一种语义,即只要不同的消费者组订阅了同一个topic,那么kafka会保证每条消息都会被发布到这些消费者组中,而在消费者组内部,一条消息一定只会被其中的某一个消费者消费。
6. 小结
本文首先对kafka的架构模式进行了简单的讲解,然后列举了kafka的优点,接着对kafka的三个主要角色:producer、broker和consumer的功能作用进行了详细的讲解。