第9章 慕课网日志实战
9-1 -课程目录
9-2 -用户行为日志概述
为什么要记录用户访问行为日志?
网站页面的访问量
网站的粘性
推荐
用户行为日志
Nginx ajax
用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击...)
用户行为轨迹、流量日志
日志数据内容
1)访问的系统属性:操作系统,浏览器等等
2)访问特征:点击的url,从哪个URL跳转过来的(referer),页面上的停留时间等
3) 访问信息:session_id,访问ip(访问城市)等
用户行为日志分析的意义
网站的眼睛 网站的神经 网站的大脑
9-3 -离线数据处理架构
数据处理流程
1)数据采集
flume: web日志写入到HDFS
2)数据清洗
脏数据
spark、hive、MapReduce 或者是其他的一些分布式计算框架
清洗完之后的数据可以存放到HDFS(Hive/spark sql)
3)数据处理
按照我们的需要进行相应的统计和分析
spark、hive、MapReduce 或者是其他的一些分布式计算框架
4)处理结果入库
结果可以存放在RDBMS、Nosql
5)数据的可视化
通过图形化展示出来:饼图、柱状图、地图、折线图
ECharts、HUE、Zepplin
9-4 -项目需求
9-5 imooc网主站日志内容构成
9-6 数据清洗之第一步原始日志解析
项目地址:
package com.imooc.log
import org.apache.spark.sql.SparkSession
/**
* 第一步清洗:抽取出我们所需要的指定列的数据
*/
object SparkStatFormatJob {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkStatFormatJob")
.master("local[2]").getOrCreate()
val acccess = spark.sparkContext.textFile("/Users/rocky/data/imooc/10000_access.log")
//acccess.take(10).foreach(println)
acccess.map(line => {
val splits = line.split(" ")
val ip = splits(0)
/**
* 原始日志的第三个和第四个字段拼接起来就是完整的访问时间:
* [10/Nov/2016:00:01:02 +0800] ==> yyyy-MM-dd HH:mm:ss
*/
val time = splits(3) + " " + splits(4)
val url = splits(11).replaceAll("\"","")
val traffic = splits(9)
// (ip, DateUtils.parse(time), url, traffic)
DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
}).saveAsTextFile("file:///Users/rocky/data/imooc/output/")
spark.stop()
}
}
9-7 -数据清洗之二次清洗概述
一般的日志处理方式,我们是需要进行分区的,
按照日志的访问时间进行相应的分区,比如:d,h,m5(每五分钟一个分区)
9-8 -数据清洗之日志解析
清洗工作
package com.imooc.log
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* 使用Spark完成我们的数据清洗操作
*/
object SparkStatCleanJob {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkStatCleanJob")
.config("spark.sql.parquet.compression.codec","gzip")
.master("local[2]").getOrCreate()
val accessRDD = spark.sparkContext.textFile("/Users/rocky/data/imooc/access.log")
//accessRDD.take(10).foreach(println)
//RDD ==> DF
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
AccessConvertUtil.struct)
// accessDF.printSchema()
// accessDF.show(false)
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
.partitionBy("day").save("/Users/rocky/data/imooc/clean2")
spark.stop
}
}
9-9 -数据清洗之ip地址解析
https://github.com/wzhe06/ipdatabase
package com.imooc.log
import com.ggstar.util.ip.IpHelper
/**
* IP解析工具类
*/
object IpUtils {
def getCity(ip:String) = {
IpHelper.findRegionByIp(ip)
}
def main(args: Array[String]) {
println(getCity("218.75.35.226"))
}
}
9-10 -数据清洗存储到目标地址
package com.imooc.log
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* 使用Spark完成我们的数据清洗操作
*/
object SparkStatCleanJob {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkStatCleanJob")
.config("spark.sql.parquet.compression.codec","gzip")
.master("local[2]").getOrCreate()
val accessRDD = spark.sparkContext.textFile("/Users/rocky/data/imooc/access.log")
//accessRDD.take(10).foreach(println)
//RDD ==> DF
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
AccessConvertUtil.struct)
// accessDF.printSchema()
// accessDF.show(false)
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
.partitionBy("day").save("/Users/rocky/data/imooc/clean2")
spark.stop
}
}
9-11 -需求一统计功能实现
代码地址:
源码
package com.imooc.log
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* TopN统计Spark作业
*/
object TopNStatJob {
def videoAccessTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
/**
* 使用DataFrame的方式进行统计
*/
import spark.implicits._
val videoAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
videoAccessTopNDF.show(false)
/**
* 使用SQL的方式进行统计
*/
// accessDF.createOrReplaceTempView("access_logs")
// val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs " +
// "where day='20170511' and cmsType='video' " +
// "group by day,cmsId order by times desc")
//
// videoAccessTopNDF.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
/**
* 不建议大家在此处进行数据库的数据插入
*/
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDAO.insertDayVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
}
9-12 -Scala操作MySQL工具类开发
源码地址:
package com.imooc.log
import java.sql.{Connection, PreparedStatement, DriverManager}
/**
* MySQL操作工具类
*/
object MySQLUtils {
/**
* 获取数据库连接
*/
def getConnection() = {
DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=root")
}
/**
* 释放数据库连接等资源
* @param connection
* @param pstmt
*/
def release(connection: Connection, pstmt: PreparedStatement): Unit = {
try {
if (pstmt != null) {
pstmt.close()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
}
}
def main(args: Array[String]) {
println(getConnection())
}
}
9-13 -需求一统计结果写入到MySQL
第一步:创建表
第二步:创建model
package com.imooc.log
/**
* 每天课程访问次数实体类
*/
case class DayVideoAccessStat(day: String, cmsId: Long, times: Long)
第三步:开发Dao层
批量插入数据库数据,提交使用batch操作
def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) //设置手动提交
val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values (?,?,?) "
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.times)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
第四步:写入数据
/**
* 将统计结果写入到MySQL中
*/
try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
/**
* 不建议大家在此处进行数据库的数据插入
*/
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDAO.insertDayVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
9-14 -需求二统计功能实现
//按照地市进行统计TopN课程
cityAccessTopNStat(spark, accessDF, day)
/**
* 按照地市进行统计TopN课程
*/
def cityAccessTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","city","cmsId")
.agg(count("cmsId").as("times"))
//cityAccessTopNDF.show(false)
//Window函数在Spark SQL的使用
val top3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)
).as("times_rank")
).filter("times_rank <=3") //.show(false) //Top3
}
9-15 -需求二统计结果写入到MySQL
第一步:创建表
第二步:创建model
package com.imooc.log
case class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)
第三步:开发Dao层
/**
* 批量保存DayCityVideoAccessStat到数据库
*/
def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) //设置手动提交
val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
第四步:写入数据
try {
top3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
9-16 -需求三统计功能实现
//按照流量进行统计
videoTrafficsTopNStat(spark, accessDF, day)
/**
* 按照流量进行统计
*/
def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)
//.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
9-17 -需求三统计结果写入到MySQL
第一步:创建表
第二步:创建model
package com.imooc.log
case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)
第三步:开发Dao层
/**
* 批量保存DayVideoTrafficsStat到数据库
*/
def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) //设置手动提交
val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) "
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.traffics)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
第四步:写入数据
/**
* 将统计结果写入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
9-18 -代码重构之删除指定日期已有的数据
StatDAO.deleteData(day)
/**
* 删除指定日期的数据
*/
def deleteData(day: String): Unit = {
val tables = Array("day_video_access_topn_stat",
"day_video_city_access_topn_stat",
"day_video_traffics_topn_stat")
var connection:Connection = null
var pstmt:PreparedStatement = null
try{
connection = MySQLUtils.getConnection()
for(table <- tables) {
// delete from table ....
val deleteSQL = s"delete from $table where day = ?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1, day)
pstmt.executeUpdate()
}
}catch {
case e:Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
object TopNStatJob {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]").getOrCreate()
val accessDF = spark.read.format("parquet").load("/Users/rocky/data/imooc/clean")
// accessDF.printSchema()
// accessDF.show(false)
val day = "20170511"
StatDAO.deleteData(day)
//最受欢迎的TopN课程
videoAccessTopNStat(spark, accessDF, day)
//按照地市进行统计TopN课程
cityAccessTopNStat(spark, accessDF, day)
//按照流量进行统计
videoTrafficsTopNStat(spark, accessDF, day)
spark.stop()
}
9-19 -功能实现之数据可视化展示概述
数据可视化:一副图片最伟大的价值莫过于它能使我们实际看到的比我们期望看到的内容更加丰富
常见的可视化框架
1)echarts
2)highcharts
3)D3.JS
4)HUE
5)zeppelin
9-20 -ECharts饼图静态数据展示
源码地址:
9-21 -ECharts饼图动态展示之一查询MySQL中的数据
源码地址:
9-22 -ECharts饼图动态展示之二前端开发
源码地址:
9-23 -使用Zeppelin进行统计结果的展示
9-24 -Spark on YARN基础
在spark中,支持4种运行模式
1)local :开发时使用
2)Standalone是spark自带的,如果集群是Standalone的话,那么就需要在多台机器上同时部署spark环境
3)YARN:建议大家在生产使用该模式,统一使用yarn进行集群作业(MR、spark)资源调度。
4)Mesos
不管使用什么模式,代码都是一样
9-25 -数据清洗作业运行到YARN上
9-26 -统计作业运行在YARN上
代码地址:
代码:
package com.imooc.log
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* 使用Spark完成我们的数据清洗操作:运行在YARN之上
*/
object SparkStatCleanJobYARN {
def main(args: Array[String]) {
if(args.length !=2) {
println("Usage: SparkStatCleanJobYARN <inputPath> <outputPath>")
System.exit(1)
}
val Array(inputPath, outputPath) = args
val spark = SparkSession.builder().getOrCreate()
val accessRDD = spark.sparkContext.textFile(inputPath)
//RDD ==> DF
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
AccessConvertUtil.struct)
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
.partitionBy("day").save(outputPath)
spark.stop
}
}
9-27 -性能优化之存储格式的选择
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite) .partitionBy("day").save(outputPath)
9-28 -性能调优之压缩格式的选择
9-29 -性能优化之代码优化
源码:
package com.imooc.log
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* TopN统计Spark作业:复用已有的数据
*/
object TopNStatJob2 {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]").getOrCreate()
val accessDF = spark.read.format("parquet").load("/Users/rocky/data/imooc/clean")
// accessDF.printSchema()
// accessDF.show(false)
val day = "20170511"
import spark.implicits._
val commonDF = accessDF.filter($"day" === day && $"cmsType" === "video")
commonDF.cache()
StatDAO.deleteData(day)
//最受欢迎的TopN课程
videoAccessTopNStat(spark, commonDF)
//按照地市进行统计TopN课程
cityAccessTopNStat(spark, commonDF)
//按照流量进行统计
videoTrafficsTopNStat(spark, commonDF)
commonDF.unpersist(true)
spark.stop()
}
/**
* 按照流量进行统计
*/
def videoTrafficsTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
import spark.implicits._
val cityAccessTopNDF = commonDF.groupBy("day","cmsId")
.agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)
//.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
/**
* 按照地市进行统计TopN课程
*/
def cityAccessTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
val cityAccessTopNDF = commonDF
.groupBy("day","city","cmsId")
.agg(count("cmsId").as("times"))
//cityAccessTopNDF.show(false)
//Window函数在Spark SQL的使用
val top3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)
).as("times_rank")
).filter("times_rank <=3") //.show(false) //Top3
/**
* 将统计结果写入到MySQL中
*/
try {
top3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
/**
* 最受欢迎的TopN课程
*/
def videoAccessTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
/**
* 使用DataFrame的方式进行统计
*/
import spark.implicits._
val videoAccessTopNDF = commonDF
.groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
videoAccessTopNDF.show(false)
/**
* 使用SQL的方式进行统计
*/
// accessDF.createOrReplaceTempView("access_logs")
// val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs " +
// "where day='20170511' and cmsType='video' " +
// "group by day,cmsId order by times desc")
//
// videoAccessTopNDF.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
/**
* 不建议大家在此处进行数据库的数据插入
*/
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDAO.insertDayVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
}
9-30 -性能调优之参数优化
val spark = SparkSession.builder().appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")