Kafka 概述 和 kafka shell操作
Kafka官方文档:kafka.apache.org/intro
Kafka概述
根据官网解释:
1、发布和订阅消息的一个企业级系统
2、高效处理实时数据
3、安全的在一个分布式系统以容错的方式存储。
1、发布和订阅消息的一个企业级系统
2、以容错的方式存储流数据
3、实时处理流数据
消息中间件:生产者和消费者
举个栗子:
妈妈:生产者
你:消费者
馒头:数据流、消息
正常情况下: 生产一个 消费一个
其他情况:
一直生产,你吃到某一个馒头时,你卡主(机器故障), 馒头就丢失了
一直生产,做馒头速度快,你吃来不及,馒头也就丢失了
拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃
篮子/框: Kafka
当篮子满了,馒头就装不下了,咋办?
多准备几个篮子 === Kafka的扩容
Kafka架构
producer:生产者,就是生产馒头(老妈)
consumer:消费者,就是吃馒头的(你)
broker:篮子
topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃
kafka是作为集群来运行,以一个或者多个broke。
kafka可以分门别类的存储流数据
kafka每一条记录都包含一个key,一个value'和一个timestamp(时间戳)
Kafka Shell 以及集群部署
单节点单broker的部署及使用
kafka 版本:kafka_2.11-0.9.0.0
配置:
$KAFKA_HOME/config/server.properties
###每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0
## kafka 默认socket端口9092
listeners=PLAINTEXT://:9092
## 机器名
host.name=hadoop000
##broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
##broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
##socket的发送缓冲区
socket.send.buffer.bytes=102400
##socket的接受缓冲区
socket.receive.buffer.bytes=102400
##socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
###kafka数据的存放地址,多个地址的话用逗号分割/opt/tmp/kafka-logs,/opt/tmp/kafka-logs2
log.dirs=/opt/tmp/kafka-logs
##每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions=1
##在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,默认1
num.recovery.threads.per.data.dir=1
##log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.hours=168
##topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
##这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.roll.hours=24*7
#文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000
##是否开启日志压缩
log.cleaner.enable=false
##日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
##连接zookeeper集群
zookeeper.connect=hadoop000:2181
##连接zookeeper超时设定
zookeeper.connection.timeout.ms=6000
启动Kafka
kafka-server-start.sh
USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties
其中-daemon表示后台运行
创建topic: zk
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic
需指定备份数和分区数
查看所有topic
kafka-topics.sh --list --zookeeper hadoop000:2181
发送消息: broker
kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic
消费消息: 0.9.0.0版本参数为--zookeeper 0.10.0.0参数为--bootstrap-server
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning
--from-beginning的使用:若含有此参数,则每次打开终端会消费该topic上所有消息
若不含此参数,则每次打开终端都会消费,打开之后收到的数据。
查看所有topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181
查看指定topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic hello_topic
单节点多broker
配置(差异配置如下,其他配置相同)
server-1.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-1
listeners=PLAINTEXT://:9093
broker.id=1
server-2.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-2
listeners=PLAINTEXT://:9094
broker.id=2
server-3.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-3
listeners=PLAINTEXT://:9095
broker.id=3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &
创建topic: zk
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
发送消息: broker
kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic
消费消息: broker
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic
kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic