SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务

SQOOP源码系列文章是把SQOOP源码详细、简单地介绍给大家,本系列文章分为3个部分,一是ToolRunner从Window本地提交MapReduce任务到HDFS,二是Sqoop从读取配置文件到提交任务的过程分析,三是Sqoop中Map切割表数据到导入表的过程。

导读:SQOOP通过生成的MapReduce向hadoop集群提交任务,然而这个过程是怎样的呢,我们通过Hadoop提供的WordCount来模拟实现这一个过程,在往后的文章会详细地分析这些过程。

1.WordCount实现

package master.hadooptool;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCount {

	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		
		private final IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				this.word.set(itr.nextToken());
				context.write(this.word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		
		private IntWritable result = new IntWritable();
		
		public void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			this.result.set(sum);
			context.write(key, this.result);
		}
	}
}

WordCount写好后,把其打包成wordcount.jar,放到本地

2.运行Tool实现
SqoopTool继承了Configured并且实现了Tool类,因此我们的运行工具类也需要实现。改类实现了以后,ToolRunner运行的时候,就会把本地的Jar包上传到HDFS上,不需要我们操作

package master.hadooptool;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.PropertyConfigurator;

public class WordCountTool extends Configured implements Tool {

	public static void main(String[] args) throws Exception {

		PropertyConfigurator.configure("log4j.properties");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		//加载Hadoop配置文件到Configuration中
		String HADOOP_CONFS[] = { "core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml",
				"hive2-site.xml", "hbase-site.xml" };
		Configuration conf = new Configuration();
		for (String name : HADOOP_CONFS) {
			File file = new File(name);
			if (!file.exists()) {
				continue;
			}
			FileInputStream in = null;
			try {
				in = new FileInputStream(file);
			} catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			conf.addResource(in);
		}

		WordCountTool tool = new WordCountTool();
		tool.setConf(conf);

		//调用ToolRunner来运行文件
		ToolRunner.run(tool.getConf(), tool, new String[] {});
	}


	public int run(String[] args) throws Exception {
	
		Configuration conf=super.getConf();
		
		conf.setBoolean("mapreduce.app-submission.cross-platform", true);//设置跨平台提交
		conf.set("tmpjars", "file:/E:/hadoop/wordcount.jar"); // 加载wordcontjar文件,注意路径格式
		
		// job.getConfiguration().set("mapred.jar","E:/hadoop/wordcount.jar");
		//mapred.jar是MapReduce所在的文件,tmpjars是MapReduce依赖库,我们没有依赖库,选择其中一个就可以,注意路径格式
		
		//构建Job
		Job job = Job.getInstance(conf, "wordcount");
		job.setJarByClass(WordCountTool.class);
		job.setMapperClass(WordCount.TokenizerMapper.class);
		job.setCombinerClass(WordCount.IntSumReducer.class);
		job.setReducerClass(WordCount.IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//设置输入输出路径,需要准备一个文本格式的文件放到HDFS中
		FileInputFormat.addInputPath(job, new Path("/wordcount/wordcount.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/wordcount/result"));

		int n=job.waitForCompletion(true)?0:-1;
		
		return n;
	}

}

3.依赖库和配置文件
MavenDependency的pom.xml依赖

<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.8.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-common</artifactId>
			<version>2.8.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
			<version>2.8.1</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs-client</artifactId>
			<version>2.8.1</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>

core-site.xml

<configuration>
	<property>
		<name>fs.defaultFS</name>
		<value>hdfs://hadoopmaster:9000</value>
	</property>
	<property>
		<name>fs.hdfs.impl</name>
		<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
		<description>The FileSystem for hdfs: uris.</description>
	</property>
	<property>
		<name>hadoop.tmp.dir</name>
		<value>/soft/hadoop_data/tmp</value>
	</property>
	<property>
		<name>hadoop.proxyuser.root.hosts</name>
		<value>*</value>
	</property>
	<property>
		<name>hadoop.proxyuser.root.groups</name>
		<value>*</value>
	</property>
	<property>	    
		<name>hadoop.proxyuser.hadoop.hosts</name>
		<value>*</value>
	</property>
	<property>
		<name>hadoop.proxyuser.hadoop.groups</name>
		<value>*</value>
	</property>
</configuration>

mapred-site.xml

<configuration> 
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

yarn-site.xml

<configuration>
 <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>hadoopmaster</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    <description>Whether virtual memory limits will be enforced for containers</description>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>4</value>
    <description>Ratio between virtual memory to physical memory when setting memory limits for containers</description>
  </property>
</configuration>

在以上的配置文件中,需要把hadoopmaster改成自己Hadoop集群的IP

4.运行结果
SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务

5.总结
Hadoop提供ToolRunner远程向Hadoop集群提交MapReduce任务,在Job中配置好tmpjars和mapred.jar后,集群会把Jar上传到HDFS,然后运行任务。
在现实的操作中还会遇到很多问题,需要我们冷静地分析,然后解决问题,不断学习、不断进步。
最后,有什么问题可以给我留言。。。