MapReduce程序数据清洗
一、首先准备好需要的清洗的数据
二、将数据导入项目中,在项目下新建input(原数据),output(清洗过后的数据),如下图所示:三、导入所需要的jar
hadoop-2.8.5\share\hadoop\common*jar
hadoop-2.8.5\share\hadoop\common\lib*jar
hadoop-2.8.5\share\hadoop\hdfs*jar
hadoop-2.8.5\share\hadoop\hdfs\lib*jar
hadoop-2.8.5\share\hadoop\mapreduce*jar
hadoop-2.8.5\share\hadoop\mapreduce\lib*jar
hadoop-2.8.5\share\hadoop\yarn*jar
hadoop-2.8.5\share\hadoop\yarn\lib*jar
四、代码如下:
清洗类:
package com.stu.mr06;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author xiaowei
* @date 2019/3/26 -10:29
* 数据清洗
*/
public class GMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//得到数据
String data = v1.toString();
//剔除标题
if (data.startsWith("STN")) {
return;
}
//分割数据
String[] str = data.split(" ");
//声明并创建一个新的字符串数组
String newWords = "";
//循环遍历数组
for (int i = 0; i < str.length; i++) {
//剔除空的字符串
if (str[i].equals("")) {
continue;
}
//剔除*
if (str[i].endsWith("*")) {
str[i] = str[i].replace("*", "");
}
//剔除A-I
if (str[i].matches(".*[A-I]")) {
str[i] = str[i].substring(0, str[i].length() - 1);
}
//更换缺失字段
if (str[i].equals("9999.9") || str[i].equals("999.9") ||
str[i].equals("99.9")) {
str[i] = "0.0";
}
//拼接新字符串
if (i == str.length) {
newWords += str[i];
} else {
newWords += str[i] + "/";
}
}
//输出 不能使用new NullWritable()来定义,获取空值只能NullWritable.get()来获取
context.write(new Text(newWords),NullWritable.get());
}
}
输出类:
package com.stu.mr06;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author xiaowei
* @date 2019/3/26 -16:18
* 主程序
*/
public class GMain {
public static void main(String args[]) throws Exception{
//创建一个job
Job job = Job.getInstance(new Configuration());
//主程序入口
job.setJarByClass(GMain.class);
//指定map和map的输出类型
job.setMapperClass(GMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定任务输入输出路径
FileInputFormat.setInputPaths(job,new Path("Hadoop_API/input"));
FileOutputFormat.setOutputPath(job,new Path("Hadoop_API/output/3265"));
//完成任务
job.waitForCompletion(true);
}
}