Hadoop小文件处理
导读
HDFS作为Hadoop生态系统的分布式文件系统,设计是用来存储海量数据,特别适合存储TB、PB量级别的数据。但是随着时间的推移或者处理程序的问题,HDFS上可能会存在大量的小文件,进而消耗NameNode大量的内存,并且延长程序的运行时间。下面我就把对小文件的处理经验总结一下,供大家参考。
引言
先来了解一下Hadoop中何为小文件:小文件指的是那些文件大小要比HDFS的块大小(在Hadoop1.x的时候默认块大小64MB,可以通过dfs.blocksize来设置;但是到了Hadoop 2.x的时候默认块大小为128MB了,可以通过dfs.block.size设置)小的多的文件。而HDFS的问题在于无法很有效的处理大量小文件。在HDFS中,任何一个文件、目录和block,在HDFS中都会被表示为一个object存储在Namenode的内存中,每一个object占用150 bytes的内存空间。所以,如果有10million个文件,每一个文件对应一个block,那么就将要消耗Namenode 3G的内存来保存这些block的信息。如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限。不仅如此,HDFS并不是为了有效的处理大量小文件而存在的。它主要是为了流式的访问大文件而设计的。对小文件的读取通常会造成大量从Datanode到Datanode的seeks和hopping来retrieve文件,而这样是非常的低效的一种访问方式。
一、概述
HDFS存储特点:
(1)流式读取方式,主要是针对一次写入,多次读出的使用模式。写入的过程使用的是append的方式。
(2)设计目的是为了存储超大文件,主要是针对几百MB,GB,TB,甚至PB的文件。
(3)该分布式系统构建在普通PC机组成的集群上,大大降低了构建成本,并屏蔽了系统故障,使得用户可以专注于自身的操作运算。
(4)HDFS适用于高吞吐量,而不适合低时间延迟的访问。如果同时存入1million的files,那么HDFS 将花费几个小时的时间。
(5)流式读取的方式,不适合多用户写入,以及任意位置写入。如果访问小文件,则必须从一个Datanode跳转到另外一个Datanode,这样大大降低了读取性能。
二、HDFS文件操作流程
HDFS体系结构
HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上。Namenode执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体Datanode节点的映射。Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。
HDFS文件的读取
(1)client端发送读文件请求给Namenode,如果文件不存在,返回错误信息,否则,将该文件对应的block及其所在Datanode位置发送给client
(2)client收到文件位置信息后,与不同Datanode建立socket连接并行获取数据。
HDFS文件的写入
(1) client端发送写文件请求,Namenode检查文件是否存在,如果已存在,直接返回错误信息,否则,发送给client一些可用Datanode节点
(2)client将文件分块,并行存储到不同节点上Datanode上,发送完成后,client同时发送信息给Namenode和Datanode
(3)Namenode收到的client信息后,发送确信信息给Datanode
(4)Datanode同时收到Namenode和Datanode的确认信息后,提交写操作。
三、HDFS小文件解决方案
1、 HDFS上的小文件问题
现象:在现在的集群上已经存在了大量的小文件和目录。
方案:文件是许多记录(Records)组成的,那么可以通过调用HDFS的sync()方法和append方法结合使用,每隔一定时间生成一个大文件。或者可以通过写一个程序来来合并这些小文件。
2、 MapReduce上的小文件问题
现象:
Map任务(task)一般一次处理一个块大小的输入(input)(默认使用FileInputFormat)。如果文件非常小,并且拥有大量的这种小文件,那么每一个map task都仅仅处理非常小的input数据,因此会产生大量的map tasks,每一个map task都会额外增加bookkeeping开销。一个1GB的文件,拆分成16个块大小文件(默认block size为64M),相对于拆分成10000个100KB的小文件,后者每一个小文件启动一个map task,那么job的时间将会十倍甚至百倍慢于前者。
方案:
I、Hadoop Archive:
Haddop Archive是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样同时在减少Namenode的内存使用。
II、Sequence file:
sequence file由一系列的二进制key/value组成。key为小文件名,value为文件内容,可以将大批小文件合并成一个大文件。
I、II 这里不做介绍可以参考(http://blog.cloudera.com/blog/2009/02/the-small-files-problem)
III、CombineFileInputFormat:
Hadoop内置提供了一个 CombineFileInputFormat 类来专门处理小文件,其核心思想是:根据一定的规则,将HDFS上多个小文件合并到一个 InputSplit中,然后会启用一个Map来处理这里面的文件,以此减少MR整体作业的运行时间。CombineFileInputFormat类继承自FileInputFormat,主要重写了List getSplits(JobContext job)方法;这个方法会根据数据的分布,mapreduce.input.fileinputformat.split.minsize.per.node、mapreduce.input.fileinputformat.split.minsize.per.rack以及mapreduce.input.fileinputformat.split.maxsize 参数的设置来合并小文件,并生成List。其中mapreduce.input.fileinputformat.split.maxsize参数至关重要,如果用户没有设置这个参数(默认就是没设置),那么同一个机架上的所有小文件将组成一个InputSplit,最终由一个Map Task来处理。如果用户设置了这个参数,那么同一个节点(node)上的文件将会组成一个InputSplit。同一个 InputSplit 包含了多个HDFS块文件,这些信息存储在 CombineFileSplit 类中,它主要包含以下信息:
折叠源码
1 2 3 4 5 |
|
从上面的定义可以看出,CombineFileSplit类包含了每个块文件的路径、起始偏移量、相对于原始偏移量的大小以及这个文件的存储节点。因为一个CombineFileSplit包含了多个小文件,所以需要使用数组来存储这些信息。CombineFileInputFormat是抽象类,如果我们要使用它,需要实现createRecordReader方法,告诉MR程序如何读取组合的InputSplit。内置实现了两种用于解析组合InputSplit的类:org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat 和 org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat,我们可以把这两个类理解是 TextInputFormat 和 SequenceFileInputFormat。为了简便,这里主要来介绍CombineTextInputFormat。
在 CombineTextInputFormat 中创建了 org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader,具体如何解析CombineFileSplit中的文件主要在CombineFileRecordReader中实现。CombineFileRecordReader类中其实封装了 TextInputFormat的RecordReader,并对CombineFileSplit中的多个文件循环遍历并读取其中的内容,初始化每个文件的RecordReader主要在initNextRecordReader里面实现;每次初始化新文件的RecordReader都会设置mapreduce.map.input.file、mapreduce.map.input.length以及mapreduce.map.input.start参数,这样我们可以在Map程序里面获取到当前正在处理哪个文件。
样例代码如下:
折叠源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
日志输出:
折叠源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
可以从日志中很清楚的看出input文件为Total input paths to process : 152,通过 CombineFileInputFormat处理后splits为 mapreduce.JobSubmitter: number of splits:1,Map数为 Launched map tasks=1。注意体会mapreduce.input.fileinputformat.split.maxsize
参数的设置,大家可以不设置这个参数并且和设置这个参数运行情况对比,观察Map Task的个数变化。
3、Hive上的小文件问题
现象1:
输入文件过多,而Hive对文件创建的总数是有限制的,这个限制取决于参数:hive.exec.max.created.files,默认值是10000。如果现在你的表有60个分区,然后你总共有2000个map,在运行的时候,每一个mapper都会创建60个文件,对应着每一个分区,所以60*2000> 120000,就会报错:exceeds 100000.Killing the job 。最简单的解决办法就是调大hive.exec.max.created.files参数。但是如果说数据文件只有400G,那么你调整这个参数比如说40000。平均下来也就差不多每一个文件10.24MB,这样的话就有40000多个小文件,不是一件很好的事情。
方案1:
设置 mapper 输入文件合并参数
折叠源码
1 2 3 4 5 6 7 8 |
|
现象2:
hive执行中间过程生成的文件过多
方案2:
设置中间过程合并参数
折叠源码
1 2 3 4 5 6 7 8 |
|
现象3:
hive结果文件过多
方案3:
设置 reducer 参数 (一种是调整reducer个数,另一种是调整reducer大小)
折叠源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
参考文章:
1、https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
2、https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
3、http://blog.cloudera.com/blog/2009/02/the-small-files-problem/