Spark介绍
概念:
Spark是一个快速且通用的集群计算平台
特点:
Spark是快速的
扩充了流行的MapReduce计算模型
基于内存计算的
基于事件驱动,通过线程池复用线程提高性能
抽象出分布式内存存储结构数据,弹性分布式数据集RDD
Spark是通用的:
设计容纳了其他分布式系统的拥有的功能
批处理,迭代式处理,交互式查询和流处理
优点:降低了维护成本
Spark高度开放:
Spark提供了Python,Java,Scala,Sql的API和丰富的内置库
和其他的大数据工具整合的很好,包括hadoop,kafka
Spark历史:
诞生于2009,加州大学伯克利分校RAD实验室的一个研究项目
最初是基于Hadoop MapReduce
发现MapReduce在迭代式计算和交互式计算上低效,引入内存存储
2010年3月份Spark开源
2011年AMP实验室在Spark上开发高级组件,Spark Streaming
2013年转移到Apache下,不就便成为顶级项目了
Spark组件:
SparkCore
包含Spark基本功能,包含任务调度,内存管理,容错机制
内部定义了RDDs(弹性分布式数据集)
提供很多APIs来创建和操作这些RDDS
应用场景,为其他组件提供底层的服务。
SparkSql:
处理结构化数据的库,就像Hive Sql,Mysql一样
应用场景,企业中用来做报表统计的
1.0版本之前没有sql
Spark Streaming
是实时数据流处理组件,类似Storm
Spark Streaming 提供了API 来操作实时流数据
应用场景:企业中用来从Kafka接收数据做实时统计
Mlib:
一个包含通用机器学习功能的包,Machine learning lib。
包含分类,聚类,回归等,还包含模型评估,和数据导入
Mlib提供的上面这些方法,都支持集群上的横线拓展
应用场景,机器学习
Graphx:
是处理图的库(例如,社交网路图),并进行图的并行运算
像Spark Streaming,Spark Sql一样,继承了RDD API
它提供了各种图的操作,和常用的图算法,例如PangeRank算法
应用场景,图计算
Cluster Managers:
就是集群管理,Spark自带一个集群管理是单独调度器
常见集群管理包括Hadoop Yarn,Apache Mesos
紧密集成的优点:
Spark底层优化了,基于Spark底层的组件,也得到了相应的优化。
紧密集成,节省了各个组件组合使用时的部署,测试等时间。
向Spark增加新的组件时,向其他组件,可立刻享用新组建的功能。
Spark与Hadoop比较:
hadoop的应用场景:
离线处理
对时效性要求不高
Spark应用场景:
时效性要求高的场景
机器学习等领域
Spark不具有HDFS的存储能力,要借助HDFS等持久化数据
Spark安装:
Spark是Scala写的,运行在JVM上,所以运行环境为Java7+。
如果使用Python API,需要安装Python2.6+或者Python3.4+
Spark 1.6.2-Scala2.1.0-sbt0.13.8-jdk1.8 Spark 2.0.0-Scala 2.11
SparkShell:
Spark的shell使你能够处理分布在集群上的数据。
Spark把数据加载到节点的内存中,因此分布式处理可以在秒级完成。
快速使迭代式计算,实时查询,分析一般能够在shells中完成
Spark提供了Python shell和Scala Shell
编译Spark:
坑1:
was cache in local repository
解决:编译命令后加 -U
坑2:
如果编译的版本是scala-2.10
解决:执行命令./dev/change-scala-version.sh 2.10
如果在编译后,你看到的异常信息不太明显,编译命令后加-X
RDDs:
Driver Program:
包含程序的main()方法,RDDs的定义和操作。
管理很多节点,我们称作executors
SparkContext:
Driver programs通过SparkContext对象访问Spark。
SparkContext对象代表和一个集群的连接
在Shell中SparkContext自动创建好了,就是sc。
RDDs(Redilient distributed datasets 弹性分布式数据集):
这些RDDs并行分布在整个集群中
RDDs是Spark分发数据和计算的基础抽象类
一个RDD是一个不可改变的分布式集合对象
Spark中,所有的计算都是通过RDDs的创建,转换,操作完成的
一个RDD内部由许多partitions(分片)组成
每一个分片包括一部分数据,partitions可在集群上不同节点上计算
分片是Spark并行处理的单元,Spark顺序的,并行的处理分片
RDDs的创建方法:
1.把一个存在的集合传给SparkContext的parallelize()方法,测试用
val rdd=sc.parallelize(Array(1,2,2,4),4)
第一个参数:待并行化处理的集合,第二个参数:分区的个数
2.加载外部数据集
val rddText=sc.textFile("");
RDD基本操作之Transformations介绍:
概念:从之前的RDD构建一个新的RDD,像map()和filter()
1.map()接受函数,把函数应用到RDD的每一个元素,返回新RDD。
val lines=sc.parallelize(Array("hello","spark","hello","world","!"));
val lines2=lines.map(word=>(word,1))
lines2.foreach(println)
2.filter()接收函数,返回只包含满足filter()函数的元素的新的RDD
val lines=sc.parallelize(Array("hello","spark","hello","world","!"));
val lines3=lines.filter(line=>line.contains("hello"))
3.flatMap()函数对每个输入元素,输出多个输出元素。flat压扁的意思,将RDD中元素压扁后返回一个新的RDD
val inputs=sc.textFile("/home/huawei/data/huawei.txt");
val outputs=inputs.flatMap(line=>line.split(" "))
4.集合运算:
并集:val rdd_union=rdd1.union(rdd2);
交集:val rdd_inter=rdd1.intersection(rdd2);
rdd1有,rdd2没有:val rdd_sub=rdd1.subtract(rdd2);
RDD基本操作之Action介绍:
概念:在RDD上计算出一个结果。把结果返回给driver program或保存在文件系统中,count(),save
reduce()函数:作用在两个类型相同的元素上,返回新元素
可以实现,RDD中元素的累加,计数,和其他类型的聚集操作
collect()函数:遍历整个RDD,向driver program返回RDD的内容
需要单击内存能够容纳下(因为数据要拷贝给driver,测试使用)
在大数据的时候,使用saveAsTextFile()等
take(n)函数:返回RDD的n个元素(同时尝试访问最少的partitions)
返回结果是无序的,测试使用
top(n)函数:排序(根据RDD中数据的比较器)。降序
foreach()函数:计算RDD中的每一个元素,但不返回本地
可以配合println()友好的打印出数据
RDDs的特性:
RDDs的血统关系图
Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图
Spark使用血统关系来计算每一个RDD的需求和恢复丢失的数据
延迟计算:
Spark对RDDs的计算是他们第一次使用action操作的时候
这种方式在处理大数据的时候特别有用,可以减少数据的传输
Spark内部记录metadata表名transformations操作已经被响应了
加载数据也延迟计算,数据只有在必要的时候,才会被加载
RDD.persist():
默认每次在RDDs上面进行action操作时,Spark都重新计算RDDs
如果想重复利用一个RDD,可以使用RDD.persist()
unpersist()方法从缓冲中移除
KeyValue对RDDs:
创建KeyValue对RDDs:
使用map()函数返回键值对
combineByKey():
最常用的基于key的聚合函数,返回的类型可以与输入类型不一样
许多基于key的聚合函数都用到了它,像groupByKey()
遍历partition中的元素,元素中的key,要么之前见过,要么不是
如果是新元素,使用我们提供的createCombiner()函数
如果是这个partition中已经存在的key,就会使用mergeValue()
合计每个分区的结果,使用mergeCombiners()函数