MapReuce之流量统计
MapReuce之流量统计
准备工具
- 一部安装过hadoop的虚拟机
- 一份上网流量数据
开启hadoop
start-all.sh
上传flow.log文件
hadoop fs -put /flow.log /
pojo层
import lombok.Data;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//Writable是可以对实现的类进行序列化和反序列化
@Data
public class TelBean implements Writable {
private String tel;
private Long up;
private Long down;
private Long total;
public TelBean() {
}
public TelBean(String tel, Long up, Long down) {
this.tel = tel;
this.up = up;
this.down = down;
this.total = up + down;
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeUTF(tel);
out.writeLong(up);
out.writeLong(down);
out.writeLong(total);
}
//反序列化
public void readFields(DataInput in) throws IOException {
this.tel = in.readUTF();
this.up = in.readLong();
this.down = in.readLong();
this.total = in.readLong();
}
}
mapper层
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TCMapper extends Mapper<LongWritable, Text,Text,TelBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");//每一列是以制表符来间隔的
String tel = split[1];
Long up = Long.parseLong(split[split.length - 3]);
Long down = Long.parseLong(split[split.length - 2]);
TelBean telBean = new TelBean(tel, up, down);
context.write(new Text(tel),telBean);
}
}
reducer层
import com.zhiyou100.pojo.TelBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TCReducer extends Reducer<Text, TelBean,Text,TelBean> {
@Override
protected void reduce(Text key, Iterable<TelBean> values, Context context) throws IOException, InterruptedException {
Long up = 0L;
Long down = 0L;
for (TelBean telBean:values) {
up = up + telBean.getUp();
down = down + telBean.getDown();
}
TelBean telBean = new TelBean(key.toString(),up,down);
context.write(key,telBean);
}
}
partition层,分区层
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//参数:这两个参数是reducer输出的参数
public class MyPartition extends Partitioner<Text, TelBean> {
@Override
public int getPartition(Text text, TelBean telBean, int numPartitions) {
String tel = text.toString();
if(tel.startsWith("138")||tel.startsWith("136")){
return 1;
}else{
return 0;
}
}
}
job层
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TCJob {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
//2.指定job使用的类
job.setJarByClass(TCJob.class);
//3.设置mapper的类以及属性
job.setMapperClass(TCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TelBean.class);
//4.设置reducer的类以及属性
job.setReducerClass(TCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TelBean.class);
//设置分区
job.setPartitionerClass(MyPartition.class);
job.setNumReduceTasks(2);
//5.设置输入文件 在调用的时候动态的传递参数
FileInputFormat.setInputPaths(job, new Path(args[0]));
//6.设置输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交任务
job.waitForCompletion(true);
}
}
将其打包成jar包并上传到服务器上
hadoop jar hdfs1-1.0-SNAPSHOT.jar com.zhiyou100.job.TCJob /flow.log /a3
查看两个文件里面的东西
总结
mapper层:负责对传入的数据进行数据清洗,转换成我们需要处理的键值对的格式
reducer层:负责对数据进行处理
job层:负责控制整个工程代码的执行顺序
注意:无论是排序还是分区都是对key进行操作的
按总流量排序
先不分区查询他的总上行,下行,总流量,然后对这个生成的结果进行排序
pojo层
修改如下
mapper层
修改如下
reducer层
分区层
job层
运行结果: