IDEA编写简单spark streaming案例,提交jar包运行
编写案例之前,首先要确认IDEA的jdk,scala sdk等已配置完成.
1.spark streaming相关依赖
1)验证是否安装了nc
$ which nc
/usr/bin/which: no nc in .........................
2)安装nc-1.84-22.el6.x86_64.rpm
$ chmod u+x nc-1.84-22.el6.x86_64.rpm
注:不加sudo时,提示 error: can't create transaction lock on /var/lib/rpm/.rpm.lock (Permission denied),因为权限不足
$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm
3)再次验证是否安装nc
$ which nc
/usr/bin/nc
2.在IDEA中编写spark streaming wordcount简单案例
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object FirstSparStreamingDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FirstSparStreamingDemo").setMaster("local[2]") //创建StreamingContext,根据需要指定收集处理数据的时间间隔 val scc = new StreamingContext(conf, Seconds(5)) //数据的收集 val lines = scc.socketTextStream("192.168.10.8", 9999) //数据的处理 val word = lines.flatMap(lines => lines.split(" ")) val pair = word.map(word => (word, 1)) val wordCount = pair.reduceByKey((a, b) => (a + b)) //数据的输出 wordCount.print() //开启StreamingContext接收器,用来接收数据 scc.start() //等待终端响应 scc.awaitTermination() //关闭StreamingContext scc.stop() } }
3.打成jar包
在如图的目录中即可找到jar包,再把jar包拷贝到伪分布式集群上
4.开启服务进程
$ nc -lk 9999
$ sbin/hadoop-daemon.sh start namenode
$ sbin/hadoop-daemon.sh start datanode
$ bin/hive --service metastore &
5.在spark安装目录下执行:
$ bin/spark-submit --master spark://192.168.10.8:9999 \
--executor-memory 2g \
--total-executor-cores 10 \
--driver-memory 2G \
--class com.tom.spark.FirstSparStreamingDemo /home/tom/sparkStreamingStudy.jar
参数:
master spark: 伪分布式集群的地址
executor-memory: 设置每个executor进程的内存
total-executor-cores: 设置executor使用的总CPU核数
driver-memory: 设置Driver进程所消耗的内存
class: 要运行的jar包的main函数 jar包所在的位置
6.在打开的nc运行窗口输入以空格分隔的字符串,按enter换行
在执行jar包的窗口看到如图的计算结果,则执行成功.