IDEA开发环境练习Spark Streaming程序

前言

搭建Spark + IDEA开发环境看参考:
https://blog.****.net/qq_38038143/article/details/89926205

1. Maven 依赖

  <properties>
    <scala.version>2.12.8</scala.version>
    <spark.version>2.4.0</spark.version>
    <mysql.version>5.1.46</mysql.version>
  </properties>

  <dependencies>
      <!--Spark Streaming-->
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>${spark.version}</version>
      </dependency>

    <!--Spark SQL-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

      <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-scala_2.12</artifactId>
          <version>2.6.7.1</version>
      </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>${mysql.version}</version>
    </dependency>


    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

示例

  1. NetworkWordCount
    使用Tocket 统计单词,先启动nc -lk 9999,然后运行程序,再输入单词:
    IDEA开发环境练习Spark Streaming程序
    IDEA开发环境练习Spark Streaming程序
    代码:NetworkWordCount.scala
    注意设置自己的主机名,local[2]因为一个线程接收数据,一个线程处理数据,如果只给一个线程,将报错,没有额外的线程处理数据。
package com.gyt.firstspark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(4))
    val lines = ssc.socketTextStream("master", 9999)
    val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. FileWordCount
    使用本地文件作为输入源,创建目录 test:
    IDEA开发环境练习Spark Streaming程序
    运行程序后,复制文件到该目录(不能到该目录下已有的文件中添加内容,只能复制新的文件到该目录下):
    IDEA开发环境练习Spark Streaming程序
    IDEA开发环境练习Spark Streaming程序
    代码:FileWordCount.scala
    注意:设置自己的目录,这里可以写local,启用一个线程,因为从本地目录读取数据不需要接收器。
package com.gyt.firstspark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FileWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setMaster("local").setAppName("FileWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(4))
    val lines = ssc.textFileStream("file:///home/hadoop/spark-2.4.0/test")
    val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. StatefulWordCount
    有状态的单词统计,即可以保留以前的状态,做所有单词的统计,而不是每次输入的统计。
    IDEA开发环境练习Spark Streaming程序
    IDEA开发环境练习Spark Streaming程序
    IDEA开发环境练习Spark Streaming程序
    代码:StatefulWordCount.scala
    官网关于updateStateByKey的解释:http://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#transformations-on-dstreams
package com.gyt.firstspark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StatefulWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    // 有状态的操作, 必须设置检查点(一般设置到HDFS, 这里设置到当前./checkpoint)
    ssc.checkpoint("./checkpoint")
    val lines = ssc.socketTextStream("master", 9999)
    val result = lines.flatMap(_.split(" ")).map((_,1))
    val state = result.updateStateByKey(updateFunction _) // _ 是隐式转换
    state.print()
    ssc.start()
    ssc.awaitTermination()
  }
  def updateFunction(currentValues: Seq[Int], previousValues: Option[Int]): Option[Int] = {
    val newCount = currentValues.sum
    val previousCount = previousValues.getOrElse(0) // 有值返回,没有返回0
    Some(newCount + previousCount)
  }
}
  1. ForEachWordCount
    统计单词,并写入MySQL,练习foreachRDD。
    官网解释:http://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#output-operations-on-dstreams
    官网举了4个例子:
    第一个例子是在Driver中创建链接,运行程序会报序列化错,因为连接不能被序列化。
    IDEA开发环境练习Spark Streaming程序
    第二个例子,可以运行,但是为每个RDD都创建了一个连接,消耗了巨大的系统资源,不推荐。
    第三个例子,为每个分区创建一个连接,该分区内的所有RDD都使用该连接,较好。
    第四个例子,使用了连接池,需要时从连接池取,用完返回给连接池,消耗最小。

运行程序:
有一个弊端,插入的数据不会和以前的数据相合并。解决办法:1.在插入之前判断是否已经插入,已经插入的作更新操作。2.HBase/Rides 数据库会自动合并。 (ForeachWordCount2使用第一种办法)
IDEA开发环境练习Spark Streaming程序

MySQL中创建数据库和表:

CREATE DATABASE gyt_wordcount;
USE gyt_wordcount;
CREATE TABLE wordcount(
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
word VARCHAR(50) DEFAULT NULL,
wordcount INT DEFAULT NULL
);

代码:ForEachWordCount.scala

package com.gyt.sparkstream

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
 * 将统计结果写入MySQL
 * 
CREATE DATABASE gyt_wordcount;
USE gyt_wordcount;
CREATE TABLE wordcount(
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
word VARCHAR(50) DEFAULT NULL,
wordcount INT DEFAULT NULL
);
 */

object ForEachWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForEachWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val lines = ssc.socketTextStream("master", 9999)
    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    //TODO... 将结果写入MySQL
    // 连接不能被序列化
