用Hadoop的MapReduce处理员工信息Join操作

在进行处理员工信息Join编程时,遇到了一点问题,我想先不纠结这个问题了,到后面有时间在慢慢来看这个问题吧

输入的数据格式

worker.txt

    empno   ename   job         mgr hiredate    sal comm    deptno  loc
    7499    allen   salesman    7698    1981-02-20  1600    300 30  
    7782    clark   managers    7639    1981-06-09  2450        10  
    7654    martin  salesman    7698    1981-03-20  1250    1400    30  boston
    7900    james   clerk   7698    1981-01-09  950     30  
    7788    scott   analyst 7566    1981-09-01  3000    100 20 

department.txt

    deptno  dname   loc
    30  sales   chicago
    20  research    dallas
    10  accounting  newyork
在编程时,由于数据不规则,导致bug不断,弄了一早上还是有问题

问题一:数组越界,数据长度都大于等于3,不知道为什么在department.setDepartmentName(data[1])会越界,可能是数据格式有问题

用Hadoop的MapReduce处理员工信息Join操作

问题二:还是数组越界,惊不惊喜,绝不绝望,相应的代码是work.setDepartmentNo(data[7]),在分析数据的时候有的地方有两个制表符,用Hadoop的MapReduce处理员工信息Join操作真有两个制表符,但是不知道为什么还是会越界;

用Hadoop的MapReduce处理员工信息Join操作

问题三:又来一个输入格式问题:为什么其他数据都没有这个问题,而这条数据却。。。。。,索性直接删除这条数据

用Hadoop的MapReduce处理员工信息Join操作

问题四:然而删除这条数据并没有用,map 100% reduce 100% ,却还是有问题,已经绝望

用Hadoop的MapReduce处理员工信息Join操作

用Hadoop的MapReduce处理员工信息Join操作

还是数据的问题,我拒绝在弄下去:

代码示例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * FileName: JoinWorkersInformation
 * Author:   hadoop
 * Email:    [email protected]
 * Date:     18-10-7 上午8:06
 * Description:
 */
public class JoinWorkersInformation {
    /**
     * 使用Mapper将数据文件中的数据本身作为Mapper输出的key直接输出
     */
    public static class JoinWorkersInformationMapper extends Mapper<LongWritable, Text, LongWritable, WorkersInformation> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputData = value.toString();
            String[] data = inputData.split("\t");
            if (data.length <=3){
                WorkersInformation department = new WorkersInformation();
                department.setDepartmentNo(data[0]);
                department.setDepartmentName(data[1]);
                department.setFlag(0);
                context.write(new LongWritable(Long.valueOf(department.getDepartmentNo())),department);
            }else if (data.length ==8 ){
                WorkersInformation work = new WorkersInformation();
                work.setWorkerNo(data[0]);
                work.setWorkerName(data[1]);
                work.setDepartmentNo(data[7]);
                work.setFlag(1);
                context.write(new LongWritable(Long.valueOf(work.getDepartmentNo())),work);

            }else if (data.length == 7){
                WorkersInformation work = new WorkersInformation();
                work.setWorkerNo(data[0]);
                work.setWorkerName(data[1]);
                work.setDepartmentNo(data[6]);
                work.setFlag(1);
                context.write(new LongWritable(Long.valueOf(work.getDepartmentNo())),work);
            }
        }
    }

    /**
     * 使用Reducer将输入的key本身作为key直接输出
     */


    public static class JoinWorkersInformationReducer extends Reducer<LongWritable, WorkersInformation, LongWritable,Text> {

        @Override
        protected void reduce(LongWritable key, Iterable<WorkersInformation> values, Context context) throws IOException, InterruptedException {
            WorkersInformation department = null;
            LongWritable resultKey = new LongWritable(0);
            List<WorkersInformation> workerList = new ArrayList<WorkersInformation>();
            for(WorkersInformation item : values){
                if (0 == item.getFlag()){
                    department = new WorkersInformation(item);
                }else {
                    workerList.add(new WorkersInformation(item));
                }
            }
            for (WorkersInformation worker : workerList){
                worker.setDepartmentNo(department.getDepartmentNo());
                worker.setDepartmentName(department.getDepartmentName());
                context.write(resultKey,new Text(worker.toString()));
            }
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();//设置MapReduce的配置
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length < 2){
            System.out.println("Usage: JoinWorkersInformation <in> [<in>...] <out>");
            System.exit(2);
        }

        //设置作业
        //Job job = new Job(conf);
        Job job = Job.getInstance(conf);
        job.setJarByClass(JoinWorkersInformation.class);
        job.setJobName("JoinWorkersInformation");
        //设置处理map,reduce的类
        job.setMapperClass(JoinWorkersInformationMapper.class);
        job.setReducerClass(JoinWorkersInformationReducer.class);

        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(WorkersInformation.class);
        //设置输入输出格式的处理
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        //设定输入输出路径
        for (int i = 0; i < otherArgs.length-1;++i){
            FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

class WorkersInformation implements WritableComparable{
    private String workerNo = "";
    private String workerName = "";
    private String departmentNo = "";
    private String departmentName = "";
    private int flag = 0; //0代表department,1代表Worker

    public WorkersInformation(){

    }

    public WorkersInformation(String workerNo, String workerName, String departmentNo, String departmentName, int flag) {
        this.workerNo = workerNo;
        this.workerName = workerName;
        this.departmentNo = departmentNo;
        this.departmentName = departmentName;
        this.flag = flag;
    }

    public WorkersInformation(WorkersInformation information){
        this.workerNo = information.workerNo;
        this.workerName = information.workerName;
        this.departmentNo = information.departmentNo;
        this.departmentName = information.departmentName;
        this.flag = information.flag;
    }
    public String getWorkerNo() {
        return workerNo;
    }

    public void setWorkerNo(String workerNo) {
        this.workerNo = workerNo;
    }

    public String getWorkerName() {
        return workerName;
    }

    public void setWorkerName(String workerName) {
        this.workerName = workerName;
    }

    public String getDepartmentNo() {
        return departmentNo;
    }

    public void setDepartmentNo(String departmentNo) {
        this.departmentNo = departmentNo;
    }

    public String getDepartmentName() {
        return departmentName;
    }

    public void setDepartmentName(String departmentName) {
        this.departmentName = departmentName;
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    @Override
    public int compareTo(Object o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.workerNo);
        dataOutput.writeUTF(this.workerName);
        dataOutput.writeUTF(this.departmentNo);
        dataOutput.writeUTF(this.departmentName);
        dataOutput.writeInt(this.flag);

    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.workerNo = dataInput.readUTF();
        this.workerName = dataInput.readUTF();
        this.departmentNo = dataInput.readUTF();
        this.departmentName = dataInput.readUTF();
        this.flag = dataInput.readInt();

    }

    @Override
    public String toString() {
        return this.workerNo + " "+ this.workerName + " "+ this.departmentNo+ " "+ this.departmentName;
    }
}