MapReduce 统计手机用户的上行流量,下行流量,总流量,并对输出的结果进行倒序排序。(一)
首先,要知道hadoop自带的LongWritable 是没办法存储三个变量,即用户的上行流量,下行流量,总流量。
这个时候,没办法,你就要去写一个属于你自己的接口,去实现能够放入这三个数据。
MapReduce中传输自定义数据类型(Bean->setter+getter)
(1) 要在Hadoop的各个节点之间传输,就必须实现其序列化机制,实现 Writable接口 ,重写两个方法:
readFields(DataInput):将Bean序列化到传输流中
write(DataOutput):从传输流中还原为Bean
(2) Hadoop的序列化机制与原生JDK不同,只传输Bean本身,而不传输继承结构信息,只要数据以减少冗余
(3)序列化还原时,底层使用反射->需要无参构造方法
(4)如果要实现自定义排序,则必须实现WritableComparable<T>接口。注:不能使用Writable+Comparable<T>组合,否则抛出initialization异常
这里定义为flowbean,实现WritableComparable接口
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @Description TODO 流量bean
* @Date 10-17-2018
* @ClassName:FlowBean
*/
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;// 上行流量
private long downFlow;// 下行流量
private long sumFlow;// 总流量
// 序列化时,需要反射调用空的无参数构造
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}/*
* 序列化:将我们要传输的数据序列化成字节流
*
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
//@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化:从字节流中恢复出各个字段
*
* 顺序和序列化的顺序一致
*/
//@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
//@Override
public int compareTo(FlowBean fb) {
//倒序排序
// TODO Auto-generated method stub
//返回1则交换,-1则不交换。
return this.sumFlow > fb.getSumFlow() ? -1 : 1;
}
}
然后就是mapreduce代码:
/**
* @Descripition:计算用户的上行下行流量,以及总流量,并且对总流量以倒序排序。
* @Date:2018-10-17
* @ClassName: Flow_log
*
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Flow_log {
//Map
public static class myMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
String[] line = value.toString().split("\t");
String phone = line[1];
int size = line.length;
Long upRate = Long.parseLong(line[size - 3]);
Long downRate = Long.parseLong(line[size - 2]);
context.write(new FlowBean(upRate,downRate), new Text(phone));
}
}
//Reduce
/**
* 在Hadoop默认的排序算法中,只会针对key值进行排序,
* 所以应该将Text,FlowBean的位置换一下,
* 让reduce排序算法去排FlowBean的顺序,
* 并且在FlowBean中重写compareTo方法
*/
public static class myReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
public void reduce (FlowBean key, Iterable<Text> values, Context context) throws IOException,InterruptedException{
/*long upRate_count = 0;
long downRate_count = 0;
long sumRate = 0;
for(FlowBean b : values){
upRate_count += b.getUpFlow();
downRate_count += b.getDownFlow();
//sumRate = upRate_count + downRate_count;
}*/
context.write(values.iterator().next(), key);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Flow_log");
job.setJarByClass(Flow_log.class);
job.setMapperClass(myMapper.class);
job.setReducerClass(myReducer.class);
//指定mapper输出数据的k,v类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//指定最终输出数据的k,v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 :1);
}
}
其次,要知道,mapreduce是对key值进行排序的,所以当我们实现了WritableComparable接口的compareTo方法之后,还需要把我们的mapper输出k,v的值换一下位置,确保把流量的数据放在key的位置,这样reduce过程才能将其排序,而我们重写之后,就能对进行修改,变成倒序。