SparkSQL实战8——综合实战完成日志分析4

需求、按流量统计主站最受欢迎的TopN课程并保存到MySQL

创建一张表:

create table day_video_traffics_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
);

创建一个实体类DayVideoTrafficsStat:

case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)

在StatDAO中添加方法:

/**
      * 批量保存DayCityVideoAccessStat到数据库
      */
    def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

        var connection: Connection = null
        var pstmt: PreparedStatement = null

        try {
            connection = MySQLUtils.getConnection()

            connection.setAutoCommit(false) //设置手动提交

            val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
            pstmt = connection.prepareStatement(sql)

            for (ele <- list) {
                pstmt.setString(1, ele.day)
                pstmt.setLong(2, ele.cmsId)
                pstmt.setString(3, ele.city)
                pstmt.setLong(4, ele.times)
                pstmt.setInt(5, ele.timesRank)
                pstmt.addBatch()
            }
            pstmt.executeBatch() // 执行批量处理
            connection.commit() //手工提交
        } catch {
            case e: Exception => e.printStackTrace()
        } finally {
            MySQLUtils.release(connection, pstmt)
        }
    }

在Spark主应用程序中添加一个方法,并且在主函数中调用这个方法


    //按流量统计主站最受欢迎的TopN课程
    def videoTrafficsTopNStat(spark:SparkSession,accessDF:DataFrame):Unit = {
        //使用DataFrame方式进行统计
        import spark.implicits._
        val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
          .groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
          .orderBy($"traffics".desc)
          //.show(false)

        /**
          * 将统计结果写入到MySQL中
          */
        try {
            cityAccessTopNDF.foreachPartition(partitionOfRecords => {
                val list = new ListBuffer[DayVideoTrafficsStat]

                partitionOfRecords.foreach(info => {
                    val day = info.getAs[String]("day")
                    val cmsId = info.getAs[Long]("cmsId")
                    val traffics = info.getAs[Long]("traffics")
                    list.append(DayVideoTrafficsStat(day, cmsId,traffics))
                })

                StatDAO.insertDayVideoTrafficsAccessTopN(list)
            })
        } catch {
            case e:Exception => e.printStackTrace()
        }
    }

运行程序,观察数据库插入结果:

SparkSQL实战8——综合实战完成日志分析4

代码重构:删除表中指定日期的,已有的数据

在DAO层新增一个方法deleteData,用来删除指定日期的数据


    //删除表中指定日期的,已有的数据
    def deleteData(day:String):Unit = {
        
        val tables = Array("day_video_access_topn_stat",
        "day_video_city_access_topn_stat",
        "day_video_traffics_topn_stat")
        
        var connection:Connection = null
        var pstmt:PreparedStatement = null
        
        try{
            connection = MySQLUtils.getConnection()
            for (table <- tables){
                val deleteSQL = s"delete from $table where day = ?"
                pstmt = connection.prepareStatement(deleteSQL)
                pstmt.setString(1,day)
                pstmt.executeUpdate()
            }
        }catch {
            case e : Exception => e.printStackTrace()
        }finally {
            MySQLUtils.release(connection,pstmt)
        }
    }