Hadoop之MapReduce

(一) 什么是MapReduce?

 (1).概念

官网上原话翻译成中文这么说的:Hadoop MapReduce是一个用于轻松编写应用程序的软件框架,它以可靠的容错方式在大型群集(数千个节点)的商品硬件上并行处理海量数据(多TB数据集).

  关键词:软件框架 可靠性 高容错 处理海量数据

说白了,其实mapreduce就是把大量数据通过分而治之的核心思想来实现分布式计算框架,核心架构map和reduce任务。

(2) Map任务

 Hadoop之MapReduce

2.1) 读取文档里面的数据内容,inputSplit将数据内容解析成k-v键值对,每对键值对调用一次map函数;

2.2) 实现自己的map函数,k-v键值对处理并输出到中间结果;

2.3) 对k-v键值对进行分区(partitioner)

2.4) 按照分区对k-v进行处理(排序和合并),

a.对相同key的value值放入一个list集合里面;

b.将不同分区的key相同的键值对放入同一个reduce处理。

2.5) 对mapper输出的结果进行归约并传入reduce(mapper任务的输出就是reduce任务的输入)

(3) Reduce任务

3.1)存在多个reduce的情况下,多mapper任务输出的结果按照不同分区复制到不同的reduce下,单个更简单;

3.2)reduce对mapper的输入结果进行合并、排序,然后在调用自己实现的reduce()方法来对k-v进行处理,得到新的

k-v键值对输出;

3.3)把通过自定义的reduce()处理后的结果存入文档;

(二) MapReduce的执行原理

 MapReduce 运行的时候,会通过Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出;

Reducer 任务会接收Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS 的文件中,如下图

Hadoop之MapReduce

2.1) Mapper任务执行流程

每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出

整个Mapper 任务的处理过程又可以分为以下几个阶段,如图

Hadoop之MapReduce

  ①:把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的;

如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB;么小的文件是一个输入片,大文件会分为两个数据块,

那么是两个输入片。一共产生三个输入片;每一个输入片由一个Mapper 进程处理。(ps:Hadoop1.0数据块block默认64M,Hadoop2.0以后默认都是128M)

②:对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对“键”是每一行的起始位置(单位是字节),“值”本行的文本内容。

③:每一对键值对都会调用一次map()来对输出数据做一些处理,并且每一次调用map()都会产生0到多对键值对| ps :这里的map()就是我们自己覆盖实现的。

④:对键值对按照一定算法进行分区,分区是指划分key的空间,分区的总数对应job的reduce数,也就是说分区和reduce是一一对应的,

默认只有一个分区,分区名为HashPartitioner。(ps:分区的实质其实就是数据归类,为了减轻reduce的压力)

⑤:对④步骤中各个分区内部根据key来进行排序,对key-value键值对进行分组。

⑥:对分区中的数据进行规约。(ps:减少到reduce的键值对数据量,作业时间变短,效率提高)

2.2) Reduce任务执行流程

 Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,步骤图所示:

 Hadoop之MapReduce

①:将各个分区的mapper输出的键值对传入到reduce里面;

②:将reduce里面的k-v键值对进行合并,接下来对其合并结果进行排序;

③:对执行好①、②操作后的键值对结果输出到指定位置。(ps:如HDFS文件系统)

2.3) 一个mapreduce的job简单原理图

Hadoop之MapReduce

ps:<key1,value1>{input} ------>map------> <key2, value2>{combine} ------> <key2,value2>----->reduce-----><key3,value3>{output}

(三) 单词计数的例子展示

3.1)单词计数结构原理模拟图

Combiner 节点负责完成上面提到的将同一个map中相同的key进行合并,避免重复传输,从而减少传输中的通信开销

Partitioner节点负责将map产生的中间结果进行划分,确保相同的key到达同一个reduce节点.

Hadoop之MapReduce

3.2)代码实例展示

ps:博主使用的开发工具是IDE,开发语言是java,当然有兴趣也使用其他语言去实现!

①:先来张wordcount类简略图

Hadoop之MapReduce

 ②:来分析下每个方法,这里就偷懒,就直接贴代码

public class WordCount {
    private final static String INPUT_PATH = "hdfs://master:9000/input"; //输入路径
    private final static String OUTPUT_PATH = "hdfs://master:9000/output"; //输出路径,必须是不存在的

    private static Configuration getConfiguration() {
        Configuration conf = new Configuration();
        return conf;
    }

    static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);//表示单词出现次数
        private Text word = new Text();//某个单词

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //表示每次处理一行数据,并且按空格分割
            StringTokenizer st = new StringTokenizer(value.toString());
            while (st.hasMoreTokens()) {
                word.set(st.nextToken());
                context.write(word, one);//写入上下文中,用于后续传入reduce
            }
        }
    }

    static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable(0); //某单词出现的总次数

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
                InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.创建Configuration
        Configuration cf = new Configuration();
        //2.准备环境
        Path outputPath = new Path(OUTPUT_PATH);
        Path inputPath = new Path(INPUT_PATH);
        FileSystem fs = FileSystem.get(getConfiguration());
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        //3.创建job
        Job job = new Job(cf, WordCount.class.getSimpleName());
        job.setJarByClass(WordCount.class);//打成jar
        FileInputFormat.setInputPaths(job, inputPath);//读取HDFS文件路径

        //设置map相关
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reduce相关
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置job的输出环境
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

第一步:打包我们上面的类大jar(ps:是jar包不是war包)

第二步:启动hadoop集群(ps:这里怎么配置集群我就不多说了,不懂的问度娘,网上一大堆)Hadoop之MapReduce

  第三步:新建输入路径并上传资源文件到HDFS,与java代码里面的input输出路径一一对应;

Hadoop之MapReduce

Hadoop之MapReduce

第四步:运行jar,获得结果

Hadoop之MapReduce

PS:MapReduce大概就这样了,等有机会更深入的了解,再来继续更新......