Spark Streaming示例(scala篇)

本段代码运行于Intellij IDEA中,与linux 中nc进行交互


1.Scala代码:


import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamingWordCount {
  def main(args: Array[String]) {
  
    //程序在运行时receiver会独占一个线程,所以streaming程序至少要两个线程,防止starvation scenario
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")

//所有流功能的主要入口
    val ssc: StreamingContext = new StreamingContext(conf , Seconds(5))

//指定从TCP源数据流的离散流,接收到的每一行数据都是一行文本
    val stream: ReceiverInputDStream[String] =  ssc.socketTextStream("hadoop-1707-003",6666)

//将接收到的文本压平,转换,聚合
    val dStream: DStream[(String, Int)] =  stream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
    
dStream.print()

// Spark Streaming 只有建立在启动时才会执行计算,在它已经开始之后,并没有真正地处理。
//---------------------------------------
//启动计算
    ssc.start()
//等待计算终止
    ssc.awaitTermination()
    //true    会把内部的sparkcontext同时停止
    //false  只会停止streamingcontext  不会停sparkcontext
    ssc.stop(true)
  }

}

2.linux中nc

        2.1.下载nc

        下载地址:http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm

        2.2.解压nc

        rpm  -ivh  nc-1.84-22.el6.x86_64.rpm

        2.3.开启nc命令

        nc    -lk    6666

3.运行scala代码,并在nc上输入

运行结果如图:Spark Streaming示例(scala篇)