将纯文本文件转换为Spark中的Hadoop序列文件
问题描述:
我现有的项目使用Hadoop map-reduce生成具有XML格式的自定义键和值的序列文件。将纯文本文件转换为Spark中的Hadoop序列文件
通过从输入源一次读取一行来生成XML值,并且实现RecordReader以从纯文本返回XML格式的下一个值。
例如输入源文件有3行(第一行是头,并具有实际的数据休息行)
id|name|value
1|Vijay|1000
2|Gaurav|2000
3|Ashok|3000
发布地图的方法的序列文件具有如下数据:
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>1</id><name>Vijay</name><value>1000</value></bars>
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>2</id><name>Gaurav</name><value>2000</value></bars>
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>3</id><name>Ashok</name><value>3000</value></bars>
问题:我想在Spark中实现相同。基本上,读取输入文件并如上所述生成键值对。
是否有任何方式/可能重用现有的InputFormat,并因此重用我的Hadoop映射器类中使用的RecordReader。
RecordReader负责/有逻辑将纯文本行转换为XML并返回值作为Hadoop映射方法用于写入context.write()
方法。
请提出建议。
答
这包含在External Datasets部分的Spark文档中。对你最重要的部分是:
对于其他的Hadoop InputFormats,您可以使用 JavaSparkContext.hadoopRDD方法,它接受一个任意JobConf 和输入格式类,重点类和价值类。将这些与您的输入源一起使用的Hadoop作业的方式设置为相同的 。您还可以使用基于 “新”MapReduce API(org.apache.hadoop.mapreduce)的InputFormats的JavaSparkContext.newAPIHadoopRDD 。
这里有一个简单的例子demostrating如何使用它:
public final class ExampleSpark {
public static void main(String[] args) throws Exception {
JavaSparkContext spark = new JavaSparkContext();
Configuration jobConf = new Configuration();
JavaPairRDD<LongWritable, Text> inputRDD = spark.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, jobConf);
System.out.println(inputRDD.count());
spark.stop();
System.exit(0);
}
}
你可以看到的Javadoc JavaSparkContext here。