Flume

                                         Flume

1.Flume架构解释

  架构设计要点

 老师总结:

Source是数据源的总称,我们往往设定好源后,数据将源源不断的被抓取或者被推送。常见的数据源有:

ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。

Channel用于连接Source和Sink,Source将日志信息发送到Channel,Sink从Channel消费日志信息;

Channel是中转日志信息的一个临时存储,保存有Source组件传递过来的日志信息。

Sink负责取出Channel中的消息数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

Flume的架构主要有一下几个核心概念:

·        Event:一个数据单元,带有一个可选的消息头

·        Flow:Event从源点到达目的点的迁移的抽象

·        Client:操作位于源点处的Event,将其发送到Flume Agent

·        Agent:一个独立的Flume进程,包含组件Source、Channel、Sink

·        Source:用来消费传递到该组件的Event

·        Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event

·        Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

FlumeNG架构

Flume

外部系统产生的日志,直接通过Flume的Agent中的Source组件,Source组件将事件(日志)发送给Channel组件来临时存储,Channel组件再将事件发送给Sink组件中,最后Sink组件直接将数据放到HDFS集群上。

多个Agent顺序连接

Flume

和前面的架构过程一样,只是将多个Agent顺序连接起来,这是一种不同应用场景的需求方式,。要控制其数量。

多个Agent数据汇聚到同一个Agent:

Flume

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

多路Agent

Flume

这种模式分两种方式,一种复制(Replication),一种分流(Multiplexing)。复制可将最前端数据源复制多份,分别传递多个channel中,每个channel接收到的数据都是相同的。

loadbalance功能:

Flume

将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上

基本功能

·        FlumeSource

Source类型

说明

Avro Source

支持Avro协议(实际上是Avro RPC),内置支持

Thrift Source

支持Thrift协议,内置支持

Exec Source

基于Unix的command在标准输出上生产数据

JMS Source

从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过

Spooling Directory Source

监控指定目录内数据变更

Twitter 1% firehose Source

通过API持续下载Twitter数据,试验性质

Netcat Source

监控某个端口,将流经端口的每一个文本行数据作为Event输入

Sequence Generator Source

序列生成器数据源,生产序列数据

Syslog Sources

读取syslog数据,产生Event,支持UDP和TCP两种协议

HTTP Source

基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式

Legacy Sources

兼容老的Flume OG中Source(0.9.x版本)

·        FlumeChannel

Channel类型

说明

Memory Channel

Event数据存储在内存中

JDBC Channel

Event数据存储在持久化存储中,当前Flume Channel内置支持Derby

File Channel

Event数据存储在磁盘文件中

Spillable Memory Channel

Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)

Pseudo Transaction Channel

测试用途

Custom Channel

自定义Channel实现

·        FlumeSink

Sink类型

说明

HDFS Sink

数据写入HDFS

Logger Sink

数据写入日志文件

Avro Sink

数据被转换成Avro Event,然后发送到配置的RPC端口上

Thrift Sink

数据被转换成Thrift Event,然后发送到配置的RPC端口上

IRC Sink

数据在IRC上进行回放

File Roll Sink

存储数据到本地文件系统

Null Sink

丢弃到所有数据

HBase Sink

数据写入HBase数据库

Morphline Solr Sink

数据发送到Solr搜索服务器(集群)

ElasticSearch Sink

数据发送到Elastic Search搜索服务器(集群)

Kite Dataset Sink

写数据到Kite Dataset,试验性质的

Custom Sink

自定义Sink实现


2.
Flume监控某个目录,把日志写入HDFS

新建自己的。.Conf文件,并编辑如下:

 Flume

--单节点 Flume 直接写入 HDFS

# Define a memory channel called ch1 onagent1

agent1.channels.ch1.type = memory  //channel有三种模式,这里用memory

agent1.channels.ch1.capacity = 100000000

agent1.channels.ch1.transactionCapacity =100000000

agent1.channels.ch1.keep-alive = 30

 

#define source monitor a file

agent1.sources.s1.type = spooldir

agent1.sources.s1.spoolDir = /tmp/gaohq/log//root下自己的文件夹,作为本次测试的监控目录

agent1.sources.s1.fileHeader = true

agent1.sources.s1.deletePolicy = never

agent1.sources.s1.batchSize =100000000

agent1.sources.s1.channels =ch1

agent1.sources.s1.deserializer.maxLineLength=1048576

 

# Define a logger sink that simply logs allevents it receives

# and connect it to the other end of thesame channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = hdfs

agent1.sinks.log-sink1.hdfs.path = hdfs://192.168.107.238/tmp/gaohq/test_flume//hdfs主路经

agent1.sinks.log-sink1.hdfs.writeFormat =Text

agent1.sinks.log-sink1.hdfs.fileType =DataStream

agent1.sinks.log-sink1.hdfs.rollInterval =0

agent1.sinks.log-sink1.hdfs.rollSize =1000000

agent1.sinks.log-sink1.hdfs.rollCount = 0

agent1.sinks.log-sink1.hdfs.batchSize =1000

agent1.sinks.log-sink1.hdfs.txnEventMax =1000

agent1.sinks.log-sink1.hdfs.callTimeout =60000

agent1.sinks.log-sink1.hdfs.appendTimeout =60000

 

# Finally, now that we've defined all ofour components, tell

# agent1 which ones we want to activate.

//指定channelssourcessinks并起个别名,在前面引用。同样agent1也是

agent1.channels = ch1

agent1.sources = s1

agent1.sinks = log-sink1

启动:

bin/flume-ng agent --conf conf/ -f conf/hdfs.conf -n agent1-Dflume.root.logger=INFO,console

到bin前目录启动:

Flume

正在运行:

Flume

给其中用到的各个文件赋予权限:(如果都在flume下就不用)

Flume

下表展现自己新建的.conf文件和将要修改的flume-env.sh.template:

Flume

把系统日志导入自己要使用的文件里:(看最后那两行权限)

Flume

日志没有删除,而是保存到如下:

Flume

报错处理:

Flume

java.lang.OutOfMemoryError: GCoverhead limit exceeded

在Flume conf目录下找到flume-env.sh.template文件

cp flume-env.sh.templateflume-env.sh

vim flume-env.sh

把下面这句配置的注释删掉就可了

# exportJAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"