用Hadoop的MapReduce处理员工信息Join操作
在进行处理员工信息Join编程时,遇到了一点问题,我想先不纠结这个问题了,到后面有时间在慢慢来看这个问题吧
输入的数据格式
worker.txt
empno ename job mgr hiredate sal comm deptno loc
7499 allen salesman 7698 1981-02-20 1600 300 30
7782 clark managers 7639 1981-06-09 2450 10
7654 martin salesman 7698 1981-03-20 1250 1400 30 boston
7900 james clerk 7698 1981-01-09 950 30
7788 scott analyst 7566 1981-09-01 3000 100 20
department.txt
deptno dname loc
30 sales chicago
20 research dallas
10 accounting newyork
在编程时,由于数据不规则,导致bug不断,弄了一早上还是有问题
问题一:数组越界,数据长度都大于等于3,不知道为什么在department.setDepartmentName(data[1])会越界,可能是数据格式有问题
问题二:还是数组越界,惊不惊喜,绝不绝望,相应的代码是work.setDepartmentNo(data[7]),在分析数据的时候有的地方有两个制表符,真有两个制表符,但是不知道为什么还是会越界;
问题三:又来一个输入格式问题:为什么其他数据都没有这个问题,而这条数据却。。。。。,索性直接删除这条数据
问题四:然而删除这条数据并没有用,map 100% reduce 100% ,却还是有问题,已经绝望
还是数据的问题,我拒绝在弄下去:
代码示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* FileName: JoinWorkersInformation
* Author: hadoop
* Email: [email protected]
* Date: 18-10-7 上午8:06
* Description:
*/
public class JoinWorkersInformation {
/**
* 使用Mapper将数据文件中的数据本身作为Mapper输出的key直接输出
*/
public static class JoinWorkersInformationMapper extends Mapper<LongWritable, Text, LongWritable, WorkersInformation> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputData = value.toString();
String[] data = inputData.split("\t");
if (data.length <=3){
WorkersInformation department = new WorkersInformation();
department.setDepartmentNo(data[0]);
department.setDepartmentName(data[1]);
department.setFlag(0);
context.write(new LongWritable(Long.valueOf(department.getDepartmentNo())),department);
}else if (data.length ==8 ){
WorkersInformation work = new WorkersInformation();
work.setWorkerNo(data[0]);
work.setWorkerName(data[1]);
work.setDepartmentNo(data[7]);
work.setFlag(1);
context.write(new LongWritable(Long.valueOf(work.getDepartmentNo())),work);
}else if (data.length == 7){
WorkersInformation work = new WorkersInformation();
work.setWorkerNo(data[0]);
work.setWorkerName(data[1]);
work.setDepartmentNo(data[6]);
work.setFlag(1);
context.write(new LongWritable(Long.valueOf(work.getDepartmentNo())),work);
}
}
}
/**
* 使用Reducer将输入的key本身作为key直接输出
*/
public static class JoinWorkersInformationReducer extends Reducer<LongWritable, WorkersInformation, LongWritable,Text> {
@Override
protected void reduce(LongWritable key, Iterable<WorkersInformation> values, Context context) throws IOException, InterruptedException {
WorkersInformation department = null;
LongWritable resultKey = new LongWritable(0);
List<WorkersInformation> workerList = new ArrayList<WorkersInformation>();
for(WorkersInformation item : values){
if (0 == item.getFlag()){
department = new WorkersInformation(item);
}else {
workerList.add(new WorkersInformation(item));
}
}
for (WorkersInformation worker : workerList){
worker.setDepartmentNo(department.getDepartmentNo());
worker.setDepartmentName(department.getDepartmentName());
context.write(resultKey,new Text(worker.toString()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//设置MapReduce的配置
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length < 2){
System.out.println("Usage: JoinWorkersInformation <in> [<in>...] <out>");
System.exit(2);
}
//设置作业
//Job job = new Job(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(JoinWorkersInformation.class);
job.setJobName("JoinWorkersInformation");
//设置处理map,reduce的类
job.setMapperClass(JoinWorkersInformationMapper.class);
job.setReducerClass(JoinWorkersInformationReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(WorkersInformation.class);
//设置输入输出格式的处理
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
//设定输入输出路径
for (int i = 0; i < otherArgs.length-1;++i){
FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
class WorkersInformation implements WritableComparable{
private String workerNo = "";
private String workerName = "";
private String departmentNo = "";
private String departmentName = "";
private int flag = 0; //0代表department,1代表Worker
public WorkersInformation(){
}
public WorkersInformation(String workerNo, String workerName, String departmentNo, String departmentName, int flag) {
this.workerNo = workerNo;
this.workerName = workerName;
this.departmentNo = departmentNo;
this.departmentName = departmentName;
this.flag = flag;
}
public WorkersInformation(WorkersInformation information){
this.workerNo = information.workerNo;
this.workerName = information.workerName;
this.departmentNo = information.departmentNo;
this.departmentName = information.departmentName;
this.flag = information.flag;
}
public String getWorkerNo() {
return workerNo;
}
public void setWorkerNo(String workerNo) {
this.workerNo = workerNo;
}
public String getWorkerName() {
return workerName;
}
public void setWorkerName(String workerName) {
this.workerName = workerName;
}
public String getDepartmentNo() {
return departmentNo;
}
public void setDepartmentNo(String departmentNo) {
this.departmentNo = departmentNo;
}
public String getDepartmentName() {
return departmentName;
}
public void setDepartmentName(String departmentName) {
this.departmentName = departmentName;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
@Override
public int compareTo(Object o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.workerNo);
dataOutput.writeUTF(this.workerName);
dataOutput.writeUTF(this.departmentNo);
dataOutput.writeUTF(this.departmentName);
dataOutput.writeInt(this.flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.workerNo = dataInput.readUTF();
this.workerName = dataInput.readUTF();
this.departmentNo = dataInput.readUTF();
this.departmentName = dataInput.readUTF();
this.flag = dataInput.readInt();
}
@Override
public String toString() {
return this.workerNo + " "+ this.workerName + " "+ this.departmentNo+ " "+ this.departmentName;
}
}