Hadoop入门系列(三)在hadoop上建立简单的倒排索引
在hadoop的框架中,刚入门我们维护好Mapper和Reducer两个类就可以实现倒排索引。作为练习可以下载20 Newsgroups数据 :http://qwone.com/~jason/20Newsgroups/。
这些文章是零散的,不适合在hadoop上跑,不过可以整合成一个或几个大文件或者抽出一小部分测试一下。
亲测:没整合,在hadoop上跑所有的文章19997篇,16g内存差点跑爆、、、
因此只是测试,为了省事Mapper、Reducer和Main全写到一个WordCount类里了。
1>worldcount是主要运行程序。
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path path = new Path(args[0]); // FileSystem fs = FileSystem.get(conf); //真分布式 FileSystem fs = path.getFileSystem(conf);//伪分布式 if (fs.exists(path)) {//遍历目录内文件,这里目录内还有一级,参数为目录路径 FileStatus[] fileStatus = fs.listStatus(path); for (FileStatus fileStatus1 : fileStatus) { FileStatus[] fileStatus2 = fs.listStatus(fileStatus1.getPath()); for (FileStatus fileStatu : fileStatus2) { // System.out.println(fileStatu.getPath().toString()); FileInputFormat.addInputPath(job, fileStatu.getPath()); } } } fs.close(); // FileInputFormat.addInputPath(job,new Path(args[0])); //单跑文件,参数为文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
注意文件系统的实例化方式,真分布和假分布式不一样,假分布式用真分布式会报错:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:9000/user/hadoop/input3, expected: file:///
相关连接:https://blog.****.net/huangjing_whlg/article/details/39341643
2>mapper,我使用了lucene进行分词。StopAnalyzer+PorterStemFilter,进行分词+词干提取。相关包可以在porn.xml依赖。注释的是普通分类器,可以都试一下比较结果。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private FileSplit split; public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { // StringTokenizer itr = new StringTokenizer(value.toString()); // while (itr.hasMoreTokens()) { // word.set(itr.nextToken()); // context.write(word, one); // } try { Analyzer analyzer = new StopAnalyzer(); TokenStream stream = analyzer.tokenStream("", new StringReader(value.toString())); CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class); stream = new PorterStemFilter(stream); stream.reset(); split = (FileSplit) context.getInputSplit(); while (stream.incrementToken()) { word.set(cta.toString()); context.write(word, one); } } catch (IOException e) { e.printStackTrace(); } } }
结果帮你们比较了,使用普通分词器在左,普通分词+提取词干在右:如下图所见。
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
4>设置参数:
这里跑的是20_Newspapers的一部分数据。下面是原码和porn.xml
WordCount:
import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.core.StopAnalyzer; import org.apache.lucene.analysis.en.PorterStemFilter; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private FileSplit split; public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { // StringTokenizer itr = new StringTokenizer(value.toString()); // while (itr.hasMoreTokens()) { // word.set(itr.nextToken()); // context.write(word, one); // } try { Analyzer analyzer = new StopAnalyzer(); TokenStream stream = analyzer.tokenStream("", new StringReader(value.toString())); CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class); stream = new PorterStemFilter(stream); stream.reset(); split = (FileSplit) context.getInputSplit(); while (stream.incrementToken()) { word.set(cta.toString() + " " + split.getPath().getName()); // word.set(cta.toString());// context.write(word, one); } } catch (IOException e) { e.printStackTrace(); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path path = new Path(args[0]); // FileSystem fs = FileSystem.get(conf); //真分布式 FileSystem fs = path.getFileSystem(conf);//伪分布式 if (fs.exists(path)) {//遍历目录内文件,这里目录内还有一级,参数为目录路径 FileStatus[] fileStatus = fs.listStatus(path); for (FileStatus fileStatus1 : fileStatus) { FileStatus[] fileStatus2 = fs.listStatus(fileStatus1.getPath()); for (FileStatus fileStatu : fileStatus2) { // System.out.println(fileStatu.getPath().toString()); FileInputFormat.addInputPath(job, fileStatu.getPath()); } } } fs.close(); // FileInputFormat.addInputPath(job,new Path(args[0])); //单跑文件,参数为文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
porn.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Test.hadoop2</groupId> <artifactId>HadoopTest2</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>nexus-aliyun</id> <name>nexus-aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-analyzers-common</artifactId> <version>7.3.0</version> </dependency> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> <version>7.3.0</version> </dependency> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-analyzers-icu</artifactId> <version>7.3.0</version> </dependency> <dependency> <groupId>jfree</groupId> <artifactId>jfreechart</artifactId> <version>1.0.13</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-dependency-plugin</artifactId> <configuration> <excludeTransitive>false</excludeTransitive> <stripVersion>true</stripVersion> <outputDirectory>./lib</outputDirectory> </configuration> </plugin> </plugins> </build> </project>
结果:word+docid+tf
至此入门系列结束。俺也是刚入门。哈哈