Flume1.6 安装及与Kafka对接
一、Flume1.6安装
1、下载:http://archive.apache.org/dist/flume/1.6.0/
2、解压到 /usr/local/src/目录下 :
>tar -zxvf flume1.6.0
3、把文件改为简单的名字:
>mv flume1.6.0 /usr/local/src/flume
4、配置环境变量:
>vi ~/.bash_profile
添加以下内容:
5、保存退出,source生效
>source ~/.bash_profile
6、配置文件
进入到conf目录中,复制一份flume-env.sh.template,并重命名为flume.env.sh
>cp flume-env.sh.template flume.env.sh
>vi flume.env.sh (将本机的jdk安装目录写入其中)
7、配置好后查看flume是否安装成功:
>./bin/flume-ng version (进入到bin目录中)
证明flume安装成功
参考链接:http://www..com/article/1313408151/
二、Kafka的安装教程见之前的博客
三、Flume与Kafka对接
1、启动zookeeper和kafka
2、在kafka中建立topic,本机中建立的是zlp-test
3、查看topic列表:
bin/kafka-topics.sh --list --zookeeper huaxia01:2181,huaxia02:2181,huaxia03:2181 (后面为zookeeper分布式集群的IP和端口号)
4、进行对接的第一步:
进入到flume的conf目录下,新建一个conf文件,flume-kafka-sink.conf
内容如下:
需提前创建一个被监控的log文件,用于存数据,本文件是/usr/local/src/test.log,自己创建,一旦文件内容有更改,则被采集到kafka中
a1.sources = r1 #对于source的配置描述 监听文件中的新增数据 exec
#这里设置所创建的topic #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高 #通过channel c1将source r1和sink k1关联起来
|
5、创建test.sh脚本,注意要与test.log在同一个目录下,用于生成数据,例如:
arr=("hubei_wuhan" "hebei_shijiazhuang" "guangdong_guangzhou" "jiangsu_nanjing" "hunan_changsha") function rand(){ min=$1 max=$(($2-$min+1)) num=$(date +%s%N) echo $(($num%$max+$min)) } for((i=0;i<50;i++)); do rnd1=$(rand 0 4) currentTime=`date "+%Y-%m-%d %H:%M:%S"` timeStamp=`date -d "$currentTime" +%s` a=${arr[$rnd1]} province=`echo $a | cut -d \_ -f 1` city=`echo $a | cut -d \_ -f 2` rnd2=$(rand 0 10) userid=$rnd2 rnd3="10000"$(rand 0 3) advid=$rnd3 newStr=${timeStamp}","${province}","${city}","${userid}","${advid} echo $newStr >> test.log done |
6、创建kafka消费者监控zlp-test这个topic
>./bin/kafka-console-consumer.sh --bootstrap-server 192.168.120.202:9092 --topic zlp-test --from-beginning
7、启动flume,在flume安装目录下执行
>bin/flume-ng agent -n a1 -c conf -f /usr/local/src/flume/conf/flume-kafka-sink.conf -Dflume.root.logger=INFO.console
8、执行test.sh文件
>chmod u+x test.log (授予权限)、
>./test.log
9、观察test.log文件和消费者界面的现象即可