云创-数据需求拉取
云创-数据需求拉取-geyuan
葛远拉的数据比较麻烦,需要去mongo数据库去导出数据然后才能拉出来。
这样吧,先来看看需求吧。现在都是8月的,我用6 、7 月份的案例需求了哈。
面对这样的需求肯定是一头雾水的,确实如此,首先告诉我们了是apk一次下载安装列表。然后是6月份的。
说实话,这种从mongo搬到hive平台,拉取数据的方式吧,我在后期一定要做成etl的形式,只是现在没有时间罢了,后面会给你改造的等着。真的太麻烦了。
好了,我们直接说步骤、做法了。不要墨迹,任务实在太多了。
首先确定少了那些表,因为要做同步了。
其实准备条件还是蛮多的。
1.来确定一下:
从上面可以看到信息,这两个表的时间要正确。
可以看到,我们需要转移这张表。同事请注意这张表目前是空的。test_01.user_info_ft_month。ok。
2.进入一次转移程序。
(1)。首先呢,上来我们就
配置信息数据库:
192.168.0.142 mongodb 数据库,库io_report 表iotasklist
是192.168.0.142服务器。进入数据库,我们查询表,步骤
cd /home/mongo/db/bin
./mongo --port 27010
use io_report
db.iotasklist.find()
然后我们找一个这个目标表的数据,需要转移的。
db.iotasklist.find({ "connTable" : "user_info_dt" })
我们会把这行数据拉出来新增用户表。
{ "_id" : ObjectId("59b20896b536060595240d86"), "connDB" : "new_market_report", "connTable" : "user_info_dt", "dbConnUrl" : "mongodb://192.168.40.151:27010", "dbSeparator" : ",", "dbType" : "mongodb", "exportPath" : "/home/upload/sxm/ftp", "ftpHostName" : "211.151.182.188", "ftpPwd" : "hUpz8{YJ2ved", "ftpUser" : "upload", "query" : "{'dt_f':{'$gte':'2017-08-01','$lt':'2017-09-01'}}", "taskState" : "0", "columnNames" : "_id,ch,dt_f ", "hiveTable" : "test_01.user_info_ft_month_temp", "partitions" : "pt:2017-08", "overwrite" : "true", "importPath" : "/home/hive/sxm/ftp", "runningStartTime" : "2017-09-08 11:07:48", "ip" : "192.168.0.149", "last_operate" : "2017-09-07", "runningStopTime_2" : "2017-09-08 11:07:48", "runningStopTime_4" : "2017-09-08 11:08:33", "runningStopTime_5" : "2017-09-08 11:08:37", "runningStopTime_6" : "2017-09-08 11:08:40", "runningStopTime_0" : "2017-09-08 11:08:40" }
以下是改写过后的案例。当然了,重要的json配置,已经写在上面的红框框里面了。
{
"connDB": "new_market_report",
"connTable": "user_info_dt",
"dbConnUrl": "mongodb://192.168.40.151:27010",
"dbSeparator": ",",
"dbType": "mongodb",
"exportPath": "/home/upload/sxm/ftp",
"ftpHostName": "211.151.182.188",
"ftpPwd": "hUpz8{YJ2ved",
"ftpUser": "upload",
"query": "{'dt_f':{'$gte':'2017-08-01','$lt':'2017-09-01'}}",
"taskState": "1",
"columnNames": "_id,ch,dt_f ",
"hiveTable": "test_01.user_info_ft_month",
"partitions": "pt:2017-09",
"overwrite": "true",
"importPath": "/home/hive/sxm/ftp",
"runningStartTime": "2017-09-08 15:07:48",
"ip": "192.168.0.149",
"last_operate": "2017-09-07",
"runningStopTime_2": "2017-09-08 11:07:48",
"runningStopTime_4": "2017-09-08 11:08:33",
"runningStopTime_5": "2017-09-08 11:08:37",
"runningStopTime_6": "2017-09-08 11:08:40",
"runningStopTime_0": "2017-09-08 11:08:40"
}
设置 "taskState" : "1" ,运行程序,该任务会被执行一次
(2)。向mongodb中插入刚才整理好的这条数据。
注意格式正确。下面为详细配置语句
db.iotasklist.insert({
"connDB": "new_market_report",
"connTable": "user_info_dt",
"dbConnUrl": "mongodb://192.168.40.151:27010",
"dbSeparator": ",",
"dbType": "mongodb",
"exportPath": "/home/upload/sxm/ftp",
"ftpHostName": "211.151.182.188",
"ftpPwd": "hUpz8{YJ2ved",
"ftpUser": "upload",
"query": "{'dt_f':{'$gte':'2017-08-01','$lt':'2017-09-01'}}",
"taskState": "1",
"columnNames": "_id,ch,dt_f ",
"hiveTable": "test_01.user_info_ft_month",
"partitions": "pt:2017-09",
"overwrite": "true",
"importPath": "/home/hive/sxm/ftp",
"runningStartTime": "2017-09-08 15:07:48",
"ip": "192.168.0.149",
"last_operate": "2017-09-07",
"runningStopTime_2": "2017-09-08 11:07:48",
"runningStopTime_4": "2017-09-08 11:08:33",
"runningStopTime_5": "2017-09-08 11:08:37",
"runningStopTime_6": "2017-09-08 11:08:40",
"runningStopTime_0": "2017-09-08 11:08:40"
})
如果失败了
db.iotasklist.find({ "connTable": "user_info_dt" });
db.iotasklist.remove({"_id" : ObjectId("59882e6eed68f95c99e57aee") })
要想严谨一点就是:
db.iotasklist.remove({"_id" : ObjectId("595c4f1c9ee34a4bf20076b9"), "taskState" : "0" })
(3)。配置完成运行
进入服务器192.168.0.149 可以执行一次任务
java -jar /home/hive/sxm/ftp/DataImport.jar /home/hive/sxm/ftp
(4)。如果看到这里你还没有把数据导进去,很可能是因为用户了,请注意
su hive
不能使用root啊,写不进去的。
一定要注意啊,149用hive用户。
到了最后一步了,在hive平台提数了。
然后,我们运行sql
付费产品一次下载pv 的在新增月份分布
select tmp.month, count(tmp.imsi) as pv
from
(
select a.imsi, substring(f.dt_f,1,7) as month from tyd.tyd_user_log_apk_ft as a
join oz_market.oz_apk as t
on a.apk_id = t.apk_id
join test_01.user_info_ft_month as f
on a.imsi =f.imsi
where a.pt >='2017-07-01' and a.pt <'2017-08-01'
and f.pt >'2017-01' and f.pt >'2017-07'
and t.from_ = 'tyd000' and t.hot=’1’
and a.ac_id ='3'
and a.t_from ='self'
and a.ch2_id <>'8'
and a.ch2_id <>'12'
and a.ch2_id <>'17'
and a.ch2_id <>'18'
and a.ch2_id <>'10'
and a.ch2_id <>'22'
)as tmp
group by tmp.month
查询7月份的付费产品一次下载pv 的在新增月份分布 (分渠道组)
select tmp.month, tmp.type, count(tmp.imsi) as pv
from
(
select a.imsi,dict.type, substring(f.dt_f,1,7) as month from tyd.tyd_user_log_apk_ft as a
join oz_market.oz_apk as t
on a.apk_id = t.apk_id
join test_01.user_info_ft_month as f
on a.imsi =f.imsi
join test_01.mst_dict_new as dict
on a.ch= dict.cd
where a.pt >='2017-07-01' and a.pt <'2017-08-01'
and f.pt >= '2017-01' and f.pt <= '2017-07'
and dict.type in('015','016','017','025')
and t.from_ = 'tyd000' and t.hot=’1’
and a.ac_id ='3'
and a.t_from ='self'
and a.ch2_id <>'8'
and a.ch2_id <>'12'
and a.ch2_id <>'17'
and a.ch2_id <>'18'
and a.ch2_id <>'10'
and a.ch2_id <>'22'
)as tmp
group by tmp.type,tmp.month
导入导出防坑操作:
此时我们切换hive用户,来到/home/hive/目录下面
导出:
hive -e "select * from testing.result;" | tr '\t' ',' >result.log