执行MapReduce-原码分析
job提交:
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
connect();
info = jobClient.submitJobInternal(conf);;
super.setJobID(info.getID());
state = JobState.RUNNING;
}
//创建一个client链接
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
// 初始化个两个client
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}
//返回 client,这里判断了 是执行的是本地模式,还是RPC模式
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
}
}
job提交 jobClient.submitJobInternal(conf)
1.获得job运行时临时文件的地址,在hdfs上构造,之后会将一些运行时的信息写在这个文件中,
默认值是:/tmp/hadoop/mapred/staging 一般在使用的是配置中的:mapreduce.jobtracker.staging.root.dir
原码如下:
LocalJobRunner implements ClientProtocol
RunningJob submitJobInternal(final JobConf job
)
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
jobCopy);
/**
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
Path stagingRootDir =
new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
"/tmp/hadoop/mapred/staging"));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user;
randid = rand.nextInt(Integer.MAX_VALUE);
if (ugi != null) {
user = ugi.getShortUserName() + randid;
} else {
user = "dummy" + randid;
}
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
}
2.获得一个新的jobID,
本地文件+随机数+jobid:
public synchronized JobID getNewJobId() {
return new JobID("local" + randid, ++jobid);
}
3.构造 submitJobDir 使用的 1中返回的目录拼接jobid,并将这个值设置给当前job运行目录地址:
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
JobStatus status = null;
4.添加认证和**信息
a.从配置文件中读取token信息,如果没有之添加**信息即可
在这里 将token和secret信息初始化到jobconf中了
populateTokenCache(jobCopy, jobCopy.getCredentials());
5.拷贝client文件到hdfs
将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入文件)复制到一个以作业ID命名的目录下jobtracker的文件系统。
包含: -libjars, -files, -archives 三种类型的文件
这里有一个副本数量 默认是10 ,可以配置,
copyAndConfigureFiles(jobCopy, submitJobDir);
* configure the jobconf of the user with the command line options of
* -libjars, -files, -archives
private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir)
throws IOException, InterruptedException {
short replication = (short)job.getInt("mapred.submit.replication", 10);
copyAndConfigureFiles(job, jobSubmitDir, replication);
// Set the working directory
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(fs.getWorkingDirectory());
}
}
6.通过namenode获得token
TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
new Path [] {submitJobDir},
jobCopy);
7.初始化job执行时需要的文件路径信息 ,并将这些信息存放在 conf中
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
int reduces = jobCopy.getNumReduceTasks();
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
job.setJobSubmitHostAddress(ip.getHostAddress());
job.setJobSubmitHostName(ip.getHostName());
}
JobContext context = new JobContext(jobCopy, jobId);
8.检查输出文件信息,在这里我们会看到,如果输出目录不做设置或者输出目录已经存在的话就会报错了,系统就会退出
// Check the output specification
if (reduces == 0 ? jobCopy.getUseNewMapper() :
jobCopy.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(),
jobCopy);
output.checkOutputSpecs(context);
} else {
jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
}
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] {outDir},
job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
9.开始对输入文件做分片处理:
这里需要说明一下 其中writeNewSplits 主哦功能 调用了List<InputSplit> splits = input.getSplits(job); 这里就是我们在看的哦啊wordcount中 FileInputFormat中getSplits(conf)被调用的地方,可以看到map的数量就是有分片的数量决定的,具体分片操作参考:
http://younglibin.iteye.com/blog/1929255
http://younglibin.iteye.com/blog/1929278
// Create the splits for the job
FileSystem fs = submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
int maps = writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
10.将将要执行的任务队列提交 到管理队列中
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = jobCopy.getQueueName();
AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
jobCopy.set(QueueManager.toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
11.将这些文件的信息提交给job,在job执行的根据这写配置来获取文件内容
// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
12. 将这写配置信息 输出到 文件中,我们可以在job运行的临时目录下看到有个job.xml文件 这个文件中存放了关于这个job的所有配置信息,也可以通过50030端口,查看到这个文件;
jobCopy.writeXml(out);
job的初始化完成了,接下来就是job的执行了
13.终于开始提交job任务了
status = jobSubmitClient.submitJob(
jobId, submitJobDir.toString(), jobCopy.getCredentials());
/**
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
*/
public JobStatus submitJob(JobID jobid, String jobSubmitDir,
Credentials credentials)
throws IOException {
Job job = new Job(jobid, jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}
//
以上实现使用的是 一个local方式的,Job是 LocalJobRunner 的一个自己的类,
这个类 继承了一个Thread ,是多线程:
private class Job extends Thread implements TaskUmbilicalProtocol {
public Job(JobID jobid, String jobSubmitDir) throws IOException {
profile = new JobProfile(job.getUser(), id, systemJobFile.toString(),
"http://localhost:8080/", job.getJobName());
status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING,
profile.getUser(), profile.getJobName(), profile.getJobFile(),
profile.getURL().toString());
jobs.put(id, this);
this.start();
}
@Override
public void run() {
...............
List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
jobId, mapOutputFiles);
ExecutorService mapService = createMapExecutor(taskRunnables.size());
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : taskRunnables) {
mapService.submit(r);
}
.........................
reduce.setJobFile(localJobFile.toString());
localConf.setUser(reduce.getUser());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
myMetrics.launchReduce(reduce.getTaskID());
reduce.run(localConf, this);
myMetrics.completeReduce(reduce.getTaskID());
reduce_tasks -= 1;
}
}
job中调用 Map 和reduce
map:
我们看到在job线程中执行了 mapService.submit(r); 中的r 是 MapTaskRunnable 对象,所以这里真正提交了 map人物执行
protected class MapTaskRunnable implements Runnable {
public void run() {
map_tasks.getAndIncrement();
myMetrics.launchMap(mapId);
map.run(localConf, Job.this);
myMetrics.completeMap(mapId);
}
我们看到上边方法调用了 MapTask类 的 run
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
..............................................
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
// make a mapper
// make the input format
// rebuild the input split
// get an output object
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
output.close(mapperContext);
}
input.initialize(split, mapperContext);
调用的是: LineRecordReader
我们知道 FileInputForamt 的 子类 默认使用了 TextInputFormat 在 TextInputFormat 中我们构造了 return new LineRecordReader(recordDelimiterBytes);
所有我们在读取数据的时候我们使用的是: LineRecordReader
以上代码 有一段是 mapper.run(mapperContext); 在这里我们终于知道 谁调用了 这个run方法了吧,到这里,一个本地运行的maoreduce就可以串起来了
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
reduce:
reduce.run(localConf, this);
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
reducer.run(reducerContext);
trackedRW.close(reducerContext);
}
在上比那 我们也看到了 reduce 调用 reducer.run 的地方, 终于把一个流程串起来了 。