SBT 构建 spark streaming集成kafka (scala版本)

前言:

    
     最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家!

 

环境准备:

        操作系统 : ubuntu14.04 LTS

      hadoop 2.7.1   伪分布式搭建

      sbt-0.13.9

      kafka_2.11-0.8.2.2

      spark-1.3.1-bin-hadoop2.6

      scala 版本 : 2.10.4

     

      注: 请重视版本问题,之前作者用的是spark-1.4.1 ,scala版本是2.11.7  结果作业提交至spark-submit 总是失败,所以大家这点注意下!             

     

                   hadooop 2.7.1 伪分布式搭建 大家可以参照 http://www.wjxfpf.com/2015/10/517149.html

 

    kafka安装与测试:        

  1. 到官网http://kafka.apache.org/downloads.html 下载 kafka_2.11-0.8.2.2.tgz    
  2. 进入下载目录,打开终端,输入以下命令,将其解压至 /usr/local 目录: sudo tar -xvzf   kafka_2.11-0.8.2.2.tgz -C /usr/local
  3. 敲入用户密码后,kafka 成功解压,继续输入以下命令:
    1. cd  /usr/local    跳转至/usr/local/ 目录;
    2. sudo  chmod 777 -R  kafka_2.11-0.8.2.2   获得该目录的所有执行权;  gedit  ~/.bashrc  打开个人配置 末尾添加 export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.2 
      export PATH=$PATH:$KAFKA_HOME/bin
    3. 保存,终端输入 source ~/.bashrc

 

kafka 有其自带默认的zookeeper 所以省去了我们一些功夫,现在可以开始测试下kafka:

  • 新建终端输入 cd $KAFKA_HOME 进入kafka 目录    (为了方便,我们称此终端为1号终端)
  • bin/zookeeper-server-start.sh config/zookeeper.properties &   后台运行zookeeper

  •  

    bin/kafka-server-start.sh  config/server.properties & 后台启动kafka-server 
  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test    新建一个叫test的topic 
  • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test   Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
  • 开启一个新的终端(为了方便,我们称此终端为2号终端),并进入kafka 目录 ,输入:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 
  • 现在,在1号终端输入HAHA,如果2号终端能输出HAHA,说明kafka测试成功!

 

SBT 构建 spark streaming集成kafka (scala版本)

 

SBT构建一个关于单词计数的scala程序

  • 新建一个文件夹,命名为spark_kafka      
  • 进入spark_kafka,按/src/main/scala/KafkaDemo.scala层级目录   新建KafkaDemo.scala
  • 在spark_kafka目录下 新建project 目录 在project下新建plugins.sbt
  • 在spark_kafka目录下新建assembly.sbt
  • 最后,你所看到的目录结构如下 
      • spark_kafka/
      • spark_kafka/src
      • spark_kafka/src/main
      • spark_kafka/src/main/scala
      • spark_kafka/src/main/scala/KafkaDemo.scala
      • spark_kafka/project
      • spark_kafka/project/plugins.sbt
      • spark_kafka/assembly.sbt
                            

其中,KafkaDemo.scala 代码如下

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object KafkaDemo {
    def main(args: Array[String]) {
  val zkQuorum = "127.0.0.1:2181"
  val group = "test-consumer-group"
  val topics = "test"
  val numThreads = 2
  val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
  val ssc =  new StreamingContext(sparkConf, Seconds(10))
  ssc.checkpoint("checkpoint")

  val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts.print()
  ssc.start()
  ssc.awaitTermination()
  }
}

 

assmebly.sbt 代码如下

name := "KafkaDemo"
version := "1.0"
scalaVersion := "2.10.4"


libraryDependencies ++= Seq(
  ("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
)



libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.3.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0"


mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("org", "apache", xs @ _*)         => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
    case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
    case x => old(x)
  }
}



resolvers += "OSChina Maven Repository" at "http://maven.oschina.net/content/groups/public/"



externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)

  

plugins.sbt 内容如下:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

 

 

 请大家注意 :

  

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
case x => old(x)
}
}

这段代码只是针对我本机的解决依赖冲突的方法,如果没有这段代码,那么我打包的时候会有依赖冲突的发生,原因是不同包下有相同的类,解决的方法是合并依赖,下面是贴出没有这段代码的错误:

[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/hadoop/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.0.jar:org/apache/spark/unused/UnusedStubClass.class
[error] /home/hadoop/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class

 

大家注意红色高亮的代码,当大家发生其他依赖冲突的时候,可以照猫画虎,解决依赖冲突

 

接下来,就是在较好的网络环境下进行打包,终端进入spark_kafka 目录 ,敲入sbt assembly , 耐心等代下载打包

 

spark streaming 对接 kafka 生产消息端口

  • 启动hadoop
  • 后台启动kafka zookeeper 和 server 端
  • 启动producer 命令行(后续通过输入字符串,spark对其进行单词计数处理)
  • 新建终端进入spark_kafka 目录,输入

    $SPARK_HOME/bin/spark-submit --class "KafkaDemo" target/scala-2.10/KafkaDemo-assembly-1.0.jar

    (打包成功的话,会有一个target 目录,而且target下有scala-2.10/KafkaDemo-assembly-1.0.jar ) 。  
  • 然后在producer 输入一系列字符串,spark streaming会进行处理

SBT 构建 spark streaming集成kafka (scala版本)

 

如果能看到该结果,那就恭喜你了。

弄这个其实弄了有一段时间,主要问题是依赖的解决,以及版本的问题。如果大家在做的过程发现出现有scala :no such method...    等问题的时候,说明是scala版本不符合了

其他的问题大家可以谷歌,此外强调一点,以上命令跟我个人目录环境有关,比如$SPARK_HOME代表我自己的spark 路径,如果你的目录跟我不一样,自己要换一换;

此文是面向有linux基础的同学,懂基本环境配置,这是最起码的要求!此文也给自己,毕竟确实辛苦!