Flume架构及核心组件

Flume架构及核心组件
1)Source 收集
2)Channel 聚集
3)Sink 输出
channel相当于一个通道,类似于一个数据的缓存池,提供一个数据临时存放的地方。在操作系统层面,写数据到磁盘,先会把数据写到内存里面,等内存满了之后,才会法内存里的数据flash到磁盘。(这样的好处就是减少直接与磁盘的交互次数,能够提升很大的性能的)
flume 把Source手机过来的数据指定一定的大小,先写到Channel里面,等Channel满了之后,在通过Sink写到目的地里面去
Sink的作用就是把数据从Channel里面读取出来,推送到目的地里面去
Flume架构及核心组件
Setting multi-agent flow:设置多代理流程
In order to flow the data across multiple agents or hops, the sink of the previous agent and source of the current hop need to be avro type with the sink pointing to the hostname (or IP address) and port of the source.
为了跨多个代理或跳数据流,先前代理的接收器和当前跳的源需要是avro类型,接收器指向源的主机名(或IP地址)和端口。
Flume架构及核心组件
Consolidation:合并
A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.
日志收集中非常常见的情况是大量日志生成客户端将数据发送到连接到存储子系统的少数消费者代理。例如,从数百个Web服务器收集的日志发送给写入HDFS集群的十几个代理

Flume架构及核心组件
Multiplexing the flow:多路复用流程
Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.
Flume支持将事件流多路复用到一个或多个目的地。这是通过定义可以复制或选择性地将事件路由到一个或多个信道的流复用器来实现的。
Flume架构及核心组件
flume安装前置条件
1.Java Runtime Environment - Java 1.8 or later
(JDK至少1.8)
2.Memory - Sufficient memory for configurations used by sources, channels or sinks
(内存 - 源,通道或接收器使用的配置的足够内存)
3.Disk Space - Sufficient disk space for configurations used by channels or sinks
(磁盘空间 - 通道或接收器使用的配置的足够磁盘空间)
4.Directory Permissions - Read/Write permissions for directories used by agent
(目录权限 - 代理使用的目录的读/写权限)

安装JDK
下载解压
将java配置系统环境变量中:~/.bash_profile
export JAVA_HOME=/home/hadoop/app/jdk(看自己Java安装的位置)
export PATH =$JAVA_HOME/bin: $PATH
source下让其配置生效
检测java-version

安装flume
flume-env.sh导出 exprort JAVA_HOME
flume-nf version 检测版本

flume实战
第一个需求:从指定网络端口采集数据输出到控制台

example.conf: A single-node Flume configuration
使用flume的关键就是写配置文件
#A)配置Source
#B)配置Channel
#C)配置Sink
#D)把以上三个组件串起来

#a1:agent的名称
#r1:source的名称
#k1:sink的名称
#c1:channel的名称

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#A netcat-like source that listens on a given port and turns each line of text into an event
(netcat会监听一个端口,然后把每一行的数据转换成一个事件)
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
#logger:logs event at INFO level 日志信息通过INFO级别输出
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1​    

使用flume就是配置flume的agent文件
cd flume
创建文件 vi example.conf
启动 start agent(官方写法):

bin / flume-ng agent 
-n $ agent_name 
-c conf -f conf / flume-conf.properties.template 
-Dflume.root.logger= INFO,console  

配置说明信息:
-n the name of this agent(required):agent的名字
-c (–conf):use configs in directory:配置文件的目录
-f specify a config file (–conf -file):特别的配置文件,即是创建的配置文件
需求一写法:

$ bin / flume-ng agent
-n a1 
-c conf -f conf /example.conf
-Dflume.root.logger= INFO,console

使用telnet进行测试:
eg:telnet hadoop000 44444
之后既可以输出数据到 控制台
控制台信息(注意)
Event: {headers:{}body:68 65 6c 6c 6f 0d hello.}
event是flume数据传输的基本单元
event=可选的header +byte array

需求二:监控一个文件实时采集新增的数据输出到控制台
Agent选型:观看官网 exec source+memory channel +logger sink

cd flume
创建配置文件:vi exec-memory-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command=tail -F /home/hadoop/data/data.log
a1.sources.r1.shell=/bin/sh -c
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1​ ​

之后启动

$ bin / flume-ng agent 
-n a1
-c conf -f conf /exec-memory-logger.conf
-Dflume.root.logger= INFO,console ​ ​

之后可通过 echo hello >>data.log 向文件中添加信息
做到实时监控某一个文件 ,并把文件的情况输出到控制台

需求三:将A服务器上的日志实时采集到B服务器(日志实在web服务器上面,如何收集到hdfs或者hadoop服务器上)
技术选型:
exec source +memory channel + avro sink
avro source +memory channel +logger sink
此需求一定要掌握好,并熟悉官网流程模型图
Flume架构及核心组件
第一个配置文件:
exec-memory-avro.conf

ec-memory-avro.sources = exec-source
exec-memory-avrof.sinks = avro sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command=tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell=/bin/sh -c

exec-memory-avro.sinks.avro sink.type =avro
exec-memory-avro.sinks.avro sink.hostname = hadoop000
exec-memory-avro.sinks.avro sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro sink.channel = memory-channel​ ​​  

第二个配置文件 avro-memory-logger.conf

avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel​ 

先启动avro-memory-logger,因为其先监听44444端口

bin / flume-ng agent 
-n  avro-memory-logger
-c conf -f conf /avro-memory-logger.conf
-Dflume.root.logger= INFO,console ​

再启动exec-memory-avro.conf

bin / flume-ng agent 
-n  exec-memory-avro 
-c conf -f conf /exec-memory-avro.conf
-Dflume.root.logger= INFO,console ​ 

Flume架构及核心组件
对于大数据学习最为主要的学习方法个人建议就是进官网学习!
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#data-flow-model