Hadoop(MapReduce工作机制)

MapReduce核心编程思想

Hadoop(MapReduce工作机制)
1)分布式的运算程序往往需要分成至少2个阶段
2)第一个阶段的MapTask并发实例,完全并行运行,互不相干
3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调
2)MapTask:负责Map阶段的整个数据处理流程
3)ReduceTask:负责Reduce阶段的整个数据处理流程

MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer、Driver

1.Mapper阶段
1)用户自定义的Mapper要继承自己的父类
2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
3)Mapper中的业务逻辑写在map()方法中
4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
5)map()方法(MapTask进程)对每一个<K,V>调用一次

2.Reducer阶段
1)用户自定义的Reducer要继承自己的父类
2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
3)Reducer的业务逻辑写在reduce()方法中
4)ReduceTask进程对每一组相同的k的<K,V>组调用一次reduce()方法

3.Driver阶段
相当于YARN集群的客户端,用于提交整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

MapReduce框架原理

1.InputFormat数据输入

Hadoop(MapReduce工作机制)
Hadoop(MapReduce工作机制)
FileInputFormat实现类

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等.
1)TextInputFormat
默认的FileInputFormat实现类;按行读取每条记录
2)KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割为key,value.可以通过在驱动类中设置confset(KeyValueLineRecord.KEY_VALUE_SEPERATOR,"\t");来设定分隔符.默认分隔符是tab(\t)
3)NLineInputFormat
如果使用NLineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按照NLineInputFormat指定的行数N来划分,即输入文件的总行数/N=切片数,如果不整除,切片数=商+1
4)自定义InputFormat
步骤如下:
a)自定义一个类继承FileInputFormat;
b)改写RecordReader,实现一次读取一个完整文件封装为KV;
c)设置Driver,在输出时使用SequenceFileOutPutFormat输出合并文件.

2.OutputFormat数据输出

OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,常见的接口实现类如下:
1)TextOutputFormat
是默认的输出格式;它把每条记录写为文本行
2)SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续MapReduce的输入,这是一种好的输出格式,紧凑,容易压缩
自定义OutputFormat
步骤如下:
a)自定义一个类继承FileOutputFormat
b)改写RecordWriter,具体改写输出数据的方法write()

MapReduce工作流程

Hadoop(MapReduce工作机制)
Hadoop(MapReduce工作机制)

MapTask工作机制

Hadoop(MapReduce工作机制)
1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value

2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value

3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中

4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作

溢写阶段详情:
a)利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序

b)按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作

c)将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中

5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件

当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index

在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销

ReduceTask工作机制

Hadoop(MapReduce工作机制)
1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中

2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多

3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可

4)Reduce阶段:reduce()函数将计算结果写到HDFS上

Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle.

Shuffle过程从第7步开始到第16步结束,具体Shuffle过程详解如下:
1)MapTask收集map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

Shuffle过程环形缓冲区的作用:
key,value从map()方法输出,被outputcollector收集通过getpartitioner()方法获取分区号,在进入环形缓冲区.默认情况下,环形缓冲区大小值为100MB。当map输入的数据进入环形缓冲区的量达到80MB以上时,那么开始执行溢写过程,溢写过程中如果有其他数据进入,那么由剩余的百分之二十反向写入.溢写过程会根据key,value先进行分区,后进行排序,最终maptask溢写文件经过归并排序后落入本地磁盘,reduceTask将多个mapTask下相同分区的数据copy到不同的reduceTask中进行归并排序后一次读取一组数据给reduce()函数

Partition分区

默认Partition分区是根据key的hashCode对ReduceTasks个数取模得到,用户没法控制哪个key存储到哪个分区.
自定义Partition分区步骤:
1)自定义类继承Partitioner,重写getPartition()方法
2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTask(Num);
分区总结
1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;
4)分区号必须从零开始,注意累加.

WritableComparable排序

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阙值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序.

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阙值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阙值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阙值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上所有数据进行一次归并排序.

排序分类
1)部分排序
MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序.
2)全排序
最总输出结果只有一个文件,且文件内部有序.
3)辅助排序(GroupingComparator分组)
对Reduce阶段的数据根据某一个或几个字段进行分组;
分组排序步骤:
a)自定义类继承WritableComparator
b)重写compare()方法
c)创建一个构造将比较对象的类传给父类
4)二次排序

自定义排序WritableComparable
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序

Combiner合并

Combiner合并
1)Combiner是MR程序中Mapper和Reducer之外的一种组件
2)Combiner组件的父类就是Reducer
3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;Reducer是在接收全局所有Mapper的输出结果
4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量
5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出KV应该跟Reducer的输入KV类型对应起来
6)自定义Combiner实现步骤:
a)自定义一个Combiner继承Reducer,重写Reduce方法
b)在Job驱动类中设置job.setCombinerClass();