Kafka集成Flume实现实时数据的处理
Kafka集成Flume实现实时数据的处理
环境准备:Kafka:2.2.0
Flume:1.9.0
netcat
demo架构:
- netcat:flume source
- memory:flume channel
- KafkaSink:flume sink、kafka producer
- kafka-console-consumer:kafka consumer**
一、创建一个kafka测试主题,主题名称为test
bin/kafka-topics.sh --zookeeper s1:2181,s2:2181,s3:2181 --create --topic test --partitions 3 --replication-factor 3
二、准备flume配置,配置文件名称为nc_memory_kafka.conf
agent.sources = r1
agent.channels = c1
agent.sinks = k1
agent.sources.r1.type=netcat
agent.sources.r1.bind=localhost
agent.sources.r1.port=9999
agent.channels.c1.type=memory
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
# 主题名称
agent.sinks.k1.kafka.topic=test
# broker列表串
agent.sinks.k1.kafka.bootstrap.servers=s1:9092,s2:9092,s3:9092
agent.sinks.k1.kafka.flumeBatchSize=20
agent.sinks.k1.kafka.producer.acks = 1
agent.sources.r1.channels=c1
agent.sinks.k1.channel=c1
三、启动flume
bin/flume-ng agent -f conf/nc_memory_kafka.conf -n agent
四、开启kafka控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server s1:9092,s2:9092,s3:9092 --topic test
五、开启nc连接到flume source暴露的nc服务器端
nc localhost 9999
开启成功后即可编辑内容回车,像flume发送数据过去,紧接着flume的sink又将数据传送给kafka的broker,然后kafka的consumer又从broker的主题拉取到数据,打印到控制台