flum执行原理及测试案例&&串联(负载均衡)
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采
集、聚合和传输的软件
数据采集
- 数据从无到有的过程
- 数据搬运传输的过程
flume是一个日志采集、聚合、汇总传输的软件。数据搬运的过程。
flume具有三个核心组件
source:对接各个不同种类的数据源
sink:对接各个不同数据存放的目的地(下沉地)
channel:中间用于临时缓存数据的
以上三个组件加起来就构成了flume一个进程----agent.
flume作为工具没有主从主备的概念存在。它可以一个agent干活 也可以多个agent串联干活。
在flume 数据是以event形式存在的 event是flume最小 的数据单元。
在Flume中使用Event对象来作为传递数据的格式。
Sources端在flume-ng-core子项目中的org.apache.flume.serialization包下,有一个名为LineDeserializer的类,这个类负责把数据按行来读取,每一行封装成一个Event(实现方式:按字节读取,当遇到"\n"时封装成Event返回,下一次获取Event时继续获取下一字节并判断)。然后按用户设置的批量传输的值来封装List。
Event: { headers:{} body: 6E 69 68 61 6F 0D nihao. }
-
如何下载cdh版本软件包
cloudera archive
http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.14.0.tar.gz
-
flume启动命令
bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console 全写版本bin/flume-ng agent -c ./conf -f ./conf/spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 简写版本 --conf(-c) 指定默认配置文件所在的目录 --conf-file(-f) 指定采集方案所在的路径 --name(-n) flume agent 名字 -Dflume.root.logger=INFO,console 开启日志功能 可以查看执行的过程
-
hdfs sink
控制文件以何种形式进行滚动
a1.sinks.k1.hdfs.rollInterval = 3 时间间隔
a1.sinks.k1.hdfs.rollSize = 20 文件大小
a1.sinks.k1.hdfs.rollCount = 5 evnet个数
如果把某个属性设置为0 意味着该属性不生效
如果都设置了 谁先满足触发条件 谁控制滚动是否开启时间上的舍弃。 文件夹如何滚动 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute
spooldir source
注意:其采集的监控的目录不能有同名的文件产生 如果有 第一报错 然后罢工。
因此在企业实际中,通常把时间追加到文件名的尾部 避免同名现象的产生。 -
如何模拟生成数据
while true; do date >> /root/allen.txt ;sleep 0.5;done
exec source :执行一个linux命令
- 负载均衡:解决的是一个程序或者进程无法处理所有请求 然后部署多个程序或者进程一台处理的过程
负载均衡的算法:轮询(round_robin) 随机(random) 权重 - 如果涉及flume直接多级之间的串联
使用avro sink、 avro source - 针对多级之间的flume架构 通常从远离数据源的那一级开始启动
- 容错 指的容忍错误的发生 通常用于单点故障问题 给容易出故障设置备份 常见的是一个备份
同一时间只能有一个工作 当工作的出现问题之后 备份才会切换顶替继续工作
- 静态拦截器的使用
没有使用静态拦截器
Event: { headers:{} body: 6 Tue Feb 19 18:16 }
使用静态拦截器:
Event: { headers:{type=access} body: 6 Tue Feb 19 18:16 }
Event: { headers:{type=web} body: 6 Tue Feb 19 18:16 }
Event: { headers:{type=nginx} body: 6 Tue Feb 19 18:16 }
如何使用:
%{type} 即可取出对应的value值
while true; do echo “access,access…” >> /root/logs1/access.log;sleep 0.5;done
while true; do echo “nginx,nginx…” >> /root/logs1/nginx.log;sleep 0.5;done
while true; do echo “web,web…” >> /root/logs1/web.log;sleep 0.5;done
安装:下载对应hadoop版本的flum安装包,
Flume 的安装非常简单
上传安装包到数据源所在节点上
然后解压 tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_HOME
根据数据采集需求配置采集方案,描述在配置文件中(文件名可任意自定义)
指定采集方案配置文件,在相应的节点上启动 flume agent
先用一个最简单的例子来测试一下程序环境是否正常
1、先在 flume 的 conf 目录下新建一个文件
vi netcat-logger.conf
定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
克隆会话
用telnet 建立连接
yum install telnet
telnet localhost 44444
就可以演示你想要的输出的数据了~
. 采集目录到 HDFS
采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,
就需要把文件采集到 HDFS 中去
根据需求,首先定义以下 3 大要素
采集源,即 source——监控文件目录 : spooldir
下沉目标,即 sink——HDFS 文件系统 : hdfs sink
source 和 sink 之间的传递通道——channel,可用 file channel 也可以用
内存 channel
监视文件夹
启动命令:
bin/flume-ng agent -c ./conf -f ./conf/spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
测试: 往/home/hadoop/flumeSpool放文件(mv ././xxxFile /home/hadoop/flumeSpool),但是不要在里面生成文件
##############
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs2
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:采集之后的文件会增加后缀(自动)再次采集就会报错,天生傲娇的特性:不能有同名文件,如果有会报错,罢工!
采集需求:比如业务系统使用 log4j 生成的日志,日志内容不断增加,需要把追
加到日志文件中的数据实时采集到 hdfs
根据需求,首先定义以下 3 大要素
采集源,即 source——监控文件内容更新 : exec ‘tail -F file’
下沉目标,即 sink——HDFS 文件系统 : hdfs sink
Source 和 sink 之间的传递通道——channel,可用 file channel 也可以用
内存 channel
使用脚本创建死循环来获取实时日志数据:
while true; do date >> /root/logs/test.log;sleep 2;done
tail-hdfs.conf
用tail命令获取数据,下沉到hdfs
启动命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
########
Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1
Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H-%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
负载均衡:
- 负载均衡:解决的是一个程序或者进程无法处理所有请求 然后部署多个程序或者进程一台处理的过程
负载均衡的算法:轮询(round_robin) 随机(random) 权重 - 如果涉及flume直接多级之间的串联
使用avro sink、 avro source - 针对多级之间的flume架构 通常从远离数据源的那一级开始启动
- 容错 指的容忍错误的发生 通常用于单点故障问题 给容易出故障设置备份 常见的是一个备份
同一时间只能有一个工作 当工作的出现问题之后 备份才会切换顶替继续工作
负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种
算法。Load balancing Sink Processor 能够实现 load balance 功能,如下图
Agent1 是一个路由节点,负责将 Channel 暂存的 Event 均衡到对应的多个 Sink
组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true #如果开启,则将失败的 sink 放入黑名单 a1.sinkgroups.g1.processor.selector = round_robin # 另外还支持 random a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结 束时,若仍然无法接收,则超时时间呈指数增长
在第一台机器上配置:exec-avro.conf文件
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/logs/123.log
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node-2
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node-3
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
启动命令bin/flume-ng agent -c conf -f conf/exec-avro.conf -n agent1 -Dflume.root.logger=INFO,console
在第二台机器上配置:avro-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node-2
a1.sources.r1.port = 52020
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在第三台配置:avro-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node-3
a1.sources.r1.port = 52020
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动命令第二台和第三台可以通用:bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
最后模拟一个实时文件就可以了,同上面案例用时间死循环建立实时文件
容错机制:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 #优先级值, 绝对值越大表示优先级越高
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000 #失败的 Sink 的最大回退期(millis)