多个文件作为输入到Hadoop Dfs和mapreduce

多个文件作为输入到Hadoop Dfs和mapreduce

问题描述:

通常我们在一个文本文件中输入到java文件(比如简单的字数问题)。相反,现在我有100个csv文件,我想给作为我的java代码的输入(所有文件不能简单地合并为一个文件)。试图预测给定100个股票的最大/最小股票波动率,因此每个csv文件是唯一的。 那么,如何将csv文件的整个文件夹作为java程序的输入流。多个文件作为输入到Hadoop Dfs和mapreduce

+0

的MapReduce已经接受文件夹 –

+0

理想情况下,你会把CSV到HDFS的功能,那么使用蜂巢或火花查询过这... –

+0

如何让MapReduce的接受文件夹? 它是使用/ *运算符吗? – user2336157

解决方案1:为了解决这个问题,我们可以使用FileInputFormat.addInputPaths()方法,可以采取的多个输入的逗号分隔的列表,我们可以写为

FileInputFormat.addInputPaths(“file0,file1,....”) 

假设需要分析2个文件以及使用Facebook和YouTube服务的人员列表(需要单个输出文件)

我们有两个文件facebook.txt和youtube.txt

Path YoutubePath = new Path(args[0]); 
Path FacebookPath = new Path(args[1]); 
Path outputPath = new Path(args[2]); 
MultipleInputs.addInputPath(job, FacebookPath, TextInputFormat.class, JoinFacebookMapper.class); 
MultipleInputs.addInputPath(job, YoutubePath, TextInputFormat.class, YoutubeMapper.class); 
FileOutputFormat.setOutputPath(job, outputPath); 

将下列代码行添加到代码中将生成多个文件,以便在单个映射reduce作业中传递。

您可以通过整个文件夹作为参数

这里是我的测试代码到许多文件复制到HDFS和合并它们,也可以过滤其他文件格式,我想这可能是帮助你!

public class FilesMergeToHDFS { 
private static FileSystem fs = null; 
private static FileSystem local = null; 

public static void main(String[] args) throws IOException, URISyntaxException { 
    // TODO Auto-generated method stub 
    list(); 
} 

private static void list() throws IOException, URISyntaxException { 
    // TODO Auto-generated method stub 

      Configuration conf = new Configuration(); 
      URI uri = new URI("hdfs://xxx:9000");//HDFS address 
      fs = FileSystem.get(uri,conf); 


      local = FileSystem.getLocal(conf); 

      FileStatus[] dirsStatus = local.globStatus(new Path("E://data/73/*"), new RegexExcludePathFilter("^.*svn$")); 
      Path[] dirs = FileUtil.stat2Paths(dirsStatus); 
      FSDataInputStream in = null; 
      FSDataOutputStream out = null; 
      for(Path p:dirs){ 
        //upload 
       String filename = p.getName(); 
       FileStatus[] localStatus = local.globStatus(new Path(p+"/*"),new RegexAcceptPathFilter("^.*txt$")); 
       Path[] listedPaths = FileUtil.stat2Paths(localStatus); 
       //set outputpath 
       Path block = new Path("hdfs://hadoop:9000/mergehdfs/filesmerge/"+filename+".txt"); 
       out =fs.create(block); 
       for(Path path:listedPaths){ 
        in = local.open(path); 
        IOUtils.copyBytes(in, out, 4096, false); // copydata 
        in.close(); 
       } 
       if (out != null) { 
        out.close(); 
       } 
      } 
} 

private static class RegexAcceptPathFilter implements PathFilter { 

private final String regex; 

    public RegexAcceptPathFilter(String regex) { 
     this.regex = regex; 
    } 

    @Override 
    public boolean accept(Path path) { 
     // TODO Auto-generated method stub 
     boolean flag = path.toString().matches(regex); 
     return flag; 
    } 

} 

private static class RegexExcludePathFilter implements PathFilter { 
private final String regex; 
    public RegexExcludePathFilter (String regex) { 
     this.regex = regex; 
    } 

    @Override 
    public boolean accept(Path path) { 
     // TODO Auto-generated method stub 
     boolean flag = path.toString().matches(regex); 
     return !flag; 
    } 
} 
}