IDEA开发环境练习Spark Streaming程序
前言
搭建Spark + IDEA开发环境看参考:
https://blog.****.net/qq_38038143/article/details/89926205
1. Maven 依赖
<properties>
<scala.version>2.12.8</scala.version>
<spark.version>2.4.0</spark.version>
<mysql.version>5.1.46</mysql.version>
</properties>
<dependencies>
<!--Spark Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark SQL-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
<version>2.6.7.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
示例
- NetworkWordCount
使用Tocket 统计单词,先启动nc -lk 9999,然后运行程序,再输入单词:
代码:NetworkWordCount.scala
注意设置自己的主机名,local[2]因为一个线程接收数据,一个线程处理数据,如果只给一个线程,将报错,没有额外的线程处理数据。
package com.gyt.firstspark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(4))
val lines = ssc.socketTextStream("master", 9999)
val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
- FileWordCount
使用本地文件作为输入源,创建目录 test:
运行程序后,复制文件到该目录(不能到该目录下已有的文件中添加内容,只能复制新的文件到该目录下):
代码:FileWordCount.scala
注意:设置自己的目录,这里可以写local,启用一个线程,因为从本地目录读取数据不需要接收器。
package com.gyt.firstspark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local").setAppName("FileWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(4))
val lines = ssc.textFileStream("file:///home/hadoop/spark-2.4.0/test")
val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
- StatefulWordCount
有状态的单词统计,即可以保留以前的状态,做所有单词的统计,而不是每次输入的统计。
代码:StatefulWordCount.scala
官网关于updateStateByKey的解释:http://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#transformations-on-dstreams
package com.gyt.firstspark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 有状态的操作, 必须设置检查点(一般设置到HDFS, 这里设置到当前./checkpoint)
ssc.checkpoint("./checkpoint")
val lines = ssc.socketTextStream("master", 9999)
val result = lines.flatMap(_.split(" ")).map((_,1))
val state = result.updateStateByKey(updateFunction _) // _ 是隐式转换
state.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunction(currentValues: Seq[Int], previousValues: Option[Int]): Option[Int] = {
val newCount = currentValues.sum
val previousCount = previousValues.getOrElse(0) // 有值返回,没有返回0
Some(newCount + previousCount)
}
}
- ForEachWordCount
统计单词,并写入MySQL,练习foreachRDD。
官网解释:http://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#output-operations-on-dstreams
官网举了4个例子:
第一个例子是在Driver中创建链接,运行程序会报序列化错,因为连接不能被序列化。
第二个例子,可以运行,但是为每个RDD都创建了一个连接,消耗了巨大的系统资源,不推荐。
第三个例子,为每个分区创建一个连接,该分区内的所有RDD都使用该连接,较好。
第四个例子,使用了连接池,需要时从连接池取,用完返回给连接池,消耗最小。
运行程序:
有一个弊端,插入的数据不会和以前的数据相合并。解决办法:1.在插入之前判断是否已经插入,已经插入的作更新操作。2.HBase/Rides 数据库会自动合并。 (ForeachWordCount2使用第一种办法)
MySQL中创建数据库和表:
CREATE DATABASE gyt_wordcount;
USE gyt_wordcount;
CREATE TABLE wordcount(
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
word VARCHAR(50) DEFAULT NULL,
wordcount INT DEFAULT NULL
);
代码:ForEachWordCount.scala
package com.gyt.sparkstream
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 将统计结果写入MySQL
*
CREATE DATABASE gyt_wordcount;
USE gyt_wordcount;
CREATE TABLE wordcount(
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
word VARCHAR(50) DEFAULT NULL,
wordcount INT DEFAULT NULL
);
*/
object ForEachWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForEachWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("master", 9999)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//TODO... 将结果写入MySQL
// 连接不能被序列化
// result.foreachRDD(rdd => {
// val connection = createConnection()
// rdd.foreach(record => {
// val sql ="INSERT INTO wordcount(word, wordcount) VALUES('" +
// record._1 + "'," + record._2 + ");"
// connection.createStatement().execute(sql)
// })
// })
result.foreachRDD( rdd => {
rdd.foreachPartition( partitionRecords => {
val connection = createConnection()
partitionRecords.foreach( record => {
val sql ="INSERT INTO wordcount(word, wordcount) VALUES('" +
record._1 + "'," + record._2 + ");"
connection.createStatement().execute(sql)
})
connection.close()
})
})
result.print()
ssc.start()
ssc.awaitTermination()
}
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/gyt_wordcount",
"root", "123456")
}
}
- ForeachWordCount2
对已经插入的数据作更新操作。
运行程序:
代码:ForeachWordCount2.scala
package com.gyt.firstspark
import java.sql.{Connection, DriverManager}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 将统计结果写入MySQL
*/
object ForEachWordCount2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForEachWordCount2")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("master", 9999)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//TODO... 将结果写入MySQL
result.foreachRDD( rdd => {
rdd.foreachPartition( partitionRecords => {
val connection = createConnection()
partitionRecords.foreach( record => {
// 已经插入MySQL, 更新操作
if (hasAlredyStored(record._1, connection)) {
val sql = "UPDATE wordcount SET wordcount = wordcount + " +
record._2 + " WHERE word = '" + record._1 + "';"
connection.createStatement().executeUpdate(sql)
} else {
val sql = "INSERT INTO wordcount(word, wordcount) VALUES('" +
record._1 + "'," + record._2 + ");"
connection.createStatement().execute(sql)
}
})
connection.close()
})
})
result.print()
ssc.start()
ssc.awaitTermination()
}
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/gyt_wordcount",
"root", "123456")
}
def hasAlredyStored(word: String, connection: Connection): Boolean = {
val sql = "SELECT * FROM wordcount WHERE word = '" + word + "';"
val results = connection.createStatement().executeQuery(sql)
if (results.next()) return true
return false
}
}
- BlankFilter
黑名单统计,对于黑马单用户不输出他的访问记录。练习transform 。
官网解释:
需求:
输入: DStream
20190507,zs
20190507,ls
20190507,ww
转换为:(zs, (20190507,zs))(ls, (20190507,ls))(ww, (20190507,ww))
黑名单: RDD
ww
ls
转换为:(zs, ture)(ls, true)
左外连接:leftOuterJoin:
(zs, ((20190507,zs), true))
(ls, ((20190507,ls), true))
(ww, ((20190507,ww), false)) --> 如果为false则输出
运行:
只打印了ww,过滤了黑名单用户:
代码:BlankFilter.scala
package com.gyt.firstspark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 统计黑名单 transform
*
* 输入: DStream
* 20190507,zs
* 20190507,ls
* 20190507,ww
* 转换为: (zs, (20190507,zs))(ls, (20190507,ls))(ww, (20190507,ww))
*
* 黑名单: RDD
* ww
* ls
* 转换为: (zs, ture)(ls, true)
*
*
* leftJoin:
* (zs, ((20190507,zs), true))
* (ls, ((20190507,ls), true))
* (ww, ((20190507,ww), false)) --> 输出
*/
object BlankFilter {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[2]").setAppName("BlankFilter")
val ssc = new StreamingContext(sparkConf, Seconds(4))
val blanks = List("zs", "ls")
val blanksRDD = ssc.sparkContext.parallelize(blanks).map(x => (x, true))
val lines = ssc.socketTextStream("master", 9999)
/*
* lines = 20190507,zs
* leftOuterJoin = (zs, ((20190507,zs), true))
*/
val whiteVisitor = lines.map(x => (x.split(",")(1), x)).transform( rdd => {
rdd.leftOuterJoin(blanksRDD)
.filter(x => x._2._2.getOrElse(false) != true)
.map(x => x._2._1)
})
whiteVisitor.print()
ssc.start()
ssc.awaitTermination()
}
}
- SqlNetworkWordCount
使用Spark Streaming + Spark SQL统计单词。
注意:Streaming读取到的是DStream类型数据,对其转换为DataFrame类型,即可创建为SQL表。
官网源代码和解释:
https://github.com/apache/spark/blob/v2.4.0/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
运行:
代码:SqlNetworkWordCount.scala
package com.gyt.firstspark
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
object SqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[2]").setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(4))
val lines = ssc.socketTextStream("master", 9999)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
网站项目GitHub:
https://github.com/GYT0313/Spark-Learning/tree/master/sparkstream