SBT 构建 spark streaming集成kafka (scala版本)
前言:
最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家!
环境准备:
操作系统 : ubuntu14.04 LTS
hadoop 2.7.1 伪分布式搭建
sbt-0.13.9
kafka_2.11-0.8.2.2
spark-1.3.1-bin-hadoop2.6
scala 版本 : 2.10.4
注: 请重视版本问题,之前作者用的是spark-1.4.1 ,scala版本是2.11.7 结果作业提交至spark-submit 总是失败,所以大家这点注意下!
hadooop 2.7.1 伪分布式搭建 大家可以参照 http://www.wjxfpf.com/2015/10/517149.html
kafka安装与测试:
- 到官网http://kafka.apache.org/downloads.html 下载 kafka_2.11-0.8.2.2.tgz
-
进入下载目录,打开终端,输入以下命令,将其解压至 /usr/local 目录: sudo tar -xvzf kafka_2.11-0.8.2.2.tgz -C /usr/local
-
敲入用户密码后,kafka 成功解压,继续输入以下命令:
- cd /usr/local 跳转至/usr/local/ 目录;
-
sudo chmod 777 -R kafka_2.11-0.8.2.2 获得该目录的所有执行权; gedit ~/.bashrc 打开个人配置 末尾添加 export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.2
export PATH=$PATH:$KAFKA_HOME/bin - 保存,终端输入 source ~/.bashrc
kafka 有其自带默认的zookeeper 所以省去了我们一些功夫,现在可以开始测试下kafka:
- 新建终端输入 cd $KAFKA_HOME 进入kafka 目录 (为了方便,我们称此终端为1号终端)
-
bin/zookeeper-server-start.sh config/zookeeper.properties & 后台运行zookeeper
-
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 新建一个叫test的topic
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
- 开启一个新的终端(为了方便,我们称此终端为2号终端),并进入kafka 目录 ,输入:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
-
现在,在1号终端输入HAHA,如果2号终端能输出HAHA,说明kafka测试成功!
SBT构建一个关于单词计数的scala程序
- 新建一个文件夹,命名为spark_kafka
- 进入spark_kafka,按/src/main/scala/KafkaDemo.scala层级目录 新建KafkaDemo.scala
- 在spark_kafka目录下 新建project 目录 在project下新建plugins.sbt
- 在spark_kafka目录下新建assembly.sbt
-
最后,你所看到的目录结构如下
-
- spark_kafka/
- spark_kafka/src
- spark_kafka/src/main
- spark_kafka/src/main/scala
- spark_kafka/src/main/scala/KafkaDemo.scala
- spark_kafka/project
- spark_kafka/project/plugins.sbt
- spark_kafka/assembly.sbt
-
其中,KafkaDemo.scala 代码如下
import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object KafkaDemo { def main(args: Array[String]) { val zkQuorum = "127.0.0.1:2181" val group = "test-consumer-group" val topics = "test" val numThreads = 2 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
assmebly.sbt 代码如下
name := "KafkaDemo" version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq( ("org.apache.spark" %% "spark-core" % "1.3.1" % "provided") ) libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.3.1" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0" mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first case x => old(x) } } resolvers += "OSChina Maven Repository" at "http://maven.oschina.net/content/groups/public/" externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
plugins.sbt 内容如下:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
请大家注意 :
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
case x => old(x)
}
}
这段代码只是针对我本机的解决依赖冲突的方法,如果没有这段代码,那么我打包的时候会有依赖冲突的发生,原因是不同包下有相同的类,解决的方法是合并依赖,下面是贴出没有这段代码的错误:
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/hadoop/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.0.jar:org/apache/spark/unused/UnusedStubClass.class
[error] /home/hadoop/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class
大家注意红色高亮的代码,当大家发生其他依赖冲突的时候,可以照猫画虎,解决依赖冲突
接下来,就是在较好的网络环境下进行打包,终端进入spark_kafka 目录 ,敲入sbt assembly , 耐心等代下载打包
spark streaming 对接 kafka 生产消息端口
- 启动hadoop
- 后台启动kafka zookeeper 和 server 端
- 启动producer 命令行(后续通过输入字符串,spark对其进行单词计数处理)
- 新建终端进入spark_kafka 目录,输入
$SPARK_HOME/bin/spark-submit --class "KafkaDemo" target/scala-2.10/KafkaDemo-assembly-1.0.jar
(打包成功的话,会有一个target 目录,而且target下有scala-2.10/KafkaDemo-assembly-1.0.jar ) 。 - 然后在producer 输入一系列字符串,spark streaming会进行处理
如果能看到该结果,那就恭喜你了。
弄这个其实弄了有一段时间,主要问题是依赖的解决,以及版本的问题。如果大家在做的过程发现出现有scala :no such method... 等问题的时候,说明是scala版本不符合了
其他的问题大家可以谷歌,此外强调一点,以上命令跟我个人目录环境有关,比如$SPARK_HOME代表我自己的spark 路径,如果你的目录跟我不一样,自己要换一换;
此文是面向有linux基础的同学,懂基本环境配置,这是最起码的要求!此文也给自己,毕竟确实辛苦!