MapReduce
MapReduce编程模型
MapReduce是一种海量数据的并行编程模型和计算框架,它最早运行在Google的分布式系统GFS下,后面Hadoop也实现了MapReduce编程模型和计算框架。MapReduce的主要思想把大规模数据的操作分发一个主节点管理下的分节点共同完成,然后整合各分节点的结果,得到最终的结果。
并行编程会遇到各种复杂的问题:分布式存储、工作调度、负载均衡、容错处理、网络通信等。这些问题均由MapReduce处理。
MapReduce大致经历如下过程:输入——》Split——》Map——》Combine——》Shuffle——》Reduce。
Split:主要完成将输入分成split,然后每个split对应一个map。(默认一行一个key,value)
Map:按照用户编写map函数对split完成<key,value>的映射操作。
Combine: 将Map操作输出的<key,value>具有相同的key合在一起,主要是提高Reduce的效率。
Shuffle:将Combine的输出<key,vlalue>通过hash映射到相应的reducer。
Reduce:按照用户编写reduce函数整合最后的输出结果,并输出。
【注】上面各部分输出的<key,value>不一定是一样的。
【注】reducers的数目最好小于mapper的数目,mapred-site.xml的mpred.reduce.tasks属性存储了Reducers的数目,可以通过job.setNumReduceTasks()方法来重新设置该值。
MapReduce集群行为
(1)任务调度与执行
MapReduce任务由一个JobTracker和多个TaskTracker两类节点控制完成。JobTracker主要负责调度和管理TaskTracker,它通常运行Master节点(NameNode)上。JobTracker将Mappers和Reducers分配给空闲的TaskTracker,由TaskTracker负责这些任务的并行执行。TaskTracker运行在DataNode上,DataNode既是数据的存储节点,又是计算节点。JobTracker要监控TaskTracker任务运行的状况,如果某个TaskTracker运行出现状况,JobTracker要其任务分给其他节点。
【注】NameNode用心跳包用来监控DataNode。
(2)本地计算
把计算节点和数据节点放在同一台计算机上,这种方式有效的减少了数据在网络中的传输,降低了任务队网络带宽的要求,避免网络带宽成为瓶颈。所以Split不会跨域两台计算机的存储,即Split小于HDFS数据块的大小。
(3)Shuffle过程
MapReduce将Mapper的结果按照key值分为R份(R表示Reducers的个数),划分时常用哈希函数,如hash(key)%R。
(4)合并Mapper输出
正如前面所说,网络带宽很重要,所以在Shuffle之前加一个Combine过程,即中间结果具有相同的key进行合并,然后才Shuffle。
【注】Combine过程和Reduce过程相似。
(5)读取中间结果
在完成Combine和Shuffle过程后,Mapper的结果被直接写入本地磁盘,不是HDFS,因为Mapper输出的只是中间结果,当任务完成之后就可以直接删除了,如果存在HDFS,HDFS的备份机制会影响效率。存储好了通知JobTracker中间结果的位置,再由JobTracker告知Reducer到哪个DataNode去取中间结果。
MapReduce重要类
(1)InputForMat类
该类的作用是将输入分割成split,并将split进步拆分成<key1,value1>对作为map函数的输入。可以通过job.setInputFormatClass()方法进行设置,默认使用TextInputFormat类处理。TextInputFormat将输入的按照行分割成splits,并通过LineRecorderReader将其中的每一行解析成<key1,value1>对,key值对应第几行,value为行的内容。
(2)Mapper类
实现map函数,根据输入<key1,value1>生成中间结果<key2,value2>。可以通过job.setMapperClass()方法进行设置。MapReduce中有默认值。
(3)Combiner类
实现combine函数,根据输入<key2,value2>将相同key2的项目合并在一起。可以通过job.setCombinerClass()方法进行设置。
(4)Partitioner类
实现getPartion函数,用于在Shuffle过程按照key值将中间结果分成R份,每份有一个Reducer负责。可以通过job.setPartitionerClass()方法进行设置。
(5)Reducer类
实现Reduce函数,将中间结果合并,得到最终结果。可以通过job.seReducerClass()方法进行设置。MapReduce值有默认值。
(6)OutputFormat类
该类负责输出最终结果。可以通过通过job.setOutputFormatClass()方法进行设置。默认用TextOutputFormat类,TextOutputFormat将最终结果写成纯文本文件,每行一个<key,value>对。
(7)job.SetOutputKeyClass()和job.setOutputValueClass()用来设置最终结果的key和value类型。
(8)job.SetMapOutputKeyClass()和job.setMapOutputValueClass()用来设置Map输出的key和value类型。
MapReduce容错
Master周期性地给Worker发送ping命令,若没有应答,则认为Worker失效,终止其任务调度,把该任务调度到其他Worker上重新执行。
Master会周期性地设置检查点(checkpoint),并导出Master的数据。一旦某个任务失效,系统就从最近的一个检查点恢复并重新执行。
MapReduce的WordCount例子
代码参考:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/
(1)map过程
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="[^\\w]"; //正则表达式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString().toLowerCase(); //全部转为小写字母 line = line.replaceAll(pattern, " "); //将非0-9, a-z, A-Z的字符替换为空格 StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } }(2)Reduce过程
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }(3)主函数
public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount");//名字 conf.setOutputKeyClass(Text.class);//设置输出key,value conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class);//注意这个设置了reduce conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args[0]));//设置输出路径 conf.setOutputPath(new Path(args[1])); JobClient.runJob(conf);//运行 return 0; } public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); } int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }