kafka概念及基本原理
kafka:
作用:
1.消息系统: 系统解耦,冗余存储, 流量削峰, 缓冲,异步通信,扩展性,可恢复性
2.存储系统 将消息持久化到磁盘
3.流式处理平台
高吞吐,低延时
kafka本质上每次写入操作其实都是把数据写入到操作系统的页缓存中, 然后操作系统将页缓存中的数据刷回到磁盘中
producer 生产者, broker 相当于kafka服务进程, consumer 消费者, zk集群
过程:
Producer -生产者发送消息到broker–> broker(将消息存储到磁盘) —> consumer(订阅,消费消息)
broker服务代理节点
主题topic/分区partion 主题可以有多个分区, 一个分区只属于单个主题
同一toptic下的分区,消息内容是不同的,分区在存储层上可以看做是一个追加的log文件。 会有一个特定的偏移量offset, offset在分区中是唯一的,不能跨分区使用,kafka能够保证在分区层面,消息的顺序性,不能保证toptic的顺序性。 消息被发送到broker之前,会根据分区规则选择存储到对应哪个分区
分区: 分区引入了多副本,提升容灾能力。 同一分区的不同副本保存的是同样的消息,副本是 一主多从的, leader负责读写操作,follower副本只负责与leader同步, 当leader出现故障的时候,follower会发起选举新的leader。可以实现故障自动转移,当某个broker失效的时候仍然能够提供服务
因为副本的follower同步leader,因此跟leader之间会存在一定程度的消息滞后,不同步。 保持一定程度同步的follower 组成了一个组ISR, 只有在ISR中才能参与leader的选举, 否则不能参与。 但是由于一定的消息同步延迟,因此在leader挂掉,消息没有被follower及时同步完成时,发起leader选举后的新leader可能存在一定的消息丢失。 OSR是滞后过多的副本, AR所有副本
在创建分区或者分区上线需执行leader选举。按照AR集合中的顺序查找,第一个存活的副本,并且这个副本在ISR中,一个愤怒的AR集合在分配的时候已经制定,不发生重分配,则副本顺序保持不变,分区的ISR顺序可能发生变化
如果由于网络波动等原因,一个节点时而同步,时而滞后,可能发生频繁的isr的变动
kafka使用ISR, 是因为, 如果同步复制,则对性能消耗过大,如果异步复制,则可能丢失消息,使用ISR,在异步情况下,保证消息的滞后性不超过一定数量,这样做一个平衡
生产者发送消息过程:
kafkaProducer 生产者 --> 拦截器,过滤 --> 序列化 Serializer --> 分区器 Partitioner --》 消息累加器(缓冲) RecordAccumulator 会有很多ProducerBatch 分批次发送—> 发送到broker
如果生产者需要向很多分区发送消息, 则可以将 buffer.memory参数调大,增加整体吞吐量
参数:
acks 确认,1表示响应成功(默认值), 0 便是不等待响应 , -1 / all所有成功进行响应
retries 重试次数
max.request.size 客户端发送的消息的最大值
send.buffer.bytes 发送消息缓冲区
receive.buffer.bytes 接收消息缓冲区
消费者:
每个消费者都有一个对应的消费者组, 当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个一个消费者
每一个分区只能被一个消费组中的一个消费者所消费。 例如: 一个toptic一共4个分区, 有两个消费者组A,B , A 有4个消费者, 则每个消费者分到一个分区, B 有两个消费者,则每个消费者分到两个分区, 如果有一个消费组有8个消费者,则有4个空闲,不会分到任何分区
因此可以看做是均匀得分到对应的消费者上
因为消息是持久化到磁盘的,因此可以指定位移进行消费
seek方法,从分区的末尾开始消费
再均衡:
再均衡是指分区的所属权从一个消费者转移到另外一个消费者的行为,使得消费者组具备高可用,可伸缩性, 可以添加删除消费者在消费者组中,在再均衡期间,消费者组中的消费者是无法读取消息的, 也就是不可用,一般情况下应该避免再均衡
出发再均衡操作的情况:
- 消费者宕机下线(下线,长时间GC, 网络延迟等)
- 消费者退出消费者组
- 主题或者主题的分区数量变化
- 新的消费者加入消费者组等
kafkaProducer 生产者是线程安全的, 然而kafkaConsumer是非线程安全的,acquire可以用来检测当前是否只有一个线程在操作
主题, 分区
主题作为消息归类,分区则是将消息分散,分区的划分不仅为kafka提供可伸缩性,水平扩展能力,还通过多个副本的机制来为kafka提供数据冗余的容灾能力
分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一到多个日志分段。 每个日志分段可以细分为: 索引文件,日志存储文件,快照文件等
每个副本(副本与日志一一对应)对应一个命名形式如: -的文件夹
同一分区的不同副本需要分布到不同的broker上,最好是不同的主机上,副本最终对应日志文件log
注意:
对于主题,目前可以新增分区,但是不能删除主题中的分区。
因为删除分区,需要考虑一系列问题: 例如消息如何处理, 可用性如何保证
主题toptic可以删除,删除会释放磁盘资源,文件句柄等等
删除操作是不可逆的
副本选举:
多副本提高可靠性, 但是只有leader提供读写服务,follower只同步消息, leader不可用,则该分区不可用,此时需要进行leader选举
优先副本: 是指在所有副本集合列表中的第一个副本, 优先副本是通过一定的方式促使优先副本选举为leader副本。
分区平衡,不仅是kafka集群的负载均衡也要考虑, 集群中分区是否均衡, 将leader尽可能均衡地分配到broker上
kafka提供分区自动平衡功能,在borker端的参数为: auto.leader.rebalance.enable默认为true开启,但是在生产环境不建议开启,因为在分区平衡的时候可能产生抖动,在高并发的情况下,容易出现系统问题, 因此不建议开启。
当集群中新增broker节点时, 只有新建的主题分区才可能被分配到新的节点上,以前的分区并不会自动分配到新节点, 则就可能带来资源浪费,负载不均衡
可以进行分区的重分配。 可以在集群扩容, broker节点失效的场景下对分区进行迁移
分区重平衡的原理: 想通过控制器为每个分区添加新的副本,新的副本将从分区的leader副本复制所有数据, 复制需要一定的时间,复制完成后,控制器将旧的副本从副本清单中移除。
分区重平衡对集群性能有很大影响, 需要占用网络,磁盘等资源
若果需要下线某个broker,在执行分区重分配动作前最好先关闭或者重启broker
重平衡的本质是复制,也提供复制限流功能,减小重分配的粒度,以小批次方式解决
kafka分区上线10000, 分区越多不一定性能越好,没有必然的联系,具体看应用情况使用
消息压缩: kafka将多条消息一起进行压缩,这样可以保证比较好的压缩效果, 一般情况下,生产者发送的压缩数据在broker中也是保持压缩状态进行存储的,在消费者消费的时候, 在消费者端进行解压
使用参数compression.type 配置, 默认为Producer 即是使用生产者使用的压缩方式, 还可以配置为gzip, lz4等压缩算法
日志索引: 每个日志分段对应了两个索引文件, 用来提高查找消息的效率
偏移量索引文件用来建立消息偏移量offset与物理地址之间的映射关系
kafka是以 稀疏索引 的方式构造消息的索引, 因此不保证每个消息都在索引文件中
日志清理:
broker端可以使用log.cleanup.policy来设置清理策略, 默认值为 delete 如果需要压缩,则使用compat
基于时间的删除,可以设置为基于一定时间删除,例如使用参数log.retention。hours 设置168 为7天删除
还有基于大小,log.segment.bytes
磁盘IO流程:
1.C标准库: 应用程序buffer -》c标准库 IObuffer -》 文件系统页缓存 -》 文件系统写入磁盘
- 调用文件IO: 应用程序buffer -》 文件系统页缓存 —》 文件系统写入磁盘
- 打开文件使用O_DIRECT 绕过页缓存直接读写磁盘
- 使用类似dd工具,并使用direct参数, 绕过系统cache与文件系统直接写盘
页缓存:
页缓存是操作系统实现的一种主要的磁盘缓存,以此来减少对磁盘IO的操作; 将磁盘数据缓存到内存中,当一个进程准备读取磁盘文件内容时候,操作系统会首先查看数据是否在页缓存中,如果存在即是命中,则返回数据,否则回盘进行IO操作
一个进程,一般会在进程内部进行缓存数据,除此之外,数据可能还会缓存在操作系统的页缓存中,除非使用Direct I/O方式,否则很难绕过页缓存
这样缓存了两份数据,因此可能内存使用量比实际使用的大,造成内存浪费,同时可能带来程序GC的性能问题,使用操作文件系统,页缓存,并使用字节码,会有效提高内存使用率, 同时程序重启,页缓存也可能保留,但是程序内的缓存则需要重建
零拷贝:
零拷贝就是将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序, 零拷贝,减少了内核和用户模式之间的上下文切换,linux系统,零拷贝技术依赖底层的sendfile()方法
如果分区规则设置得合理, 那么所有的消息可以均匀地分布到不同的分区中, 这样可以水平扩展
在一个broker中, 一个分区对应一个日志文件,为了防止log过大,kafka引入了日志分段, 将log切分为多个logsegment, 便于维护清理,为了便于消息检索, 每个logsegment的文件.log文件都有对应两个索引文件:偏移量索引文件.index, 时间索引文件.timeindex
LogSegment 有一个基准偏移量baseoffset,用来便是第一条消息的offset, 偏移量是一个64位的长整数, 都是根据这个偏移量进行命名的
索引采用稀疏索引方式, 当写入量达到一定数量后进行偏移计入索引, 可以设置这个数量大小, 在broker端参数: log.index.interval.bytes, 稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引查询速度。
采用二分法查找消息位置
默认情况下日子分段文件logsegment大小1G, 也就是一个一个1G大小的日志
kafka存在大量延时操作: 延时生产, 拉取,删除。 kafka基于时间轮自定义实现了延时功能的定时器, 基于时间轮可以将插入删除操作的时间复杂度降为O(1), kafka的时间轮为一个存储定时任务的环形队列,底层使用数组实现
由于一个时间轮表示的时间有限,如果超出范围则无法表示,kafka引入层级时间轮,当超过当前时间轮时,则添加到上层时间轮,进位表示
控制器controller
在kafka集群会有多个broker, 其中一个会被选举为controller, controller负责管理集群中所有分区和副本的状态,leader故障时,负责为该分区选举leader, 分区ISR集合变化时,负责通知所有的broker更新元数据
职责总结如下:
-
更新集群元数据
-
创建删除toptic, toptic分区扩展
-
分区重分配
-
leader副本选举
-
broker加入集群, broker异常处理
-
controller选举等
kafka提供消费者客户端参数partition.assignment.strategy 设置消费者与订阅主题之间的分区分配策略。默认采用RangeAssignor分配策略,另外也还提供RoundBobinAssignor和StickyAssignor, 也可以配置多个分配策略, 使用逗号分隔
RangeAssignor是类似于除余的方式进行一一分配。
StickyAssignor: 分配要尽可能均匀, 分配尽可能与上次分配的保持相同。
消息传输:
-
至多一次 消息可能丢失,但是不会重复
-
最少一次 消息不丢失,但是可能重复
-
刚好一次 传输一次,不丢失
kafka使用最少一次
幂等性:
引入***来实现幂等只是针对每一对<pid, 分区>, 也就是kafka幂等性只能保证单个生产者会话中单分区的幂等。
过期时间TTL
可以通过消息timestamp, consumerInterceptor接口的onConsume()实现ttl
但是每个消息的过期时间都是一样的, 第二种可以使用消息中的headers字段, 将ttl值放入到header中,当消费者消费到这个消息的时候,通过拦截器来判断headers设置的超时时间
延时队列:
kafka没有延时队列的概念,或者说kafka没有队列的概念, 存放延时消息的主题。
重试:
当消费端消费消息失败,为了防止消息丢失,而重新建消息回滚到broker中,
幂等性:
消息重复,处理结果一样,或者不重复处理
事务:
为表示事务,kafka要求应用程序必须提供一个唯一的id表征事务, 事务id,TransactionaID