《Kafka权威指南》读书笔记——生产者、消费者
一、Kafka基本概念
Kafka是分布式日志系统、分布式流平台,按序持久化,消费者按需读取。Kafka 的数据单元被称为消息,可以理解为数据库的一行,为了提高效率,消息被分批次写入Kafka ,批次就是一组消息(通常我们所说的batch_size就是批次大小),需要在时间延迟和吞吐量之间做权衡,选择合适的batch_size。消息写入分区时用到键,一般取键的hash,对分区数取模,保证相同键的消息落在同一分区上。
Kafka的主题包含若干个分区,消息以追加的方式写入分区,只能在分区维度保证消息有序性。Kafka通过分区来实现数据冗余和伸缩性,即一个主题可以横跨多个服务器。
生产者一般并不关心消息写入哪个分区,会均衡的将消息分布在主题的各个分区上,也可以通过键和分区器将消息指定到某一分区。
消费者通过偏移量消费消息,偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时, Kafka 会把它添加到消息里,在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper(旧版)或Kafka(新版)上,旧版的并不具备再均衡和提交offset的可控性。如果悄费者关闭或重启,它的读取状态不会丢失。消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。消费组保证每个分区只能被一个消费者使用(如果不这么做,需要额外的同步机制。如果需要发布订阅模式,应该为每个消费者建立各自的消费组),一条消息只会被一个消费组消费一次(不考虑宕机)。消费者可以选择提交offset的时间间隔,1 min比较合理,在 1 min内如果消息丢失可以重复读取。
每个集群都有一个broker 同时充当了集群控制器的角色,控制器负责管理工作,包括将分区分配给broker 和监控broker。一个分区可以分配给多个broker,有一个broker作为分区的首领,这种分区复制提供了消息冗余,同时主分区应均为分布在各个broker上。Kafka可以保留消息,指定保留的时间长度,或者消息大小的阈值。
旧版的kafka将broker和主题的元数据、偏移量都保存在ZK上,新版的将偏移量保存在Kafka内部的_consume_offset分区。broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
下面介绍一些主题的配置,根据经验,把分区的大小限制在25GB 以内可以得到比较理想的效果,数据默认保存一周。如果同时指定了log.retention.bytes 和log.retention.ms,只要任意一个条件得到满足,消息就会被删除。log.segment.bytes和log.segment.ms可以指定日志片段的大小和时间,达到阈值时关闭当前片段,打开新的片段进行写入。只有当日志片段关闭时,才会开始计算过期时间,所以最初到达的消息的存活时间可能会比设置的阈值还要大(过期应该按照批次的维度,否则过于麻烦)。message.max.bytes限制单个消息的大小。
二、生产者
(1)ProducerRecord包含目标主题和要发送的内容,还可以指定键或分区,再进行序列化。无键时轮询选择分区,有键hash。为了节省网络开销,消息被添加到批次里,批次按照(主题、分区)的维度来创建,被发送到相同的主题和分区上。服务器返回RecordMetaData,包含主题、分区、偏移量信息,或者返回写入失败,生产者重试。生产者按分区组装消息,按节点发送消息,最小化网络开销。
(2)生产者有三个必填属性,broker和KV的序列化器。有三种消息发送方式,种方式。发送并忘记(fire- and-forget)
发送完井不关心消息是否正常到达,大多数情况下,消息会正常到达,因为Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息;同步发送,使用send()方法发送消息,返回Future对象,get()判断消息是否发送成功;异步发送,调用send(),指定一个回调函数, 服务器在返回响应时调用该函数。
如下是最简单的fire-and-forget,指定主题和KV。
同步发送消息。
异步发送。
(3)生产者配置
acks,acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。一般设置为0,1(只需主节点ack),all。
buffer.memory指定内存缓冲区大小,compression.type指定压缩算法。
retries指定最大重试次数,batch.size指定了批次大小,通常配合linger.ms(发送等待时间)使用。
max.in.flight.requests.per.connection指定了滑动窗口大小,此外还有获取元数据、发送消息的超时时间,单个消息的大小阈值,TCP缓冲区大小等参数。
重试时,消息会乱序,如果要求严格有序,可以把滑动窗口大小设为1。
(4)序列化
介绍一下Avro 序列化,利用schema指定序列化策略,schema保存在注册表里。
(5)键可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区,如果键值为null , 井且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)将消息均衡地分布到各个分区上。指定键的话则hash,只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。
三、消费者
(1)Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。一个分区只会被消费组里的一个消费者消费。如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。向消费组增加消费者是横向伸缩的方式,不同的消费组或不影响。
可以通过consumer.assign(partitions)手动分配分区,如果分区变化了cosumer不知情。
(2)再均衡
在增减消费组、分区时,会进行分区的再均衡。再均衡为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者)。但是再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态(offset)会丢失,还需要刷新缓存。消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系,停止心跳时间达到阈值则认为消费者死亡,进行再均衡。新版本的Kafka引入了心跳线程,将轮询和发送心跳独立开来,防止轮询时间过长,被判断为死亡。
当消费者要加入群组时,它会向群组协调器发送一个Join Group请求。第一个加入群组的消费者将成为“群主”。创建消费者时,需要指定broker,KV序列化方式和消费组(非必须,可以不指定,不过不常见)。subscribe()订阅主题,也可以传入正则表达式。
(3)轮询
如下是消费者轮询,poll()指定超时时间,返回一个记录列表。每条记录都包含了记录的主题、分区、偏移量,和键值对。轮询不只是获取数据那么简单。在第一次调用新消费者的poll(),它会负责查找GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询尽快完成。在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。
(4)消费者配置
fetch.min.bytes指定了消费者获取记录的最小字节数,可用数据达到该值时才会发送给消费者,fetch.max.wait.ms指定等待时间,和fetch.min.bytes配合使用,看哪个优先满足。
max. parition.fetch.bytes指定了服务器从每个分区里返回给消费者的最大字节数,默认1 MB。
session.timeout.ms指定心跳超时时间,默认3s 。heartbeat.interval.ms指定poll()向协调器发送心跳的频率,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。
auto.offset.reset指定了消费者在读取一个没有偏移量的分区或者偏移量时如何处理。它的默认值是latest ,从最新的记录开始读取数据。另一个值是earliest ,从起始位置读取分区的记录。
enable.auto.commit指定了消费者是否自动提交偏移量,默认true。为了尽量避免出现重复数据和数据丢失,可以把它设为false ,由自己控制何时提交偏移量。如果把它设为true,还可以通过配置auto.commit.interval.ms来控制提交的频率。
partition.assignment.strategy指定分区策略,Range分配连续分区,RoundRobin轮询分配,默认Range。
max.poll.records控制单次调用poll() 能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
此外,还可以标志消息,指定TCP缓冲区大小。
(5)提交偏移量
消费者追踪消息在分区里的位置,并向主题_consumer_offset提交偏移量(或者ZK),再均衡之后,消费者可能分配到新的分区,拉取偏移量继续处理。偏移量的提交要在效率和消息丢失之间做权衡。如果enable.auto.commit = true,消费者会自动把从poll()接收到的最大偏移量提交上去,提交时间间隔由auto.commit.interval.ms控制,默认值是5s。自动提交虽然方便, 不过并没有为开发者留有余地来避免重复处理消息。
enable.auto.commit = false时,可以commitSync()或者commitAsync()手动提交。可以组合使用同步和异步提交,在try块里异步提交,在finally块里同步提交,应对异常情况。消费者API允许在commitSync()或者commitAsync()里传进去希望提交的分区和偏移量的map,指定需要提交的偏移量。
可以使用seekToBeginning和seekToEnd方法从指定的偏移量读取消息。如果应用程序从Kafka读取数据、处理、入库,不想重复入库,可以将偏移量和处理过的记录原子入库,下次查找偏移量,seek()读取数据。
(6)再均衡监听器
在调用subscribe() 时传入ConsumeRebalancelistener,监听再均衡。
(7)如果想要退出轮询,需要通过另一个线程调用consumer.wakeup,是消费者唯一一个可以从其他线程里安全调用的方法,可以退出poll(),并抛出wakeupException。