21Kafka大合集

kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。

一、kafka架构简介

21Kafka大合集
kafka架构的组成:
Kafka架构是由:

  • producer(消息生产者)
  • consumer(消息消费者)
  • borker(kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker)
  • topic(消息队列/分类相当于队列,里面有生产者和消费者模型)
  • zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成。

kafka里面的消息是由topic来组织的,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition内部消息强有序,相当于有序的队列,其中每个消息都有个序号offset,比如0到12,从前面读往后面写。一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,消息不经过内存缓冲,直接写入文件,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念
Kakfa提供了两种策略来删除数据。一是基于时间,二是基于partition文件大小

producer自己决定往哪个partition里面去写,这里有一些的策略,譬如轮询、hashconsumer自己维护消费到哪个offset,每个consumer都有对应的group,group内是queue消费模型(各个consumer消费不同的partition,因此一个消息在group内只消费一次),group间是publish-subscribe消费模型,各个group各自独立消费,互不影响,因此一个消息在被每个group消费一次。

二、kafka的特点

  • 系统的特点:生产者消费者模型,FIFO。
    Partition内部是FIFO的,partition之间呢不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO。
  • 高性能:单节点支持上千个客户端,百MB/s吞吐,接近网卡的极限,零拷贝。
  • 持久性:消息直接持久化在普通磁盘上且性能好。
  • 分布式:数据副本冗余、流量负载均衡、可扩展。
  • 很灵活:消息长时间持久化+Client维护消费状态。

三、kafka为什么吞吐量大、速度快

  • 顺序读写Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。这种方法是没有办法删除数据的 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来。
  • Page Cache(操作系统内存)Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存,通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
  • 零拷贝:linux操作系统 “零拷贝” 机制使用了sendfile方法允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区(网卡缓冲区), 这样避免重新复制数据,相比较于普通的读取数据,省略了把数据从服务器加载到application应用,再用应用到目标端的过程,直接从服务器到目标端。**通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。**这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。
  • 分区分段+索引:Kafka的message是按topic分类存储的topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
  • 批量读写Kafka数据读写也是批量的而不是单条的。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。
  • 批量压缩Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议

四、kafka的broker的leader

简述:通过Zookeeper的临时节点来控制broker leader Controller的选举。
Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。

kafka broker controller的作用:

  • 主题管理 : Kafka Controller 可以帮助我们完成对 Kafka 主题创建、删除和增加分区的操作,简而言之就是对分区拥有最高行使权。
  • 分区重分配: 分区重分配主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。。
  • 在 broker 宕机后,控制器能够选举新的分区 Leader(针对partition)
  • Prefered 领导者选举 : Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案
  • 集群成员管理: 主要管理 新增 broker、broker 关闭、broker 宕机。
  • 数据服务: 控制器的最后一大类工作,就是向其他 broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据

Kafka判断一个节点是否活着有两个条件:

  • 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
  • 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。
    符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。由参数replica.lag.max.messages和参数replica.lag.time.max.ms决定的。

五、kafka的consumer和consumer group

  • 同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不能够一个consumer group的多个consumer同时消费一个partition。
  • 一个consumer group下,无论有多少个consumer,这个consumer group一定回去把这个topic下所有的partition都消费了。如果consumer group里面的consumer数量等于这个topic下的partition数量的时候,效率是最高的。

Consumer Rebalance的触发条件:(1)Consumer增加或删除会触发 Consumer Group的Rebalance(2)Broker的增加或者减少都会触发 Consumer Rebalance。

六、kafka的producer

Kafka producer 发送message不用维护message的offsite信息,因为这个时候,offsite就相当于一个自增id,producer就尽管发送message就好了。
kafka producer的ack的三种方式:

  • ack=0 ,producer不等待broker同步完成的确认,继续发送下一条(批)信息
  • ack=1 ,producer要等待leader成功收到数据并得到确认,才发送下一条message。Partition的Leader死亡,follwer尚未复制,数据就会丢失。
  • ack=-1 ,producer得到所有配置的follwer确认,才发送下一条数据。
    21Kafka大合集

七、kafka的topic和partition

Topic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance
一般来说,(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。(2)同一个Partition的Replica尽量分散到不同的机器,高可用
当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance

replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本

Partition leader与follower:partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其他的broker的partition follower上选择follower变为parition leader。

七、kafka的partition和segment

producer发message到某个topic,message会被均匀的分布到多个partition上,kafka broker收到message往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件。
  • segment文件命名规则partion全局的第一个segment从0开始后续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
    21Kafka大合集
    索引文件存储大量元数据,数据文件存储大量消息,**索引文件中元数据指向对应数据文件中message的物理偏移地址。**其中以索引文件中 元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移 地址为497
    segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

kafka的副本分配规则:

  • 当有4个broker,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:
    21Kafka大合集
  • 当集群中新增2节点,Partition增加到6个时分布情况如下:
    21Kafka大合集

八、kafka在什么情况下会丢失数据

  • 1、producer配置acks=0
  • 2、producer配置acks=1时,未同步副本leader宕机。
  • 3、NO_ENOUGH_REPLICATE
    produer配置acks=all的时候,也是有可能会丢数据的,当某个partition的ISR列表中的副本数,不满足min.inSync.replicate的时候,生产者发送消息就得不到ack确认,这时候生产者会进入重试,重试次数为配置的message.send.max.retries,如果在重试次数内ISR列表副本数仍然达不到最小同步副本数,那么,生产者会抛出NO_ENOUGH_REPLICATE的异常,如果没能正确处理这个异常,很可能这条数据就丢失了。
    那么什么情况下ISR列表的副本数不足最小副本数呢
    1、follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC
    2、follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如IO开销过大
  • 4、NOT_LEADER_FOR_PARTITION
    1、其中一台Broker会出现与zk的sessionTime out 连接超时,接着导致Controller重新选举,导致producer元数据不正确,此时写入该Broker,会抛出NOT_LEADER_FOR_PARTITION的警告,此时可能会发生数据丢失。
    2、auto.leader.rebalance.enable=true 也会进行重新选举leader的操作,导致写入原leader,抛出NOT_LEADER_FOR_PARTITION。
  • 5、磁盘故障
    kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等PageCache上的数据就丢失了
    可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔
  • 6、Producer生产数据过长
    单批数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常
    生产者生产的数据,大于消费者配置的能拉取的最大消息大小,这条大数据将会消费失败。
  • 7、无重发重试
    网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。没有做限速处理,超出了网络带宽限速。kafka一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。
  • 8、消费者崩溃
    如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。
  • 9、消费者异常没正确处理
    (1)Consumer消费者自动提交offset,在消费者消费数据异常时,没有将异常数据处理妥当,导致业务异常数据丢失。
    (2)Consumer手动批量提交offset,在批量位点中某个位点数据异常时,没有正确处理异常,而是将批量位点的最后一个offset提交,导致异常数据丢失。