spark 基础 二 数据读取与保存
spark 本身基于Hadoop生态圈构建,可以通过hadoop mapreduce框架的InputFormat 和OutputFormat 接口访问数据,大部分的文件格式与存储系统如S3、HDFS、Cassandra、HBase等都支持这种接口
spark会根据文件扩展名选择对应处理方式
1读取/保存文本文件
textFIle()读取一个文本文件,生成RDD,输入的每一行成为RDD中的一个元素;
wholeTextFIles() 读取一个文件夹内的多个文本文件 ,生成PairRDD 键是文件名,value是每一行
saveAsTextFIle 将RDD内容都输入到对应的文件中
2JSON
读取JSON可以将数据作为文本文件读取,然后使用JSON解析器来对RDD中的值进行映射操作。也可以使用JSON序列化库将数据转为字符串,在java和scala中也可以使用自定义的Hadoop格式来操作JSON数据
python 中读取非结构化JSON
import json
data = input.map(lambda x: json.loads(x))
Java中读取JSON
将记录读取到一个代表结构信息的类中
class ParseJson implements FlatMapFunction<Iterator<String>,Person>{
public Iterable<Person> call(Iterator<String> lines) throws Exception {
ArrayList<Person> people = new ArrayList<Person>();
ObjectMapper mapper= new ObjectMapper();
while (lines.hasNext()){
String line = lines.next();
try{
people.add(mapper.readValue(line,Person.class));
}catch(Exception e){
//跳过失败的数据
}
}
return people;
}
}
JavaRDD<String> input = sc.textFile("file.json");
JavaRDD<Person> result = input.mapPartitions(new ParseJson())
保存为JSON
python
(data.filer(lambda x:x["loversPandas"]).map(lambda x; json.dumps(x)).saveAsTextFile(outputFile))
Java
class WriteJson implements FlatMapFunction<Iterator<Person>, String>{
public Iterable<String> call(Iterator<Person> people) throws Exception{
ArrayList<String> text = new ArrayList<String>();
ObjectMapper mapper = new ObjectMapper();
while(people.hasNext()){
Person person = people.next();
text.add(mapper.writeValueAsString(person));
}
return text;
}
}
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.svaAsTextFile(outfile);
3 sequenceFile
sequenceFIle 由wirtable接口实现
sequenceFile(path, keyClass, valueClass, minPartitions)
python 读取
sc.sequenceFile(infile,"org.apache.hadoop.io.Text",org.apache.hadoop.io.IntWritable)
java 读取
public static class ConvertToNativeTypes implements PairFunction<Tuple2<Text,IntWritable>,String,Integer>{
public Tuple2<String,Integer> call(Tuple2<Text,IntWritable> record){
return new Tuple2(record._1.toString(),record._2.get());
}
}
JavaPairRDD<text,IntWritable> input=sc.sequenceFile(fileName,Text.class,IntWritable.class);
JavaPairRDD<String,Integer> result = input.mapToPair(new ConvertToNativeTypes());
4文件系统
本地 file://...
s3 s3://...
hdfs hdfs://master:port/...
5 spark SQL
将SQL查询给spark SQL,让它对一个数据源执行查询,得到一个由ROW对象组成的RDD,每个Row对象表示一条记录。在java和scala中对象访问基于下标,row有get芳芳,返回一般类型提供转换。在python中可以使用row[column_number]和 row.column_name访问元素
Hive可以在HDFS或其他存储系统中存储多种格式的表,是hadoop中常见的结构化数据源,要把spark SQL连接到已有的Hive上,需要提供Hive的配置文件,将hive-site.xml复制到spark的conf目录下。再创建HiveContext对象,就可以使用HQL进行查询,并以行组成的RDD得到返回数据
python
hivectx=HiveContext(sc)
rows=hivectx.sql("SELECT name, age FROM users")
firstRow=rows.first()
Java
HiveContext hivectx=new HiveContext(sc);
SchemaRDD rows=hivectx.sql("SELECT name, age FROM users");
Row firstRow=rows.first();
对JSON查询需要创建HiveContext 然后使用jsonfile读取整个文件获取由row对象组成的RDD,也可以将RDD注册为一张表,再从中选出特定的字段
python
tweets=hivectx.jsonFIle("tweets.json")
tweets.registerTempTable("tweets")
results=hivectx.sql("SELECT user.name,text FROM tweets")