Spark Streaming示例(scala篇)
本段代码运行于Intellij IDEA中,与linux 中nc进行交互
1.Scala代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
//程序在运行时receiver会独占一个线程,所以streaming程序至少要两个线程,防止starvation scenario
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")
//所有流功能的主要入口
val ssc: StreamingContext = new StreamingContext(conf , Seconds(5))
//指定从TCP源数据流的离散流,接收到的每一行数据都是一行文本
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop-1707-003",6666)
//将接收到的文本压平,转换,聚合
val dStream: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
dStream.print()
// Spark Streaming 只有建立在启动时才会执行计算,在它已经开始之后,并没有真正地处理。
//---------------------------------------
//启动计算
ssc.start()
//等待计算终止
ssc.awaitTermination()
//true 会把内部的sparkcontext同时停止
//false 只会停止streamingcontext 不会停sparkcontext
ssc.stop(true)
}
}
2.linux中nc
2.1.下载nc
下载地址:http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm
2.2.解压nc
rpm -ivh nc-1.84-22.el6.x86_64.rpm
2.3.开启nc命令
nc -lk 6666
3.运行scala代码,并在nc上输入
运行结果如图: