mapreduce当中的GroupingComparator分组详解

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑

1、分组排序步骤:

(1)自定义类继承WritableComparator;

(2)重写compare()方法;

@Override

public int compare(WritableComparable a, WritableComparable b) {

        // 比较的业务逻辑

        return result;

}

 (3)创建一个构造将比较对象的类传给父类

protected OrderGroupingComparator() {

        super(OrderBean.classtrue);

}

分组如图所示:

mapreduce当中的GroupingComparator分组详解

2、分组求top1案例

需求:现在有订单数据如下要求取每个订单当中金额最大的商品

订单id

商品id

成交金额

Order_0000001

Pdt_01

222.8

Order_0000001

Pdt_05

25.8

Order_0000002

Pdt_03

522.8

Order_0000002

Pdt_04

122.4

Order_0000002

Pdt_05

722.4

Order_0000003

Pdt_01

222.8

(1)自定义求top1分析图:

mapreduce当中的GroupingComparator分组详解

自定义求top1思路:

三个订单,以订单号分区,以金额分组。

  1. 第一步:读取订单文件,解析成k1,v1;k1类型为LongWritable,v1类型为Text;
  2. 第二步:mapTask接收k1,v1,转换成k2,v2;而要使得相同的订单号的数据,必须去到同一reduceTask中去,则必须以订单号为分区依据;故k2类型设置为自定义的OrderBean类型,OrderBean类定义两个变量,分别是String类型的orderId和Double类型的price;
  3. 第三步:以订单号orderId分区,相同key的数据发送到同一个reduce里面去,相同key合并value形成一个集合(分组)
  4. 第四步:排序,以金额为依据,对每个订单号里的数据进行排序(不同的订单号没有可比性,不用排序);
  5. 第五步:combiner,略;
  6. 第六步:分组,相同key的数据,其value为金额,以金额为分组依据;
  7. 第七步:reduceTask对分组后的数据进行处理,订单1输出数据为Order_0000001[222.8,25.8],订单2的数据Order_0000002[822.4,522.4,322.8]
  8. 将上述数据保存。

(2)自定义OrderBean对象

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId;
    private Double price;

    @Override
    public int compareTo(OrderBean o) {
        //注意:如果是不同的订单之间,金额不需要排序,没有可比性
        int orderIdCompare = this.orderId.compareTo(o.orderId);
        if (orderIdCompare == 0){
            //比较金额,按照金额进行排序
            int priceCompare = this.price.compareTo(o.price);
            return -priceCompare;
        }else {
            //如果订单号不同,没有可比性,直接返回订单号的排序即可
            return orderIdCompare;
        }
    }

    /**
     * 序列化方法
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeDouble(price);
    }

    /**
     * 反序列化方法
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "OrderBean{" +
                "orderId='" + orderId + '\'' +
                ", price=" + price +
                '}';
    }
}

(3)自定义mapper类

public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {

    /**

     * Order_0000005    Pdt_01  222.8

     Order_0000005  Pdt_05  25.8

     Order_0000002  Pdt_03  322.8

     Order_0000002  Pdt_04  522.4

     Order_0000002  Pdt_05  822.4

     Order_0000003  Pdt_01  222.8

     * @param key

     * @param value

     * @param context

     * @throws IOException

     * @throws InterruptedException

     */

    @Override

    protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {

        String[] split = value.toString().split("\t");

        OrderBean orderBean = new OrderBean();

        orderBean.setOrderId(split[0]);

        orderBean.setPrice(Double.valueOf(split[2]));

        //输出orderBean

        context.write(orderBean, NullWritable.get());

    }

}

 

(4)自定义分区类partition

public class GroupPartition extends Partitioner<OrderBean,NullWritable> {

    @Override

    public int getPartition(OrderBean orderBeanNullWritable nullWritableint numReduceTasks) {

        //(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

        //注意这里:使用orderId作为分区的条件,来进行判断,保证相同的orderId进入到同一个reduceTask里面去

        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;

    }

}

(5)自定义分组类group

/**

 * 第六步:自定义分组逻辑

 */

public class MyGroup extends WritableComparator {

    /**

     * 覆写默认构造器,通过反射,构造OrderBean对象

     * 通过反射来构造OrderBean对象

     * 接受到的key2  是orderBean类型,我们就需要告诉分组,以orderBean接受我们的参数

     */

    public MyGroup(){

        super(OrderBean.class,true);

    }

    /**

     * compare方法接受到两个参数,这两个参数其实就是我们前面传过来的OrderBean

     * @param a

     * @param b

     * @return

     */

    @Override

    public int compare(WritableComparable aWritableComparable b) {

        OrderBean first = (OrderBean) a;

        OrderBean second = (OrderBean) b;

        //以orderId作为比较条件,判断哪些orderid相同作为同一组

        return first.getOrderId().compareTo(second.getOrderId());

    }

}

(6)自定义reduce类

public class GroupReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {

    @Override

    protected void reduce(OrderBean keyIterable<NullWritablevaluesContext contextthrows IOExceptionInterruptedException {

        context.write(key, NullWritable.get());

    }

}

(7)定义程序入口类main

public class GroupMain extends Configured implements Tool {

    @Override

    public int run(String[] argsthrows Exception {

        //获取job对象

        Job job = Job.getInstance(super.getConf(), "group");

        //第一步:读取文件,解析成为key,value对

        job.setInputFormatClass(TextInputFormat.class);

        TextInputFormat.addInputPath(job,new Path("file:///E:\\MapReduce\\mapreduce当中的分组求topN\\input

"));

        //第二步:自定义map逻辑

        job.setMapperClass(GroupMapper.class);

        job.setMapOutputKeyClass(OrderBean.class);

        job.setMapOutputValueClass(NullWritable.class);

        //第三步:分区

        job.setPartitionerClass(GroupPartition.class);

        //第四步:排序  已经做了

        //第五步:规约  combiner  省掉

        //第六步:分组   自定义分组逻辑

        job.setGroupingComparatorClass(MyGroup.class);

        //第七步:设置reduce逻辑

        job.setReducerClass(GroupReducer.class);

        job.setOutputKeyClass(OrderBean.class);

        job.setOutputValueClass(NullWritable.class);

        //第八步:设置输出路径

        job.setOutputFormatClass(TextOutputFormat.class);

        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\MapReduce\\mapreduce当中的分组求topN\\output"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;

    }

    public static void main(String[] argsthrows Exception {

        int run = ToolRunner.run(new Configuration(), new GroupMain(), args);

        System.exit(run);

    }

}

 

如何求每个组当中的top2的订单金额数据(即输出前面价格最大的)???

自定义求top2与top1的区别:

区别就是输出不同,top1中要求最大金额,故其以订单号为分区依据后,订单内部对金额进行了排序,在输出结果时,默认只输出最大值一条数据,故其OrderMapper类中只需要输出orderBean就可,其k2的类型为OrderBean,v2为NullWritable。

而top2要求输出金额的排序最大的两个值,即OrderMapper的输出不仅是orderBean,还需要price,这里就需要将v2的类型改为price的类型(即DoubleWritable)。

代码略。