MapReduce对手机上网记录的简单分析和Partitioner分区

概览

1.MapReduce处理手机上网记录
2.Partitioner分区

上次说过了关于MapReduce的执行流程和原理,下面来说下分区和简单示例

1.MapReduce处理手机上网记录

首先我们需要先模拟一个通话记录文件

在Windows的桌面建个tel.log的文件,里面模拟一些通话记录信息

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200

这些字段代表的是
MapReduce对手机上网记录的简单分析和Partitioner分区

首先我们需要将部分字段提取出来,以便之后进行分析

在主机master上启动hadoop集群,hadoop集群版的搭建可以参照简单的hadoop集群搭建

start-all.sh

验证是否启动成功

然后将tel.log文件利用Xftp传输到虚拟机中的/usr/tmp下cd /usr/tmp/

然后上传到hdfs

#上传
[[email protected] tmp]# hadoop fs -put tel.log /
#查看
[[email protected] tmp]# hadoop fs -ls /
Found 1 items
-rw-r--r--   1 root supergroup       2315 2018-10-19 19:33 /tel.log

然后在eclipse上新建java项目,并在项目下建个lib文件夹,然后将jar包放到lib中导入项目
MapReduce对手机上网记录的简单分析和Partitioner分区
MapReduce对手机上网记录的简单分析和Partitioner分区
MapReduce对手机上网记录的简单分析和Partitioner分区
然后创建包,创建一个telBean实体类,这次我们分析的是
手机号和其对应的上行流量,下行流量和总流量
所以将其封装成实体类

package com.hd.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class TelBean implements Writable{
	
	private String tel;
	private long upPayLoad;
	private long downPayLoad;
	private long totalPayLoad;
	
	
	
	public String getTel() {
		return tel;
	}

	public void setTel(String tel) {
		this.tel = tel;
	}

	public long getUpPayLoad() {
		return upPayLoad;
	}

	public void setUpPayLoad(long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}

	public long getDownPayLoad() {
		return downPayLoad;
	}

	public void setDownPayLoad(long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}

	public long getTotalPayLoad() {
		return totalPayLoad;
	}

	public void setTotalPayLoad(long totalPayLoad) {
		this.totalPayLoad = totalPayLoad;
	}

	public TelBean(String tel, long upPayLoad, long downPayLoad, long totalPayLoad) {
		super();
		this.tel = tel;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = totalPayLoad;
	}

	public TelBean() {
		super();
		// TODO Auto-generated constructor stub
	}

	@Override
	public String toString() {
		return  tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t"
				+ totalPayLoad ;
	}

	//反序列化的过程
	@Override
	public void readFields(DataInput in) throws IOException {
		this.tel = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
		this.totalPayLoad = in.readLong();
	}

	//序列化的过程
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(this.tel);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
		out.writeLong(this.totalPayLoad);
	}


}

在mr包下创建个TelMapper类继承Mapper
首先分析一下,我们要传入的第一个需要Map处理的<k1,v1>是long类型(电话号码)和String类型(与之对应的一行记录),而从Map处理过的<k2,v2>是String类型(电话号码)和TelBean对象(将我们需要的字段封装成对象)

package com.hd.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.zy.hadoop.entity.TelBean;

//k1,v1 long string  k2,v2  string TelBean
public class TelMapper extends Mapper<LongWritable, Text, Text, TelBean> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TelBean>.Context context)
			throws IOException, InterruptedException {
		//value 对应 tel.log中的每一行数据,行中的数据以\t隔开的
		String line = value.toString();
		//对正航读取的数据进行拆分
		String[] res = line.split("\t");//0---res.length-1
		//取数组中的电话号码
		String tel = res[1];
		//取上行流量
		long upPayLoad = Long.parseLong(res[8]);
		//取下行流量
		long downPayLoad = Long.parseLong(res[9]);
		//创建telBean对象
		TelBean telBean = new TelBean(tel, upPayLoad, downPayLoad, 0);
		context.write(new Text(tel), telBean); 
	}

}

然后创建个TelReducer类继承Reducer
分析一下,这里传入的<k2,v2>是String(Text)类型和TelBean类型,而我们处理过输出的<k3,v3>也是相同类型,这里要记得将TelBean的toString方法重写,不然输出的是对象地址

package com.hd.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.zy.hadoop.entity.TelBean;

public class TelReducer extends Reducer<Text, TelBean, Text, TelBean> {

