【图文解析 】MapReduce 示例程序编写及编码规范
上一步,我们查看了 WordCount 这个 MapReduce 程序的源码编写,可以得出几点结论:
1、 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,比如指定 Mapper 类和 Reducer 类 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);
2、 该程序中的 TokenizerMapper 类继承了 Mapper 类
3、 该程序中的 IntSumReducer 类继承了 Reducer 类
总结:MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分 编写该 MapReduce 程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类
那么现在就来编写我们自己的 Wordcount 程序 按照上面的编码规范,主体结构应该是这样:
MapReduce 程序编写规范:
1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
4、Mapper 中的业务逻辑写在 map()方法中
5、map()方法(maptask 进程)对每一个<K,V>调用一次
6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
7、Reducer 的业务逻辑写在 reduce()方法中
8、Reducetask 进程对每一组相同 k 的<K,V>组调用一次 reduce()方法
9、用户自定义的 Mapper 和 Reducer 都要继承各自的父类
10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象
WordCount 的业务逻辑:
1、 maptask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成 一个 key-value 对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask 去汇总
2、 reducetask 阶段将接受 maptask 的结果,来做汇总计数
下面是具体实现,首先看 Map:
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对
String[] words = value.toString().split(" ");
for(String word: words){
context.write(new Text(word), new IntWritable(1));
}
}
}
其次看 Reduce:
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计
int sum = 0;
for(IntWritable v: values){
sum += v.get();
}
context.write(key, new IntWritable(sum));
}
}
在看 Job:
public static void main(String[] args) throws Exception {
// 指定 hdfs 相关的参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
// 新建一个 job 任务
Job job = Job.getInstance(conf);
// 设置 jar 包所在路径
job.setJarByClass(WordCountMR.class);
// 指定 mapper 类和 reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 指定 maptask 的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定 reducetask 的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定该 mapreduce 程序数据的输入和输出路径
Path inputPath = new Path("/wordcount/input");
Path outputPath = new Path("/wordcount/output");
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 最后提交任务
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1); }