手写WordCount
需求分析:
统计在文件中每一个字母出现的次数.
需要的jar包:
在安装包里面有对应的jar包:
https://pan.baidu.com/s/1eFpKbExrpT7AcgQvpjvETQ
hadoop-2.6.4\share\hadoop\hdfs\hadoop-hdfs-2.6.4.jar
hadoop-2.6.4\share\hadoop\hdfs\lib\所有jar包
hadoop-2.6.4\share\hadoop\common\hadoop-common-2.4.1.jar
hadoop-2.6.4\share\hadoop\common\lib\所有jar包
hadoop-2.6.4\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.6.4.jar之外的jar包
hadoop-2.6.4\share\hadoop\mapreduce\lib\所有jar包
编写业务代码:
Mapper:
package com.thp.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// map 方法的声明周期 : 框架每传递一行数据就被调用一次
// key : 这一行的起始点在文件中的偏移量
// value : 这一行的内容
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到一行的数据转换为String
String line = value.toString();
// 将这一行切分出各个单词
String[] words = line.split("");
// 遍历数组,输出<字母,1>
for(String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
Reduce:
package com.thp.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* reduce 方法提供reduce task进程来调用
*
* reduce task会将shuffle阶段分发过来的大量k-v数据对进行聚合,聚合机制是相同的key的k-v对聚合为一组
* 然后 reduce task 对每一组聚合k-v调用一次我们自定义的reduce方法
*/
// 声明周期:框架每传递一个kv组,reduce方法就被调用一次
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 定义一个计数器
int count = 0;
// 遍历这一组kv所有的v,累加到count
for(IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
主类:
package com.thp.mapper;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountRunner {
// 把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放在哪里...)描述成一个job对象
// 把这个描述好的job提交给集群去运行
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
// 指定我这个job所在的jar包
wcjob.setJarByClass(WordCountRunner.class);
// 设置wcjob所用的mapper逻辑类为哪个类
wcjob.setMapperClass(WordCountMapper.class);
// 设置wcjob所用的reducer逻辑类为哪个类
wcjob.setReducerClass(WordCountReduce.class);
// 设置我们的业务逻辑Reducer类的输出key和value的数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
// JobConf是旧API使用的,而我们需要的是新API
// 指定要处理的数据所在的位置
// FileInputFormat.setInputPaths(wcjob, "hdfs://bd1:9000/wordcount/input");
FileInputFormat.setInputPaths(wcjob, "hdfs://bd1:9000/wordcount/input/big.txt");
// FileOutputFormat.setOutputPath(wcjob, "hdfs://bd1:9000/wordcount/output/");
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://bd1:9000/wordcount/output/"));
// 向yarn集群提交这个job
boolean res = wcjob.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
将这个项目进行打包:
打包之前要先运行一下主类的main方法,这样就能使用eclipse进行打包.
下一步:
可以看一下这个wordcount.jar包里面的内容:
然后将这个wordcount.jar上传到hadoop服务器上,
big.txt:
Hello tom
Hello jim
Hello ketty
Hello world
Ketty tom
将这个 big.txt上传到hadoop的hdfs文件系统上:
然后直接运行这个wordcount.jar文件就行
运行的命令是:
hadoop jar wordcount.jar com.thp.mapper.WordCountRunner