转自:http://blog.****.net/zfszhangyuan/article/details/52593521
今天简单讲解一下应用spark1.5.2相关读取MySQL数据到DataFrame的接口以及将DF数据存放到mysql中接口实现实例
同样我们的编程开发环境是不需要安装Spark的,但是需要一台安装了mysql的服务器,我这里直接在本机安装了一个mysql,还有就是Scala的编程环境。
注意本次使用的spark版本是1.5.2,相关引用的包请参考下图:
先看代码吧
-
package JDBC_MySql
-
-
import java.util.Properties
-
-
import org.apache.spark.sql.SaveMode
-
import org.apache.spark.{SparkConf, SparkContext}
-
-
/**
-
* Created by zhoubh on 2016/7/20.
-
*/
-
object mysqlDB {
-
-
case class zbh_test(day_id:String, prvnce_id:String,pv_cnts:Int)
-
-
def main(args: Array[String]) {
-
-
-
val conf = new SparkConf().setAppName("mysql").setMaster("local[4]")
-
val sc = new SparkContext(conf)
-
//sc.addJar("D:\\workspace\\sparkApp\\lib\\mysql-connector-java-5.0.8-bin.jar")
-
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-
-
-
//定义mysql信息
-
val jdbcDF = sqlContext.read.format("jdbc").options(
-
Map("url"->"jdbc:mysql://localhost:3306/db_ldjs",
-
"dbtable"->"(select imei,region,city,company,name from tb_user_imei) as some_alias",
-
"driver"->"com.mysql.jdbc.Driver",
-
"user"-> "root",
-
//"partitionColumn"->"day_id",
-
"lowerBound"->"0",
-
"upperBound"-> "1000",
-
//"numPartitions"->"2",
-
"fetchSize"->"100",
-
"password"->"123456")).load()
-
-
-
jdbcDF.collect().take(20).foreach(println) //终端打印DF中的数据。
-
//jdbcDF.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/abi_sum")
-
val url="jdbc:mysql://localhost:3306/db_ldjs"
-
val prop=new Properties()
-
prop.setProperty("user","root")
-
prop.setProperty("password","123456")
-
jdbcDF.write.mode(SaveMode.Overwrite).jdbc(url,"zfs_test",prop) //写入数据库db_ldjs的表 zfs_test 中
-
//jdbcDF.write.mode(SaveMode.Append).jdbc(url,"zbh_test",prop) //你会发现SaveMode改成Append依然无济于事,表依然会被重建,为了解决这个问题,后期会另开博客讲解
-
-
//org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(jdbcDF,url,"zbh_test",prop)
-
//// #然后进行groupby 操作,获取数据集合
-
// val abi_sum_area = abi_sum.groupBy("date_time", "area_name")
-
//
-
//// #计算数目,并根据数目进行降序排序
-
// val sorted = abi_sum_area.count().orderBy("count")
-
//
-
//// #显示前10条
-
// sorted.show(10)
-
//
-
//// #存储到文件(这里会有很多分片文件。。。)
-
// sorted.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/sparktest/flight_top")
-
//
-
//
-
//// #存储到mysql表里
-
// //sorted.write.jdbc(url,"table_name",prop)
-
-
}
-
}
下面来看看运行结果啥样:

数据库结果如下:

通过这段代码可以实现从mysql关系型数据库中直接读取数据转化成DataFrame参与到sparksql的分析当中这个意义是非常重大的,因为我们日常应用sparksql进行数据分析时经常会用到一些配置表,而这些配置定义表都是存在关系型数据库中,所以以后不用担心了。
另外这里还实现了DataFrame结果回写到mysql数据库中,虽然官方的spark源码的写入有些奇葩,设定的写死模式overwriter,也就是说你确定写入的表,他会重新创建,然后导入数据,这个用起来很不爽,后面博客将讲解如何改写源码,我要怎么写入就怎么写入。(这个意义也很重大,以后分析的结果就可以直接放mysql中,直接对外提供报表,哇 赞
最后感觉华哥的代码和讲解(一个个默默耕耘大数据多年的人)