Hadoop 实战

一,WordCount开发(Map-Reduce开发模板)

//Map阶段

需要继承Mapper,并重写map方法

public static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable>{

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

注释:Map输入数据键值对类型(LongWritable,Text, )(该类型一般不需要修改

  • LongWritable:输入数据的偏移量

  • Text:输入数据类型,类似java中的String类型

          Map输出数据键值对类型(Text, IntWritable)(该类可以根据需要开发需要的类)

  • Text:输出数据key的类型,比如把单词作为key,

  •  IntWritable:输出数据value的类型,比如把单词出现的次数作为value。

         对于Map和Reduce阶段的输出与输入数据,如果要完成在HDFS内的传输,这个些数据类型必须是可以序列化,才能实现通过网络的传输或者本地的拷贝。

重写方法:右击Mapper

Hadoop 实战

Hadoop 实战

Hadoop 实战

开始重写Map的方法:

@Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  //重写map方法

           String line = value.toString(); //将map读取的数据转换成String类

           String[] words = line.split("\t"); //将数据按照制表格(Tab)进行分隔

           for(String word : words){          

               //Map输出数据key类型 word

               //Map输出数据Value类型 1                         

               context.write(new Text(word),new IntWritable(1));  

           }

        }

 

//Reduce阶段

需要继承Reducer,并重写reduce方法

public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

        @Override

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

注释:Reduce输入数据键值对类型(Text, IntWritable) (该类必须与Map的输出类型相同)

  • Text:输入数据的key类型

  • IntWritable:输入数据的key类型

           Reduce输出数据键值对类型Text, IntWritable(该类可以根据需要开发需要的类)

  • Text:输出数据的key类型

  • IntWritable:输出数据的key类型

@Override

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            // word {1,1,1,...}

            int sum = 0;

            for(IntWritable value : values){

                sum += value.get();

            }

            context.write(key,new IntWritable(sum));

        }

 

//主函数配置

//1.配置job

        Configuration conf = new Configuration();

        Job job = null;

 //2.创建job

        try {

            job = Job.getInstance(conf);

        } catch (IOException e) {

            e.printStackTrace();

        }

        job.setJarByClass(WordCountMRJob.class);

//3.给job添加执行流程

        //3.1 HDFS中需要处理的文件路径

//        Path path = new Path(args[0]);

        Path path = new Path("hdfs://ns//input/wc/");

        try {

            //job添加输入路径

            FileInputFormat.addInputPath(job,path);

        } catch (IOException e) {

            e.printStackTrace();

        }

        //3.2设置map执行阶段

        job.setMapperClass(WordCountMapper.class);   //设置Mapper的实现类,每次开发新需求,都需要修改

        job.setMapOutputKeyClass(Text.class);               //map输出key类型,需要与该类定义的key类型相同

        job.setMapOutputValueClass(IntWritable.class); //map输出value类型,需要与该类定义的Value类型相同

        //3.3设置reduce执行阶段

        job.setReducerClass(WordCountReducer.class); //设置Reduce的实现类,每次开发新需求,都需要修改

        job.setOutputKeyClass(Text.class);              //reduce输出key类型,需要与该类定义的key类型相同

        job.setOutputValueClass(IntWritable.class);//reduce输出value类型,需要与该类定义的Value类型相同

        //3.4设置job计算结果输出路径

//        Path output = new Path(args[1]);

        Path output = new Path("/out/wc3/");

        FileOutputFormat.setOutputPath(job,output);

  //4. 提交job,并等待job执行完成

        try {

            boolean result = job.waitForCompletion(true);

            System.exit(result ? 0 : 1);

        } catch (IOException e) {

            e.printStackTrace();

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

        }

 

二,网站日志分析项目

需求1:学习目标:Map阶段--Apple广告的明细数据

需求1:查询7号广告主是apple的明细数据

需求分析:

  • 查询明细数据没有统计需求,只需要Map阶段就可以实现。

  • 只查询广告主时apple公司广告投放日志,需要再Mapper中针对苹果公司过滤出符合要求的日志明细数据。

首先拷贝WordCount开发出来的Map-Reduce开发模板,这个需要只有Map阶段,所以reduce阶段不需要开发。

//Map阶段代码开发

public static class SelectWhereMapper extends Mapper<LongWritable,Text, Text, NullWritable>{

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  //重写map方法

           String line = value.toString();  //将输入的数据值类型转换成string类,

           String[] fields = line.split("\t");//将每行数据通过制表符(Tab)进行分隔,

           if(fields != null && fields.length == 9){

               if(fields[1].equals("apple")){

                   context.write(new Text(line), NullWritable.get());

               }

           }

        }

    }

需求2:学习目标:自定义输出Value类(AdMetricWritable

 

需求3:学习目标:自定义键值类(PVKeyWritable

需求3:将1日到7日日报表的数据按照曝光量倒序排列

分析:

  • 需求2的输出结果是4列数据,全局降序排列可以使用一个Reduce Task,利用Reducer shuffle过程,按照key排序的特点实现

  • 但是shuffle对key排序默认是升序排序,所以需要实现降序排序,需要对key做处理,如需要自定义key类型,重写compare To方法

继承WritableComparable,可以实现排序功能的key类:

Hadoop 实战

自动引用未完成的方法:

Hadoop 实战

 

Hadoop 实战

 

为pv变量自动产生get和set方法:

Hadoop 实战

 

Hadoop 实战

Hadoop 实战

pv自动产生的get和set方法:一般cut到最下面

Hadoop 实战

定义一个无参的构造方法

public PVKeyWritable(){}, 该构造方法是再反序列化时用到的

序列化

@Override

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeLong(this.pv);

    }

反序列化:

@Override

    public void readFields(DataInput dataInput) throws IOException {

        this.pv = dataInput.readLong();

    }

重写toString方法,将pv的值,从long类型转化为string类型

@Override

    public String toString() {

        return String.valueOf(this.pv);

    }

重写compareTo方法:

@Override

    public int compareTo(PVKeyWritable o) {

        //升序:当 a > b,返回一个正数,一般是1

        //降序:当 a > b,返回一个负数,一般是-1

        return this.pv > o.pv ? -1 : 1;

    }

需求4:学习目标:自定义Partitioner类

需求5:学习目标:实现Reducer端Join操作

需求6:学习目标:Map端Join与分布式缓存及其压缩文件

需求7:学习目标:先整理个人的思路。

先按照曝光量降序排列,曝光量相同,再按照点击量降序排列