学习Hadoop第三十四课(自动化采集工具---Flume)
上节课我们一起学习了Hive自定义UDF,这节课我们一起来学习一下自动化采集工具Flume。
首先我们来看一张图,如下所示,最上方代表三台设备,当然可以是更多的设备,每台设备运行过程都会产生一些log,这些log是我们需要的信息,我们不可能手动的一台一台的去收集这些log,那样的话太浪费人力了,这就需要一个自动化的采集工具,而我们今天要说的Flume便是自动化采集工具中的代表,flume可以自动从设备收集log然后将这些log上传到HDFS,HDFS会对这些log进行过滤,过滤后为了方便业务模块实时查询,HDFS会将过滤好的数据通过Sqoop工具导入到关系型数据库当中,从而各个业务模块可以去关系型数据库中去读取数据然后展示给用户。
但是在现实开发过程中有可能客户不让你随便给人家的服务器上安装软件,比如与银行合作开发项目,人家为了安全是不让随便安装软件的,那么怎么解决呢?我们看下下面这张图。下面这张图的上面部分显示的是银行的集群,其中Active和standby状态的两台服务器是负载均衡服务器,它们下方的三台服务器是Weblogic服务器,我们要得到银行设备的log,我们可以在外网的设备上安装Flume自动化采集工具,银行的集群一般与外网也有接口,我们可以让银行向我们的服务器上发送log,当然为了防止log中途被截获,需要我们与银行定义一套加密解密规则,银行把log加密之后发送出来,我们的Flume工具便接收到这些log,然后将这些log解密之后发送到HDFS,之后我们便又可以像第一张图那样过滤数据并将数据导入关系型数据库并共业务模块查询等等。
Flume是一个自动化采集框架,既然是框架,很多东西都不用我们去写了,甚至我们都不用写java代码便可以实现我们的目的,这就是框架的好处!
下面我们来看一张图,Agent也就是Flume,是由三部分组成的,第一部分是Source,Source是用来采集数据的,Channel是用来暂时保存数据的,Sink是将数据写到某种介质当中,比如写到HDFS、数据库、文件等。那么为什么Agent要分成三个组件呢?这是为了灵活搭配满足我们的个性化需求而这样设计的。三个组件分别有多个实现类,以Source为例,我们可以监听某个端口,我们可以监听某个文件夹,还可以使用tail -f来实时监听log文件,只要有数据就收集过来。这需要不同的实现类来完成,Channel作为临时存储介质也有多个实现类,比如Memory Channel,这样的Channel的优点是效率高,但是缺点是数据容易丢失。我们可以使用File Channel。Sink可以写到不同的介质当中HDFS只是其中的一种而已,我们还可以写到HBase、Oracle等介质当中。我们可以根据需要灵活搭配,只需要更改配置文件内容即可,不需要我们写代码。
Source、Sink、Channel三个组件的实现类列表如下图所示。
上面说了些理论,下面我们来真正使用flume。
第一步:下载Flume开发包
目前官方最新的版本是apache-flume-1.7.0-bin.tar.gz,大家可以去官网下载,也可以到:https://download.****.net/download/anaitudou/10534674这个地址下载。
第二步:上传到服务器
我们把刚下载的开发包上传到服务器,这里我用一台虚拟机来代表服务器,我把开发包上传到服务器的root根目录下,如下图所示。
第三步:解压
我把开发包解压到/itcast目录下,但是现在是没有/itcast目录的,因此我们需要先创建/itcast目录,如下所示
[[email protected] ~]# mkdir /itcast
创建完/itcast目录后我们便把开发包解压到这个目录下,如下所示
[[email protected] ~]# tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /itcast/
第四步:安装JDK
Flume是用Java写的,因此Flume的运行需要JDK环境,关于JDK的安装大家可以参考:https://blog.****.net/anaitudou/article/details/80378312这篇博客进行安装。
第五步:配置文件
首先我们到flume的conf目录下看看都有哪些配置文件,如下所示。
[[email protected] ~]# cd /itcast/apache-flume-1.7.0-bin/
[[email protected] apache-flume-1.7.0-bin]# ls
bin CHANGELOG conf DEVNOTES doap_Flume.rdf docs lib LICENSE NOTICE README.md RELEASE-NOTES tools
[[email protected] apache-flume-1.7.0-bin]# cd conf/
[[email protected] conf]# ls
flume-conf.properties.template flume-env.ps1.template flume-env.sh.template log4j.properties
[[email protected] conf]#
我们需要修改的是flume-env.sh,但我们发现conf目录下只有一个flume-env.sh.template的文件,那么我们便把它的后缀去掉,如下所示。
[[email protected] conf]# mv flume-env.sh.template flume-env.sh
[[email protected] conf]# ls
flume-conf.properties.template flume-env.ps1.template flume-env.sh log4j.properties
[[email protected] conf]#
下面我们便进入flume-env.sh文件里去修改这个配置文件。我们要修改的是JAVA_HOME配置,它原来是被注释掉了,现在我们把它前面的"#"去掉,然后把JAVA_HOME的路径配置成我们安装的路径,如下所示。
[[email protected] conf]# vim flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.
# Enviroment variables can be set here.
export JAVA_HOME=/usr/java/jdk1.7.0_80
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
# Let Flume write raw event data and configuration information to its log files for debugging
# purposes. Enabling these flags is not recommended in production,
# as it may result in logging sensitive user information or encryption secrets.
# export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
# Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""
下面我们来配置Agent的三个组件,既然要配置就需要有配置文件,我们人工建一个配置文件,起名叫a4.conf(当然也可以叫a5.conf或其它名字)。下面我们看看a4.conf文件中的内容(黑色粗体字的内容都是a4.conf文件中的内容)。配置文件中的注释可能还不够详细,我再说的具体一点,一开始是给source、channel、sink起个名字;接下来具体配置source,我们定义的spooldir是监听一个目录,监听的目录是/root/logs目录,你只要向这个目录放数据它就能监听到;接下来我们定义具体的channel,我们配置的是memory类型的channel这种类型的channel的优点是速度快,但有个缺点就是容易丢失数据。我们配置的memory channel的最大容纳数据量是10000条,达到10000条便将数据写出去,memory事务的容量是100;接着是拦截器,拦截器可以帮我们过滤一些数据,同时它还可以帮我们做一些处理,比如我们下面配置的拦截器就是帮我们给数据添加时间戳,根据时间戳我们便可以动态的将数据写入到指定日期的文件当中,这就相当于分区功能了。我们来看看sink,我们配置的sink的具体实现是HDFS,配置了channel将数据写到HDFS上的目录是flume下以年月日为文件名的文件当中,文件的前缀是events-,文件类型是纯文本方式(还可以是压缩文件)。sink配置中的rollCount是指每多少条flush成一个文件,如果配置成0则代表不按照条数生成文件,紧接着配置了当文件达到128M时会生成一个文件,或者时间达到60秒时也会生成一个文件。最后将source、channel、sink组装起来。
#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/logs
#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#定义拦截器,为消息添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
第六步:将a4.conf上传到服务器并放到flume的conf目录下
我们先将a4.conf文件上传到服务器的root根目录下,然后将它拷贝到flume的conf目录下
[[email protected] ~]# cp a4.conf /itcast/apache-flume-1.7.0-bin/conf/
[[email protected] ~]# cd /itcast/apache-flume-1.7.0-bin/conf/
[[email protected] conf]# ls
a4.conf flume-conf.properties.template flume-env.ps1.template flume-env.sh log4j.properties
[[email protected] conf]#
第七步:准备jar包
我们在运行flume之前需要先准备一些jar包,首先我们需要hadoop-common-2.2.0.jar,我itcast06设备上安装过hadoop-2.2.0,我便从itcast06上拷贝。
[[email protected] ~]# cd /itcast/hadoop-2.2.0/[[email protected] hadoop-2.2.0]# ls
bin etc include journal lib libexec logs sbin share tmp
[[email protected] hadoop-2.2.0]# cd share/
[[email protected] share]# ls
hadoop
[[email protected] share]# cd hadoop/
[[email protected] hadoop]# ls
common hdfs httpfs mapreduce tools yarn
[[email protected] hadoop]# cd common/
[[email protected] common]# ls
hadoop-common-2.2.0.jar hadoop-common-2.2.0-tests.jar hadoop-nfs-2.2.0.jar jdiff lib sources templates
[[email protected] common]# scp hadoop-common-2.2.0.jar 169.254.254.100:/itcast/apache-flume-1.7.0-bin/lib
The authenticity of host '169.254.254.100 (169.254.254.100)' can't be established.
RSA key fingerprint is 73:1d:15:b0:32:e6:39:18:f5:5a:76:95:25:0c:eb:cb.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '169.254.254.100' (RSA) to the list of known hosts.
[email protected]'s password:
hadoop-common-2.2.0.jar 100% 2612KB 2.6MB/s 00:01
[[email protected] common]#
接着我们来拷贝commons-configuration-1.6.jar和hadoop-auth-2.2.0.jar,如下所示。
[[email protected] common]# cd lib
[[email protected] lib]# ls
activation-1.1.jar commons-compress-1.4.1.jar commons-math-2.1.jar jackson-xc-1.8.8.jar jets3t-0.6.1.jar log4j-1.2.17.jar snappy-java-1.0.4.1.jar
asm-3.2.jar commons-configuration-1.6.jar commons-net-3.1.jar jasper-compiler-5.5.23.jar jettison-1.1.jar mockito-all-1.8.5.jar stax-api-1.0.1.jar
avro-1.7.4.jar commons-digester-1.8.jar guava-11.0.2.jar jasper-runtime-5.5.23.jar jetty-6.1.26.jar netty-3.6.2.Final.jar xmlenc-0.52.jar
commons-beanutils-1.7.0.jar commons-el-1.0.jar hadoop-annotations-2.2.0.jar jaxb-api-2.2.2.jar jetty-util-6.1.26.jar paranamer-2.3.jar xz-1.0.jar
commons-beanutils-core-1.8.0.jar commons-httpclient-3.1.jar hadoop-auth-2.2.0.jar jaxb-impl-2.2.3-1.jar jsch-0.1.42.jar protobuf-java-2.5.0.jar zookeeper-3.4.5.jar
commons-cli-1.2.jar commons-io-2.1.jar jackson-core-asl-1.8.8.jar jersey-core-1.9.jar jsp-api-2.1.jar servlet-api-2.5.jar
commons-codec-1.4.jar commons-lang-2.5.jar jackson-jaxrs-1.8.8.jar jersey-json-1.9.jar jsr305-1.3.9.jar slf4j-api-1.7.5.jar
commons-collections-3.2.1.jar commons-logging-1.1.1.jar jackson-mapper-asl-1.8.8.jar jersey-server-1.9.jar junit-4.8.2.jar slf4j-log4j12-1.7.5.jar
[[email protected] lib]# scp commons-configuration-1.6.jar 169.254.254.100:/itcast/apache-flume-1.7.0-bin/lib
[email protected]'s password:
commons-configuration-1.6.jar 100% 292KB 291.8KB/s 00:00
[[email protected] lib]# scp hadoop-auth-2.2.0.jar 169.254.254.100:/itcast/apache-flume-1.7.0-bin/lib
[email protected]'s password:
hadoop-auth-2.2.0.jar 100% 49KB 48.6KB/s 00:00
[[email protected] lib]#
最后我们来拷贝hadoop-hdfs-2.2.0.jar,如下所示。
[[email protected] common]# ls
hadoop-common-2.2.0.jar hadoop-common-2.2.0-tests.jar hadoop-nfs-2.2.0.jar jdiff lib sources templates
[[email protected] common]# cd ..
[[email protected] hadoop]# ls
common hdfs httpfs mapreduce tools yarn
[[email protected] hadoop]# cd hdfs
[[email protected] hdfs]# ls
hadoop-hdfs-2.2.0.jar hadoop-hdfs-2.2.0-tests.jar hadoop-hdfs-nfs-2.2.0.jar jdiff lib sources templates webapps
[[email protected] hdfs]# scp hadoop-hdfs-2.2.0.jar 169.254.254.100:/itcast/apache-flume-1.7.0-bin/lib
[email protected]'s password:
hadoop-hdfs-2.2.0.jar 100% 5108KB 5.0MB/s 00:00
[[email protected] hdfs]#
第八步:拷贝配置文件
之所以拷贝配置文件是因为我们配置flume的时候配置了http://ns1,这个配置的意思是让flume连向HDFS的NameNode的一个抽象,那么flume该怎么知道namenode和datanode在哪些设备上呢?就是通过core-site.xml和hdfs-site.xml这两个配置文件知道的。
[[email protected] lib]# scp /itcast/hadoop-2.2.0/etc/hadoop/{core-site.xml,hdfs-site.xml} 169.254.254.100:/itcast/apache-flume-1.7.0-bin/conf
[email protected]'s password:
core-site.xml 100% 1180 1.2KB/s 00:00
hdfs-site.xml 100% 2901 2.8KB/s 00:00
[[email protected] lib]#
第九步:修改hosts文件
之所以要配置hosts文件,是因为hdfs-site.xml文件当中配置的内容是itcast01、itcast02等主机名,而我们当前这个服务器默认是不认识这些主机名的,要想让它认识的话,需要配置一下hosts文件,我们加上下面黑色粗体字的两行内容。然后保存退出并使用
[[email protected] lib]# vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
169.254.254.10 itcast01
169.254.254.20 itcast02
第十步:创建监控的目录/root/logs
默认情况下是没有/root/logs目录的因此我们需要创建这个目录
[[email protected] ~]# mkdir /root/logs
[[email protected] ~]# cd /root/
[[email protected] ~]# ls
a4.conf anaconda-ks.cfg apache-flume-1.7.0-bin.tar.gz Desktop Documents Downloads install.log install.log.syslog jdk-7u80-linux-x64.gz logs Music Pictures Public Templates Videos
[[email protected] ~]#
第十一步:启动flume
前面做了那么多铺垫,我们现在终于要启动flume了,如下所示,命令的参数的意思是,agent是指我们要配置agent的参数,-n代表agent的名字,-c代表conf目录,-f代表agent中三个组件的具体配置,它在conf/a4.conf当中,-Dflume.root.logger=INFO,console是指日志信息级别是INFO,console表示输出到控制台。
[[email protected] apache-flume-1.7.0-bin]# bin/flume-ng agent -n a4 -c conf -f conf/a4.conf -Dflume.root.logger=INFO,console
Info: Sourcing environment configuration script /itcast/apache-flume-1.7.0-bin/conf/flume-env.sh
Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/jdk1.7.0_80/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/itcast/apache-flume-1.7.0-bin/conf:/itcast/apache-flume-1.7.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -n a4 -f conf/a4.conf
2016-11-09 17:22:22,613 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2016-11-09 17:22:22,625 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/a4.conf
2016-11-09 17:22:22,634 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,634 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,634 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,634 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,634 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,634 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,635 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a4
2016-11-09 17:22:22,635 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,635 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-09 17:22:22,678 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a4]
2016-11-09 17:22:22,678 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2016-11-09 17:22:22,700 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2016-11-09 17:22:22,711 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1
2016-11-09 17:22:22,718 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type spooldir
启动完flume之后我们另打开一个169.254.254.100的连接,把日志文件放到/root/logs目录下,如下所示。
[[email protected] ~]# ls
a4.conf anaconda-ks.cfg Desktop Downloads install.log.syslog logs Pictures Templates
access_2013_05_30.log apache-flume-1.7.0-bin.tar.gz Documents install.log jdk-7u80-linux-x64.gz Music Public Videos
[[email protected] ~]# cp access_2013_05_30.log /root/logs/
[[email protected] ~]#
把log文件放到/root/logs目录之后我们便回到监听界面看看控制台这时输出的信息,可以看到如下所示的信息,说明将内容都写完了。
2016-11-09 17:22:22,867 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2016-11-09 17:23:33,930 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2016-11-09 17:23:34,439 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)]Creating hdfs://ns1/flume/20161109/events-.1478741013931.tmp
2016-11-09 17:23:34,653 (hdfs-k1-call-runner-0) [WARN - org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-11-09 17:24:07,517 (pool-5-thread-1) [WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
2016-11-09 17:24:07,540 (pool-5-thread-1) [WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
2016-11-09 17:24:14,478 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2016-11-09 17:24:14,479 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)]Preparing to move file /root/logs/access_2013_05_30.log to /root/logs/access_2013_05_30.log.COMPLETED
2016-11-09 17:24:35,638 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)]Closing hdfs://ns1/flume/20161109/events-.1478741013931.tmp
2016-11-09 17:24:35,731 (hdfs-k1-call-runner-7) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming hdfs://ns1/flume/20161109/events-.1478741013931.tmp to hdfs://ns1/flume/20161109/events-.1478741013931
2016-11-09 17:24:35,761 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
既然都写到HDFS了,我们便到HDFS上看看,如下图所示,可以看到确实生成了一个flume目录,在flume目录下生成了一个名叫20161109的目录,在20161109目录下生成了一个events-开头的文件,文件中的内容就是我们的日志文件的内容。这说明我们的flume自动化采集工具完全没问题,成功了!
下面我们再来试一下另一个Agent,配置文件的名字是a2.conf,内容如下所示,我们source具体定义的是命令类型的source,实时监听/root/log文件,如果该文件的内容发生变化将实时写出去,channel还是配置memory,sink配置成最简单的logger。
#定义agent名, source、channel、sink的名称
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#具体定义source
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /root/log
#具体定义channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#具体定义sink
a2.sinks.k1.type = logger
#组装source、channel、sink
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
下面我们先把a2.conf文件放到/itcast/apache-flume-1.7.0-bin/conf目录下,然后在root目录下创建一个log文件并在里面写上一行内容11111,如下所示。
[[email protected] ~]# ls
a2.conf access_2013_05_30.log apache-flume-1.7.0-bin.tar.gz Documents install.log jdk-7u80-linux-x64.gz Music Public Videos
a4.conf anaconda-ks.cfg Desktop Downloads install.log.syslog logs Pictures Templates
[[email protected] ~]# cp a2.conf /itcast/apache-flume-1.7.0-bin/conf/
[[email protected] ~]# vim log
111111
下面我们来启动flume,如下所示,可以看到它把我们/root/log文件中的111111给打印出来了。
[[email protected] apache-flume-1.7.0-bin]# bin/flume-ng agent -n a2 -c conf/ -f conf/a2.conf -Dflume.root.logger=INFO,console
Info: Sourcing environment configuration script /itcast/apache-flume-1.7.0-bin/conf/flume-env.sh
Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/jdk1.7.0_80/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/itcast/apache-flume-1.7.0-bin/conf:/itcast/apache-flume-1.7.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -n a2 -f conf/a2.conf
2016-11-10 15:37:50,088 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2016-11-10 15:37:50,094 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/a2.conf
2016-11-10 15:37:50,100 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-10 15:37:50,101 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a2
2016-11-10 15:37:50,102 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2016-11-10 15:37:50,147 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a2]
2016-11-10 15:37:50,147 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2016-11-10 15:37:50,168 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2016-11-10 15:37:50,172 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1
2016-11-10 15:37:50,173 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type exec
2016-11-10 15:37:50,189 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2016-11-10 15:37:50,206 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [r1, k1]
2016-11-10 15:37:50,213 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2016-11-10 15:37:50,223 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2016-11-10 15:37:50,267 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2016-11-10 15:37:50,267 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2016-11-10 15:37:50,269 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2016-11-10 15:37:50,271 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1
2016-11-10 15:37:50,272 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:168)] Exec source starting with command:tail -F /root/log
2016-11-10 15:37:50,275 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2016-11-10 15:37:50,275 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2016-11-10 15:37:54,324 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 31 31 31 31 31 111111 }
下面我们另外打开一个该服务器的连接,并在该连接中向/root/log文件中追加内容,如下图所示,我们追加了两行内容。
追加完内容之后我们回到flume界面,看看有没有打印出来我们刚才追加的内容,发现确实给打印出来了。说明我们的这套配置也没问题。
2016-11-10 15:39:12,340 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 32 32 32 32 22222 }
2016-11-10 15:39:27,346 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 33 33 33 33 3333 }
好了,本小节课我们便一起学习到这里