Hadoop 实战
一,WordCount开发(Map-Reduce开发模板)
//Map阶段
需要继承Mapper,并重写map方法
public static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
注释:Map输入数据键值对类型(LongWritable,Text, )(该类型一般不需要修改)
-
LongWritable:输入数据的偏移量
-
Text:输入数据类型,类似java中的String类型
Map输出数据键值对类型(Text, IntWritable)(该类可以根据需要开发需要的类)
-
Text:输出数据key的类型,比如把单词作为key,
-
IntWritable:输出数据value的类型,比如把单词出现的次数作为value。
对于Map和Reduce阶段的输出与输入数据,如果要完成在HDFS内的传输,这个些数据类型必须是可以序列化,才能实现通过网络的传输或者本地的拷贝。
重写方法:右击Mapper
开始重写Map的方法:
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //重写map方法
String line = value.toString(); //将map读取的数据转换成String类
String[] words = line.split("\t"); //将数据按照制表格(Tab)进行分隔
for(String word : words){
//Map输出数据key类型 word
//Map输出数据Value类型 1
context.write(new Text(word),new IntWritable(1));
}
}
//Reduce阶段
需要继承Reducer,并重写reduce方法
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
注释:Reduce输入数据键值对类型(Text, IntWritable) (该类必须与Map的输出类型相同)
-
Text:输入数据的key类型
-
IntWritable:输入数据的key类型
Reduce输出数据键值对类型(Text, IntWritable)(该类可以根据需要开发需要的类)
-
Text:输出数据的key类型
-
IntWritable:输出数据的key类型
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// word {1,1,1,...}
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
context.write(key,new IntWritable(sum));
}
//主函数配置
//1.配置job
Configuration conf = new Configuration();
Job job = null;
//2.创建job
try {
job = Job.getInstance(conf);
} catch (IOException e) {
e.printStackTrace();
}
job.setJarByClass(WordCountMRJob.class);
//3.给job添加执行流程
//3.1 HDFS中需要处理的文件路径
// Path path = new Path(args[0]);
Path path = new Path("hdfs://ns//input/wc/");
try {
//job添加输入路径
FileInputFormat.addInputPath(job,path);
} catch (IOException e) {
e.printStackTrace();
}
//3.2设置map执行阶段
job.setMapperClass(WordCountMapper.class); //设置Mapper的实现类,每次开发新需求,都需要修改
job.setMapOutputKeyClass(Text.class); //map输出key类型,需要与该类定义的key类型相同
job.setMapOutputValueClass(IntWritable.class); //map输出value类型,需要与该类定义的Value类型相同
//3.3设置reduce执行阶段
job.setReducerClass(WordCountReducer.class); //设置Reduce的实现类,每次开发新需求,都需要修改
job.setOutputKeyClass(Text.class); //reduce输出key类型,需要与该类定义的key类型相同
job.setOutputValueClass(IntWritable.class);//reduce输出value类型,需要与该类定义的Value类型相同
//3.4设置job计算结果输出路径
// Path output = new Path(args[1]);
Path output = new Path("/out/wc3/");
FileOutputFormat.setOutputPath(job,output);
//4. 提交job,并等待job执行完成
try {
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
二,网站日志分析项目
需求1:学习目标:Map阶段--Apple广告的明细数据
需求1:查询7号广告主是apple的明细数据
需求分析:
-
查询明细数据没有统计需求,只需要Map阶段就可以实现。
-
只查询广告主时apple公司广告投放日志,需要再Mapper中针对苹果公司过滤出符合要求的日志明细数据。
首先拷贝WordCount开发出来的Map-Reduce开发模板,这个需要只有Map阶段,所以reduce阶段不需要开发。
//Map阶段代码开发
public static class SelectWhereMapper extends Mapper<LongWritable,Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //重写map方法
String line = value.toString(); //将输入的数据值类型转换成string类,
String[] fields = line.split("\t");//将每行数据通过制表符(Tab)进行分隔,
if(fields != null && fields.length == 9){
if(fields[1].equals("apple")){
context.write(new Text(line), NullWritable.get());
}
}
}
}
需求2:学习目标:自定义输出Value类(AdMetricWritable)
需求3:学习目标:自定义键值类(PVKeyWritable)
需求3:将1日到7日日报表的数据按照曝光量倒序排列
分析:
-
需求2的输出结果是4列数据,全局降序排列可以使用一个Reduce Task,利用Reducer shuffle过程,按照key排序的特点实现
-
但是shuffle对key排序默认是升序排序,所以需要实现降序排序,需要对key做处理,如需要自定义key类型,重写compare To方法
继承WritableComparable,可以实现排序功能的key类:
自动引用未完成的方法:
为pv变量自动产生get和set方法:
pv自动产生的get和set方法:一般cut到最下面
定义一个无参的构造方法,
public PVKeyWritable(){}, 该构造方法是再反序列化时用到的
序列化:
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(this.pv);
}
反序列化:
@Override
public void readFields(DataInput dataInput) throws IOException {
this.pv = dataInput.readLong();
}
重写toString方法,将pv的值,从long类型转化为string类型
@Override
public String toString() {
return String.valueOf(this.pv);
}
重写compareTo方法:
@Override
public int compareTo(PVKeyWritable o) {
//升序:当 a > b,返回一个正数,一般是1
//降序:当 a > b,返回一个负数,一般是-1
return this.pv > o.pv ? -1 : 1;
}
需求4:学习目标:自定义Partitioner类
需求5:学习目标:实现Reducer端Join操作
需求6:学习目标:Map端Join与分布式缓存及其压缩文件
需求7:学习目标:先整理个人的思路。
先按照曝光量降序排列,曝光量相同,再按照点击量降序排列