CDH版本Flume的数据采集Demo
DEMO:将mysql中的数据采集到HDFS上面
1、在CM界面中添加角色:
2、增加配置
agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.channels = ch1
agent.sinks = HDFS
agent.sources = sql-source
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent.sources.sql-source.connection.url = jdbc:mysql://192.168.128.146:3306/zyl
agent.sources.sql-source.user = root
agent.sources.sql-source.password = hadoop
agent.sources.sql-source.table = wlslog
agent.sources.sql-source.columns.to.select = *
agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0
agent.sources.sql-source.run.query.delay=5000
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
agent.sinks.HDFS.channel = ch1
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://rhel6-146:8020/flume/mysql
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.rollSize = 268435456
agent.sinks.HDFS.hdfs.rollInterval = 0
agent.sinks.HDFS.hdfs.rollCount = 0
3、准备JAR包
将MySQL JDBC驱动JAR包也复制到Flume库目录。cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar
4、数据源准备
create table wlslog
(id int not null,
time_stamp varchar(40),
category varchar(40),
type varchar(40),
servername varchar(40),
code varchar(40),
msg varchar(40),
primary key ( id )
);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to standby');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to starting');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to admin');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to resuming');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started weblogic adminserver');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to running');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
commit;
5、建立外部表
进入hive,创建外部表
CREATE EXTERNAL TABLE ext_wlslog
(id int,
time_stamp varchar(40),
category varchar(40),
type varchar(40),
servername varchar(40),
code varchar(40),
msg varchar(40)
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '"')
LOCATION
'hdfs://rhel6-146:8020/flume/mysql'
6、重启flume服务
7、查看hdfs和hive外部表数据
> select * from ext_wlslog;
OK
1 apr-8-2014-7:06:16-pm-pdt notice weblogicserver adminserver bea-000365 server state changed to standby
2 apr-8-2014-7:06:17-pm-pdt notice weblogicserver adminserver bea-000365 server state changed to starting
3 apr-8-2014-7:06:18-pm-pdt notice weblogicserver adminserver bea-000365 server state changed to admin
4 apr-8-2014-7:06:19-pm-pdt notice weblogicserver adminserver bea-000365 server state changed to resuming
5 apr-8-2014-7:06:20-pm-pdt notice weblogicserver adminserver bea-000361 started weblogic adminserver
6 apr-8-2014-7:06:21-pm-pdt notice weblogicserver adminserver bea-000365 server state changed to running
7 apr-8-2014-7:06:22-pm-pdt notice weblogicserver adminserver bea-000360 server started in running mode
8 apr-8-2014-7:06:22-pm-pdt notice weblogicserver adminserver bea-000360 server started in running mode
9 apr-8-2014-7:06:22-pm-pdt notice weblogicserver adminserver bea-000360 server started in running mode
10 apr-8-2014-7:06:22-pm-pdt notice weblogicserver adminserver bea-000360 server started in running mode
Time taken: 0.248 seconds, Fetched: 10 row(s)
8、实时增加mysql源表数据
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
commit;