Spark SQL演示与分布式SQL查询引擎
(1)通过sparksql访问mysql中的数据
bin/spark-submit \
--class bigdata.spark.sql.test.DFTest \
--master local \
./FirstSparkPro.jar
(2)SparkSQL与Hive结合(类似的有Hive中处理SQL,直接用MapReduce计算处理)
使用SparkSQL访问Hive
将hive安装包中conf/hive-site.xml配置文件拷贝到spark安装包的conf目录下
将mysql驱动jar包拷贝到spark安装包的jars目录下
启动:spark安装包下bin/spark-sql
分布式SQL查询引擎(工作中最常用的)
(3)配置HiveServer2 Thrift服务的访问地址和端口号
方法1:在hive-site.xml文件中添加hiveserver2的配置信息
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>192.168.137.100</value>
</property>
方法2:在环境变量中配置
export HIVE_SERVER2_THRIFT_PORT=10010
export HIVE_SERVER2_THRIFT_BIND_HOST=192.168.137.100
方法3:在启动Spark Thrift Server的时候以参数的形式指定
--hiveconf hive.server2.thrift.port=10010
--hiveconf hive.server2.thrift.bind.host=192.168.137.100
启动Spark Thrift Server
#Local模式启动:
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10010 \
--hiveconf hive.server2.thrift.bind.host=192.168.137.100
#yarn-client模式启动:
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10010 \
--hiveconf hive.server2.thrift.bind.host=192.168.137.100 \
--master yarn \
--deploy-mode client \
--executor-memory 3g \
--executor-cores 1 \
--num-executors 2 \
--driver-cores 1 \
--driver-memory 1g
查看是否报错以及是否连接 用命令:tail -100 路径和lsof -i:10010
连接分布式SQL查询引擎
#(1)使用beeline jdbc连接
./bin/beeline
beeline>!connect jdbc:hive2://192.168.137.100:10010
#(2)通过Java Api使用JDBC访问(代码在最下面)
通过JDBC访问spark thrift server
bin/spark-submit \
--class bigdata.spark.sql.SparkSqlJdbc \
--master local \
./FirstSparkPro.jar
代码:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.chinahadoop</groupId>
<artifactId>FirstSparkPro</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.11</scala.version>
<spark.version>2.2.1</spark.version>
<hadoop.version>2.7.4</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.45</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<classifier>dist</classifier>
<appendAssemblyId>true</appendAssemblyId>
<descriptorRefs>
<descriptor>jar-with-dependencies</descriptor>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<executions>
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
DFTest.scala:
package bigdata.spark.sql.test
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* Created by pc on 2018/8/6.
*/
object DFTest {
case class UserCoreDF(userId : String,age : Int,gender : String,core : Int)
def main(args: Array[String]) {
val conf = new SparkConf()
//创建sparksession
val spark = SparkSession.builder()
.master("local[2]")
.appName("dftest")
.config(conf)
.getOrCreate()
val sc = spark.sparkContext
//RDD -> DataFrame 通过反射
val userCoresRdd = sc.textFile("hdfs://192.168.137.100:9000/data/sparksql/user")
val userCoreRdd = userCoresRdd.map(_.split("\t")).map(cols => UserCoreDF(cols(0),cols(1).toInt,cols(2),cols(3).toInt))
//引入隐式转换,将RDD隐式转换为DataFrame
import spark.implicits._
val userCoreDF = userCoreRdd.toDF()
// userCoreDF.take(2).foreach(println(_))
// println("userCoreDF count --------->"+userCoreDF.count())
//RDD -> DataFrame 显式定义schema
//定义schema
val userCoreSchema = StructType(
List(
StructField("userId",StringType,true),
StructField("age",IntegerType,true),
StructField("gender",StringType,true),
StructField("core",IntegerType,true)
)
)
val userCoreRdd2 = userCoresRdd.map(_.split("\t")).map(cols => Row(cols(0),cols(1).toInt,cols(2),cols(3).toInt))
val userCoreDF2 = spark.createDataFrame(userCoreRdd2,userCoreSchema)
// println("userCoreDF2 count --------->"+userCoreDF2.count())
/*
//dataframe -> hdfs json文件
userCoreDF2.write.mode(SaveMode.Overwrite).json("hdfs://192.168.137.100:9000/tmp/user_json" )
//dataframe -> hdfs parquet文件
userCoreDF2.write.mode(SaveMode.Overwrite).parquet("hdfs://192.168.137.100:9000/tmp/user_parquet" )
*/
//通过内置API读取指定格式文件,创建DataFrame
// val jsonDF1 = spark.read.format("json").load("hdfs://192.168.137.100:9000/tmp/user_json")
// println("jsonDF1 count------------>"+jsonDF1.count())
val jsonDF2 = spark.read.json("hdfs://192.168.137.100:9000/tmp/user_json")
// println("jsonDF2 count------------>"+jsonDF2.count())
// println("jsonDF2 count------------>"+jsonDF2.schema)
//通过JDBC读取mysql 创建DataFrame
val connconf = new Properties()
connconf.put("user","root")
connconf.put("password","haohu123")
val jdbcdf = spark.read.jdbc("jdbc:mysql://192.168.137.100:3306","hive.TBLS",connconf)
jdbcdf.show()
//通过DSF
// jsonDF2.filter("age > 20").select("gender","core").groupBy("gender").sum("core").show()
// jsonDF2.createOrReplaceTempView("user_core")
//
// spark.sql("select gender,sum(core) from user_core where age>20 group by gender").show()
}
}
SparkSqlJdbc.scala:
package bigdata.spark.sql.test
import java.sql.DriverManager
/**
* Created by pc on 2018/8/6.
*/
object SparkSqlJdbc {
def main(args: Array[String]) {
val driver="org.apache.hive.jdbc.HiveDriver"
Class.forName(driver)
val (url,username,userpasswd)=("jdbc:hive2://192.168.137.100:10010/rel","hadoop","haohu123")
val conn = DriverManager.getConnection(url,username,userpasswd)
val sql ="select * from rel.user_core_info"
val stat= conn.prepareStatement(sql)
val rs = stat.executeQuery()
while(rs.next()){
println("userId->"+rs.getString(1))
println("name->"+rs.getString(2))
}
rs.close()
stat.close()
conn.close()
}
}