spark 基础 二 数据读取与保存

spark 本身基于Hadoop生态圈构建,可以通过hadoop mapreduce框架的InputFormat 和OutputFormat 接口访问数据,大部分的文件格式与存储系统如S3、HDFS、Cassandra、HBase等都支持这种接口

spark会根据文件扩展名选择对应处理方式

spark 基础 二 数据读取与保存

 1读取/保存文本文件

textFIle()读取一个文本文件,生成RDD,输入的每一行成为RDD中的一个元素;

wholeTextFIles() 读取一个文件夹内的多个文本文件 ,生成PairRDD 键是文件名,value是每一行

saveAsTextFIle 将RDD内容都输入到对应的文件中

2JSON

读取JSON可以将数据作为文本文件读取,然后使用JSON解析器来对RDD中的值进行映射操作。也可以使用JSON序列化库将数据转为字符串,在java和scala中也可以使用自定义的Hadoop格式来操作JSON数据

python 中读取非结构化JSON

import json

data = input.map(lambda x: json.loads(x))

Java中读取JSON

将记录读取到一个代表结构信息的类中

class ParseJson implements FlatMapFunction<Iterator<String>,Person>{

  public Iterable<Person> call(Iterator<String> lines) throws Exception {

      ArrayList<Person> people = new ArrayList<Person>();

      ObjectMapper mapper= new ObjectMapper();

       while (lines.hasNext()){

         String line = lines.next();

          try{

                  people.add(mapper.readValue(line,Person.class));

           }catch(Exception e){

           //跳过失败的数据

           }

          }

          return people;

   }

}

 

JavaRDD<String> input = sc.textFile("file.json");

JavaRDD<Person> result = input.mapPartitions(new ParseJson())

保存为JSON

python

(data.filer(lambda x:x["loversPandas"]).map(lambda x; json.dumps(x)).saveAsTextFile(outputFile))

Java

 class WriteJson implements FlatMapFunction<Iterator<Person>, String>{

    public Iterable<String> call(Iterator<Person> people) throws Exception{

         ArrayList<String> text = new ArrayList<String>();

         ObjectMapper mapper = new ObjectMapper();

         while(people.hasNext()){

              Person person = people.next();

             text.add(mapper.writeValueAsString(person));

         }

         return text;

      }

  }

JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());

JavaRDD<String> formatted = result.mapPartitions(new WriteJson());

formatted.svaAsTextFile(outfile);

 

3 sequenceFile

sequenceFIle 由wirtable接口实现

sequenceFile(path, keyClass, valueClass, minPartitions)

python 读取

sc.sequenceFile(infile,"org.apache.hadoop.io.Text",org.apache.hadoop.io.IntWritable)

java 读取

public static class ConvertToNativeTypes implements PairFunction<Tuple2<Text,IntWritable>,String,Integer>{

    public Tuple2<String,Integer> call(Tuple2<Text,IntWritable> record){

           return new Tuple2(record._1.toString(),record._2.get());

      }

 }

JavaPairRDD<text,IntWritable> input=sc.sequenceFile(fileName,Text.class,IntWritable.class);

JavaPairRDD<String,Integer> result = input.mapToPair(new ConvertToNativeTypes());

 

4文件系统

本地 file://...

s3 s3://...

hdfs hdfs://master:port/...

5 spark SQL

将SQL查询给spark SQL,让它对一个数据源执行查询,得到一个由ROW对象组成的RDD,每个Row对象表示一条记录。在java和scala中对象访问基于下标,row有get芳芳,返回一般类型提供转换。在python中可以使用row[column_number]和 row.column_name访问元素

Hive可以在HDFS或其他存储系统中存储多种格式的表,是hadoop中常见的结构化数据源,要把spark SQL连接到已有的Hive上,需要提供Hive的配置文件,将hive-site.xml复制到spark的conf目录下。再创建HiveContext对象,就可以使用HQL进行查询,并以行组成的RDD得到返回数据

python

hivectx=HiveContext(sc)

rows=hivectx.sql("SELECT name, age FROM users")

firstRow=rows.first()

Java

HiveContext hivectx=new HiveContext(sc);

SchemaRDD rows=hivectx.sql("SELECT name, age FROM users");

Row firstRow=rows.first();

对JSON查询需要创建HiveContext 然后使用jsonfile读取整个文件获取由row对象组成的RDD,也可以将RDD注册为一张表,再从中选出特定的字段

python

tweets=hivectx.jsonFIle("tweets.json")

tweets.registerTempTable("tweets")

results=hivectx.sql("SELECT user.name,text FROM tweets")