MapReduce框架学习(1)——输入、输出格式
- 参考:JeffreyZhou的博客园
- 《Hadoop权威指南》第四版
在前面的学习中,完成了几件事:
- 搭建并测试Hadoop完全分布式环境;
- 在master节点上配置Hadoop的Eclipse开发环境
上一篇博文,Eclipse的开发环境搭建中,博文最后终于揭开了WordCount的源代码程序,这是一个小程序,但其中也包括了Map/Reduce的大体框架,这个系列博文就来捋一捋整个Map/Reduce的流程及其作用。
一个MR作业,包括三点:
- 输入数据
- MR程序
- 配置信息
0 Map/Reduce大致流程
- 输入(input): 将输入数据分成一个个split,并将spilt进一步拆成<key,value>形式;
- 映射(map):根据输入的<key,value>进行处理,输出list<key,value>;
- 合并(combiner):合并(单个节点上)中间相同的key值;
- 分区(partition):将<key,value>分成N分,分别送到下一环节;
- 化简(reduce):将中间结果合并,得到最终结果;
- 输出(output):指定输出最终结果格式。
接下来我们对各个环节进行理解和应用,还是以煮烂了的栗子(WordCount)开刀:
1 .1 输入分片与记录
- 输入格式(InputFormat)用于描述整个MapReduce作业的数据输入规范。
- 先对输入的文件进行格式规范检查,如输入路径,后缀等检查;
- 然后对数据文件进行输入分块(split),一个分片(split)就是一个由单个map操作来处理的输入块,每个Map操作只处理一个split;每个split被划分若干个记录,每个记录就是一个<key,value>对,map一个接一个的处理记录。
- 分片和记录都是逻辑概念,不必对应到文件,尽管其常见形式都是文件。
从一般的文本文件到数据库,Hadoop可以处理很多不同类型的数据格式。一图以蔽之(来源:《Hadoop权威指南》):
图 InputFormat类的层次结构
1.2 FileInputFormat类
FIleInputFormat类是所有使用文件作为其数据源的InputFormat实现的基类,它提供两个功能:
- 指出作业的输入文件位置(选择作为输入的文件或对象);
- 为输入文件生成分片的代码实现(定义把文件划分到任务的InputSplits)。
- 把分片分割成记录的作业则由其具体的子类来完成(为RecordReader读取文件提供了一个工程方法)。
1.3 FIleInputFormat的输入路径
- 提供四种静态方法来设定job的输入路径:
// 单个路径
public static void addInputPath(Job job, Path path)
// 注意,下面三个函数名多了 s,用于多个路径
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)
默认存在一个过滤器,排除隐藏文件(名称中以“.”和"_"开头的文件),也可以使用setInputPathFilter()
方法设置一个过滤器。默认的过滤器只能看到非隐藏文件。
1.4 常用输入格式
当然,最常用的还是两种:
- TextInputFormat:系统默认的数据输入格式。将文件分块,并逐行读入,没一行记录成为一对<key,value>,其中,key为当前行在整个文件中的字节偏移量,LongWritable类型,value为这一行的文本内容,不包括任何行终止符(换行和回车符),它被打包成Text对象。
- KeyValueTextInputFormat:通常情况下,文件中的键值对形式并非以字节偏移量表示(用处不大),一般是Text形式的key,使用某个分界符进行分隔,例如Hadoop默认的OutputFormat产生的TextOutputFormat就是这种形式,此时用KeyValueTextInputFormat处理比较合适。
输入格式 | 描述 | 键 | 值 |
---|---|---|---|
TextInputFormat | 默认格式,读取文件的行 | 行的字节偏移量 | 行的内容 |
KeyValueInputFormat | 把行解析为键值对 | 第一个tab字符前的所有字符 | 行剩下内容 |
还是煮个栗子来的比较实在,如下两个文件:
其中第二个文件,以制表符分割。
使用TextInputFormat处理第一个文件,得到以下3条记录:
<0, hello world, i am xiaozhou>
<27, stay hungey, stay foolish>
<54, bye game, bye boring>
看吧,在实际应用中,字节偏移量作为key可能真的没啥卵用。。。
使用KeyValueTextInputFormat处理第二个文件,得到以下3条记录:
<one, hello world, i am xiaozhou>
<two, stay hungry, stay foolish>
<three, bye game, bye boring>
以上,就是常用的两个inputFormat的区别。
1.5 设置输入格式
那我们怎么选择使用那种输入格式呢?很简单,面向对象的思想,你只要调用你所使用的输入格式封装好的对象就行了。只要在job函数中调用
job.setInputFormatclass(MyInputFormat.class)
至于怎么去创建自己的MyInputFormat,参照上面InputFormat类的层次结构,进行继承和复写就行了:
- 如果数据来源是文件,则可以继承FIleInputFormat:
public class MyInputFormat extends FileInputFormat<Text,Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
- 如果数据来源是非文件,如关系数据,则继承:
public class MyInputFormat extends InputFormat<Text,Text> {
// 将spilt输出成<key,value>
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return null;
}
// 拆分为spilt
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
1.6 输出格式
数据输出格式(OutputFormat)用于描述MR作业的数据输出规范,Hadoop提供了丰富的内置数据输出格式。最常的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,将结果以"key+\t+value"的形式逐行输出到文本文件中。还有其它的,如来源:《Hadoop权威指南》:
1.7 设置输出格式
默认的输出格式是TextOutputFormat,把每条记录写为文本行,键值可以是任意类型,因为TextOutputFormat会调用toString()方法把它们转换为字符串,每个键值对由制表符(tab)进行分隔(当然也可以设定分隔符),与其对应的输入格式是KeyValueOutputFormat。
若要自定义输出格式,如下:
public class MyOutputFormat extends OutputFormat<Text,Text> {
@Override
public void checkOutputSpecs(JobContext arg0)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return null;
}
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
1.8 复合键
从前面的整个过程中可以看到,都是采用key-value的方式进行传入传出,而这些key或者value类型大多是单一的字符串或者整型,也就是基本数据类型。如果我的key中需要包含多个信息怎么办?用字符串直接拼接么? 太不方便了,最好能够自己定义一个类,作为这个key,这样就方便了。
要自定义一个类作为key或value的类型,就要实现WriableComparable类,复写其中三个函数如下:
public class MyType implements WritableComparable<MyType> {
private float x,y;
public float GetX(){return x;}
public float GetY(){return y;}
// 读缓冲
@Override
public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
}
// 比较器
@Override
public int compareTo(MyType arg0) {
//输入:-1(小于) 0(等于) 1(大于)
return 0;
}
}
关于复合键,在本节内容学习的最后,会写一个倒排索引的程序例子,就会使用复合键。
注:关于writable还有很多细节上的知识,《Hadoop权威指南》上也没系统性的讲解,待后面遇到了实际问题再解决吧。
1.x 后记
目前能用到的关于文件格式的知识大概也就这些了,看《Hadoop权威指南》上还有很多细节上的东西,等以后用到了再回来查吧,不然学了也记不住。