SparkSQL实战8——综合实战完成日志分析4
需求、按流量统计主站最受欢迎的TopN课程并保存到MySQL
创建一张表:
create table day_video_traffics_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
);
创建一个实体类DayVideoTrafficsStat:
case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)
在StatDAO中添加方法:
/**
* 批量保存DayCityVideoAccessStat到数据库
*/
def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) //设置手动提交
val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
在Spark主应用程序中添加一个方法,并且在主函数中调用这个方法:
//按流量统计主站最受欢迎的TopN课程
def videoTrafficsTopNStat(spark:SparkSession,accessDF:DataFrame):Unit = {
//使用DataFrame方式进行统计
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
.groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)
//.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
运行程序,观察数据库插入结果:
代码重构:删除表中指定日期的,已有的数据
在DAO层新增一个方法deleteData,用来删除指定日期的数据
//删除表中指定日期的,已有的数据
def deleteData(day:String):Unit = {
val tables = Array("day_video_access_topn_stat",
"day_video_city_access_topn_stat",
"day_video_traffics_topn_stat")
var connection:Connection = null
var pstmt:PreparedStatement = null
try{
connection = MySQLUtils.getConnection()
for (table <- tables){
val deleteSQL = s"delete from $table where day = ?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1,day)
pstmt.executeUpdate()
}
}catch {
case e : Exception => e.printStackTrace()
}finally {
MySQLUtils.release(connection,pstmt)
}
}