从零开始写一个Spark Structured Streaming程序来统计单词个数
本文将从零开始写一个Spark Structured Streaming程序来统计单词的个数。单词的来源是socket,读者也可以换成kafka,计算的结果输出到控制台,读者也可以改成输出到kafka的某个topic。
准备环境:
JDK和Scala安装,并配置好环境变量JAVA_HOME和SCALA_HOME
下面是配置好的环境信息:
pilafiMac:~ pilaf$ java -version
java version "1.8.0_101"
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.101-b13, mixed mode)
pilafiMac:~ pilaf$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
Spark下载安装
去Spark官方网站(http://spark.apache.org/downloads.html)下载
解压安装
pilafiMac:opt pilaf$ ls
spark-2.3.2-bin-hadoop2.7.tgz
pilafiMac:opt pilaf$ sudo tar -xzf spark-2.3.2-bin-hadoop2.7.tgz
pilafiMac:opt pilaf$ ls
spark-2.3.2-bin-hadoop2.7
spark-2.3.2-bin-hadoop2.7.tgz
pilafiMac:opt pilaf$ sudo rm -f spark-2.3.2-bin-hadoop2.7.tgz
查看安装好的spark是否可用
pilafiMac:opt pilaf$ ls
spark-2.3.2-bin-hadoop2.7
pilafiMac:opt pilaf$ cd spark-2.3.2-bin-hadoop2.7/
pilafiMac:spark-2.3.2-bin-hadoop2.7 pilaf$
pilafiMac:spark-2.3.2-bin-hadoop2.7 pilaf$ ls
LICENSE README.md conf jars python
NOTICE RELEASE data kubernetes sbin
R bin examples licenses yarn
pilafiMac:spark-2.3.2-bin-hadoop2.7 pilaf$ sudo ./bin/spark-shell
2018-10-30 14:08:11 WARN Utils:66 - Your hostname, pilafiMac resolves to a loopback address: 127.0.0.1; using 192.168.1.189 instead (on interface en1)
2018-10-30 14:08:11 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-10-30 14:08:15 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.189:4040
Spark context available as 'sc' (master = local[*], app id = local-1540879704574).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
可见一切正常。
编写Spark Structured Streaming程序
在IDEA工具中新建一个Maven工程,其pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.pilaf</groupId>
<artifactId>spark-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spark.version>2.3.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--maven全量打包插件,把所有依赖打入一个jar包中-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
编写代码,只有一个类:
package com.pilaf.demo;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.Iterator;
/**
* @author pilaf
* @create: 2018-10-30 10:22
*/
public class SparkDemo {
public static void main(String[] args) throws Exception{
SparkSession sparkSession = SparkSession.builder()
.appName("PilafSparkDemo")
.getOrCreate();
sparkSession.sparkContext().setLogLevel("WARN");
//从socket读取输入流,要在本地运行nc -l 9999命令来输入单词
//也可以从kafka中读取某个topic的内容
Dataset<Row> lines = sparkSession.readStream()
.format("socket")
.option("host", "localhost")
.option("port","9999")
.load();
Dataset<String> words = lines.as(Encoders.STRING())
.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
}, Encoders.STRING());
//单词出现次数统计
Dataset<Row> wordCounts = words.groupBy("value").count();
StreamingQuery query = wordCounts.writeStream()
//all the rows in the streaming DataFrame/Dataset will be written to the sink
// every time these is some updates
//complete模式:当wordCounts Dataset中有更新的时候,就把所有的行都全量输出(包括未更新的行)
.outputMode("complete")
//输出到控制台
//输出也可以是kafka
.format("console")
.start();
query.awaitTermination();
}
}
运行maven clean package命令或者用IDEA右侧的Lifecycle中的clean package,然后会在工程的target目录下生成jar包:
pilafiMac:target pilaf$ pwd
/Users/pilaf/project/sparkdemo/target
pilafiMac:target pilaf$ ls
classes spark-demo-1.0-SNAPSHOT.jar
maven-archiver surefire
original-spark-demo-1.0-SNAPSHOT.jar
pilafiMac:target pilaf$ ls -lh
total 254536
drwxr-xr-x 3 pilaf staff 102B 10 30 14:16 classes
drwxr-xr-x 3 pilaf staff 102B 10 30 14:16 maven-archiver
-rw-r--r-- 1 pilaf staff 4.2K 10 30 14:16 original-spark-demo-1.0-SNAPSHOT.jar
-rw-r--r-- 1 pilaf staff 124M 10 30 14:17 spark-demo-1.0-SNAPSHOT.jar
drwxr-xr-x 2 pilaf staff 68B 10 30 14:17 surefire
pilafiMac:target pilaf$
可以看到spark-demo-1.0-SNAPSHOT.jar
有124MB大小,是全量包。
运行nc -l 9999命令来向socket输出字符串
pilafiMac:~ pilaf$ nc -l 9999
nihao
pilaf
apple
nihao
nihao apple orange
What would you like to eat ?
运行spark程序完成计算输出
pilafiMac:target pilaf$ ls
classes original-spark-demo-1.0-SNAPSHOT.jar surefire
maven-archiver spark-demo-1.0-SNAPSHOT.jar
pilafiMac:target pilaf$ export SPARK_HOME=/opt/spark-2.3.2-bin-hadoop2.7
pilafiMac:target pilaf$ /opt/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \
> --class com.pilaf.demo.SparkDemo \
> --master local[4] \
> spark-demo-1.0-SNAPSHOT.jar
2018-10-30 15:05:41 WARN Utils:66 - Your hostname, pilafiMac resolves to a loopback address: 127.0.0.1; using 192.168.1.189 instead (on interface en1)
2018-10-30 15:05:41 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-10-30 15:05:42 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-10-30 15:05:42 INFO SparkContext:54 - Running Spark version 2.3.2
2018-10-30 15:05:42 INFO SparkContext:54 - Submitted application: PilafSparkDemo
2018-10-30 15:05:43 INFO SecurityManager:54 - Changing view acls to: pilaf
2018-10-30 15:05:43 INFO SecurityManager:54 - Changing modify acls to: pilaf
2018-10-30 15:05:43 INFO SecurityManager:54 - Changing view acls groups to:
2018-10-30 15:05:43 INFO SecurityManager:54 - Changing modify acls groups to:
2018-10-30 15:05:43 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pilaf); groups with view permissions: Set(); users with modify permissions: Set(pilaf); groups with modify permissions: Set()
2018-10-30 15:05:43 INFO Utils:54 - Successfully started service 'sparkDriver' on port 54653.
2018-10-30 15:05:43 INFO SparkEnv:54 - Registering MapOutputTracker
2018-10-30 15:05:44 INFO SparkEnv:54 - Registering BlockManagerMaster
2018-10-30 15:05:44 INFO BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2018-10-30 15:05:44 INFO BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2018-10-30 15:05:44 INFO DiskBlockManager:54 - Created local directory at /private/var/folders/nx/8gkdvscn5r1_jlhwshgjr8j40000gp/T/blockmgr-9677604e-639d-4e43-9291-5b065f553c46
2018-10-30 15:05:44 INFO MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2018-10-30 15:05:44 INFO SparkEnv:54 - Registering OutputCommitCoordinator
2018-10-30 15:05:44 INFO log:192 - Logging initialized @5251ms
2018-10-30 15:05:44 INFO Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2018-10-30 15:05:44 INFO Server:419 - Started @5480ms
2018-10-30 15:05:44 INFO AbstractConnector:278 - Started [email protected]{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-10-30 15:05:44 INFO Utils:54 - Successfully started service 'SparkUI' on port 4040.
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/jobs,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/jobs/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/jobs/job,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/jobs/job/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages/stage,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages/stage/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages/pool,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages/pool/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/storage,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/storage/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/storage/rdd,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/storage/rdd/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/environment,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/environment/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/executors,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/executors/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/executors/threadDump,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/executors/threadDump/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/static,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/api,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/jobs/job/kill,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO ContextHandler:781 - Started [email protected]{/stages/stage/kill,null,AVAILABLE,@Spark}
2018-10-30 15:05:44 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://192.168.1.189:4040
2018-10-30 15:05:45 INFO SparkContext:54 - Added JAR file:/Users/pilaf/project/sparkdemo/target/spark-demo-1.0-SNAPSHOT.jar at spark://192.168.1.189:54653/jars/spark-demo-1.0-SNAPSHOT.jar with timestamp 1540883145058
2018-10-30 15:05:45 INFO Executor:54 - Starting executor ID driver on host localhost
2018-10-30 15:05:45 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54658.
2018-10-30 15:05:45 INFO NettyBlockTransferService:54 - Server created on 192.168.1.189:54658
2018-10-30 15:05:45 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2018-10-30 15:05:45 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO BlockManagerMasterEndpoint:54 - Registering block manager 192.168.1.189:54658 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 192.168.1.189, 54658, None)
2018-10-30 15:05:45 INFO ContextHandler:781 - Started [email protected]{/metrics/json,null,AVAILABLE,@Spark}
2018-10-30 15:05:48 WARN TextSocketSourceProvider:66 - The socket source should not be used for production applications! It does not support recovery.
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|nihao| 1|
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|apple| 1|
|nihao| 2|
|pilaf| 1|
+-----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|orange| 1|
| apple| 2|
| nihao| 3|
| pilaf| 1|
+------+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|orange| 1|
| you| 1|
| apple| 2|
| eat| 1|
| nihao| 3|
| like| 1|
| What| 1|
| pilaf| 1|
| would| 1|
| ?| 1|
| to| 1|
+------+-----+
参考文章: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html