多个文件作为输入到Hadoop Dfs和mapreduce
问题描述:
通常我们在一个文本文件中输入到java文件(比如简单的字数问题)。相反,现在我有100个csv文件,我想给作为我的java代码的输入(所有文件不能简单地合并为一个文件)。试图预测给定100个股票的最大/最小股票波动率,因此每个csv文件是唯一的。 那么,如何将csv文件的整个文件夹作为java程序的输入流。多个文件作为输入到Hadoop Dfs和mapreduce
答
解决方案1:为了解决这个问题,我们可以使用FileInputFormat.addInputPaths()方法,可以采取的多个输入的逗号分隔的列表,我们可以写为
FileInputFormat.addInputPaths(“file0,file1,....”)
或
假设需要分析2个文件以及使用Facebook和YouTube服务的人员列表(需要单个输出文件)
我们有两个文件facebook.txt和youtube.txt
Path YoutubePath = new Path(args[0]);
Path FacebookPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
MultipleInputs.addInputPath(job, FacebookPath, TextInputFormat.class, JoinFacebookMapper.class);
MultipleInputs.addInputPath(job, YoutubePath, TextInputFormat.class, YoutubeMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
将下列代码行添加到代码中将生成多个文件,以便在单个映射reduce作业中传递。
或
您可以通过整个文件夹作为参数
答
这里是我的测试代码到许多文件复制到HDFS和合并它们,也可以过滤其他文件格式,我想这可能是帮助你!
public class FilesMergeToHDFS {
private static FileSystem fs = null;
private static FileSystem local = null;
public static void main(String[] args) throws IOException, URISyntaxException {
// TODO Auto-generated method stub
list();
}
private static void list() throws IOException, URISyntaxException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
URI uri = new URI("hdfs://xxx:9000");//HDFS address
fs = FileSystem.get(uri,conf);
local = FileSystem.getLocal(conf);
FileStatus[] dirsStatus = local.globStatus(new Path("E://data/73/*"), new RegexExcludePathFilter("^.*svn$"));
Path[] dirs = FileUtil.stat2Paths(dirsStatus);
FSDataInputStream in = null;
FSDataOutputStream out = null;
for(Path p:dirs){
//upload
String filename = p.getName();
FileStatus[] localStatus = local.globStatus(new Path(p+"/*"),new RegexAcceptPathFilter("^.*txt$"));
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
//set outputpath
Path block = new Path("hdfs://hadoop:9000/mergehdfs/filesmerge/"+filename+".txt");
out =fs.create(block);
for(Path path:listedPaths){
in = local.open(path);
IOUtils.copyBytes(in, out, 4096, false); // copydata
in.close();
}
if (out != null) {
out.close();
}
}
}
private static class RegexAcceptPathFilter implements PathFilter {
private final String regex;
public RegexAcceptPathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}
}
private static class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter (String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return !flag;
}
}
}
的MapReduce已经接受文件夹 –
理想情况下,你会把CSV到HDFS的功能,那么使用蜂巢或火花查询过这... –
如何让MapReduce的接受文件夹? 它是使用/ *运算符吗? – user2336157