MapReuce之流量统计

准备工具

  1. 一部安装过hadoop的虚拟机
  2. 一份上网流量数据
    MapReuce之流量统计

开启hadoop

start-all.sh
上传flow.log文件
hadoop fs -put /flow.log /

pojo层

import lombok.Data;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//Writable是可以对实现的类进行序列化和反序列化
@Data
public class TelBean implements Writable {
    private String tel;
    private Long up;
    private Long down;
    private Long total;

    public TelBean() {
    }

    public TelBean(String tel, Long up, Long down) {
        this.tel = tel;
        this.up = up;
        this.down = down;
        this.total = up + down;
    }

    //序列化
    public void write(DataOutput out) throws IOException {
        out.writeUTF(tel);
        out.writeLong(up);
        out.writeLong(down);
        out.writeLong(total);
    }

    //反序列化
    public void readFields(DataInput in) throws IOException {
        this.tel = in.readUTF();
        this.up = in.readLong();
        this.down = in.readLong();
        this.total = in.readLong();
    }
}

mapper层

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class TCMapper extends Mapper<LongWritable, Text,Text,TelBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split("\t");//每一列是以制表符来间隔的
        String tel = split[1];
        Long up = Long.parseLong(split[split.length - 3]);
        Long down = Long.parseLong(split[split.length - 2]);
        TelBean telBean = new TelBean(tel, up, down);
        context.write(new Text(tel),telBean);
    }
}

reducer层

import com.zhiyou100.pojo.TelBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class TCReducer extends Reducer<Text, TelBean,Text,TelBean> {
    @Override
    protected void reduce(Text key, Iterable<TelBean> values, Context context) throws IOException, InterruptedException {
        Long up = 0L;
        Long down = 0L;
        for (TelBean telBean:values) {
            up = up + telBean.getUp();
            down = down + telBean.getDown();
        }
        TelBean telBean = new TelBean(key.toString(),up,down);
        context.write(key,telBean);
    }
}

partition层,分区层

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

//参数:这两个参数是reducer输出的参数
public class MyPartition extends Partitioner<Text, TelBean> {
    @Override
    public int getPartition(Text text, TelBean telBean, int numPartitions) {
        String tel = text.toString();
        if(tel.startsWith("138")||tel.startsWith("136")){
            return 1;
        }else{
            return 0;
        }
    }
}

job层

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TCJob {
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        Job job=Job.getInstance(conf);
        //2.指定job使用的类
        job.setJarByClass(TCJob.class);
        //3.设置mapper的类以及属性
        job.setMapperClass(TCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TelBean.class);
        //4.设置reducer的类以及属性
        job.setReducerClass(TCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TelBean.class);
        //设置分区
        job.setPartitionerClass(MyPartition.class);
        job.setNumReduceTasks(2);
        //5.设置输入文件 在调用的时候动态的传递参数
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //6.设置输出目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //7.提交任务
        job.waitForCompletion(true);
    }
}

将其打包成jar包并上传到服务器上

hadoop jar hdfs1-1.0-SNAPSHOT.jar com.zhiyou100.job.TCJob /flow.log /a3
MapReuce之流量统计
查看两个文件里面的东西
MapReuce之流量统计

总结

mapper层:负责对传入的数据进行数据清洗,转换成我们需要处理的键值对的格式
reducer层:负责对数据进行处理
job层:负责控制整个工程代码的执行顺序
注意:无论是排序还是分区都是对key进行操作的

按总流量排序

先不分区查询他的总上行,下行,总流量,然后对这个生成的结果进行排序

pojo层

修改如下
MapReuce之流量统计
MapReuce之流量统计

mapper层

修改如下
MapReuce之流量统计

reducer层

MapReuce之流量统计

分区层

MapReuce之流量统计

job层

MapReuce之流量统计
运行结果:
MapReuce之流量统计