[Spark SQL01]Spark SQL入门
1、SQL结合spark有两条线:
Spark SQL和Hive on Spark(还在开发状态,不稳定,暂时不建议使用)。
#Hive on Spark是在Hive中的,使用Spark作为hive的执行引擎,只需要在hive中修改一个参数即可:
# set hive.execution.engine=spark
2、Spark SQL
a.概述:
Spark SQL是Spark处理数据的一个模块,跟基本的Spark RDD的API不同,Spark SQL中提供的接口将会提供给Spark更多关于结构化数据和计算的信息。其本质是,Spark SQL使用这些额外的信息去执行额外的优化,这儿有几种和Spark SQL进行交互的方法,包括SQL和Dataset API,当使用相同的执行引擎时,API或其它语言对于计算的表达都是相互独立的,这种统一意味着开发人员可以轻松地在不同的API之间进行切换。
b.SQL:
Spark SQL的一大用处就是执行SQL查询语句,Spark SQL也可以用来从Hive中读取数据,当我们使用其它编程语言来运行一个SQL语句,结果返回的是一个Dataset或者DataFrame.你可以使用命令行,JDBC或者ODBC的方式来与SQL进行交互。
c.Dataset和DataFrame
Dataset是一个分布式数据集合。Dataset是一个在Spark 1.6版本之后才引入的新接口,它既拥有了RDD的优点(强类型、能够使用强大的lambda函数),又拥有Spark SQL的优点(用来一个经过优化的执行引擎)。你可以将一个JVM对象构造成一个Dataset,之后就可以使用一些transformations操作啦。我们可以使用scala,java来访问Dataset API,不支持python哦,当然,由于python的动态特性,很多的Dataset API是可以使用的,R语言也是一样哦。
DataFrame是Dataset中一个有名字的列。从概念上,它等价于关系型数据库中的一张表,或者等价于R/Python中的Data Frame,但它在底层做了更好的优化。构造DataFrame的数据源很多:结构化的数据文件、hive表、外部数据库、已经存在的RDD。DataFrame 的API支持java,scal.python,R。
3、面试题
RDD VS DataFrame
esgd
a.基于RDD的编程,不同语言性能是不一样的,而DataFrame是一样的,因为底层会有一个优化器先将代码进行优化。
b.对于RDD,暴露给执行引擎的信息只有数据的类型,如RDD[Student]装的是Student,而对于DataFrame,对于外部可见的信息有字段类型,字段key,字段value等。
c.RDD是一个数组,DataFrame是一个列式表。
4、Spark SQL愿景
a.写更少的代码
b.读更少的数据(压缩,存储格式,列裁剪)
c.对于不同语言的应用程序让优化器自动进行优化
5、Spark SQL架构
客户端->未解析的逻辑执行计划(Schema Catalog 将schema作用在数据上)->逻辑执行计划->优化过后的逻辑执行计划->物理执行计划->Spark引擎。
#Spark SQL 要使用hive中的表,需要将hive-site.xml加入spark的配置文件目录。
6、执行计划(Hive 或Spark SQL)
explain extended +查询语句
7、SparkSession
添加依赖:
<dependency>
<groupId>org.spark.apache</groupId>
<artifactId>spark-sql_2.11</artifactId> ##2.11位scala版本
<version>${spark.version}</version>
</dependency>
Spark中所有功能的入口点是SparkSession类,我们可以使用SparkSession.builder()来创建一个SparkSession,具体如下(scala):
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。
在Spark 2.0之后,SparkSession内置了对于hive特性的支持,允许使用HiveQL来书写查询语句访问UDF,以及从Hive表中读取数据。使用这些特性,你不需要进行任何Hive的设置。
8、创建DataFrame
通过SparkSession,应用程序可以从一个现有的RDD、Hive表、Spark数据源来创建一个DataFrame。
以下创建DataFrame是基于JSON格式的文件:
val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。
9、无类型的Dataset操作(又称DataFrame 操作)
上面提到的,在Spark 2.0时,在java或者scala API中,DataFrame是Dataset的行,这些操作也被称为“非类型转换”,与“类型化转换”相比,具有强类型的Scala/Java Dataset。
这儿包括一些使用Dataset处理结构化数据的例子:
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() //groupBy返回一个Dataset,count返回一个DataFrame. // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。
对于可以在数据集上执行的操作类型的完整列表,请参阅API Documentation。
除了简单的列引用和表达式之外,数据集还拥有丰富的函数库,包括字符串操作、日期算术、常见的数学运算等等。完整列表查看 DataFrame Function Reference.
10、以编程方式运行SQL查询语句
SparkSession中的SQL函数可以让应用程序以编程的方式运行SQL查询语句,让结果返回一个DataFrame。
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。
11、全局临时视图
Spark SQL中的临时视图作用域仅仅在于创建该视图的会话窗口,如果窗口关闭,该视图也终止。如果你想要一个在所有会话中都生效的临时视图,并且即使应用程序终止该视图仍然存活,你可以创建一个全局临时视图。 全局临时视图与系统保存数据库global_temp相关联,我们必须使用规范的名字来定义它,比如:SELECT * FROM global_temp.view1
.
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
12、创建Dataset
Dataset有点像RDD,但它并不是使用java或Kryo这样的序列化方式,而是使用专用的编码器将对象进行序列化,以便于在网络上进行处理和传输。虽然编码器和标准的序列化都可以将对象转成字节,但编码器产生动态的代码,它使用的格式允许Spark在不执行反序列化的情况下去执行像过滤、排序、哈希等许许多多的操作。
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+