Hadoop MapReduce 过程详解

MapReduce是一种用于数据处理的编程模型。 该模型很简单,但不易于表达有用的程序.Hadoop可以运行用各种语言编写的MapReduce程序; 在本章中,我们将看一下用Java程序。 最重要的是,MapReduce程序本质上是并行的,因此将大规模的数据分析交给任何拥有足够机器的人。 MapReduce对于大型数据集来说是独一无二的,所以让我们先看一下。

 

使用Hadoop分析数据

要利用Hadoop提供的并行处理,我们需要将查询表达为MapReduce作业。经过一些本地的小规模测试后,我们将能够在一组机器上运行它。

Map和Reduce MapReduce的工作原理是将处理分为两个阶段:Map阶段和Reduce阶段。每个阶段都有键值对作为输入和输出,其类型可由程序员选择。程序员还指定了两个函数:map函数和reduce函数。

Example 2-3. Mapper for the maximum temperature example

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper

extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature = Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if (airTemperature != MISSING && quality.matches("[01459]")) {

context.write(new Text(year), new IntWritable(airTemperature));

}

}

}

 

Mapper类是泛型类型,具有四个形式类型参数,用于指定map函数的输入键,输入值,输出键和输出值类型。对于本示例,输入键是长整数偏移,输入值是文本行,输出键是年,输出值是空气温度(整数)。

Hadoop不是使用内置的Java类型,而是提供自己的一组基本类型,这些类型已针对网络序列化进行了优化。这些可以在org.apache.hadoop.io包中找到。这里我们使用LongWritable,它对应于Java Long,Text(如Java String)和IntWritable(如Java Integer)。

map()方法传递一个键和一个值。我们将包含输入行的Text值转换为Java String,然后使用其substring()方法提取我们感兴趣的列。

map()方法还提供了一个Context实例来写输出。在这种情况下,我们将年份写为Text对象(因为我们只是将它用作键),并且温度包含在IntWritable中。只有在温度存在且质量代码表明温度读数正常时,我们才会写出输出记录。

使用Reducer类似地定义reduce函数,如例2-4所示。

 

 

Example 2-4. Reducer for the maximum temperature example

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;

for (IntWritable value : values) {

maxValue = Math.max(maxValue, value.get());

}

context.write(key, new IntWritable(maxValue));

}

}

 

同样,四个正式类型参数用于指定输入和输出类型,这次是reduce函数。 reduce函数的输入类型必须与map函数的输出类型匹配:Text和IntWritable。 在这种情况下,reduce函数的输出类型是Text和IntWritable,一年及其最高温度,我们通过迭代温度并将每个温度与迄今为止发现的最高记录进行比较来找到。

 

第三段代码运行MapReduce作业(参见例2-5)。

例2-5。 用于查找天气数据集中的最高温度的应用程序

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

