[Spark SQL03]Spark SQL愿景
写更少的代码
加载更少的数据
将优化交给底层
1、写更少的代码
A.从wordcount角度看:
MapReduce(代码量最多)--->hive(代码量少)---->Spark core(代码量更少,但可读性差)----->Spark SQL(代码量少,可读性好,性能更好)
B.从外部数据源角度看:
为文件输入输出提供了访问的接口
C.从schema推导的角度来看:
可以自动推导数据类型,对于数据类型不对的数据,很方便转换,即数据兼容性更好
例如:如果读一些数据 ,字段age有的数据有,有的数据没有,没有的字段会赋值为NULLL
如果age字段有的数据为Int型,有的为double型,则Spark SQL会自动帮你转成double型
分区推导:
假设有一个表所在的目录为:/user/hive/warehouse/gender=male/country=us。数据类型为parquet
如果加载数据时使用:val df=spark.read.format("parquet").load("/user/hive/warehouse")
此时执行df.show ,查看到的表的信息中有gender字段也有country字段
如果加载数据时使用:val df=spark.read.format("parquet").load("/user/hive/warehouse/gender=male")
此时执行df.show ,查看到的表的信息中只有country字段
###分区推导仅支持numeric、String、date、timestamp类型,如果不想对列的类型进行自动推导,可以将参数spark.sql.sources.partitionColumnTypeInference.enabled设置为false,该参数默认为true.
schema合并:
schema合并是相对昂贵的操作,spark1.5之后该属性默认为关闭。开启的两种方式如下:
A.在读取parquet的时候,设置数据源选项mergeSchema为true
B.设置全局SQL选项spark.sql.parquet.mergeSchema为true
例子:
// This is used to implicitly convert an RDD to a DataFrame. import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true)
savemode:
将数据以DataFrame形式写到外部数据源的时候,使用的命令:spark.read.format("目标格式").save("目标路径"),在format和save之间可以加一个mode("参数"),该参数有四种,以下详细讲解:
default 如果目标路径存在数据,则报错
append 如果目标路径存在数据,则以新文件的形式追加到原数据之后
overwrite 如果目标路径存在数据,则新数据覆盖原数据
ignore 如果目标路径存在数据,则新数据跳过原数据
目标参数也可以写全称,比如mode("append")替换成mode(SaveMode.Append),但此时应该导包,import org.apache.spark.sql.savemode
2、读更少的数据
分区、压缩、pushdown、谓词下压、过滤
3、将优化交给底层
有一个join操作:users.join(events,users.id===events.id).filter(evevts.data>="2016-01-01") ##要三个等号哦
该操作的执行计划优化过程如下: