MapReduce之Shuffle详解
Shuffle机制
概念:主要是Map阶段之后,Reduce阶段之前对数据的分区、排序、合并、分组过程
分区(Partition)
-
概述:为了将不同类型的内容输出到不同文件中,进行分类存储。
-
默认分区:
- HashPartitioner根据key的hashCode对ReduceTasks个数取模得到的进行分区,用户不能进行设置。
- 底层原理 : (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
-
自定义Partitioer步骤:
- 自定义类继承Partitioner重写getPartition方法(主要是控制分区代码逻辑)
- 在job驱动中,设置自定义Partitioner,
- job.setPartitionerClass(CustomPartitioner.class);
- 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
- job.setNumReduceTasks(5);
-
小知识点:
- ReduceTask数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
- 1<ReduceTask数量<getPartition的结果数,则有一部分分区数据无处安放,会抛出Exception;
- ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
- 分区号必须从零开始,逐一累加。
排序(WritableComparable)
- 概述:MapTask与ReduceTask均默认会对数据按照key进行排序(字典排序,实现的方法是快速排序)。任何数据均会被排序,而不管逻辑上是否需要。
- MapTask会将数据放在环形缓冲区中,当环形缓冲区使用率达到一定阈值时,再对缓冲区中的数据进行一次快速排序并溢写到磁盘上,当数据处理完毕后,会对磁盘上所有文件进行归并排序。
- ReduceTask从每个MapTask上远程拷贝数据文件到内存中,如果文件大小超过一定阈值,就溢写到磁盘上。如果磁盘文件数目到达一定能阈值,就进行一次归并排序汇总成一个文件,当数据拷贝完毕后,统一对内存磁盘上的所有数据进行一次归并排序
- 排序分类
- 部分排序
- MapReduce根据输入记录的键对数据排序,保证输出的每一个文件内部有序
- 全排序
- 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但是失去了MapReduce所提供的并行架构
- 辅助排序
- Reduce阶段对key分组。在接受key为bean对象是,想让一个或几个字段相同的key进入到同一个reduce中。
- 二次排序
- 在自定义排序过程中,如果自定义bean实现了compareTo逻辑即为二次排序
- 部分排序
- 自定义排序
- bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
合并(Combiner)
- 概述
- MR程序中Mapper和Reduce之外的一种组件
- 父类是的Reduce
- Combiner在每一个MapTask所在的节点运行,Reduce是接收全局所有Mapper的输出架构
- 对每一个MapTask输出进行局部汇总减少网络传输量
- 使用的前提不能影响最终业务逻辑,要跟Reduce的输入KV对应
- 自定义Combiner实现步骤
- 自定义一个Combiner继承Reducer,重写Reduce方法
- 在Job驱动类中设置
- job.setCombinerClass(WordcountCombiner.class);
分组(GroupingComparator)
- 概述
- 对Reduce阶段的数据根据某一个或几个字段进行分组。
- 自定义分组排序
- 自定义类继承WritableComparator
- 重写compare()方法
- 创建一个构造将比较对象的类传给父类