使用spark对输入目录的文件进行过滤

使用spark进行文件过滤

在使用spark的很多情形下, 我们需要计算某个目录的数据.
但这个文件夹下面并不是所有的文件都是我们想要计算的

比如 : 对于某一天的数据,我们只想计算其中的几个小时,这个时候就需要把剩下的数据过滤掉

更坏的一种情形 : 对于那些正在copy(还没有完成),或者是.tmp临时文件,
程序在读取的过程中,文件发生变化已经复制完成或者被删除,都会导致程序出错而停掉

为了避免上述问题的出现, 我们就需要对 输入目录下的文件进行过滤:
即保证我们textFile的时候, 只读取那些我们想要的数据.
1.基本操作
获取到spark的context,代码如下 :
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark.fileFilter"); 
sparkConf.setMaster("local"); 
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
2. 关键操作
1. 通过spark的context得到hadoop的configuration,并对hadoop的conf进行设置
代码如下 : 
Configuration conf  = jsc.hadoopConfiguration(); //通过spark上下文获取到hadoop的配置
conf.set("fs.defaultFS", "hdfs://192.168.1.31:9000");
conf.set("mapreduce.input.pathFilter.class", "cn.mastercom.bigdata.FileFilter"); // 设置过滤文件的类,这是关键类!!

2. 过滤类FileFilter的写法

关键点 : 实现PathFilter接口,重写里面的accept方法

一个简单的测试demo如下 :

package cn.mastercom.bigdata;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

/**
 * 输入路径的文件过滤
 * 过滤掉临时文件和正在复制的文件
 * @author xmr
 *
 */
public class FileFilter implements PathFilter
{
	@Override
	public boolean accept(Path path) {
	
		String tmpStr = path.getName();
		if(tmpStr.indexOf(".tmp") >= 0)
        {
        	return false;
        }
        else if(tmpStr.indexOf("_COPYING_") >= 0)
        {
        	return false;
        }
        else
        {
        	return true;
        }
	}
}

3. 读取目录(文件), 进行rdd操作,输出结果

经过以上设置,使用spark对输出目录的文件进行过滤的功能就已经实现了!
接下来的rdd操作,就只会针对那些没有被过滤掉的文件了!!

具体的执行又分为以下几种情况 :(测试进行的rdd操作为最简单的map,直接将输入的结果输出出来) 
1. 直接读取被过滤掉的文件(.tmp文件或者是正在复制的文件)
举例 :
JavaRDD<String> testRdd = jsc.textFile("hdfs://192.168.1.31:9000/test/spark/part-00003.tmp");

这种情况下会抛出文件不存在的异常 : 
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: 
Input path does not exist: hdfs://192.168.1.31:9000/test/spark/part-00003.tmp

详细报错情况如下图 : !!
使用spark对输入目录的文件进行过滤

但是这个文件在hdfs上面命名是存在的,为什么会碰到这个问题呢? 去源码里面看,发现代码如下 : 

使用spark对输入目录的文件进行过滤

    红框标注的 p ,就是我们传入的path, inputFilter就是我们FileFilter里面设置的文件过滤
    而这个文件刚好在我们的过滤范围内,globstatus就会返回一个空值
    在matches是一个空值的情况下,就会抛出 : Input path does not exist: 的异常
2. 读取正常文件(不在过滤的范围内)
执行情况与正常执行的spark程序相同,不再赘述
3. 读取一个全部都是要被过滤掉文件的文件夹
举例 : 
 JavaRDD<String> testRdd = jsc.textFile("hdfs://192.168.1.31:9000/test/spark");
 // 这个文件夹下所有的文件都是.tmp文件或者是_COPYING_文件
 
 这种情形下虽然没有任何的结果输出出来(因为所有的文件都被过滤掉)
 但是也并没有抛出 Input path does not exist: 的异常,这是为什么呢?
 
 原因  : 我们的Filter过滤, 过滤掉的是这个目录下面的所有文件
 但是这个文件夹,对于框架来说,确实是存在的,所以才不会报错
 
 如果我们换一种写法 : 
  JavaRDD<String> testRdd = jsc.textFile("hdfs://192.168.1.31:9000/test/spark/*"); // 读取这个文件夹下面全部的文件
  
  这种情况下就会有错误抛出啦!
  Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input Pattern hdfs://192.168.1.31:9000/test/spark/* matches 0 files

详细报错信息如下:!

使用spark对输入目录的文件进行过滤

这错误就是说 : 这个目录下没有文件匹配~

同样的目的,换一种写法,结果就截然不同,还是挺奇妙的吧!! 

4. 读取一个不全都是要被过滤掉文件的文件夹

结果 : 符合过滤条件的文件被过滤掉,其余的文件正常运算跑出结果

总结 : spark的文件过滤最关键的还是得到hadoop的configuration
并且对这个configuration进行设置! 
在读取文件的时候,框架就会自动在最合适的时候进行文件的过滤~
hadoop,spark还是很强大的QAQ!