	@Override
	protected void reduce(Text key, Iterable<TelBean> value, Reducer<Text, TelBean, Text, TelBean>.Context context)
			throws IOException, InterruptedException {
		//声明一个上行流量的变量
		long upPayLoad = 0;
		//声明一个下行流量
		long downPayLoad = 0;
		for (TelBean telBean : value) {
			//统计相同电话的上行流量的和,下行流量的和
			upPayLoad += telBean.getUpPayLoad();
			downPayLoad += telBean.getDownPayLoad();
		}
		
		//k3 v3
		TelBean telBean= new TelBean(key.toString(), upPayLoad, downPayLoad, upPayLoad+downPayLoad);
		
		context.write(key, telBean);
	}

}

最后我们创建个主方法TelCount类

package com.zy.hadoop.mr1;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zy.hadoop.entity.TelBean;


public class TelCount {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  {
		// 1.获取job
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		// 2.指定job使用的类
		job.setJarByClass(TelCount.class);

		// 3.设置Mapper的属性
		job.setMapperClass(TelMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TelBean.class);

		// 4.设置输入文件 args[0]手动输入输入文件的位置
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 5.设置reducer的属性
		job.setReducerClass(TelReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TelBean.class);

		// 6.设置输出文件夹,查看结果保存到hdfs文件夹中的位置
		//args[1]手动输入输出文件的位置
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		
		// 7.提交 true 提交的时候打印日志信息
		job.waitForCompletion(true);

	}

}

最后将其打成Jar包,主方法选择TelCount,然后上传到虚拟机/usr/tmp下

然后执行jar包

hadoop jar tel_1.jar /tel.log /tel1

等待执行成功后查看结果文件

#结果如下
[[email protected] tmp]# hadoop fs -cat /tel1/part-r-00000
13480253104	13480253104	180	180	360
13502468823	13502468823	7335	110349	117684
13560436666	13560436666	2232	1908	4140
13560439658	13560439658	2034	5892	7926
13602846565	13602846565	1938	2910	4848
13660577991	13660577991	6960	690	7650
13719199419	13719199419	240	0	240
13726230503	13726230503	2481	24681	27162
13726238888	13726238888	2481	24681	27162
13760778710	13760778710	120	120	240
13826544101	13826544101	264	0	264
13922314466	13922314466	3008	3720	6728
13925057413	13925057413	11058	48243	59301
13926251106	13926251106	240	0	240
13926435656	13926435656	132	1512	1644
15013685858	15013685858	3659	3538	7197
15920133257	15920133257	3156	2936	6092
15989002119	15989002119	1938	180	2118
18211575961	18211575961	1527	2106	3633
18320173382	18320173382	9531	2412	11943
84138413	84138413	4116	1432	5548

这说明执行成功了

2.Partitioner分区

什么是Partitioner?

在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。

我们还是处理手机的上网记录
在之前的mr包中见一个TCPartitioner类
我们将135和136开头的号码视为移动用户,处理结果放到一起(part-r-00000),另外的号码处理结果放到一起(part-r-00001)

package com.hd.hadoop.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import com.zy.hadoop.entity.TelBean;

public class TCPartitioner extends Partitioner<Text, TelBean> {

	@Override
	public int getPartition(Text text, TelBean telBean, int arg2) {
		// TODO Auto-generated method stub
		
		String tel = text.toString();
		String sub_tel = tel.substring(0, 3);//取手机号前三位进行分区

		//假设135 136的为移动的 放一个分区,其他的放一个分区
		if(sub_tel.equals("135")||sub_tel.equals("136")){
			//return的数对应着计算结果文件 part-r-00001
			return 1;
		}
		
		return 0;
	}

}

然后在TelCount添加几行代码

MapReduce对手机上网记录的简单分析和Partitioner分区

再将项目打成jar包放入到虚拟机/usr/tmp下,然后执行

hadoop jar tel_2.jar /tel.log /tel2

等待执行完毕后查看结果

#查看生成几个结果文件
[[email protected] tmp]# hadoop fs -ls /tel2
Found 3 items
-rw-r--r--   1 root supergroup          0 2018-10-19 20:10 /tel2/_SUCCESS
-rw-r--r--   1 root supergroup        603 2018-10-19 20:10 /tel2/part-r-00000
-rw-r--r--   1 root supergroup        198 2018-10-19 20:10 /tel2/part-r-00001
#查看135和136开头的手机号的结果
[[email protected] tmp]# hadoop fs -cat /tel2/part-r-00001
13502468823	13502468823	7335	110349	117684
13560436666	13560436666	2232	1908	4140
13560439658	13560439658	2034	5892	7926
13602846565	13602846565	1938	2910	4848
13660577991	13660577991	6960	690	7650

这就是简单的分区操作