【图文解析 】MapReduce 示例程序编写及编码规范

上一步,我们查看了 WordCount 这个 MapReduce 程序的源码编写,可以得出几点结论:

1、 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,比如指定 Mapper 类和 Reducer 类 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);

2、 该程序中的 TokenizerMapper 类继承了 Mapper 类

3、 该程序中的 IntSumReducer 类继承了 Reducer 类 
 
总结:MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分 编写该 MapReduce 程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类 
 
那么现在就来编写我们自己的 Wordcount 程序 按照上面的编码规范,主体结构应该是这样: 

【图文解析 】MapReduce 示例程序编写及编码规范
MapReduce 程序编写规范:

1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)

2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)

3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)

4、Mapper 中的业务逻辑写在 map()方法中

5、map()方法(maptask 进程)对每一个<K,V>调用一次

6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式

7、Reducer 的业务逻辑写在 reduce()方法中

8、Reducetask 进程对每一组相同 k 的<K,V>组调用一次 reduce()方法

9、用户自定义的 Mapper 和 Reducer 都要继承各自的父类

10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象 
 
WordCount 的业务逻辑:

1、 maptask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成 一个 key-value 对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask 去汇总

2、 reducetask 阶段将接受 maptask 的结果,来做汇总计数 
 
下面是具体实现,首先看 Map: 
 【图文解析 】MapReduce 示例程序编写及编码规范

    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{          
    @Override  
    protected void map(LongWritable key, Text value,Context context)    
    throws IOException, InterruptedException {            

    // 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对   
    String[] words = value.toString().split(" ");   
    for(String word: words){    
    context.write(new Text(word), new IntWritable(1));   
        }  
    } 
} 


其次看 Reduce: 

 【图文解析 】MapReduce 示例程序编写及编码规范

static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 
    @Override  
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)            
        throws IOException, InterruptedException {   
        // 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计   
        int sum = 0;   
        for(IntWritable v: values){    
        sum += v.get();   
}   
        context.write(key, new IntWritable(sum));  
        } 
    }


 
在看 Job: 
 【图文解析 】MapReduce 示例程序编写及编码规范
 

public static void main(String[] args) throws Exception { 
    // 指定 hdfs 相关的参数  
    Configuration conf = new Configuration();  
    conf.set("fs.defaultFS", "hdfs://hadoop02:9000");          
    System.setProperty("HADOOP_USER_NAME", "hadoop"); 

    // 新建一个 job 任务  
    Job job = Job.getInstance(conf);    

    // 设置 jar 包所在路径  
    job.setJarByClass(WordCountMR.class);    

    // 指定 mapper 类和 reducer 类  
    job.setMapperClass(WordCountMapper.class);      
    job.setReducerClass(WordCountReducer.class);   
    // 指定 maptask 的输出类型  
    job.setMapOutputKeyClass(Text.class);  
    job.setMapOutputValueClass(IntWritable.class);    

    // 指定 reducetask 的输出类型  
    job.setOutputKeyClass(Text.class);  
    job.setOutputValueClass(IntWritable.class);    

    // 指定该 mapreduce 程序数据的输入和输出路径  
    Path inputPath = new Path("/wordcount/input");  
    Path outputPath = new Path("/wordcount/output");  
    FileInputFormat.setInputPaths(job, inputPath);  
    FileOutputFormat.setOutputPath(job, outputPath);    

    // 最后提交任务  
    boolean waitForCompletion = job.waitForCompletion(true);      
    System.exit(waitForCompletion?0:1); }