大数据实时流处理日志Spark项目实战(非常详细)

一、实战内容

  • 编写python脚本,源源不断产生学习网站的用户行为日志。
  • 启动 Flume 收集产生的日志。
  • 启动 Kfka 接收 Flume 接收的日志。
  • 使用 Spark Streaming 消费 Kafka 的用户日志。
  • Spark Streaming将数据清洗过滤非法数据,然后分析日志中用户的访问课程,统计课程数,得到最受欢迎课程TOP5,第二个业务是统计各个搜索引擎的搜索量。(通过分析日志还可以实现很多其他的业务,基于篇幅,本项目只实现其中两个)。
  • 将 Spark Streaming 处理的结果写入 HBase 中。
  • 前端使用 Spring MVC、 Spring、 MyBatis 整合作为数据展示平台。
  • 使用Ajax异步传输数据到jsp页面,并使用 Echarts 框架展示数据。
  • 本实战使用IDEA2018作为开发工具,JDK版本为1.8,Scala版本为2.11。

二、大数据实时流处理分析系统简介

  • 1.需求

    如今大数据技术已经遍布生产的各个角落,其中又主要分为离线处理实时流处理。本实战项目则是使用了实时流处理,而大数据的实时流式处理的特点:
    1.数据会不断的产生,且数量巨大。
    2.需要对产生额数据实时进行处理。
    3.处理完的结果需要实时读写进数据库或用作其他分析。
    针对以上的特点,传统的数据处理结构已经无力胜任,因而产生的大数据实时流处理的架构思想。
  • 2.背景及架构

    数据的处理一般涉及数据的聚合,数据的处理和展现能够在秒级或者毫秒级得到响应。针对这些问题目前形成了Flume + kafka + Storm / Spark /Flink + Hbase / Redis 的技架构。本实战采用Flume + kafka + Spark + Hbase的架构。

三、实战所用到的架构和涉及的知识

  • 1.后端架构

    1.Hadoop-2.6.0-cdh5.7.0
    2.Spark-2.2.0-bin-2.6.0-cdh5.7.0
    3.Apache-flume-1.9.0-bin
    4.Kafka_2.11-1.0.0
    5.Hbase-1.2.0-cdh5.7.0
  • 2.前端框架

    1.Spring MVC-4.3.3.RELEASE
    2.Spring-4.3.3.RELEASE
    3.MyBatis-3.2.1
    4.Echarts