//    result.foreachRDD(rdd => {
//      val connection = createConnection()
//      rdd.foreach(record => {
//        val sql ="INSERT INTO wordcount(word, wordcount) VALUES('" +
//              record._1 + "'," + record._2 + ");"
//        connection.createStatement().execute(sql)
//      })
//    })

    result.foreachRDD( rdd => {
      rdd.foreachPartition( partitionRecords => {
        val connection = createConnection()
        partitionRecords.foreach( record => {
          val sql ="INSERT INTO wordcount(word, wordcount) VALUES('" +
                record._1 + "'," + record._2 + ");"
          connection.createStatement().execute(sql)
        })
        connection.close()
      })
    })

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }

  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/gyt_wordcount",
      "root", "123456")
  }
}
  1. ForeachWordCount2
    对已经插入的数据作更新操作。
    运行程序:
    IDEA开发环境练习Spark Streaming程序
    代码:ForeachWordCount2.scala
package com.gyt.firstspark

import java.sql.{Connection, DriverManager}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
 * 将统计结果写入MySQL
 */
object ForEachWordCount2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForEachWordCount2")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val lines = ssc.socketTextStream("master", 9999)
    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    //TODO... 将结果写入MySQL
    result.foreachRDD( rdd => {
      rdd.foreachPartition( partitionRecords => {
        val connection = createConnection()
        partitionRecords.foreach( record => {
          // 已经插入MySQL, 更新操作
          if (hasAlredyStored(record._1, connection)) {
            val sql = "UPDATE wordcount SET wordcount = wordcount + " +
                  record._2 + " WHERE word = '" + record._1 + "';"
            connection.createStatement().executeUpdate(sql)
          } else {
            val sql = "INSERT INTO wordcount(word, wordcount) VALUES('" +
              record._1 + "'," + record._2 + ");"
            connection.createStatement().execute(sql)
          }
        })
        connection.close()
      })
    })

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }

  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/gyt_wordcount",
      "root", "123456")
  }

  def hasAlredyStored(word: String, connection: Connection): Boolean = {
    val sql = "SELECT * FROM wordcount WHERE word = '" + word + "';"
    val results = connection.createStatement().executeQuery(sql)
    if (results.next()) return true
    return false
  }
}
  1. BlankFilter
    黑名单统计,对于黑马单用户不输出他的访问记录。练习transform 。
    官网解释:
    IDEA开发环境练习Spark Streaming程序
    需求:
    输入: DStream
    20190507,zs
    20190507,ls
    20190507,ww
    转换为:(zs, (20190507,zs))(ls, (20190507,ls))(ww, (20190507,ww))
    黑名单: RDD
    ww
    ls
    转换为:(zs, ture)(ls, true)
    左外连接:leftOuterJoin:
    (zs, ((20190507,zs), true))
    (ls, ((20190507,ls), true))
    (ww, ((20190507,ww), false)) --> 如果为false则输出

运行:
IDEA开发环境练习Spark Streaming程序
只打印了ww,过滤了黑名单用户:
IDEA开发环境练习Spark Streaming程序
代码:BlankFilter.scala

package com.gyt.firstspark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 统计黑名单 transform
  *
  * 输入: DStream
  * 20190507,zs
  * 20190507,ls
  * 20190507,ww
  * 转换为: (zs, (20190507,zs))(ls, (20190507,ls))(ww, (20190507,ww))
  *
  * 黑名单: RDD
  * ww
  * ls
  * 转换为: (zs, ture)(ls, true)
  *
  *
  * leftJoin:
  * (zs, ((20190507,zs), true))
  * (ls, ((20190507,ls), true))
  * (ww, ((20190507,ww), false))  --> 输出
  */

object BlankFilter {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[2]").setAppName("BlankFilter")
    val ssc = new StreamingContext(sparkConf, Seconds(4))

    val blanks = List("zs", "ls")
    val blanksRDD = ssc.sparkContext.parallelize(blanks).map(x => (x, true))

    val lines = ssc.socketTextStream("master", 9999)
    /*
     * lines = 20190507,zs
     * leftOuterJoin = (zs, ((20190507,zs), true))
     */
    val whiteVisitor = lines.map(x => (x.split(",")(1), x)).transform( rdd => {
      rdd.leftOuterJoin(blanksRDD)
        .filter(x => x._2._2.getOrElse(false) != true)
        .map(x => x._2._1)
    })

    whiteVisitor.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. SqlNetworkWordCount
    使用Spark Streaming + Spark SQL统计单词。
    注意:Streaming读取到的是DStream类型数据,对其转换为DataFrame类型,即可创建为SQL表。
    官网源代码和解释:
    https://github.com/apache/spark/blob/v2.4.0/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
    IDEA开发环境练习Spark Streaming程序
    运行:
    IDEA开发环境练习Spark Streaming程序
    代码:SqlNetworkWordCount.scala
package com.gyt.firstspark

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

object SqlNetworkWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[2]").setAppName("SqlNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(4))
    val lines = ssc.socketTextStream("master", 9999)
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

网站项目GitHub:

https://github.com/GYT0313/Spark-Learning/tree/master/sparkstream

完!