MapReduce之二次排序
二次排序:
mapreduce计算过程和输出,都是啊按照key自动排序的,要是想value也要排序输出,即key第一排序,value第二次排序的方式。
1 步骤图如下
2 主要代码实现:
以题为例,集群上某表结构为(学号,姓名,课程名称,成绩)四个属性。用MR框架实现学号第一次排序你相同学号的情况下再成绩倒叙排列。最后输出字段(学号,姓名,课程名称,成绩)。
原表如下图:代码:
public class SecondSort {
/**
* 自定义的newKey
*/
public static class KeyPairWritable implements
WritableComparable<KeyPairWritable> {
// 组合key,key1是分区key,key2是二次排序key
private String key1;
private int key2;
public KeyPairWritable() {
}
public KeyPairWritable(String key1, int key2) {
this.set(key1, key2);
}
// 一次性将两个key设置成完
public void set(String key1, int key2) {
this.key1 = key1;
this.key2 = key2;
}
// 当map端写出的时候的序列化方法,即map如何将对象写出去,保证与读取的顺序一致
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(key1);
arg0.writeInt(key2);
}
// 在reducer读取数据时候的反序列化方法,即reduce如何将对象读取出来,保证与写入的顺序一致
public void readFields(DataInput arg0) throws IOException {
this.key1 = arg0.readUTF();
this.key2 = arg0.readInt();
}
// 自定义比较器方法,先比较key1,确定分区号。在分区号相同的情况下,去比较key2
// 就不需要单独写一个Comparator了
public int compareTo(KeyPairWritable o) {
int compare = this.key1.compareTo(o.key1);
if (compare != 0) {
return compare;
} else {
// 降序排列,故将o放到前边即可
return o.key2-this.getkey2();
}
}
public int getkey2() {
return key2;
}
public void setkey2(int key2) {
this.key2 = key2;
}
public String getkey1() {
return key1;
}
public void setkey1(String key1) {
this.key1 = key1;
}
}
// map类,实现map函数
public static class LineProcessMapper extends
Mapper<Object, Text, KeyPairWritable, Text> {
// 暂存每个传过来的词的值,省掉重复申请空间
private KeyPairWritable outputKey = new KeyPairWritable();
private Text outputValue = new Text();
// 核心map方法的具体实现,逐个<key,value>对去处理
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 通过context对象,将map的输出逐个输出
String tempLine = value.toString();
if (tempLine != null && tempLine.trim().length() > 0) {
String[] columnArray = tempLine.split("\\s");
outputKey.set(columnArray[0], Integer.parseInt(columnArray[3]));
outputValue.set(columnArray[1]+"\t"+columnArray[2]+"\t"
+columnArray[3]);
context.write(outputKey, outputValue);
}
}
}
// 自定义分区类,包证同key的记录,如S1,S2等,能映射到相同的reduce端去处理
public static class SecondPartitioner extends
Partitioner<KeyPairWritable, IntWritable> {
// 采集默认的HashPartiton实现即可
@Override
public int getPartition(KeyPairWritable key, IntWritable value,
int numPartitions) {
return (key.getkey1().hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
/**
* 在shuffle阶段的sort全局排序完成后,如何对数据记录进行分组
*/
public static class SecondSortGroupComparator extends WritableComparator {
// 对象KeyPairWritable.class注册,让比较器知道该对象并能够初始化
protected SecondSortGroupComparator() {
super(KeyPairWritable.class, true);
}
@Override
public int compare(WritableComparable first, WritableComparable second) {
if (first == null || second == null) {
return 0;
}
KeyPairWritable newKey1 = (KeyPairWritable) first;
KeyPairWritable newKey2 = (KeyPairWritable) second;
// 自定义按原始数据中第一个key分组
return newKey1.getkey1().compareTo(newKey2.getkey1());
}
}
// reduce类,实现reduce函数
public static class SortReducer extends
Reducer<KeyPairWritable, Text, Text, Text> {
private Text outputKey = new Text();
// 核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理
public void reduce(KeyPairWritable keyPair,
Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 进来时已经排序完成
outputKey.set(keyPair.getkey1());
for (Text val : values) {
context.write(outputKey, val);
}
}
}
// 启动mr的driver方法
public static void main(String[] args) throws Exception {
// 得到集群配置参数
Configuration conf = new Configuration();
// 参数解析器
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if ((remainingArgs.length != 2)) {
System.err .println("Usage: yarn jar jar_path main_class_path -D参数列表 <in> <out>");
System.exit(2);
}
// 设置到本次的job实例中
Job job = Job.getInstance(conf, "二次排序");
// 指定本次执行的主类是WordCount
job.setJarByClass(SecondSort.class);
// 指定map类
job.setMapperClass(LineProcessMapper.class);
// 指定partition类
job.setPartitionerClass(SecondPartitioner.class);
job.setGroupingComparatorClass(SecondSortGroupComparator.class);
// 指定reducer类
job.setReducerClass(SortReducer.class);
// 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
job.setMapOutputKeyClass(KeyPairWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定输入数据的路径
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
// 指定输出路径,并要求该输出路径一定是不存在的
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
// 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
通过hdfs环境验证:
com.gf.csdn.SecondSort \
-Dmapred.reduce.tasks=1 \
/tmp/table/student_score.txt /
tmp/tianliangedu/output/jingjingli_98
左后显示的结果: