Kafka高可用、高吞吐背后的秘密(含问题)
简介
首先熟悉kafka,先读官方文档,链接送上kafka中文官方
kafka是一个分布式流处理平台
流处理平台有以下三种特性:
- 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
- 可以储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就进行处理。
Kafka适合什么样的场景?
它可以用于两大类别的应用:
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
在 kafka 中, topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个topic。
物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息;
partition分区是topic的进一步拆分,每个topic可以拆分为多个partition分区,类似于数据库中表的水平拆分,每条消息都会分到某一个分区当中,分区内部会给消息分配一个offset来表示消息的顺序所在。消息结构图大致如下
多个生产者可以向topic发送消息,消息可以按照一定规则均匀分布在各个partition里面,由于各个partition物理上也是隔离存储的,这点就类似于数据库对于表做水平拆分来提高性能
partition(分区)有以下几个用途。
第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。第二,可以作为并行的单元集
broker: 中间的kafka cluster,存储消息,是由多个server组成的集群。
topic: kafka给消息提供的分类方式,broker用来存储不同topic的消息数据。
Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题。
producer: 往broker中某个topic里面生产数据。
consumer:从broker中某个topic获取数据。
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。
c1、c2在一个group A中消费不同的P,同样group B也是这样,保证消费中某个节点丢失可以正常消费
-
producer选择一个topic,生产消息,消息会通过分配策略append到某个partition末尾。
-
consumer选择一个topic,通过offset指定从哪个位置开始消费消息。消费完成之后保留offset,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费
高可用机制
kafka副本
概念
分区中的所有副本统称为AR(Assigned Repllicas),所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集
1、leader副本:响应客户端的读写请求 2、follow副本:备份leader的数据,不进行读写操作 3、ISR副本:leader副本和所有能够与leader副本保持基本同步的follow副本,如 果follow副本和leader副本数据同步速度过慢,该follow将会被T出ISR副本
ISR集合中的副本必须满足的条件
-
副本所在的节点与zk相连
-
副本的最后一条消息和leader副本的最后一条消息的差值不能超过阈值replica.lag.time.max.ms:如果该follower在此时间间隔之内没有追上leader,则该follower将会被T出ISR
在kafka中,正常情况下所有node处于同步中状态,当某个node处于非同步中状态,也就意味着整个系统出问题,需要做容错处理
同步代表了:
-
该node与zookeeper能连通。
-
该node如果是follower,那么consumer position与leader不能差距太大(差额可配置)
某个分区内同步中的node组成一个集合,即该分区的ISR。
2.1kafka通过两个手段容错
-
数据备份:以partition为单位备份,副本数可设置。当副本数为N时,代表1个leader,N-1个followers,followers可以视为leader的consumer,拉取leader的消息,append到自己的系统中
-
failover:
-
当leader处于非同步中时,系统从followers中选举新leader
-
当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入 ISR
kafka有个保障:当producer生产消息时,只有当消息被所有ISR确认时,才表示该消息提交成功。只有提交成功的消息,才能被consumer消费
假设N副本全挂了,node恢复后会面临同步数据的过程,这期间ISR中没有node,会导致该分区服务不可用。kafka采用一种降级措施来处理:选举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举。
由于leader是主要提供服务的,kafka broker将多个partition的leader均分在不同的server上以均摊风险。每个parition都有leader,如果在每个partition内运行选主进程,那么会导致产生非常多选主进程。kakfa采用一种轻量级的方式:从broker集群中选出一个作为controller,这个controller监控挂掉的broker,为上面的分区批量选主
----------------好,这时候问题来了---------------
1、leader会维护一个与其基本保持同步的Replica列表,也就是ISR(In-Sync Replica)
2、一个follower落后leader太多会被踢出
3、ack机制
那么在如上三种情况下,出现下列问题怎么办?
1、一个producer 一个请求发送的消息过多,导致follower瞬间落后leader太多怎么办?
2、如果follower不停移出ISR会影响性能么?
3、如果broker挂掉怎么办?或者因为GC问题导致落后leader太多怎么处理?
解决
一个副本被踢出ISR集合有这几种可能:
1、IO性能跟不上或者CPU负载过高,导致broker在磁盘追加消息的速率低于接收leader消息的速率
2、broker在很长时间内没有像leader发送fetch请求,可能发生full GC或者挂掉
那么kafka在0.8版本之后,设置两个参数
replica.lag.max.messages 用来识别性能一直很慢的节点
replica.lag.time.max.ms 用来识别卡住的节点
当follower卡住或者性能慢的时候,通过follower落后leader多少来判断是否将其剔除ISR集合,就要对流量进行监控
kafka这样解决的:
在follower落后leader超过eplica.lag.max.messages条消息的时候,不会立马踢出ISR集合,而是持续落后过replica.lag.time.max.ms时间,才会被踢出,这样可以降低异常情况的问题
kafka高吞吐机制
1、零拷贝
通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据
具体参考零拷贝
2、分区
kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作
的能力
3、批量发送
kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka
-
等消息条数到固定条数
-
一段时间发送一次
4、数据压缩
Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩 压缩的好处就是减少传输的数据量,减轻对网络传输的压力
-
批量发送
和数据压缩
一起使用,单条做数据压缩的话,效果不明显
kafka防止消息丢失和重复
kafka支持3种消息投递语义:
-
At most once——最多一次,消息可能会丢失,但不会重复
-
At least once——最少一次,消息不会丢失,可能会重复
-
Exactly once——只且一次,消息不丢失不重复,只且消费一次。
数据重复的原因
1、acks = -1 的情况下,数据发送到 leader 后 ,部分 ISR 的副本同步,leader 此时挂掉。
比如 follower1 和 follower2 都有可能变成新的 leader, producer 端会得到返回异常,producer 端会重新发送数据,数据可能会重复
2、在消费端,offset采用自动提交的方式,假设1s提交一次offset的更新,当前offset=10,当消费者消费了0.5s的数据,offset移动15,由于提交时间间隔1s,因此不会提交,此时消费者挂掉,重启后,消费者去Zookeeper上重新读取offset=10,就会产生重复消费
kafka的ack机制
Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。
ack有3个可选值,分别是1,0,-1,ack的默认值就是1
ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。
ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功
ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
kafka集群中分区如何分配到broker上?
一个topic可以建立多个分区,当然在单机环境下分区都在一个broker上面,但是在集群环境下,分区是按照什么规则分布到集群中各台broker上面?使用的是最简单的取模算法。 例如:topic test有 p0,p1,p2,p3四个分区,有三台broker b1,b2,b3。那么分区分配到broker上面的策略就是 b1:p0,p3 b2:p1 b3:p2 简单来说就是topic排序后所在序号对broker 的size取模,结果就是所在broker。
kafka消息如何确定发送到哪个分区?
kafka的消息内容包含了key-value键值对,key的作用之一就是确定消息的分区所在。默认情况下, kafka 采用的是 hash 取模的分区算法。如果Key 为 null,则会随机分配一个分区。
这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区。简单来说就是:hash(key) % partitions.size。
持久化
基于以下几点事实,kafka重度依赖磁盘而非内存来存储消息
kafka则是把数据以追加日志的形式存在了磁盘上。类似redis的AOF
这样的优势就在于读操作不会阻塞写操作和其他操作(因为读和写都是追加的形式,都是顺序的,不会乱,所以不会发生阻塞),数据大小不对性能产生影响
-
硬盘便宜,内存贵
-
顺序读+预读取操作,能提高缓存命中率
-
操作系统利用富余的内存作为pagecache,配合预读取(read-ahead)+写回(write-back)技术,从cache读数据,写到cache就返回(操作系统后台flush),提高用户进程响应速度
-
java对象实际大小比理想大小要大,使得将消息存到内存成本很高
-
当堆内存占用不断增加时,gc抖动较大
-
基于文件顺序读写的设计思路,代码编写简单
-
在持久化数据结构的选择上,kafka采用了queue而不是Btree
-
kafka只有简单的根据offset读和append操作,所以基于queue操作的时间复杂度为O(1),而基于Btree操作的时间复杂度为O(logN)
-
在大量文件读写的时候,基于queue的read和append只需要一次磁盘寻址,而Btree则会涉及多次。磁盘寻址过程极大降低了读写性能
一致性
上面的方案保证了数据高可用,有时高可用是体现在对一致性的牺牲上。如果希望达到强一致性,可以采取如下措施:
-
禁用脏leader选举,ISR没有node时,宁可不提供服务也不要未完全同步的node。
-
设置最小ISR数量min_isr,保证消息至少要被min_isr个node确认才能提交