分布式计算框架MapReduce入门 06
1. 理解MapReduce思想
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。
2. MapReduce编程规范及步骤
2.1 天龙八部
mapReduce一共有八个步骤
- map阶段两个步骤
第一步:读取文件,解析成key,value对 k1 v1
第二步:自定义map逻辑,接收k1 v1 转换成新的 k2 v2 输出 - shuffle阶段四个步骤
第三步:分区。相同key的数据,发送到同一个reduce里面去,key合并,value形成一个集合
第四步:排序
第五步:规约
第六步:分组 - reduce阶段的两个步骤
第七步:自定义reduce逻辑,接收k2 v2,转换成新的k3 v3 输出
第八步:将我们k3 v3 写出到目的地
3. WordCount示例编写
3.1 需求
在一堆给定的文本文件中统计输出每一个单词出现的总次数
3.2 数据格式准备如下
cd /export/servers
vim wordcount.txt
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop
hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/
3.3 代码编写
3.3.1 mapper类
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
for (String word : split) {
context.write(new Text(word),new LongWritable(1));
}
}
}
3.3.2 定义一个reducer类
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
* 自定义我们的reduce逻辑
* 所有的key都是我们的单词,所有的values都是我们单词出现的次数
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key,new LongWritable(count));
}
}
3.3.3 定义一个主类,用来描述job并提交job
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
//打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数
job.setJarByClass(JobMain.class);
//第一步:读取输入文件解析成key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://192.168.52.100:8020/wordcount"));
//第二步:设置我们的mapper类
job.setMapperClass(WordCountMapper.class);
//设置我们map阶段完成之后的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三步,第四步,第五步,第六步,省略
//第七步:设置我们的reduce类
job.setReducerClass(WordCountReducer.class);
//设置我们reduce阶段完成之后的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步:设置输出类以及输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.52.100:8020/wordcount_out"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
/**
* 程序main函数的入口类
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Tool tool = new JobMain();
int run = ToolRunner.run(configuration, tool, args);
System.exit(run);
}
}
3.3.4 bug解决
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
- 解决方法
直接将hdfs-site.xml当中的权限关闭即可
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
然后重启hdfs集群
4. MapReduce程序运行模式
4.1 本地运行
- mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行
- 而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
- 怎样实现本地运行?写一个程序,不要带集群的配置文件
本质是程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local参数 - 本地模式非常便于进行业务逻辑的debug,只要在eclipse中打断点即可
- 本地模式代码设置
configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
TextInputFormat.addInputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\3、大数据离线第三天\\wordcount\\input"));
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\3、大数据离线第三天\\wordcount\\output"));
4.2 集群运行模式
- 将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行
- 处理的数据和输出结果应该位于hdfs文件系统
- 提交集群的实现步骤:
- 代码设置
//打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数 job.setJarByClass(JobMain.class);
- 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
yarn jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar cn.itcast.hdfs.demo1.JobMain