mapreduce当中的GroupingComparator分组详解
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑
1、分组排序步骤:
(1)自定义类继承WritableComparator;
(2)重写compare()方法;
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
(3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
分组如图所示:
2、分组求top1案例
需求:现在有订单数据如下,要求取每个订单当中金额最大的商品?
订单id |
商品id |
成交金额 |
Order_0000001 |
Pdt_01 |
222.8 |
Order_0000001 |
Pdt_05 |
25.8 |
Order_0000002 |
Pdt_03 |
522.8 |
Order_0000002 |
Pdt_04 |
122.4 |
Order_0000002 |
Pdt_05 |
722.4 |
Order_0000003 |
Pdt_01 |
222.8 |
(1)自定义求top1分析图:
自定义求top1思路:
三个订单,以订单号分区,以金额分组。
- 第一步:读取订单文件,解析成k1,v1;k1类型为LongWritable,v1类型为Text;
- 第二步:mapTask接收k1,v1,转换成k2,v2;而要使得相同的订单号的数据,必须去到同一reduceTask中去,则必须以订单号为分区依据;故k2类型设置为自定义的OrderBean类型,OrderBean类定义两个变量,分别是String类型的orderId和Double类型的price;
- 第三步:以订单号orderId分区,相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合(分组);
- 第四步:排序,以金额为依据,对每个订单号里的数据进行排序(不同的订单号没有可比性,不用排序);
- 第五步:combiner,略;
- 第六步:分组,相同key的数据,其value为金额,以金额为分组依据;
- 第七步:reduceTask对分组后的数据进行处理,订单1输出数据为Order_0000001[222.8,25.8],订单2的数据Order_0000002[822.4,522.4,322.8]
- 将上述数据保存。
(2)自定义OrderBean对象
public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private Double price; @Override public int compareTo(OrderBean o) { //注意:如果是不同的订单之间,金额不需要排序,没有可比性 int orderIdCompare = this.orderId.compareTo(o.orderId); if (orderIdCompare == 0){ //比较金额,按照金额进行排序 int priceCompare = this.price.compareTo(o.price); return -priceCompare; }else { //如果订单号不同,没有可比性,直接返回订单号的排序即可 return orderIdCompare; } } /** * 序列化方法 * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(orderId); dataOutput.writeDouble(price); } /** * 反序列化方法 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.orderId = dataInput.readUTF(); this.price = dataInput.readDouble(); } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "OrderBean{" + "orderId='" + orderId + '\'' + ", price=" + price + '}'; } }
(3)自定义mapper类
public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {
/**
* Order_0000005 Pdt_01 222.8
Order_0000005 Pdt_05 25.8
Order_0000002 Pdt_03 322.8
Order_0000002 Pdt_04 522.4
Order_0000002 Pdt_05 822.4
Order_0000003 Pdt_01 222.8
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
//输出orderBean
context.write(orderBean, NullWritable.get());
}
}
(4)自定义分区类partition
public class GroupPartition extends Partitioner<OrderBean,NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numReduceTasks) {
//(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//注意这里:使用orderId作为分区的条件,来进行判断,保证相同的orderId进入到同一个reduceTask里面去
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
(5)自定义分组类group
/**
* 第六步:自定义分组逻辑
*/
public class MyGroup extends WritableComparator {
/**
* 覆写默认构造器,通过反射,构造OrderBean对象
* 通过反射来构造OrderBean对象
* 接受到的key2 是orderBean类型,我们就需要告诉分组,以orderBean接受我们的参数
*/
public MyGroup(){
super(OrderBean.class,true);
}
/**
* compare方法接受到两个参数,这两个参数其实就是我们前面传过来的OrderBean
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
//以orderId作为比较条件,判断哪些orderid相同作为同一组
return first.getOrderId().compareTo(second.getOrderId());
}
}
(6)自定义reduce类
public class GroupReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
(7)定义程序入口类main
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "group");
//第一步:读取文件,解析成为key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///E:\\MapReduce\\mapreduce当中的分组求topN\\input
"));
//第二步:自定义map逻辑
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步:分区
job.setPartitionerClass(GroupPartition.class);
//第四步:排序 已经做了
//第五步:规约 combiner 省掉
//第六步:分组 自定义分组逻辑
job.setGroupingComparatorClass(MyGroup.class);
//第七步:设置reduce逻辑
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//第八步:设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\MapReduce\\mapreduce当中的分组求topN\\output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
如何求每个组当中的top2的订单金额数据(即输出前面价格最大的)???
自定义求top2与top1的区别:
区别就是输出不同,top1中要求最大金额,故其以订单号为分区依据后,订单内部对金额进行了排序,在输出结果时,默认只输出最大值一条数据,故其OrderMapper类中只需要输出orderBean就可,其k2的类型为OrderBean,v2为NullWritable。
而top2要求输出金额的排序最大的两个值,即OrderMapper的输出不仅是orderBean,还需要price,这里就需要将v2的类型改为price的类型(即DoubleWritable)。
代码略。