Kafka基础知识点
Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
1.基本概念
1.1 Kafka的特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
1.2 Kafka的使用场景:
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr, ES等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
1.3 Kafka的一些基本概念
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
- Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
- Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
- Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
- Segment:partition物理上由多个segment组成,每个Segment存着message信息
- Producer : 生产message发送到topic
- Consumer : 订阅topic消费message, consumer作为一个线程来消费
- Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
简要执行流程:
- Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
- Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
- Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息
1.4 Kakfa的一些重要设计思想
- Consumergroup:各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。
- 消息状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。进入zkCli.sh查看offset
sh: get /kafka/consumers/consumer-group/offsets/my-topic/0
- 消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
- 消息有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
- 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
- Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
- 同步异步:Producer采用push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
- 分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
- 消息投递可靠性
kafka的消费模式总共有3种:最多一次,最少一次,正好一次。为什么会有这3种模式,是因为客户端处理消息,提交反馈(commit)这两个动作不是原子性。
① 最多一次:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。
②最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。
③正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。
1.5 Kakfa配置
① Broker配置
②Consumer主要配置
③Producer主要配置
下表列举了部分重要的配置参数,更多配置请参考官网文档
broker配置参数
参数 | 默认值 | 描述 |
---|---|---|
broker.id | -1 | 每一个boker都有一个唯一的id作为它们的名字。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 |
port | 9092 | broker server服务端口 |
host.name | “” | broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK |
log.dirs | /tmp/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
message.max.bytes | 1000012 | 表示消息体的最大大小,单位是字节 |
num.network.threads | 3 | broker处理消息的最大线程数,一般情况下数量为cpu核数 |
num.io.threads | 8 | 处理IO的线程数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 168 (24*7) | 控制一个log保留多长个小时 |
log.retention.bytes | -1 | 控制log文件最大尺寸 |
log.cleaner.enable | false | 是否log cleaning |
log.cleanup.policy | delete | delete还是compat. |
log.segment.bytes | 1073741824 | 单一的log segment文件大小 |
log.roll.hours | 168 | 开始一个新的log文件片段的最大时间 |
background.threads | 10 | 后台线程序 |
num.partitions | 1 | 默认分区数 |
socket.send.buffer.bytes | 102400 | socket SO_SNDBUFF参数 |
socket.receive.buffer.bytes | 102400 | socket SO_RCVBUFF参数 |
zookeeper.connect | null | 指定zookeeper连接字符串, 格式如hostname:port/chroot。chroot是一个namespace |
zookeeper.connection.timeout.ms | 6000 | 指定客户端连接zookeeper的最大超时时间 |
zookeeper.session.timeout.ms | 6000 | 连接zk的session超时时间 |
zookeeper.sync.time.ms | 2000 | zk follower落后于zk leader的最长时间 |
high-level consumer的配置参数
参数 | 默认值 | 描述 |
---|---|---|
groupid groupid 一个字符串用来指示一组consumer所在的组 | ||
socket.timeout.ms 30000 socket超时时间 | ||
socket.buffersize 64*1024 socket receive buffer | ||
fetch.size 300 * 1024 控制在一个请求中获取的消息的字节数。 这个参数在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代 | ||
backoff.increment.ms 1000 这个参数避免在没有新数据的情况下重复频繁的拉数据。 如果拉到空数据,则多推后这个时间 | ||
queued.max.message.chunks 2 high level consumer内部缓存拉回来的消息到一个队列中。 这个值控制这个队列的大小 | ||
auto.commit.enable true 如果true,consumer定期地往zookeeper写入每个分区的offset | ||
auto.commit.interval.ms 10000 往zookeeper上写offset的频率 | ||
auto.offset.reset largest 如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常. | ||
consumer.timeout.ms -1 默认-1,consumer在没有新消息时无限期的block。如果设置一个正值, 一个超时异常会抛出 | ||
rebalance.retries.max 4 rebalance时的最大尝试次数 |
producer的配置参数
参数 | 默认值 | 描述 |
---|---|---|
producer.type sync | 指定消息发送是同步还是异步 | 异步asyc成批发送用kafka.producer.AyncProducer, 同步sync用kafka.producer.SyncProducer |
metadata.broker.list | boker list | 使用这个参数传入boker和分区的静态信息,如host1:port1,host2:port2, 这个可以是全部boker的一部分 |
compression.codec | NoCompressionCodec | 消息压缩,默认不压缩 |
compressed.topics | null | 在设置了压缩的情况下,可以指定特定的topic压缩,未指定则全部压缩 |
message.send.max.retries | 3 | 消息发送最大尝试次数 |
retry.backoff.ms | 300 | 每次尝试增加的额外的间隔时间 |
topic.metadata.refresh.interval.ms | 600000 | 定期的获取元数据的时间。当分区丢失,leader不可用时producer也会主动获取元数据,如果为0,则每次发送完消息就获取元数据,不推荐。如果为负值,则只有在失败的情况下获取元数据。 |
queue.buffering.max.ms | 5000 | 在producer queue的缓存的数据最大时间,仅仅for asyc |
queue.buffering.max.message | 10000 | producer 缓存的消息的最大数量,仅仅for asyc |
queue.enqueue.timeout.ms | -1 0 | 当queue满时丢掉,负值是queue满时block,正值是queue满时block相应的时间,仅仅for asyc |
batch.num.messages | 200 | 一批消息的数量 |
request.required.acks | 0 | 0表示producer无需等待leader的确认,1代表需要leader确认写入它的本地log并立即确认,-1代表所有的备份都完成后确认。 |
request.timeout.ms | 10000 | 确认超时时间 |
1.6 Kakfa常用命令
① 查看当前Kafka集群中Topic的情况
bin/kafka-topics.sh --list –zookeeper ip:2181
列出该zookeeper中记录在案的topic列表,只有名字
② 查看Topic的分区和副本情况
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test0
运行结果:
Topic: test0 PartitionCount:16 ReplicationFactor:3 Configs:
Topic: test0 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2
Topic: test0 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test0 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
Topic: test0 Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test0 Partition: 4 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
Topic: test0 Partition: 5 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test0 Partition: 6 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test0 Partition: 7 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
Topic: test0 Partition: 8 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
Topic: test0 Partition: 9 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: test0 Partition: 10 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test0 Partition: 11 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
Topic: test0 Partition: 12 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: test0 Partition: 13 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test0 Partition: 14 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
Topic: test0 Partition: 15 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
结果分析:
第一行显示partitions的概况,列出了Topic名字,partition总数,存储这些partition的broker数
以下每一行都是其中一个partition的详细信息:
- leader:是该partitons所在的所有broker中担任leader的broker id,每个broker都有可能成为leader
- replicas:显示该partiton所有副本所在的broker列表,包括leader,不管该broker是否是存活,不管是否和leader保持了同步。
- isr(in-sync replicas)的简写,表示存活且副本都已同步的的broker集合,是replicas的子集
举例:
比如上面结果的第一行:Topic: test0 Partition:0 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2
Partition: 0
该partition编号是0
Replicas: 0,2,1
代表partition0 在broker0,broker1,broker2上保存了副本
Isr: 1,0,2
代表broker0,broker1,broker2都存活而且目前都和leader保持同步
Leader: 0
代表保存在broker0,broker1,broker2上的这三个副本中,leader是broker0
leader负责读写,broker1、broker2负责从broker0同步信息,平时没他俩什么事
当producer发送一个消息时,producer自己会判断发送到哪个partiton上,如果发到了partition0上,消息会发到leader,也就是broker0上,broker0处理这个消息,broker1、broker2从broker0同步这个消息
如果这个broker0挂了,那么kafka会在Isr列表里剩下的broker1、broker2中选一个新的leader
③创建Topic
bin/kafka-topics.sh --create --topic test0--zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --partitions 5 --replication-factor 1
说明:
--topic后面的test0是topic的名称
--zookeeper应该和server.properties文件中的zookeeper.connect一样
--partitions指定topic的partition数量,如果不指定该数量,默认是server.properties文件中的num.partitions配置值
--replication-factor指定每个partition的副本个数,默认1个
④删除topic
- 删除kafka的topic
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test0
如果server.properties中没有把delete.topic.enable设为true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
- 删除kafka中该topic相关的目录。
在server.properties中找到配置log.dirs,把该目录下test0相关的目录删掉 - 登录zookeeper client。
sh /bin/zkCli.sh
- 删除zookeeper中该topic相关的目录
rm -r /kafka/config/topics/test0
rm -r /kafka/brokers/topics/test0
rm -r /kafka/admin/delete_topics/test0 (topic被标记为marked for deletion时需要这个命令)
- 重启zookeeper和broker
sh bin/zkServer.sh restart
⑤查看topic消费到的offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test0 --time -1
或者可去zk上去查看offset值
⑥查看topic各个分区的消息的信息
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
运行结果:
GROUP | TOPIC | PID | OFFSE | LOGSIZE | LAG |
---|---|---|---|---|---|
消费者组 | topic名字 | partition id | 当前已消费的条数 | 总条数 | 未消费的条数 |
⑦修改topic的partition数量(只能增加不能减少)
bin/kafka-topics.sh --alter --zookeeper 127.0.0.1:2183 --partitions 10 --topic test0
⑧修改topic的副本数
- 自己写一个文件addReplicas.json,文件的内容是JSON格式的:
{ "version": 1, "partitions": [ { "topic": "test0", "partition": 0, "replicas": [ 1,2 ] }, { "topic": "test0", "partition": 1, "replicas": [ 1,2,3 ] }, { "topic": "test0", "partition": 2, "replicas": [ 1,2,3 ] } ] }
- 运行命令:
bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:9092 --reassignment-json-file addReplicas.json --execute
⑨kafka服务启动
bin/kafka-server-start.sh -daemon config/server.properties
⑩下线broker
如下线broker0:
bin/kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker 0 --num.retries 3 --retry.interval.ms 60 shutdown broker
2. Kafka一些基本原理
2.1. producer发布消息
2.1.1 写入方式
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
2.1.2 消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其机制为:
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
- patition 和 key 都未指定,使用轮询选出一个 patition。
2.1.3 写入流程
producer 写入消息序列图如下所示:
流程说明:
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK
2.2. broker保存消息
2.2.1. 存储方式
物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:
2.2.2 存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:log.retention.hours
- 基于大小:log.retention.bytes
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关.
2.3 leader选举和副本策略
2.3.1 replication
同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。
Kafka 分配 Replica 的算法如下:
- 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
- 将第 i 个 partition 分配到第(i mod n)个 broker 上
- 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上
2.3.2 leader
当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。
kafka 在 zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas),只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。
当所有 replica 都不工作时,
等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
2.4 kafka拓扑结构
3.开发中遇到过问题:
3.1发送数据同步还是异步?
kafka有同步(sync)、异步(async)以及oneway这三种发送方式,某些概念上区分也可以分为同步和异步两种,同步和异步的发送方式通过“producer.type”参数指定,而oneway由“request.require.acks”参数指定
3.2 kafka重复消费和数据丢失?
3.2.1 Kafka重复消费原因
底层根本原因:已经消费了数据,但是offset没提交。
- 原因1:强行kill线程,导致消费后的数据,offset没有提交。
- 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如:
上面代码会导致部分offset没提交,下次启动时会重复消费。try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); } catch (Exception e) { }
- 原因3(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
3.2.2 Kafka Consumer丢失数据原因
- 猜测:
设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。 - 解决方案:
记录offset和恢复offset的方案,理论上记录offset,下一个group consumer可以接着记录的offset位置继续消费。 - offset记录方案:
每次消费时更新每个topic+partition位置的offset在内存中,Map<key, value>,key=topic+’-’+partition,value=offset,当调用关闭consumer线程时,把上面Map的offset数据记录到 文件中(分布式集群可能要记录到redis中)。下一次启动consumer,需要读取上一次的offset信息,方法是 以当前的topic+partition为key,从上次的Map中去寻找offset。然后使用consumer.seek()方法指定到上次的offset位置。
3.3 数据入相应的partition,指定机器消费指定partition
3.3.1 高级 API 的特点
- 优点
● 高级API写起来简单
● 不需要去自行去管理offset,系统通过zookeeper自行管理
● 不需要管理分区,副本等情况,系统自动管理
● 消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据(默认设置5s更新一下 zookeeper 中存的的offset),版本为0.10.2
● 可以使用group来区分对访问同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
缺点
● 不能自行控制 offset(对于某些特殊需求来说)
● 不能细化控制如分区、副本、zk 等
3.3.2 低级 API 的特点
- 优点
● 能够开发者自己控制offset,想从哪里读取就从哪里读取。
● 自行控制连接分区,对分区自定义进行负载均衡
● 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储offset 即可,比如存在文件或者内存中)
缺点
● 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等
3.4.partition数量配置
partition数量由topic的并发决定,并发少则1个分区就可以,并发越高,分区数越多,可以提高吞吐量。,但是我们必须意识到集群的partition总量多大或者单个broker节点partition过多,都会对系统的可用性和消息延迟带来潜在的影响.
创建topic时指定topic数量
bin/kafka-topics.sh --create --zookeeper 10.25.58.35:2181 --replication-factor 3 --partitions 3 --topic test8
3.5.日志保留策略设置
当kafka broker的被写入海量消息后,会生成很多数据文件,占用大量磁盘空间,kafka默认是保留7天,建议根据磁盘情况配置log.retention.hours,避免磁盘撑爆。段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快。
3.6.文件刷盘策略
为了大幅度提高producer写入吞吐量,需要定期批量写文件。建议配置:
每当producer写入10000条消息时,刷数据到磁盘 log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘 log.flush.interval.ms=1000
3.7异步提交(kafka.javaapi.producer)
- 采用同步:某地区mac数据2000条/s
- 采用异步:1s异步写入,速度提升为1w条/s(ProducerConfig)
3.8 producer和consumer优化
-
buffer.memory:在Producer端用来存放尚未发送出去的Message的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由block.on.buffer.full的配置来决定。
-
compression.type:none:默认发送不进行压缩,可以配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
-
num.consumer.fetchers:启动Consumer的个数,适当增加可以提高并发度。