MapReduce之Shuffle详解

Shuffle机制

概念:主要是Map阶段之后,Reduce阶段之前对数据的分区排序合并分组过程

MapReduce之Shuffle详解

分区(Partition)

  1. 概述:为了将不同类型的内容输出到不同文件中,进行分类存储。

  2. 默认分区:

    1. HashPartitioner根据key的hashCode对ReduceTasks个数取模得到的进行分区,用户不能进行设置。
    2. 底层原理 : (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
  3. 自定义Partitioer步骤:

    1. 自定义类继承Partitioner重写getPartition方法(主要是控制分区代码逻辑)
    2. 在job驱动中,设置自定义Partitioner,
      • job.setPartitionerClass(CustomPartitioner.class);
    3. ​ 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
      • job.setNumReduceTasks(5);
  4. 小知识点:

    1. ReduceTask数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
    2. 1<ReduceTask数量<getPartition的结果数,则有一部分分区数据无处安放,会抛出Exception;
    3. ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
    4. 分区号必须从零开始,逐一累加。

排序(WritableComparable)

  1. 概述:MapTask与ReduceTask均默认会对数据按照key进行排序(字典排序,实现的方法是快速排序)。任何数据均会被排序,而不管逻辑上是否需要。
    1. MapTask会将数据放在环形缓冲区中,当环形缓冲区使用率达到一定阈值时,再对缓冲区中的数据进行一次快速排序溢写到磁盘上,当数据处理完毕后,会对磁盘上所有文件进行归并排序
    2. ReduceTask从每个MapTask上远程拷贝数据文件到内存中,如果文件大小超过一定阈值,就溢写到磁盘上。如果磁盘文件数目到达一定能阈值,就进行一次归并排序汇总成一个文件,当数据拷贝完毕后,统一对内存磁盘上的所有数据进行一次归并排序
  2. 排序分类
    1. 部分排序
      • MapReduce根据输入记录的键对数据排序,保证输出的每一个文件内部有序
    2. 全排序
      • 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但是失去了MapReduce所提供的并行架构
    3. 辅助排序
      • Reduce阶段对key分组。在接受key为bean对象是,想让一个或几个字段相同的key进入到同一个reduce中。
    4. 二次排序
      • 在自定义排序过程中,如果自定义bean实现了compareTo逻辑即为二次排序
  3. 自定义排序
    • bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

合并(Combiner)

  1. 概述
    1. MR程序中Mapper和Reduce之外的一种组件
    2. 父类是的Reduce
    3. Combiner在每一个MapTask所在的节点运行,Reduce是接收全局所有Mapper的输出架构
    4. 对每一个MapTask输出进行局部汇总减少网络传输量
    5. 使用的前提不能影响最终业务逻辑,要跟Reduce的输入KV对应
  2. 自定义Combiner实现步骤
    1. 自定义一个Combiner继承Reducer,重写Reduce方法
    2. 在Job驱动类中设置
      • job.setCombinerClass(WordcountCombiner.class);

分组(GroupingComparator)

  1. 概述
    • 对Reduce阶段的数据根据某一个或几个字段进行分组。
  2. 自定义分组排序
    1. 自定义类继承WritableComparator
    2. 重写compare()方法
    3. 创建一个构造将比较对象的类传给父类