Spark2.X环境准备、编译部署及运行
1.Spark概述
Spark 是一个用于大规模数据处理的快速和通用的计算引擎。
在速度方面, Spark 扩展了广泛使用的 MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。 在处理大规模数据集时,速度是非常重要的。速度快就意味着我们可以进行交互式的数据操作, 否则我们每次操作就需要等待数分钟甚至数小时。
Spark 的一个主要特点是基于内存进行计算,因而更快。不过即使是必须在磁盘上进行的复杂计算, Spark 依然比 MapReduce 更加高效。
2.Spark生态系统
3.Spark学网站
1)databricks 网站
2)spark 官网
3)github 网站
4.Spark2.x源码下载及编译生成版本(安装在节点2-可自选)
1)Spark2.2源码下载到bigdata-pro02.kfk.com节点的/opt/softwares/目录下。
解压
tar -zxf spark-2.2.0.tgz -C /opt/modules/
2)spark2.2编译所需要的环境:Maven3.3.9和Java8
3)Spark源码编译的方式:Maven编译(我们采用此方式)、SBT编译(暂无)和打包编译make-distribution.sh
a)下载Jdk8并安装(如果以前就是此版本则跳过相应步骤)
tar -zxf jdk8u60-linux-x64.tar.gz -C /opt/modules/
b)JAVA_HOME配置/etc/profile
vi /etc/profile
export JAVA_HOME=/opt/modules/jdk1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
编辑退出之后,使之生效
source /etc/profile
c)如果遇到不能加载当前版本的问题
rpm -qa|grep jdk
rpm -e --nodeps jdk版本
which java 删除/usr/bin/java
d)下载并解压Maven
下载Maven
解压maven
tar -zxf apache-maven-3.3.9-bin.tar.gz -C /opt/modules/
配置MAVEN_HOME
vi /etc/profile
export MAVEN_HOME=/opt/modules/apache-maven-3.3.9
export PATH=$PATH:$MAVEN_HOME/bin
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=1024M -XX:ReservedCodeCacheSize=1024M"
编辑退出之后,使之生效
source /etc/profile
#查看maven版本
[[email protected] modules]$ mvn -version
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1024M; support was removed in 8.0
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /opt/modules/apache-maven-3.3.9
Java version: 1.8.0_60, vendor: Oracle Corporation
Java home: /opt/modules/jdk1.8.0_60/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "2.6.32-431.el6.x86_64", arch: "amd64", family: "unix"
e)在/etc/resolv.conf文件中添加如下内容
nameserver 8.8.8.8
nameserver 8.8.4.4
注:这样做主要是为了一会儿能够正常从外网下载我们所需要的依赖。
f)编辑make-distribution.sh内容,可以让编译速度更快
VERSION=2.2.0
SCALA_VERSION=2.11.8
SPARK_HADOOP_VERSION=2.6.0
#支持spark on hive
SPARK_HIVE=1
4)通过make-distribution.sh源码编译spark(时间很长,一个小时以上,而且可能不会一次成功,请保持网络状况良好!)
[[email protected] modules]$ cd spark-2.2.0/
[[email protected] spark-2.2.0]$ ./dev/make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn
编译成功,历时两个半小时~
#编译完成之后解压
tar -zxf spark-2.2.0-bin-custom-spark.tgz -C /opt/modules/
#重命名
[[email protected] modules]$ mv spark-2.2.0-bin-custom-spark/ spark-2.2.0-bin
[[email protected] modules]$ ls
apache-maven-3.3.9 hadoop-2.6.0 jdk1.8.0_60 kafka_2.11-0.9.0.0 spark-2.2.0 zookeeper-3.4.5-cdh5.10.0
flume-1.7.0-bin hbase-0.98.6-cdh5.3.0 kafka_2.11-0.8.2.1 producer.properties spark-2.2.0-bin
5.scala安装及环境变量设置
1)下载
2)解压
tar -zxf scala-2.11.8.tgz -C /opt/modules/
3)配置环境变量
sudo vi /etc/profile
export SCALA_HOME=/opt/modules/scala-2.11.8
export PATH=$PATH:$SCALA_HOME/bin
4)编辑退出之后,使之生效
source /etc/profile
6.spark2.0本地模式运行测试
1)启动spark-shell测试
/bin/spark-shell
基本操作示例:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
scala> textFile.count()
res0: Long = 126
scala> textFile.first()
res1: String = # Apache Spark
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
2)词频统计
a)创建一个本地文件test.txt
vi /opt/datas/test.txt
doop storm spark
hbase spark flume
spark rdd spark
hdfs mapreduce spark
hive hdfs solr
spark flink storm
hbase storm es
b)spark-shell 词频统计
./bin/spark-shell
#加载文件内容,形成RDD
scala> val rdd = spark.read.textFile("/opt/datas/test.txt")
rdd: org.apache.spark.sql.Dataset[String] = [value: string]
#以空格为分割单位,形成若干元组
scala> var lines = rdd.flatMap(x => x.split(" ")).collect
lines: Array[String] = Array(doop, storm, spark, hbase, spark, flume, spark, rdd, spark, hdfs, mapreduce, spark, hive, hdfs, solr, spark, flink, storm, hbase, storm, es)
#形成(key, value)元组对
scala> var lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).collect
lines: Array[(String, Int)] = Array((doop,1), (storm,1), (spark,1), (hbase,1), (spark,1), (flume,1), (spark,1), (rdd,1), (spark,1), (hdfs,1), (mapreduce,1), (spark,1), (hive,1), (hdfs,1), (solr,1), (spark,1), (flink,1), (storm,1), (hbase,1), (storm,1), (es,1))7.spark
#根据key值进行统计合并
scala> var lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).collect
lines: Array[(String, Int)] = Array((spark,6), (hive,1), (es,1), (flume,1), (doop,1), (flink,1), (mapreduce,1), (solr,1), (rdd,1), (hdfs,2), (storm,3), (hbase,2))
#可以交换原来的key和value的位置
scala> var lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).map(x => (x._2,x._1)).collect
lines: Array[(Int, String)] = Array((6,spark), (1,hive), (1,es), (1,flume), (1,doop), (1,flink), (1,mapreduce), (1,solr), (1,rdd), (2,hdfs), (3,storm), (2,hbase))
#排序
scala> var lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).map(x => (x._2,x._1)).sortByKey().collect
lines: Array[(Int, String)] = Array((1,hive), (1,es), (1,flume), (1,doop), (1,flink), (1,mapreduce), (1,solr), (1,rdd), (2,hdfs), (2,hbase), (3,storm), (6,spark))
#降序排序之后再将key、value顺序换回去
scala> var lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).map(x => (x._2,x._1)).sortByKey().map(x => (x._2,x._1)).collect
lines: Array[(String, Int)] = Array((hive,1), (es,1), (flume,1), (doop,1), (flink,1), (mapreduce,1), (solr,1), (rdd,1), (hdfs,2), (hbase,2), (storm,3), (spark,6))
c)通过web页面查看spark服务情况
bigdata-pro02.kfk.com:4040
以上就是博主为大家介绍的这一板块的主要内容,这都是博主自己的学习过程,希望能给大家带来一定的指导作用,有用的还望大家点个支持,如果对你没用也望包涵,有错误烦请指出。如有期待可关注博主以第一时间获取更新哦,谢谢!同时也欢迎转载,但必须在博文明显位置标注原文地址,解释权归博主所有!