MapReduce
MapReduce简介
- MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
- MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
MapReduce分为Mapper和Reducer两个阶段
(1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
(2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml(hadoop配置)配置文件里设置参数mapred.reduce.tasks的值,默认值为1
MapReduce处理流程图:
MapReduce的执行步骤:
1、Map任务处理
1.1 读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数.
<0,hello you> <10,hello me>
1.2 重写map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。
<hello,1> <you,1> <hello,1> <me,1>
1.3 对1.2输出的<k,v>进行分区。默认分为一个区。详见《Partitioner》
1.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同key的value放到一个集合中。
排序后:<hello,1> <hello,1> <me,1> <you,1>
分组后:<hello,{1,1}><me,{1}><you,{1}>
1.5 (可选)对分组后的数据进行归约。详见《Combiner》
2、Reduce任务处理
2.1 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。(shuffle)详见《shuffle过程分析》
2.2 对多个map的输出进行合并、排序。重写reduce函数,接收的是分组后的数据,实现自己的业务逻辑,
<hello,2> <me,1> <you,1>
处理后,产生新的<k,v>输出。
2.3 对reduce输出的<k,v>写到HDFS中。
代码实现:
Mapper
package com.sy.hadoop.mr.wordcount;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper类的四个泛型,
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
* KEYIN, VALUEIN 前两个框架输入的数据类型代表读取文件的
* key表示每一行的起始位置(偏移量offset)-value一行内容
* KEYOUT, VALUEOUT 后两个是mapper输出到Reduces的数据类型
* (Reduces和mapper的数据输入输出都是以key-value对的形式封装的)
*
*
* 默认情况下框架传递给我们的mapper的输入数据中,KEYIN是要处理的文本中一行的起始偏移量(Long),VALUEIN是这一行的内容(String)
*
* 由于输入输出都是走的网络,输入输出的数据类型序列化默认使用的是jdk自己的,但是jdk在序列化时会添加很多附加信息,
* 如果在高并发情况下这些附加信息会对整体的网络传递的数据量大大增加,所以hadoop中自己封装了一套数据类型减少了序列化的附加信息
* 例如: java中的Long 对应 hadoop中的 LongWritable
*/
public class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
/**
* MapReduce框架每拿一行数据调用一次此方法
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//编写具体的业务逻辑,我们业务要处理的数据在参数中
String line = value.toString();
String[] words = StringUtils.split(line," ");
//遍历单词数组 输出位key-val形式到Reducer
for (String str : words){
context.write(new Text(str),new LongWritable(1));
}
}
}
Reducer:
package com.sy.hadoop.mr.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* Reducer类的四个泛型,
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
* KEYIN, VALUEIN 前两个是mapper输出并进行排序和分组后的数据类型
* KEYOUT, VALUEOUT 后两个是Reduces输出的数据类型
* (Reduces和mapper的数据输入输出都是以key-value对的形式封装的)
*
*/
public class WCReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
* MapReduce框架在map处理完成后,
* 将所有key-value缓存起来,进行排序(key字典顺序排序)和分组(一个key和值堆为一组)key-values,
* 然后传递一个组,调用一次reduce方法
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
Iterator<LongWritable> iterator = values.iterator();
Long count = 0L;
while (iterator.hasNext()){
LongWritable it = iterator.next();
count += it.get();
}
//将内容输出
context.write(new Text(key.toString()),new LongWritable(count));
}
}
编写jar入口类(用于hadoop集群运行) 描述作业:
package com.sy.hadoop.mr.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 描述一个特定的作业
* 例如:该作业使用那个类作为逻辑处理中的map,那个作为reduce
*
* 还可以指定该作业要处理的数据所在路径
* 还可以指定该作业输出的结果该放到那个路径
*/
public class WCRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置整个job使用的类在那些jar包
job.setJarByClass(WCRunner.class);
//指定此作业使用的mapper和reduce类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//指定reduce的输出key-val类型 如果map和reduce输出类型一样话默认是对两个都起作用
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定map的输出key-val类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定map要处理的输入数据存放路径
FileInputFormat.setInputPaths(job,new Path("/wc/srcdata/"));
//指定reduce处理的结果数据存放位置
FileOutputFormat.setOutputPath(job,new Path("/wc/reduce"));
//将job提交给集群,参数为true表示是否打印运行进度
job.waitForCompletion(true);
}
}
执行命令:
将jar包分发到集群运行
hadoop jar <jar在linux的路径> <main方法所在的类的全类名> <参数>
本地直接运行main方法不过这样效率会低很多因为直接本地一个map和一个reduce运行并且都是通过网络传输数据