四、项目实战

  • 1.后端开发实战

    1.构建项目

    构建Scala工程项目,并添加Maven支持。

    2.引入依赖
      <properties>
          <scala.version>2.11.8</scala.version>
          <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
          <spark.version>2.2.0</spark.version>
          <hbase.version>1.2.0-cdh5.7.0</hbase.version>
      </properties>
    
      <repositories>
          <repository>
              <id>cloudera</id>
              <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
          </repository>
      </repositories>
    
     <dependencies>
          <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
          </dependency>
      
          <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
          </dependency>
      
          <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
          </dependency>
      
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId> spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>
          </dependency>
      
          <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.6.5</version>
          </dependency>
      
          <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.3.0</version>
          </dependency>
          
          <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
          </dependency>
      
          <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
          </dependency>
      
          <dependency>
                <groupId>com.ggstar</groupId>
                <artifactId>ipdatabase</artifactId>
                <version>1.0-SNAPSHOT</version>
          </dependency>
     </dependencies>
    
    3.创建工程包结构

    在scala目录下创建如下包结构:
    大数据实时流处理日志Spark项目实战(非常详细)

    4.编写代码

    (1). 在util包下创健Java类HBaseUtils,用于连接HBase,存储处理的结果:
    大数据实时流处理日志Spark项目实战(非常详细)
    HBaseUtils完整代码如下:

     /**
     * Hbase工具类,用于:
     * 连接HBase,获得表实例
     */
    public class HBaseUtils {
    
        private Configuration configuration = null;
        private Connection connection = null;
        private static HBaseUtils instance = null;
    
        /**
         * 在私有构造方法中初始化属性
         */
        private HBaseUtils(){
            try {
                configuration = new Configuration();
                //指定要访问的zk服务器
                configuration.set("hbase.zookeeper.quorum", "hadoop01:2181");
                //得到Hbase连接
                connection = ConnectionFactory.createConnection(configuration);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    	
    	/**
    	* 获得HBase连接实例
    	*/
    	
       public static synchronized HBaseUtils getInstance(){
            if(instance == null){
                instance = new HBaseUtils();
            }
            return instance;
        }
    
        /**
         *由表名得到一个表的实例
         * @param tableName
         * @return
         */
        public  HTable getTable(String tableName) {
            HTable hTable = null;
            try {
                hTable = (HTable)connection.getTable(TableName.valueOf(tableName));
            }catch (Exception e){
                e.printStackTrace();
            }
            return hTable;
        }
    }
    

    在util包下创Scala object类DateUtils,用于格式化日志的日期,代码如下:

    /**
    * 格式化日期工具类
    */
    object DateUtils {
    
    //指定输入的日期格式
    val YYYYMMDDHMMSS_FORMAT= FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss")
    //指定输出格式
    val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddhhmmss")
    
    //输入String返回该格式转为log的结果
    def getTime(time:String) = {
      YYYYMMDDHMMSS_FORMAT.parse(time).getTime
    }
    
    def parseToMinute(time:String) = {
      //调用getTime
      TARGET_FORMAT.format(getTime(time))
     }
    }
    

    (2). 在domain包下创建以下几个Scala样例类:
    ClickLog:用于封装清洗后的日志信息:
    大数据实时流处理日志Spark项目实战(非常详细)
    然后将该类声明为样例类,在class关键字前增加case关键字:
    大数据实时流处理日志Spark项目实战(非常详细)
    ClickLog 类完整代码如下:

     /**
      * 封装清洗后的数据
      * @param ip 日志访问的ip地址
      * @param time 日志访问的时间
      * @param courseId 日志访问的实战课程编号
      * @param statusCode 日志访问的状态码
      * @param referer 日志访问的referer信息
      */
    case class ClickLog (ip:String,time:String,courseId:Int,statusCode:Int,referer:String)
    

    再创建样例类 CourseClickCount 用于封装课程统计的结果,样例类 CourseSearchClickCount 用于封装搜索引擎的统计结果,因为创建过程与上面的ClickLog 类一样,这里不再展示,直接给出完整代码:
    CourseClickCount 类完整代码如下:

       /**
        * 封装实战课程的总点击数结果
         * @param day_course 对应于Hbase中的RowKey
         * @param click_count 总点击数
         */
       case class CourseClickCount(day_course:String,click_count:Int)
    

    CourseSearchClickCount 类完整代码如下:

        /**
      * 封装统计通过搜索引擎多来的实战课程的点击量
      * @param day_serach_course 当天通过某搜索引擎过来的实战课程
      * @param click_count 点击数
      */
    case class CourseSearchClickCount(day_serach_course:String,click_count:Int)
    

    (3). 在dao包下创建以下Scala的object类:
    CourseClickCountDao :用于交互HBase,把课程点击数的统计结果写入HBase:
    大数据实时流处理日志Spark项目实战(非常详细)
    CourseClickCountDao 类的完整代码如下:

    	/**
      * 实战课程点击数统计访问层
      */
    object CourseClickCountDao {
    
        val tableName = "ns1:courses_clickcount"  //表名
        val cf = "info"   //列族
        val qualifer = "click_count"   //列
    
      /**
        * 保存数据到Hbase
        * @param list (day_course:String,click_count:Int) //统计后当天每门课程的总点击数
        */
      def save(list:ListBuffer[CourseClickCount]): Unit = {
      	//调用HBaseUtils的方法,获得HBase表实例
        val table = HBaseUtils.getInstance().getTable(tableName)
        for(item <- list){
          //调用Hbase的一个自增加方法
          table.incrementColumnValue(Bytes.toBytes(item.day_course),
            Bytes.toBytes(cf),
            Bytes.toBytes(qualifer),
            item.click_count)  //赋值为Long,自动转换
        }
      }
    }
    

    CourseClickCountDao :用于交互HBase,把搜引擎搜索数量的统计结果写入HBase,创建过程与CourseClickCountDao 类一致故不再展示,完整代码如下:

    object CourseSearchClickCountDao {
      val tableName = "ns1:courses_search_clickcount"
      val cf = "info"
      val qualifer = "click_count"
    
      /**
        * 保存数据到Hbase
        * @param list (day_course:String,click_count:Int) //统计后当天每门课程的总点击数
        */
      def save(list:ListBuffer[CourseSearchClickCount]): Unit = {
        val table = HBaseUtils.getInstance().getTable(tableName)
        for(item <- list){
          table.incrementColumnValue(Bytes.toBytes(item.day_serach_course),
            Bytes.toBytes(cf),
            Bytes.toBytes(qualifer),
            item.click_count)  //赋值为Long,自动转换
        }
      }
    }
    

    注意: 代码中的HBase表需要提前创建好,详情请看本节的 8.在HBase中创建项目需要的表
    (4). 在application包下创建Scala的object类 CountByStreaming,用于处理数据,是本项目的程序入口,最为核心的类:
    大数据实时流处理日志Spark项目实战(非常详细)
    CountByStreaming 类的完整代码如下:

    object CountByStreaming {
    
      def main(args: Array[String]): Unit = {
    
        /**
          * 最终该程序将打包在集群上运行,
          * 需要接收几个参数:zookeeper服务器的ip,kafka消费组,
          * 主题,以及线程数
          */
        if(args.length != 4){
            System.err.println("Error:you need to input:<zookeeper> <group> <toplics> <threadNum>")
            System.exit(1)
          }
    
          //接收main函数的参数,外面的传参
          val Array(zkAdderss,group,toplics,threadNum) = args
    
        /**
          * 创建Spark上下文,下本地运行需要设置AppName
          * Master等属性,打包上集群前需要删除
          */
        val sparkConf = new SparkConf()
            .setAppName("CountByStreaming")
            .setMaster("local[4]")
          
          //创建Spark离散流,每隔60秒接收数据
          val ssc = new StreamingContext(sparkConf,Seconds(60))
          //使用kafka作为数据源
          val topicsMap = toplics.split(",").map((_,threadNum.toInt)).toMap
          //创建kafka离散流,每隔60秒消费一次kafka集群的数据
          val kafkaInputDS = KafkaUtils.createStream(ssc,zkAdderss,group,topicsMap)
    
           //得到原始的日志数据
          val logResourcesDS = kafkaInputDS.map(_._2)
        /**
          * (1)清洗数据,把它封装到ClickLog中
          * (2)过滤掉非法的数据
          */
          val cleanDataRDD = logResourcesDS.map(line => {
            val splits = line.split("\t")
            if(splits.length != 5) {      //不合法的数据直接封装默认赋予错误值,filter会将其过滤
              ClickLog("", "", 0, 0, "")
            }
            else {
            val ip = splits(0)   //获得日志中用户的ip
            val time = DateUtils.parseToMinute(splits(1)) //获得日志中用户的访问时间,并调用DateUtils格式化时间
            val status = splits(3).toInt  //获得访问状态码
            val referer = splits(4)
            val url = splits(2).split(" ")(1)  //获得搜索url
            var courseId = 0
            if(url.startsWith("/class")){
              val courseIdHtml = url.split("/")(2)
              courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
            }
            ClickLog(ip,time,courseId,status,referer)  //将清洗后的日志封装到ClickLog中
            }
          }).filter(x => x.courseId != 0 )   //过滤掉非实战课程
    
        /**
          * (1)统计数据
          * (2)把计算结果写进HBase
          */
          cleanDataRDD .map(line => {
            //这里相当于定义HBase表"ns1:courses_clickcount"的RowKey,
            // 将‘日期_课程’作为RowKey,意义为某天某门课的访问数
            (line.time.substring(0,8) + "_" + line.courseId,1)   //映射为元组
          }).reduceByKey(_ + _)   //聚合
            .foreachRDD(rdd =>{    //一个DStream里有多个RDD
            rdd.foreachPartition(partition => {   //一个RDD里有多个Partition
              val list = new ListBuffer[CourseClickCount]
              partition.foreach(item => {   //一个Partition里有多条记录
                list.append(CourseClickCount(item._1,item._2))
              })
              CourseClickCountDao.save(list)   //保存至HBase
            })
          })
    
        /**
          * 统计至今为止通过各个搜索引擎而来的实战课程的总点击数
          * (1)统计数据
          * (2)把统计结果写进HBase中去
          */
        cleanDataRDD.map(line => {
          val referer = line.referer
          val time = line.time.substring(0,8)
          var url = ""
          if(referer == "-"){     //过滤非法url
            (url,time)
          }else {
            //取出搜索引擎的名字
            url = referer.replaceAll("//","/").split("/")(1)
            (url,time)
          }
        }).filter(x => x._1 != "").map(line => {
          //这里相当于定义HBase表"ns1:courses_search_clickcount"的RowKey,
          // 将'日期_搜索引擎名'作为RowKey,意义为某天通过某搜搜引擎访问课程的次数
          (line._2 + "_" + line._1,1)   //映射为元组
        }).reduceByKey(_ + _)   //聚合
          .foreachRDD(rdd => {
            rdd.foreachPartition(partition => {
              val list = new ListBuffer[CourseSearchClickCount]
              partition.foreach(item => {
                list.append(CourseSearchClickCount(item._1,item._2))
              })
              CourseSearchClickCountDao.save(list)
            })
          })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    5.编写python脚本产生数据

    编写一个python脚本,命名为 generate_log.py,用于产生用户日志:

     import random
     import time
     
     //创建url访问数组class/112,数字代表的是实战课程id
     url_paths = [
     	"class/112.html",
     	"class/128.html",
     	"class/145.html",
     	"class/146.html",
     	"class/500.html",
     	"class/250.html",
     	"class/131.html",
     	"class/130.html",
     	"class/271.html",
     	"class/127.html",
     	"learn/821",
     	"learn/823",
     	"learn/987",
     	"learn/500",
     	"course/list"
     ]
     
     //创建ip数组,随机选择4个数字作为ip如132.168.30.87
     ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168,192,134,111,54,64,110,43]
     
     //搜索引擎访问数组{query}代表搜索关键字
     http_referers = [
     	"http://www.baidu.com/s?wd={query}",
     	"https://www.sogou.com/web?query={query}",
     	"http://cn.bing.com/search?q={query}",
     	"https://search.yahoo.com/search?p={query}",
     ]
     
     //搜索关键字数组
     search_keyword = [
     	"Spark SQL实战",
     	"Hadoop基础",
     	"Storm实战",
     	"Spark Streaming实战",
     	"10小时入门大数据",
     	"SpringBoot实战",
     	"Linux进阶",
     	"Vue.js"
     ]
     
     //状态码数组
     status_codes = ["200","404","500","403"]
     
     //随机选择一个url
     def sample_url():
     	return random.sample(url_paths, 1)[0]
     	
     //随机组合一个ip
     def sample_ip():
     	slice = random.sample(ip_slices , 4)
     	return ".".join([str(item) for item in slice])
     
     //随机产生一个搜索url
     def sample_referer():
     	//一半的概率会产生非法url,用于模拟非法的用户日志
     	if random.uniform(0, 1) > 0.5:
     		return "-"
     
     	refer_str = random.sample(http_referers, 1)
     	query_str = random.sample(search_keyword, 1)
     	return refer_str[0].format(query=query_str[0])
     
     //随机产生一个数组
     def sample_status_code():
     	return random.sample(status_codes, 1)[0]
     
     //组合以上的内容,产生一条简单的用户访问日志
     def generate_log(count = 10):
     
     	//获取本机时间并将其作为访问时间写进访问日志中
     	time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
     	
     	//存储日志的目标文件(换成你自己的)
     	f = open("/home/hadoop/data/click.log","w+")
     	
     	//组合用户日志
     	while count >= 1:
     		query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(), status_code=sample_status_code(),local_time=time_str)
     
     		f.write(query_log + "\n")
     
     		count = count - 1 
     
     //执行main,每次产生100条用户日志
     if __name__ == '__main__':
     	generate_log(100)
    
    6.创建日志存放目录并编写Flume的配置文件

    (1). 创建日志存储的目录用于存放日志,位置自定义,但是一定要与python脚本及即将要编写的Flume配置文件中监控的目录一致,否则将无法收集。在home目录下创建data文件夹,并创建click.log文件,即最后日志存放在~/data/click.log中。
    (2). 在Flume的配置文件目录下新建一个文件,即在$FLUME_HOME/conf下新建一个文件 streaming_project.conf
    streaming_project.conf的完整配置如下:

      exec-memory-kafka.sources = exec-source  #exec的源,用于监控某个文件是否有     数据追加
     exec-memory-kafka.sinks = kafka-sink
     exec-memory-kafka.channels = memory-channel
     
     exec-memory-kafka.sources.exec-source.type = exec
     exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/data/click.log  #被监控的文件,目录必须正确
     exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
     
     exec-memory-kafka.channels.memory-channel.type = memory 
     
     exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink 
     exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop01:9092   #Kafka集群的某个活动节点的ip
     exec-memory-kafka.sinks.kafka-sink.topic = streamtopic  #Kafka的主题
     exec-memory-kafka.sinks.kafka-sink.batchSize = 10
     exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
     
     exec-memory-kafka.sources.exec-source.channels = memory-channel   #关联三大组件
     xec-memory-kafka.sinks.kafka-sink.channel = memory-channel
    
    7.创建Kafka主题

    在Linux终端执行以下命令:
    $KAFKA_HOME/bin/./kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 3 --partitions 3 --topic streamtopic
    主题的名字自定义,副本的个数小于等于Kafka集群中存活节点的个数,zookeeper写任意一台存活的zookeeper的主机ip。

    8.在HBase中创建项目需要的表

    在Linux终端执行以下命令:
    (1). 进入Hbase shell:$HBASE_HOME/bin/./hbase shell
    在Hbase shell中执行以下命令:
    (2). 创建名字空间 ns1:create_namespace 'ns1'
    (3). 创建课程访问统计表,列族为info:create 'ns1:courses_clickcoun', 'info'
    (4). 创建搜索引擎统计表,列族为info:create 'ns1:courses_search_clickcount', 'info'

    9.测试后端代码

    (1). 启动Flume,监控存储日志的文件。
    在Linux终端下执行以下命令:
    $FLUME_HOME/bin/./flume-ng agent -n exec-memory-kafka -f FLUME_HOME/conf/streaming_project.conf
    (2). 调用Linux的crontab命令,周期性调用python脚本,源源不断产生数据,有关于crontab命令的使用可自行查找相关资料,这里不详细展,在LInux终端下执行命令:crontab -e,进入任务设置,键盘输入 i ,进入insert模式,编写脚本如下:
    大数据实时流处理日志Spark项目实战(非常详细)
    编写完后在键盘下敲esc健退出insert模式,输入 :wq保存退出。
    注意: 如果要停止产生日志,可以执行命令:crontab -e,进入任务设置,在那一行脚本前输入 #,代表注释那一行脚本。
    (3). 启动一个kafak控制台消费者,检测Flume是否成功收集到日志:
    在Linux终端输入命令:$KAFKA_HOME/bin/./kafka-console-consumer.sh --topic streamtopic --zookeeper hadoop01:2181,如果成功则终端控制台会有如下显示:
    大数据实时流处理日志Spark项目实战(非常详细)
    (4). 运行本项目的核心类 CountByStreaming ,因为我们设置main方法要接收许多参数,因此在运行前需要配置一下该类:
    大数据实时流处理日志Spark项目实战(非常详细)
    大数据实时流处理日志Spark项目实战(非常详细)
    配置完成后运行程序即可。
    注意: 有的小伙伴的电脑可能运行内存不足报如下异常:
    大数据实时流处理日志Spark项目实战(非常详细)
    只需要在代码中增加如下配置就可以解决:
    大数据实时流处理日志Spark项目实战(非常详细)
    (5). 进入hbase shell,查看结果:
    在hbase shell下输入以下命令:
    scan 'ns1:courses_clickcount' :扫描课程点击统计表,得到如下结果:
    大数据实时流处理日志Spark项目实战(非常详细)
    scan 'ns1:courses_search_clickcount' :扫描搜索引擎搜索统计表,得到如下结果:
    大数据实时流处理日志Spark项目实战(非常详细)
    至此,后端的代码及测试已经完成。

  • 2.前端开发实战

    1.构建工程

    构建Java Web工程项目,并添加Maven支持。

    2.引入依赖
    <properties>
       <hbase.version>1.2.0-cdh5.7.0</hbase.version>
       <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
       <spring.version>4.3.3.RELEASE</spring.version>
       <spring_mvc.vserion>4.3.3.RELEASE</spring_mvc.vserion>
       <mybatis.version>3.2.1</mybatis.version>
       <mysql.version>5.1.17</mysql.version>
    </properties>
    
    <repositories>
       <repository>
           <id>cloudera</id>
           <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
       </repository>
    </repositories>
    
    <dependencies>
       <dependency>
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
           <version>${mysql.version}</version>
       </dependency>
    
       <dependency>
           <groupId>c3p0</groupId>
           <artifactId>c3p0</artifactId>
           <version>0.9.1.2</version>
       </dependency>
    
       <dependency>
           <groupId>org.mybatis</groupId>
           <artifactId>mybatis</artifactId>
           <version>${mybatis.version}</version>
       </dependency>
    
       <dependency>
           <groupId>junit</groupId>
           <artifactId>junit</artifactId>
           <version>4.11</version>
       </dependency>
    
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context-support</artifactId>
           <version>${spring.version}</version>
       </dependency>
       
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-jdbc</artifactId>
           <version>${spring.version}</version>
       </dependency>
    
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-webmvc</artifactId>
           <version>${spring_mvc.vserion}</version>
       </dependency>
    
       <dependency>
           <groupId>org.mybatis</groupId>
           <artifactId>mybatis-spring</artifactId>
           <version>1.3.0</version>
       </dependency>
    
       <dependency>
           <groupId>org.aspectj</groupId>
           <artifactId>aspectjweaver</artifactId>
           <version>1.8.10</version>
       </dependency>
    
       <dependency>
           <groupId>javax.servlet</groupId>
           <artifactId>jstl</artifactId>
           <version>1.2</version>
       </dependency>
    
       <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-client</artifactId>
           <version>${hbase.version}</version>
       </dependency>
    
       <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-client</artifactId>
           <version>${hadoop.version}</version>
       </dependency>
    
       <dependency>
           <groupId>net.sf.json-lib</groupId>
           <artifactId>json-lib</artifactId>
           <version>2.4</version>
           <classifier>jdk15</classifier>
       </dependency>
    
       <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
           <version>2.9.3</version>
       </dependency>
    
       <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-core</artifactId>
           <version>2.9.3</version>
       </dependency>
    
       <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-annotations</artifactId>
           <version>2.9.3</version>
       </dependency>
    </dependencies>
    
    3.创建MySQL表

    创建course用于根据课程id(HBase存储的数据)查询课程名:
    create table course(id int,name varchar(30))
    插入数据后得到如下表格:
    大数据实时流处理日志Spark项目实战(非常详细)
    创建course用于根据搜索引擎id(HBase存储的数据)查询搜索引擎名:
    create table search_engine(id varchar(10),name varchar(30))
    插入数据后得到如下表格:
    大数据实时流处理日志Spark项目实战(非常详细)

    4.配置SSM框架

    (1). 在resources目录下创建如下xml文件:
    大数据实时流处理日志Spark项目实战(非常详细)
    (2). 在Web/WEB-INF 下创建dispatcher-servlet.xml
    大数据实时流处理日志Spark项目实战(非常详细)
    (3). 以上xml文件的完整内容:
    beans.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
          xmlns:aop="http://www.springframework.org/schema/aop"
          xsi:schemaLocation="http://www.springframework.org/schema/beans
       							 http://www.springframework.org/schema/beans/spring-beans.xsd
       							   http://www.springframework.org/schema/context
       							   http://www.springframework.org/schema/context/spring-context-4.3.xsd
       							   http://www.springframework.org/schema/tx
       							   http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
       							   http://www.springframework.org/schema/aop
       							   http://www.springframework.org/schema/aop/spring-aop-4.3.xsd"
          default-autowire="byType">
       <!-- 配置数据源 换成你的数据库url-->
       <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource">
           <property name="driverClass" value="com.mysql.jdbc.Driver"/>
           <property name="jdbcUrl" value="jdbc:mysql://hadoop01:3306/jdbc?user=root&amp;password=root"/>
           <property name="user" value="root"/>
           <property name="password" value="root"/>
           <property name="maxPoolSize" value="20"/>
           <property name="minPoolSize" value="2"/>
           <property name="initialPoolSize" value="3"/>
           <property name="acquireIncrement" value="2"/>
       </bean>
    
       <!-- 配置SessionFactory -->
       <bean id="SessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
           <property name="dataSource" ref="dataSource"/>
           <property name="configLocation" value="classpath:mybatis-config.xml"/>
       </bean>
       <!-- 包扫描 -->
       <context:component-scan base-package="com.ssm.dao,com.ssm.service,com.ssm.dao" />
    
       <!-- 事务管理器 -->
       <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
           <property name="dataSource" ref="dataSource"/>
       </bean>
    
       <!-- 配置事务 -->
       <tx:advice id="txAdvice" transaction-manager="txManager">
           <tx:attributes>
               <tx:method name="*" propagation="REQUIRED" isolation="DEFAULT"/>
           </tx:attributes>
       </tx:advice>
    
       <!-- 配置事务切面 -->
       <aop:config>
           <aop:advisor advice-ref="txAdvice" pointcut="execution(* *..*Service.*(..))" />
       </aop:config>
    

    mybatis-config.xml

    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE configuration
            PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-config.dtd">
    <configuration>
    
        <!-- 创建别名关系 -->
        <typeAliases>
            <typeAlias type="com.ssm.domain.Course" alias="_Course"/>
            <typeAlias type="com.ssm.domain.SearchEngine" alias="_SearchEngine"/>
        </typeAliases>
    
        <!--   创建映射关系   -->
        <mappers>
            <mapper resource="CourseMapper.xml"/>
            <mapper resource="SearchEngineMapper.xml"/>
        </mappers>
    </configuration>
    

    CourseMapper.xml

    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper
            PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
    <!-- 定义名字空间 -->
    <mapper namespace="course">
        <select id="findCourseName" parameterType="int" resultType="_Course">
            select id,`name`
            from course
            where id = #{id}
        </select>
    </mapper>
    

    SearchEngineMapper.xml

    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper
            PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
    <!-- 定义名字空间 -->
    <mapper namespace="searchEngine">
        <select id="findEngineName" parameterType="String" resultType="_SearchEngine">
            select `name`
            from search_engine
            where id = #{name}
        </select>
    </mapper>
    

    dispatcher-servlet.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:mvc="http://www.springframework.org/schema/mvc"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
    								http://www.springframework.org/schema/beans/spring-beans.xsd
    								http://www.springframework.org/schema/mvc
    								http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
    								http://www.springframework.org/schema/context
    								http://www.springframework.org/schema/context/spring-context-4.3.xsd">
        <!-- 配置包扫描路径 -->
        <context:component-scan base-package="com.ssm.controller"/>
        <!-- 配置使用注解驱动 -->
        <mvc:annotation-driven />
        <mvc:default-servlet-handler/>
        <!-- 配置视图解析器 -->
        <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
            <property name="prefix" value="/"/>
            <property name="suffix" value=".jsp"/>
        </bean>
    </beans>
    

    (4). 配置Web/WEB-INF/web.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
             version="4.0">
    
        <!-- 指定spring的配置文件beans.xml -->
        <context-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:beans.xml</param-value>
        </context-param>
    
        <!-- 确保web服务器启动时,完成spring的容器初始化 -->
        <listener>
            <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
        </listener>
    
        <!-- 配置分发器 -->
        <servlet>
            <servlet-name>dispatcher</servlet-name>
            <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        </servlet>
    
        <servlet-mapping>
            <servlet-name>dispatcher</servlet-name>
            <url-pattern>/</url-pattern>
        </servlet-mapping>
    </web-app>
    

    至此,SSM整合配置已经完成。

    5.创建工程包结构

    在java目录下创建以下包结构:
    大数据实时流处理日志Spark项目实战(非常详细)

    6.编写代码

    (1). 在util包下新健Java类 HBaseUtils用于访问HBase,完整代码如下:

        public class HBaseUtils {
      
          private Configuration configuration = null;
          private Connection connection = null;
          private static HBaseUtils hBaseUtil = null;
      
          private HBaseUtils(){
              try {
                  configuration = new Configuration();
                  //zookeeper服务器的地址
                  configuration.set("hbase.zookeeper.quorum","hadoop01:2181");
                  connection =  ConnectionFactory.createConnection(configuration);
              }catch (Exception e){
                  e.printStackTrace();
              }
          }
      
          /**
           * 获得HBaseUtil实例
           * @return
           */
          public static synchronized HBaseUtils getInstance(){
              if(hBaseUtil == null){
                  hBaseUtil = new HBaseUtils();
              }
              return hBaseUtil;
          }
      
          /**
           * 根据表名获得表对象
           */
          public HTable getTable(String tableName){
              try {
                  HTable table = null;
                  table = (HTable)connection.getTable(TableName.valueOf(tableName));
                  return table;
              }catch (Exception e){
                  e.printStackTrace();
              }
              return null;
          }
      
          /**
           * 根据日期查询统计结果
           */
          public Map<String,Long> getClickCount(String tableName,String date){
              Map<String,Long> map = new HashMap<String, Long>();
              try {
                  //得到表实例
                  HTable table = getInstance().getTable(tableName);
                  //列族
                  String cf = "info";
                  //列
                  String qualifier = "click_count";
                  //定义扫描器前缀过滤器,只扫描给定日期的row
                  Filter filter =  new PrefixFilter(Bytes.toBytes(date));
                  //定义扫描器
                  Scan scan = new Scan();
                  scan.setFilter(filter);
                  ResultScanner results =  table.getScanner(scan);
                  for(Result result:results){
                      //取出rowKey
                      String rowKey = Bytes.toString(result.getRow());
                      //取出点击次数
                      Long clickCount = Bytes.toLong(result.getValue(cf.getBytes(),qualifier.getBytes()));
                      map.put(rowKey,clickCount);
                  }
              }catch (Exception e){
                  e.printStackTrace();
                  return null;
              }
              return map;
          }
       }
    

    (2). 在domain包下新建以下JavaBean用于封装信息:
    Course

       /**
      *课程类,实现Comparable接口用于排序
      */
     public class Course implements Comparable<Course>{
     
         private int id;   //课程编号
         private String name;    //课程名
         private Long count;     //点击次数
     
         public Long getCount() {
             return count;
         }
     
         public void setCount(Long count) {
             this.count = count;
         }
     
         public int getId() {
             return id;
         }
     
         public void setId(int id) {
             this.id = id;
         }
     
         public String getName() {
             return name;
         }
     
         public void setName(String name) {
             this.name = name;
         }
     
         /**
          * 降序排序
          */
         public int compareTo(Course course) {
             return course.count.intValue() - this.count.intValue();
         }
     }
    

    SearchEngine

      /**
     * 搜索引擎类
     */
    public class SearchEngine {
    
        private String name;
        private Long count;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Long getCount() {
            return count;
        }
    
        public void setCount(Long count) {
            this.count = count;
        }
    }
    

    (3). 在dao包下新建以下接口:
    CourseClickCountDao:

        /**
     * 课程统计结果Dao
     */
    public interface CourseClickCountDao {
    
        public Set<Course> findCourseClickCount(String tableName, String date);
    }
    

    SearchEngineCountDao

    /**
    * 索索引擎统计Dao
    */
    public interface SearchEngineCountDao {
    
       public List<SearchEngine> findSearchEngineCount(String tableName, String date);
    }
    

    (4). 在dao包下的impl包新建以下Java类实现dao下的接口:
    CourseClickCountDaoImpl:

    /**
    * 课程点击统计Dao实现类
    */
    @Repository("CourseClickCountDao")
    public class CourseClickCountDaoImpl extends SqlSessionDaoSupport implements CourseClickCountDao {
    
       public Set<Course> findCourseClickCount(String tableName, String date) {
           Map<String,Long> map = new HashMap<String, Long>();
           Course course = null;
           Set<Course> set = new TreeSet<Course>();
           map = HBaseUtils.getInstance().getClickCount(tableName,date);
           for (Map.Entry<String, Long> entry : map.entrySet()) {
               //Rowkey的结构:20190330_112,112为课程的id
             int id = Integer.parseInt(entry.getKey().substring(9));
               //查询课程名并封装进bean
             course = getSqlSession().selectOne("course.findCourseName",id);
               //将课程名封装进bean
             course.setCount(entry.getValue());
             set.add(course);
           }
           return set;
       }
    }
    

    CourseClickCountDaoImpl

    /**
    * 搜索引擎统计Dao实现类
    */
    @Repository("SearchEngineCountDao")
    public class SearchEngineCountDaoImpl extends SqlSessionDaoSupport implements SearchEngineCountDao {
    
       public List<SearchEngine> findSearchEngineCount(String tableName, String date) {
           Map<String,Long> map = new HashMap<String, Long>();
           List<SearchEngine> list = new ArrayList<SearchEngine>();
           SearchEngine searchEngine = null;
           //调用HBaseUtil查询结果
           map = HBaseUtils.getInstance().getClickCount(tableName,date);
           System.out.println(map);
           for ( Map.Entry<String, Long> entry : map.entrySet()){
               //取出搜索引擎的名字
               String name = entry.getKey().split("\\.")[1];
               //查询引擎名字,封装进bean
               searchEngine = getSqlSession().selectOne("searchEngine.findEngineName",name);
               //把搜索引擎的点击次数封装进bean
               searchEngine.setCount(entry.getValue());
               list.add(searchEngine);
           }
           return list;
       }
    }
    

    (5). 在service包新建如下接口,用于为controller层提供服务:
    CourseClickService

    /**
    * 课程点击统计Service类
    */
    public interface CourseClickService {
    
       public List<Course> findCourseClickCount(String tableName, String date);
    }
    

    SearchEngineService

    /**
    * 搜索引擎点击Service实现类
    */
    public interface SearchEngineService {
    
       public List<SearchEngine> findSearchEngineCount(String tableName, String date);
    }
    

    (6). 在service包下的impl包新建如Java类,实现service包下的接口:
    CourseClickServiceImpl

    /**
     * 课程点击统计Service实现类
     */
    @Service("CourseClickService")
    public class CourseClickServiceImpl implements CourseClickService {
    
        //注入CourseClickCountDaoImpl类
        @Resource(name = "CourseClickCountDao")
        private CourseClickCountDao  courseClickCountDao;
    
        /**
         * 将点击率TOP5的课程封装进list
         */
        public List<Course> findCourseClickCount(String tableName, String date) {
            List<Course> list = new ArrayList<Course>();
            //Set集合里的bean根据点击数进行的降序排序
            Set<Course> set = courseClickCountDao.findCourseClickCount(tableName,date);
            Iterator<Course> iterator = set.iterator();
            int i = 0;
            //将TOP5课程封装进list
            while (iterator.hasNext() && i++ < 5){
                list.add(iterator.next());
            }
            return list;
        }
    }
    

    SearchEngineServiceImpl

    /**
    * 搜索引擎点击统计Service实现类
    */
       @Service("SearchEngineService")
       public class SearchEngineServiceImpl implements SearchEngineService {
       
           //注入SearchEngineCountDao
           @Resource(name = "SearchEngineCountDao")
           private SearchEngineCountDao  searchEngineDao;
       
           public List<SearchEngine> findSearchEngineCount(String tableName, String date) {
               return searchEngineDao.findSearchEngineCount(tableName,date);
           }
       }
    

    (6). 在controller包中新建以下Java类,用于响应Web的请求:
    CourseClickController

    /**
    * 课程点击Controller
    */
    @Controller
    public class CourseClickController {
    
       //注入CourseClickService
       @Resource(name = "CourseClickService")
       private CourseClickService service;
    
       //页面跳转
       @RequestMapping("/courseclick")
       public String toGetCourceClick(){
           return "courseclick";
       }
    
       /**
        *  sponseBody注解的作用是将controller的方法返回的对象通过适当的转换器转
        *  换为指定的格式之后,写入到response对象的body区,通常用来返回JSON数据或者是XML
        */
       @ResponseBody
       @RequestMapping(value = "/getCourseClickCount",method = RequestMethod.GET)
       public JSONArray courseClickCount(String date){
           //如果url没有传值,传入一个默认值
           if(date == null || date.equals("")){
               date = "20190330";
           }
           List<Course> list = null;
           list = service.findCourseClickCount("ns1:courses_clickcount",date);
           //封装JSON数据
           return JSONArray.fromObject(list);
       }
    }
    

    SearchEngineController

     /**
     * 搜索引擎点击Controller
     */
    @Controller
    public class SearchEngineController {
    
        //注入SearchEngineService
        @Resource(name = "SearchEngineService")
        private SearchEngineService searchEngineService;
    
        //页面跳转
        @RequestMapping("/searchclick")
        public String toGetCourceClick(){
            return "searchclick";
        }
    
        /**
         *  sponseBody注解的作用是将controller的方法返回的对象通过适当的转换器转
         *  换为指定的格式之后,写入到response对象的body区,通常用来返回JSON数据或者是XML
         */
        @ResponseBody
        @RequestMapping(value = "/getSearchClickCount",method = RequestMethod.GET)
        public JSONArray searchClickCount(String date){
            //如果url没有传值,传入一个默认值
            if(date == null || date.equals("")){
                date = "20190330";
            }
            List<SearchEngine> list = null;
            list = searchEngineService.findSearchEngineCount("ns1:courses_search_clickcount",date);
            //封装JSON数据
            return JSONArray.fromObject(list);
        }
    }
    

    (6). 在Web目录下新建js目录,引入echarts和jquery(自行去官网下载),在Web目录下新建两个jsp文件courseclick.jspsearchclick.jsp用于展示数据,得到如下目录:
    大数据实时流处理日志Spark项目实战(非常详细)
    courseclick.jsp

     <%@ page contentType="text/html;charset=UTF-8" language="java" %>
     <head>
         <meta charset="UTF-8"/>
         <!-- 设置每隔60秒刷新一次页面-->
         <meta http-equiv="refresh" content="60">
         <title>学习网实战课程实时访问统计</title>
         <script src="js/echarts.min.js"></script>
         <script src="js/jquery-1.11.3.min.js"></script>
         <style type="text/css">
             div{
                 display: inline;
             }
         </style>
     </head>
     <body>
     <div id="main" style="width: 600px;height:400px;float: left;margin-top:50px"></div>
     <div id="main2" style="width: 700px;height:400px;float: right;margin-top:50px"></div>
     <script type="text/javascript">
         var scources = [];
         var scources2 = [];
         var scources3 = [];
         var scources4 = [];
          //获得url上参数date的值
         function GetQueryString(name)
         {
             var reg = new RegExp("(^|&)"+ name +"=([^&]*)(&|$)");
             var r = window.location.search.substr(1).match(reg);//search,查询?后面的参数,并匹配正则
             if(r!=null)return  unescape(r[2]); return null;
         }
         var date = GetQueryString("date");
         $.ajax({
             type:"GET",
             url:"/getCourseClickCount?date="+date,
             dataType:"json",
             async:false,
             error: function (data) {
                 alert("失败啦");
             },
             success:function (result) {
                 if(scources.length != 0){
                     scources.clean();
                     scources2.clean();
                     scources3.clean();
                     scources4.clean();
                 }
                 for(var i = 0; i < result.length; i++){
                     scources.push(result[i].name);
                     scources2.push(result[i].count);
                     scources3.push({"value":result[i].count,"name":result[i].name});
     
                 }
                 for(var i = 0; i < 3; i++){
                     scources4.push({"value":result[i].count,"name":result[i].name});
                 }
             }
         })
     
         // 基于准备好的dom,初始化echarts实例
         var myChart = echarts.init(document.getElementById('main'));
     
         // 指定图表的配置项和数据
         var option = {
                 title: {
                     text: '学习网实时实战课程访问量',
                     subtext: '课程点击数',
                     x:'center'
                 },
             tooltip: {},
            /* legend: {
                 data:['点击数']
             },*/
             xAxis: {
                 data: scources
             },
             yAxis: {},
             series: [{
                 name: '点击数',
                 type: 'bar',
                 data: scources2
             }]
         };
     <!--------------------------------------------------------------------------- -->
         // 使用刚指定的配置项和数据显示图表。
         myChart.setOption(option);
     
         var myChart = echarts.init(document.getElementById('main2'));
     
         // 指定图表的配置项和数据
         var option = {
             title: {
                 text: '学习网实时实战课程访问量',
                 subtext: '课程点击数',
                 x:'center'
             },
             tooltip: {
                 trigger: 'item',
                 formatter: "{a} <br/>{b}: {c} ({d}%)"
             },
             legend: {
                 orient: 'vertical',
                 x: 'left'/*,
                 data:scources*/
             },
             series: [
                 {
                     name:'课程点击数',
                     type:'pie',
                     selectedMode: 'single',
                     radius: [0, '30%'],
     
                     label: {
                         normal: {
                             position: 'inner'
                         }
                     },
                     labelLine: {
                         normal: {
                             show: false
                         }
                     },
                     data:scources4
                 },
                 {
                     name:'课程点击数',
                     type:'pie',
                     radius: ['40%', '55%'],
                     label: {
                         normal: {
                             formatter: '{a|{a}}{abg|}\n{hr|}\n  {b|{b}:}{c}  {per|{d}%}  ',
                             backgroundColor: '#eee',
                             borderColor: '#aaa',
                             borderWidth: 1,
                             borderRadius: 4,
     
                             rich: {
                                 a: {
                                     color: '#999',
                                     lineHeight: 22,
                                     align: 'center'
                                 },
                                 hr: {
                                     borderColor: '#aaa',
                                     width: '100%',
                                     borderWidth: 0.5,
                                     height: 0
                                 },
                                 b: {
                                     fontSize: 16,
                                     lineHeight: 33
                                 },
                                 per: {
                                     color: '#eee',
                                     backgroundColor: '#334455',
                                     padding: [2, 4],
                                     borderRadius: 2
                                 }
                             }
                         }
                     },
                     data:scources3
                 }
             ]
         };
     
         // 使用刚指定的配置项和数据显示图表。
         myChart.setOption(option);
     
       </script>
      </body>
     </html>
    

    courseclick.jsp

    <%@ page contentType="text/html;charset=UTF-8" language="java" %>
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8"/>
        <!-- 设置每隔60秒刷新一次页面-->
        <meta http-equiv="refresh" content="60">
        <title>学习网课程搜索引擎访问统计</title>
        <script src="js/echarts.min.js"></script>
        <script src="js/jquery-1.11.3.min.js"></script>
        <style type="text/css">
            div{
                display: inline;
            }
        </style>
    </head>
    <body>
    <div id="main" style="width: 600px;height:400px;float: left;margin-top:50px"></div>
    <div id="main2" style="width: 700px;height:400px;float: right;margin-top:50px"></div>
    <script type="text/javascript">
        var scources = [];
        var scources2 = [];
        var scources3 = [];
        //获得url上参数date的值
        function GetQueryString(name)
        {
            var reg = new RegExp("(^|&)"+ name +"=([^&]*)(&|$)");
            var r = window.location.search.substr(1).match(reg);//search,查询?后面的参数,并匹配正则
            if(r!=null)return  unescape(r[2]); return null;
        }
        var date = GetQueryString("date");
        $.ajax({
            type:"GET",
            url:"/getSearchClickCount?date="+date,
            dataType:"json",
            async:false,
            success:function (result) {
                if(scources.length != 0){
                    scources.clean();
                    scources2.clean();
                    scources3.clean();
                }
                for(var i = 0; i < result.length; i++){
                    scources.push(result[i].name);
                    scources2.push(result[i].count);
                    scources3.push({"value":result[i].count,"name":result[i].name});
                }
            }
        })
    
        // 基于准备好的dom,初始化echarts实例
        var myChart = echarts.init(document.getElementById('main'));
    
        // 指定图表的配置项和数据
        var option = {
            title: {
                text: '学习网实时课程搜索引擎访问量',
                subtext: '搜索引擎搜索数',
                x:'center'
            },
            color: ['#3398DB'],
            tooltip : {
                trigger: 'axis',
                axisPointer : {            // 坐标轴指示器,坐标轴触发有效
                    type : 'shadow'        // 默认为直线,可选为:'line' | 'shadow'
                }
            },
            grid: {
                left: '3%',
                right: '4%',
                bottom: '3%',
                containLabel: true
            },
            xAxis : [
                {
                    type : 'category',
                    data : scources,
                    axisTick: {
                        alignWithLabel: true
                    }
                }
            ],
            yAxis : [
                {
                    type : 'value'
                }
            ],
            series : [
                {
                    name:'直接访问',
                    type:'bar',
                    barWidth: '60%',
                    data:scources2
                }
            ]
        };
    
        <!--------------------------------------------------------------------------- -->
        // 使用刚指定的配置项和数据显示图表。
        myChart.setOption(option);
    
        var myChart = echarts.init(document.getElementById('main2'));
    
        // 指定图表的配置项和数据
        var option = {
            title : {
                text: '学习网搜索引擎搜索图',
                subtext: '搜索引擎使用比例',
                x:'center'
            },
            tooltip : {
                trigger: 'item',
                formatter: "{a} <br/>{b} : {c} ({d}%)"
            },
            legend: {
                x : 'center',
                y : 'bottom',
                data:scources
            },
            toolbox: {
                show : true,
                feature : {
                    mark : {show: true},
                    dataView : {show: true, readOnly: false},
                    magicType : {
                        show: true,
                        type: ['pie', 'funnel']
                    },
                    restore : {show: true},
                    saveAsImage : {show: true}
                }
            },
            calculable : true,
            series : [
                {
                    name:'搜索数',
                    type:'pie',
                    radius : [20, 110],
                    center : ['25%', '50%'],
                    roseType : 'radius',
                    label: {
                        normal: {
                            show: false
                        },
                        emphasis: {
                            show: true
                        }
                    },
                    lableLine: {
                        normal: {
                            show: false
                        },
                        emphasis: {
                            show: true
                        }
                    },
                    data:scources3
                },
                {
                    name:'搜索数',
                    type:'pie',
                    radius : [30, 110],
                    center : ['75%', '50%'],
                    roseType : 'area',
                    data:scources3
                }
            ]
        };
    
        // 使用刚指定的配置项和数据显示图表。
        myChart.setOption(option);
    
    </script>
    </body>
    </html>
    

五、项目结果展示

1.启动前端Web项目的TomCat服务器,在url地址栏中输入查询的日期:
大数据实时流处理日志Spark项目实战(非常详细)
2.得到如下漂亮的展示结果:
大数据实时流处理日志Spark项目实战(非常详细)
大数据实时流处理日志Spark项目实战(非常详细)
配合页面每隔60秒刷新一次数据,后台数据每隔60秒产生一批,Spark Streaming每隔60秒处理一批数据,形成了数据实时产生实时处理实时展示的。

六、总结

1.在本项目中,我们从后端到前端,由里到外系统地学习了大数据实时流处理的流程,总结一下过程如下后台每隔60s执行一次python脚本产生一批用户日志存储在目标文件中 -> Flume监控目标文件并收集 -> 收集后的日志传给Kafka -> Spark Streaming消费Kafka的数据处理完将结果写入HBase,前端SSM整合读取HBase的数据,并且响应jsp页面的Ajax请求,将数据以JSON格式发给前端页面,最后前端页面使用Echarts展示数据。
2.本项目只是实现了其中的两个功能,但是还可以挖掘更多的功能,比如统计IP
地址来获取每个省份最受欢迎课程TOP3,及每门课程在哪个省份最受欢迎等等,能够分析中很多具有价值的信息,这也是大数据魅力值所在!