flum执行原理及测试案例&&串联(负载均衡)

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采
集、聚合和传输的软件

数据采集

  • 数据从无到有的过程
  • 数据搬运传输的过程

flume是一个日志采集、聚合、汇总传输的软件。数据搬运的过程。

flum执行原理及测试案例&&串联(负载均衡)
flume具有三个核心组件

source:对接各个不同种类的数据源
sink:对接各个不同数据存放的目的地(下沉地)
channel:中间用于临时缓存数据的

以上三个组件加起来就构成了flume一个进程----agent.

flume作为工具没有主从主备的概念存在。它可以一个agent干活 也可以多个agent串联干活。

在flume 数据是以event形式存在的 event是flume最小 的数据单元。
在Flume中使用Event对象来作为传递数据的格式。

Sources端在flume-ng-core子项目中的org.apache.flume.serialization包下,有一个名为LineDeserializer的类,这个类负责把数据按行来读取,每一行封装成一个Event(实现方式:按字节读取,当遇到"\n"时封装成Event返回,下一次获取Event时继续获取下一字节并判断)。然后按用户设置的批量传输的值来封装List。

Event: { headers:{} body: 6E 69 68 61 6F 0D                               nihao. }
  • 如何下载cdh版本软件包
    cloudera archive
    http://archive.cloudera.com/cdh5/cdh/5/

    flume-ng-1.6.0-cdh5.14.0.tar.gz
    
  • flume启动命令
    bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console 全写版本

    bin/flume-ng agent -c ./conf -f ./conf/spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console  简写版本
    
    --conf(-c)       指定默认配置文件所在的目录
    --conf-file(-f)  指定采集方案所在的路径
    --name(-n)       flume agent 名字
    -Dflume.root.logger=INFO,console  开启日志功能 可以查看执行的过程
    
  • hdfs sink
    控制文件以何种形式进行滚动
    a1.sinks.k1.hdfs.rollInterval = 3 时间间隔
    a1.sinks.k1.hdfs.rollSize = 20 文件大小
    a1.sinks.k1.hdfs.rollCount = 5 evnet个数
    如果把某个属性设置为0 意味着该属性不生效
    如果都设置了 谁先满足触发条件 谁控制滚动

    是否开启时间上的舍弃。  文件夹如何滚动
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    

    spooldir source
    注意:其采集的监控的目录不能有同名的文件产生 如果有 第一报错 然后罢工。
    因此在企业实际中,通常把时间追加到文件名的尾部 避免同名现象的产生。

  • 如何模拟生成数据
    while true; do date >> /root/allen.txt ;sleep 0.5;done
    exec source :执行一个linux命令


  • 负载均衡:解决的是一个程序或者进程无法处理所有请求 然后部署多个程序或者进程一台处理的过程
    负载均衡的算法:轮询(round_robin) 随机(random) 权重
  • 如果涉及flume直接多级之间的串联
    使用avro sink、 avro source
  • 针对多级之间的flume架构 通常从远离数据源的那一级开始启动
  • 容错 指的容忍错误的发生 通常用于单点故障问题 给容易出故障设置备份 常见的是一个备份
    同一时间只能有一个工作 当工作的出现问题之后 备份才会切换顶替继续工作

  • 静态拦截器的使用
    没有使用静态拦截器
    Event: { headers:{} body: 6 Tue Feb 19 18:16 }
    使用静态拦截器:
    Event: { headers:{type=access} body: 6 Tue Feb 19 18:16 }
    Event: { headers:{type=web} body: 6 Tue Feb 19 18:16 }
    Event: { headers:{type=nginx} body: 6 Tue Feb 19 18:16 }
    如何使用:
    %{type} 即可取出对应的value值
    while true; do echo “access,access…” >> /root/logs1/access.log;sleep 0.5;done
    while true; do echo “nginx,nginx…” >> /root/logs1/nginx.log;sleep 0.5;done
    while true; do echo “web,web…” >> /root/logs1/web.log;sleep 0.5;done

安装:下载对应hadoop版本的flum安装包,

Flume 的安装非常简单
上传安装包到数据源所在节点上
然后解压 tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_HOME
 根据数据采集需求配置采集方案,描述在配置文件中(文件名可任意自定义)
 指定采集方案配置文件,在相应的节点上启动 flume agent

先用一个最简单的例子来测试一下程序环境是否正常
1、先在 flume 的 conf 目录下新建一个文件
vi netcat-logger.conf

定义这个 agent 中各组件的名字

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

克隆会话
用telnet 建立连接
yum install telnet
telnet localhost 44444
就可以演示你想要的输出的数据了~

. 采集目录到 HDFS

采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,
就需要把文件采集到 HDFS 中去
根据需求,首先定义以下 3 大要素
 采集源,即 source——监控文件目录 : spooldir
 下沉目标,即 sink——HDFS 文件系统 : hdfs sink
 source 和 sink 之间的传递通道——channel,可用 file channel 也可以用
内存 channel

监视文件夹
启动命令:  
bin/flume-ng agent -c ./conf -f ./conf/spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

测试: 往/home/hadoop/flumeSpool放文件(mv ././xxxFile /home/hadoop/flumeSpool),但是不要在里面生成文件


##############


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs2
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意:采集之后的文件会增加后缀(自动)再次采集就会报错,天生傲娇的特性:不能有同名文件,如果有会报错,罢工!

采集需求:比如业务系统使用 log4j 生成的日志,日志内容不断增加,需要把追
加到日志文件中的数据实时采集到 hdfs

根据需求,首先定义以下 3 大要素
 采集源,即 source——监控文件内容更新 : exec ‘tail -F file’
 下沉目标,即 sink——HDFS 文件系统 : hdfs sink
 Source 和 sink 之间的传递通道——channel,可用 file channel 也可以用
内存 channel

使用脚本创建死循环来获取实时日志数据:
while true; do date >> /root/logs/test.log;sleep 2;done

tail-hdfs.conf

用tail命令获取数据,下沉到hdfs
启动命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
########

Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1

Describe the sink

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H-%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


负载均衡:

  • 负载均衡:解决的是一个程序或者进程无法处理所有请求 然后部署多个程序或者进程一台处理的过程
    负载均衡的算法:轮询(round_robin) 随机(random) 权重
  • 如果涉及flume直接多级之间的串联
    使用avro sink、 avro source
  • 针对多级之间的flume架构 通常从远离数据源的那一级开始启动
  • 容错 指的容忍错误的发生 通常用于单点故障问题 给容易出故障设置备份 常见的是一个备份
    同一时间只能有一个工作 当工作的出现问题之后 备份才会切换顶替继续工作

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种
算法。Load balancing Sink Processor 能够实现 load balance 功能,如下图
Agent1 是一个路由节点,负责将 Channel 暂存的 Event 均衡到对应的多个 Sink
组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上

a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 k3 
a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true  #如果开启,则将失败的 sink 放入黑名单 a1.sinkgroups.g1.processor.selector = round_robin  # 另外还支持 random a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结 束时,若仍然无法接收,则超时时间呈指数增长 

在第一台机器上配置:exec-avro.conf文件

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/logs/123.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node-2
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node-3
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000




启动命令bin/flume-ng agent -c conf -f conf/exec-avro.conf -n agent1 -Dflume.root.logger=INFO,console

在第二台机器上配置:avro-logger.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node-2
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在第三台配置:avro-logger.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node-3
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令第二台和第三台可以通用:bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

最后模拟一个实时文件就可以了,同上面案例用时间死循环建立实时文件


容错机制:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 #优先级值, 绝对值越大表示优先级越高
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000 #失败的 Sink 的最大回退期(millis)