Flume
Flume
1.Flume架构解释
架构设计要点
老师总结:
Source是数据源的总称,我们往往设定好源后,数据将源源不断的被抓取或者被推送。常见的数据源有:
ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。
Channel用于连接Source和Sink,Source将日志信息发送到Channel,Sink从Channel消费日志信息;
Channel是中转日志信息的一个临时存储,保存有Source组件传递过来的日志信息。
Sink负责取出Channel中的消息数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
Flume的架构主要有一下几个核心概念:
· Event:一个数据单元,带有一个可选的消息头
· Flow:Event从源点到达目的点的迁移的抽象
· Client:操作位于源点处的Event,将其发送到Flume Agent
· Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
· Source:用来消费传递到该组件的Event
· Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
· Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)
FlumeNG架构:
外部系统产生的日志,直接通过Flume的Agent中的Source组件,Source组件将事件(日志)发送给Channel组件来临时存储,Channel组件再将事件发送给Sink组件中,最后Sink组件直接将数据放到HDFS集群上。
多个Agent顺序连接:
和前面的架构过程一样,只是将多个Agent顺序连接起来,这是一种不同应用场景的需求方式,。要控制其数量。
多个Agent数据汇聚到同一个Agent:
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
多路Agent:
这种模式分两种方式,一种复制(Replication),一种分流(Multiplexing)。复制可将最前端数据源复制多份,分别传递多个channel中,每个channel接收到的数据都是相同的。
loadbalance功能:
将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上
基本功能:
· FlumeSource
Source类型 | 说明 |
Avro Source | 支持Avro协议(实际上是Avro RPC),内置支持 |
Thrift Source | 支持Thrift协议,内置支持 |
Exec Source | 基于Unix的command在标准输出上生产数据 |
JMS Source | 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 |
Spooling Directory Source | 监控指定目录内数据变更 |
Twitter 1% firehose Source | 通过API持续下载Twitter数据,试验性质 |
Netcat Source | 监控某个端口,将流经端口的每一个文本行数据作为Event输入 |
Sequence Generator Source | 序列生成器数据源,生产序列数据 |
Syslog Sources | 读取syslog数据,产生Event,支持UDP和TCP两种协议 |
HTTP Source | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
· FlumeChannel
Channel类型 | 说明 |
Memory Channel | Event数据存储在内存中 |
JDBC Channel | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
File Channel | Event数据存储在磁盘文件中 |
Spillable Memory Channel | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
Pseudo Transaction Channel | 测试用途 |
Custom Channel | 自定义Channel实现 |
· FlumeSink
Sink类型 | 说明 |
HDFS Sink | 数据写入HDFS |
Logger Sink | 数据写入日志文件 |
Avro Sink | 数据被转换成Avro Event,然后发送到配置的RPC端口上 |
Thrift Sink | 数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
IRC Sink | 数据在IRC上进行回放 |
File Roll Sink | 存储数据到本地文件系统 |
Null Sink | 丢弃到所有数据 |
HBase Sink | 数据写入HBase数据库 |
Morphline Solr Sink | 数据发送到Solr搜索服务器(集群) |
ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群) |
Kite Dataset Sink | 写数据到Kite Dataset,试验性质的 |
Custom Sink | 自定义Sink实现 |
2.Flume监控某个目录,把日志写入HDFS
新建自己的。.Conf文件,并编辑如下:
--单节点 Flume 直接写入 HDFS
# Define a memory channel called ch1 onagent1
agent1.channels.ch1.type = memory //channel有三种模式,这里用memory
agent1.channels.ch1.capacity = 100000000
agent1.channels.ch1.transactionCapacity =100000000
agent1.channels.ch1.keep-alive = 30
#define source monitor a file
agent1.sources.s1.type = spooldir
agent1.sources.s1.spoolDir = /tmp/gaohq/log//root下自己的文件夹,作为本次测试的监控目录
agent1.sources.s1.fileHeader = true
agent1.sources.s1.deletePolicy = never
agent1.sources.s1.batchSize =100000000
agent1.sources.s1.channels =ch1
agent1.sources.s1.deserializer.maxLineLength=1048576
# Define a logger sink that simply logs allevents it receives
# and connect it to the other end of thesame channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://192.168.107.238/tmp/gaohq/test_flume//hdfs主路经
agent1.sinks.log-sink1.hdfs.writeFormat =Text
agent1.sinks.log-sink1.hdfs.fileType =DataStream
agent1.sinks.log-sink1.hdfs.rollInterval =0
agent1.sinks.log-sink1.hdfs.rollSize =1000000
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize =1000
agent1.sinks.log-sink1.hdfs.txnEventMax =1000
agent1.sinks.log-sink1.hdfs.callTimeout =60000
agent1.sinks.log-sink1.hdfs.appendTimeout =60000
# Finally, now that we've defined all ofour components, tell
# agent1 which ones we want to activate.
//指定channels、sources、sinks并起个别名,在前面引用。同样agent1也是
agent1.channels = ch1
agent1.sources = s1
agent1.sinks = log-sink1
启动:
bin/flume-ng agent --conf conf/ -f conf/hdfs.conf -n agent1-Dflume.root.logger=INFO,console
到bin前目录启动:
正在运行:
给其中用到的各个文件赋予权限:(如果都在flume下就不用)
下表展现自己新建的.conf文件和将要修改的flume-env.sh.template:
把系统日志导入自己要使用的文件里:(看最后那两行权限)
日志没有删除,而是保存到如下:
报错处理:
java.lang.OutOfMemoryError: GCoverhead limit exceeded
在Flume conf目录下找到flume-env.sh.template文件
cp flume-env.sh.templateflume-env.sh
vim flume-env.sh
把下面这句配置的注释删掉就可了
# exportJAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"