public static void main(String[] args) throws Exception {

if (args.length != 2) {

System.err.println("Usage: MaxTemperature <input path> <output path>");

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(MaxTemperature.class);

job.setJobName("Max temperature");

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

Job对象形成作业的规范,并使您可以控制作业的运行方式。 当我们在Hadoop集群上运行此作业时,我们会将代码打包到JAR文件中(Hadoop将在集群中分发)。 我们可以在Job的setJarByClass()方法中传递一个类,而不是显式指定JAR文件的名称,Hadoop将通过查找包含此类的JAR文件来查找相关的JAR文件。

Job对象形成作业的规范,并使您可以控制作业的运行方式。当我们在Hadoop集群上运行此作业时,我们会将代码打包到JAR文件中(Hadoop将在集群中分发)。我们可以在Job的setJarByClass()方法中传递一个类,而不是显式指定JAR文件的名称,Hadoop将通过查找包含此类的JAR文件来查找相关的JAR文件。

 构建了Job对象后,我们指定了输入和输出路径。通过在FileInputFormat上调用静态addInputPath()方法来指定输入路径,它可以是单个文件,目录(在这种情况下,输入形成该目录中的所有文件)或文件模式。顾名思义,可以多次调addInputPath()来使用多个路径的输入。

输出路径(其中只有一个)由FileOutputFormat上的静态setOutput Path()方法指定。它指定写入reduce函数的输出文件的目录。在运行作业之前,该目录不应该存在,因为Hadoop会抱怨并且不会运行该作业。这种预防措施是为了防止数据丢失(意外地用另一个工作的输出覆盖长工作的输出可能非常烦人)。

接下来,我们通过setMapperClass()和setReducerClass()方法指定要使用的map和reduce类型。

setOutputKeyClass()和setOutputValueClass()方法控制reduce函数的输出类型,并且必须与Reduce类生成的类型相匹配。映射输出类型默认为相同的类型,因此如果映射器生成与reducer相同的类型(在我们的示例中),则不需要设置它们。但是,如果它们不同,则必须使用setMapOutputKeyClass()和setMapOutputValueClass()方法设置Map输出类型。

 

输入类型是通过输入格式控制的,我们没有明确设置,因为我们使用的是默认的TextInputFormat。

在设置定义map和reduce函数的类之后,我们就可以运行该作业了。 Job上的waitForCompletion()方法提交作业并等待它完成。该方法的单个参数是一个标志,指示是否生成详细输出。如果为true,则作业会将有关其进度的信息写入控制台。

waitForCompletion()方法的返回值是一个布尔值,表示成功(true)或失败(false),我们将其转换为程序的退出代码0或1。

本节和整本书中使用的Java MapReduce API称为“新API”;它取代了旧的,功能相当的API。附录D中介绍了两种API之间的差异,以及如何在两种API之间进行转换的提示。您还可以在那里找到与最高温度应用相当的旧API。

 

测试运行在编写MapReduce作业之后,在一个小数据集上尝试它以清除代码中的任何直接问题是正常的。 首先,以独立模式安装Hadoop(有关如何在附录A中执行此操作的说明)。 这是Hadoop使用本地文件系统与本地作业运行器一起运行的模式。 然后,使用本书网站上的说明安装和编译示例。

让我们在前面讨论的五行示例中对它进行测试(输出已经稍微重新设置以适应页面,并且已经删除了一些行)

 

 

% export HADOOP_CLASSPATH=hadoop-examples.jar

% hadoop MaxTemperature input/ncdc/sample.txt output

 

当使用类名作为第一个参数调用hadoop命令时,它会启动Java虚拟机(JVM)来运行该类。 hadoop命令将Hadoop库(及其依赖项)添加到类路径中,并获取Hadoop配置。为了将应用程序类添加到类路径中,我们已经定义了一个名为HADOOP_CLASSPATH的环境变量,hadoop脚本会选择该变量。

使用Hadoop分析数据| 29在本地(独立)模式下运行时,本书中的程序都假设您已经以这种方式设置了HADOOP_CLASSPATH。应该从安装示例代码的目录运行命令。

运行作业的输出提供了一些有用的信息。例如,我们可以看到作业被赋予了job_local26392882_0001的ID,并且它运行了一个map任务和一个reduce任务(具有以下ID:attempt_lo cal26392882_0001_m_000000_0 attempt_local26392882_0001_r_000000_0)。

调试MapReduce作业时,了解作业和任务ID非常有用。

标题为“计数器”的输出的最后一部分显示了Hadoop为其运行的每个作业生成的统计信息。这些对于检查处理的数据量是否符合预期非常有用。例如,我们可以跟踪通过系统的记录数量:五个Map输入记录产生五个Map输出记录(因为映射器为每个有效输入记录发出一个输出记录),然后五个减少两个组中的输入记录(一个对于每个唯一键)产生两个减少输出记录。

 

缩小您已经看到MapReduce如何适用于小输入;现在是时候对系统进行鸟瞰,并查看大输入的数据流。为简单起见,到目前为止的示例都使用了本地文件系统上的文件。但是,要扩展,我们需要将数据存储在分布式文件系统中(通常是HDFS,您将在下一章中了解到)。这允许Hadoop使用Hadoop的资源管理系统YARN将MapReduce计算移动到托管部分数据的每台机器上(参见第4章)。让我们看看它是如何工作的。

数据流首先,一些术语。 MapReduce作业是客户端希望执行的工作单元:它由输入数据,MapReduce程序和配置信息组成。 Hadoop通过将其分成任务来运行工作,其中有两种类型:映射任务和减少任务。任务使用YARN进行调度,并在群集中的节点上运行。如果任务失败,它将自动重新安排在另一个节点上运行。

Hadoop将MapReduce作业的输入划分为称为输入拆分的固定大小的块,或者只是拆分。 Hadoop为每个拆分创建一个映射任务,该任务为拆分中的每个记录运行用户定义的映射函数。

具有许多分割意味着与处理整个输入的时间相比,处理每个分割所花费的时间很小。因此,如果我们并行处理拆分,则当拆分较小时,处理更好地实现负载平衡,因为更快的机器将能够在作业过程中比较慢的机器按比例处理更多的拆分。即使机器相同,失败的进程或同时运行的其他作业也可以实现负载平衡,并且随着分割变得更细粒度,负载平衡的质量也会提高。

另一方面,如果拆分太小,管理拆分和映射任务创建的开销将开始占据总的作业执行时间。对于大多数作业,良好的分割大小往往是HDFS块的大小,默认情况下为128 MB,尽管可以为群集(对于所有新创建的文件)更改或在创建每个文件时指定。

Hadoop尽力在输入数据驻留在HDFS中的节点上运行map任务,因为它不使用宝贵的集群带宽。这称为数据局部优化。但是,有时,承载地图任务输入拆分的HDFS块复制品的所有节点都在运行其他映射任务,因此作业调度程序将在与其中一个块相同的机架中的节点上查找空闲映射槽。偶尔甚至这是不可能的,因此使用机架外节点,这导致机架间网络传输。这两种可能性如图2-2所示。

 

现在应该清楚为什么最佳分割大小与块大小相同:它是可以保证存储在单个节点上的最大输入大小。 如果拆分跨越两个块,则任何HDFS节点都不可能存储两个块,因此一些拆分必须通过网络传输到运行map任务的节点,这显然不如运行整个映射有效。 任务使用本地数据。

映射任务将其输出写入本地磁盘,而不是HDFS。 为什么是这样? 映射输出是中间输出:它由reduce任务处理以产生最终输出,一旦作业完成,映射输出就可以丢弃。 因此,将其存储在HDFS中并进行复制将是一种过度的做法。 如果运行map任务的节点在reduce任务使用了map输出之前失败,那么Hadoop将自动重新运行另一个节点上的map任务以重新创建map输出。

Hadoop MapReduce 过程详解

Hadoop MapReduce 过程详解

 

减少任务没有数据局部性的优势;单个reduce任务的输入通常是所有映射器的输出。在本示例中,我们有一个由所有map任务提供的reduce任务。因此,有序映射输出必须通过网络传输到正在运行reduce任务的节点,在那里它们被合并,然后传递给用户定义的reduce函数。 reduce的输出通常存储在HDFS中以确保可靠性。如第3章所述,对于reduce输出的每个HDFS块,第一个副本存储在本地节点上,其他副本存储在机架外节点上以确保可靠性。因此,写入reduce输出确实消耗了网络带宽,但只消耗了普通HDFS写入管道所消耗的数量。

具有单个reduce任务的整个数据流如图2-3所示。虚线框表示节点,虚线箭头表示节点上的数据传输,实线箭头表示节点之间的数据传输。

Hadoop MapReduce 过程详解

 

Hadoop MapReduce 过程详解

reduce任务的数量不受输入大小的控制,而是独立指定。

当存在多个reducer时,map任务会对其输出进行分区,每个reduce会为每个reduce任务创建一个分区。每个分区中可以有许多键(及其关联值),但任何给定键的记录都在一个分区中。分区可以通过用户定义的分区功能来控制,但通常默认的分区器 - 使用散列函数来分组** - 非常有效。

图2-4显示了多个reduce任务的一般情况下的数据流。

此图清楚地说明了为什么map和reduce任务之间的数据流通常被称为“shuffle”,因为每个reduce任务都由许多map任务提供。 shuffle比这个图表建议的更复杂,调整它会对作业执行时间产生很大影响,正如您将在第197页的“随机排序”中看到的那样。

Hadoop MapReduce 过程详解

Hadoop MapReduce 过程详解

 

最后,还可以实现零减少任务。 当您不需要shuffle时,这可能是合适的,因为处理可以完全并行执行(一些示例在“NLineInputFormat”在234页中讨论)。 在这种情况下,唯一的节点外数据传输是映射任务写入HDFS时(参见图2-5)。

组合器功能许多MapReduce作业受到群集上可用带宽的限制,因此最大限度地减少map和reduce任务之间传输的数据是值得的。 Hadoop允许用户指定要在地图输出上运行的组合器函数,组合器函数的输出形成reduce函数的输入。 由于组合器功能是一种优化,因此Hadoop无法保证为特定的地图输出记录调用它的次数(如果有的话)。 换句话说,调用组合器函数零,一次或多次应该从reducer产生相同的输出。

Hadoop MapReduce 过程详解

组合器函数的契约约束了可以使用的函数类型。

用一个例子可以很好地说明这一点。 假设对于最高温度示例,1950年的读数由两个地图处理(因为它们处于不同的分裂中)。 想象一下第一张地图产生了输出:

(1950, 0)

(1950, 20)

(1950, 10)

第二个产生:

(1950, 25)

(1950, 15)

将使用所有值的列表调用reduce函数:

(1950, [0, 20, 10, 25, 15])

输出:

(1950, 25)

 

因为25是列表中的最大值。 我们可以使用一个组合函数,就像reduce函数一样,可以找到每个map输出的最高温度。 然后调用reduce函数:

(1950,[20,25])

并将产生与以前相同的输出。 更简洁的是,我们可以在这种情况下表示函数调用温度值,如下所示:

 

max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25

 

 

Not all functions possess this property.1 For example, if we were calculating mean tem‐

peratures, we couldn’t use the mean as our combiner function, because:

mean(0, 20, 10, 25, 15) = 14

but:

mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15

 

组合器功能不替换reduce功能。 (怎么可能呢?还需要使用reduce函数来处理来自不同映射的相同键的记录。)但是它可以帮助减少映射器和reducer之间混乱的数据量,仅此因此它总是值得的 考虑是否可以在MapReduce作业中使用组合器功能。

指定组合器函数回到Java MapReduce程序,组合器函数是使用Reducer类定义的,对于此应用程序,它与MaxTemperatureReducer中的reduce函数相同。 我们需要做的唯一改变是在Job上设置组合器类(参见例2-6)。

 

Example 2-6. Application to find the maximum temperature, using a combiner func‐

tion for efficiency

public class MaxTemperatureWithCombiner {

public static void main(String[] args) throws Exception {

if (args.length != 2) {

System.err.println("Usage: MaxTemperatureWithCombiner <input path> " +

"<output path>");

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(MaxTemperatureWithCombiner.class);

job.setJobName("Max temperature");

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);

job.setCombinerClass(MaxTemperatureReducer.class);

job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

 

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

运行分布式MapReduce作业

相同的程序将在完整数据集上运行,无需更改。 这就是重点

MapReduce:它可以扩展到您的数据大小和硬件大小。