RDD、DataFrame、DataSet原理解析
一、RDD、DataFrame、DataSet三者概念
1. RDD:全称Resilient Distributed Dataset,弹性分布式数据集,Spark中最基础的数据抽象,特点是RDD只包含数据本身,没有数据结构。
2. DataFrame:也是一个分布式数据容器,除数据本身,还记录了数据的结构信息,即schema;结构信息便于Spark知道该数据集中包含了哪些列,每一列的类型和数据是什么。
3. DataSet:Spark中最上层的数据抽象,不仅包含数据本身,记录了数据的结构信息schema,还包含了数据集的类型,也就是真正把数据集做成了一个java对象的形式,需要先创建一个样例类case class,把数据做成样例类的格式,每一列就是样例类里的属性。
ps:
(1)DataSet是面向对象的思想,把数据变成了对象的属性。
(2)DataSet是强类型,比如可以有DataSet[Car],DataSet[Person](汽车对象数据集,人对象数据集);DataFrame=DataSet[Row],DataFrame是DataSet的特例。
(3)在后期的Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口。
二、RDD、DataFrame、DataSet的创建和相互转换
首先是创建方式有区别,
· RDD的创建方式有三种:
(1)从集合创建:sc.makeRDD(),sc.parallelize()
(2)从外部存储系统的数据集创建:sc.textFile("path或者hdfs://hadoop100:9000/path")
(3)从其他RDD转换
· DataFrame的创建方式也有三种:
(1)通过spark的数据源进行创建:spark.read.~(“path”)
(2)由RDD转换得来
(3)从HiveTable中查询返回
ps:SparkSQL专用的json格式:{“A”:“B”},去掉了头尾的[ ]
· DataSet的创建方式有两种:
(1)首先创建样例类(样例类整数类型一般设定为Long,或者BigInt),然后创建DataSet
(2)通过DataFrame转换
注意:当涉及RDD与DF和DS之间互相转换操作时,需要引入import spark.implicits._
scala> import spark.implicits._
1. RDD和DataFrame互转
(1)RDD转DataFrame
1. 首先创建RDD
scala> val personRDD = sc.textFile("/opt/module/datas/people1.txt")
2. 查看RDD
scala> personRDD.collect
res0: Array[String] = Array(zhangsan,1, lisi,10, wangwu,20, zhaoliu,30)
3. RDD=>DataFrame
scala> personRDD.map{x => val per = x.split(",") ; (per(0),per(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
4. 查看DF
scala> res1.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 1|
| lisi| 10|
| wangwu| 20|
| zhaoliu| 30|
+--------+---+
(2)DataFrame转RDD
1. 通过spark数据源创建DataFrame
scala> val df = spark.read.json("/opt/module/datas/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2. DF=>RDD
scala> val rdd1 = df.rdd
rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at rdd at <console>:28
3. 查看RDD
scala> rdd1.collect
res4: Array[org.apache.spark.sql.Row] = Array([10,Michael], [30,Andy], [20,Justin])
2. RDD与DataSet互转
(1)RDD转DataSet
scala> val personRDD = sc.textFile("/opt/module/datas/people1.txt")
1. 编写样例类Person
scala> case class Person(name:String,age:Long)
2. RDD=>DS
scala> personRDD.map{x => val per = x.split(",") ; Person(per(0),per(1).trim.toInt)}.toDS()
res9: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
3. 查看DS
scala> res9.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 1|
| lisi| 10|
| wangwu| 20|
| zhaoliu| 30|
+--------+---+
(2)DataSet转RDD
1. 编写样例类Person
scala> case class Person(name:String,age:Long)
2. 创建DataSet
scala> val ds = Seq(Person("Andy",32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
3. DataSet=>RDD
scala> val rdd2 = ds.rdd
rdd2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[27] at rdd at <console>:33
4. 查询RDD
scala> rdd2.collect
res14: Array[Person] = Array(Person(Andy,32))
ps:对比一下RDD转DF,RDD转DS的区别
1. RDD转DF:
personRDD.map{x => val per = x.split(",") ; (per(0),per(1).trim.toInt)}.toDF("name","age")
2. RDD转DS:
case class Person(name:String,age:Long)
personRDD.map{x => val per = x.split(",") ; Person(per(0),per(1).trim.toInt)}.toDS()
3. DataFrame和DataSet的互转
(1)DF=>DS(样例类、df.as[样例类])
1. 通过spark数据源创建DataFrame
scala> val df = spark.read.json("/opt/module/datas/people.json")
2. 创建样例类Person
scala> case class Person(name:String,age:Long)
3. DF=>DS
scala> val ds = df.as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
4. 查看DS
scala> ds.show
+---+-------+
|age| name|
+---+-------+
| 10|Michael|
| 30| Andy|
| 20| Justin|
+---+-------+
(2)DS=>DF(ds.toDF)
scala> val df2 = ds.toDF
df2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df2.show
+---+-------+
|age| name|
+---+-------+
| 10|Michael|
| 30| Andy|
| 20| Justin|
+---+-------+
三、RDD、DataFrame、DataSet互转示意图
1. RDD是最基础的数据类型,在向上转换时,需要添加必要的信息;
转DataFrame:需要添加结构信息并加上列名 toDF("id","name")
转DataSet:需要添加类型信息,写样例类 map(x=>{Emp(x)}).toDS()
2. DataFrame在向上转换时,本身包含结构信息,只添加类型信息即可;
转DataSet:先写样例类,as[Emp]
转RDD:df.rdd
3. DataSet作为最上层的抽象,转换其他对象直接可以往下转;
转DataFrame:ds.toDF
转RDD:ds.rdd
四、RDD、DataFrame、DataSet异同点
1. 相同点
(1)都是spark平台下的弹性分布式数据集,为处理超大型数据提供便利
(2)都是懒执行,在创建、转换如map时,不会立即执行,只有在遇到action算子如foreach时,才会开始运算
(3)都有partition的概念
(4)对DataFrame、DataSet和RDD之间转换需要import spark.implicits._ 这个包的支持
(5)DF和DS都可以通过模式匹配获取各个字段的值和类型
2. 区别
(1)RDD不支持sparksql操作,DF和DS支持sparksql
(2)DF和DS可以注册临时表/视窗,支持sql查询
DF.createOrReplaceTempView("Tmp")
spark.sql("select row,date from Tmp where date is not null order by date").show
(3)DataFrame行的类型都为Row,适配性很强
(4)DataSet很方便于访问列字段的值和类型,但是需要先写明样例类,给DS声明类型