Spark基础
spark概述
1.1 什么是Spark(官网:http://spark.apache.org)
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。– 也是一个分布式的并行计算框架
– spark是下一代的map-reduce,扩展了mr的数据处理流程。
– executor都是装载在container里运行,container默认内存是1G(参数yarn.scheduler.minimum-allocation-mb定义)
– executor分配的内存是executor-memory,向YARN申请的内存是executor-memory * num-executors。
– AM在Spark中叫driver,AM向RM申请的是executor资源,当分配完资源后,executor启动后,由spark的AM向executor分配task,分配多少task、分配到哪个executor由AM决定,可理解为spark也有个调度过程,这些task都运行在executor的坑里
– Executor有线程池多线程管理这些坑内的task
1.2 为什么要学习Spark
中间结果输出:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
1.3 Spark特点
1.3.1 快
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。
1.3.2 易用
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
1.3.3 通用
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
1.3.4 兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
1.4 Spark和Hadoop作业之间的区别
– 一个MapReduce程序就是一个job,而一个job里面可以有一个或多个Task,Task又可以区分为Map Task和Reduce Task
– MapReduce中的每个Task分别在自己的进程中运行,当该Task运行完时,进程也就结束
– Application:spark-submit提交的程序,一个作业中描述完整的APP作业流
– Driver:完成任务的调度以及和executor和cluster manager进行协调
– Executor:每个Spark executor作为一个YARN容器(container)运行
– Job:和MR中Job不一样。MR中Job主要是Map或者Reduce Job。而Spark的Job其实很好区别,一个action算子就算一个Job,比方说count,first等
– Task:是Spark中最新的执行单元。RDD一般是带有partitions的,每个partition在一个executor上的执行可以认为是一个Task
– Stage:是spark中独有的。一般而言一个Job会切换成一定数量的stage。各个stage之间按照顺序执行
– 应用程序: 由一个driver program和多个job构成
– Job: 由多个stage组成
– Stage: 对应一个taskset
– Taskset: 对应一组关联的相互之间没有shuffle依赖关系的task组成
– Task: 任务最小的工作单元
– (驱动程序)是Spark的核心组件
– 构建SparkContext(Spark应用的入口,创建需要的变量,还包含集群的配置信息等)
– 将用户提交的job转换为DAG图(类似数据处理的流程图)
– 根据策略将DAG图划分为多个stage,根据分区从而生成一系列tasks
– 根据tasks要求向RM申请资源
– 提交任务并检测任务状态
• Executor:
– 真正执行task的单元,一个Work Node上可以有多个Executor
Spark核心
• Spark基于弹性分布式数据集(RDD)模型
具有良好的通用性、容错性与并行处理数据的能力
• RDD(Resilient Distributed Dataset )
弹性分布式数据集(相当于集合),它的本质是数据集的描述(只读的、可分区的分布式数据集),而不是数据集本身• RDD的关键特征:
– RDD使用户能够显式将计算结果保存在内存中,控制数据的划分,并使用更丰富的操作集合来处理
– 使用更丰富的操作来处理,只读(由一个RDD变换得到另一个RDD,但是不能对本身的RDD修改)
– 记录数据的变换而不是数据本身保证容错(lineage)
• 通常在不同机器上备份数据或者记录数据更新的方式完成容错,但这种对任务密集型任务代价很高
• RDD采用数据应用变换(map,filter,join),若部分数据丢失,RDD拥有足够的信息得知这部分数据是如何计算得到的,可通过重新计算来得到丢失的数据
• 这种恢复数据方法很快,无需大量数据复制操作,可以认为Spark是基于RDD模型的系统
– 懒操作,延迟计算,action的时候才操作
– 瞬时性,用时才产生,用完就释放
• Spark允许从以下四个方面构建RDD
– 从共享文件系统中获取,如从HDFS中读数据构建RDD• val a = sc.textFile(“/xxx/yyy/file”)
Sparkcontext是spark的入口,编写spark程序用到的第一个类,包含sparkconf sparkenv等类
– 通过现有RDD转换得到
• val b = a.map(x => (x, 1))
– 定义一个scala数组
• val c = sc.parallelize(1 to 10, 1)
– 有一个已经存在的RDD通过持久化操作生成
• val d = a.persist(), a. saveAsHadoopFile(“/xxx/yyy/zzz”)
• Spark针对RDD提供两类操作:transformations和action
– transformations是RDD之间的变换,action会对数据执行一定的操作– transformations采用懒策略,仅在对相关RDD进行action提交时才触发计算
• 每个RDD包含了数据分块/分区(partition)的集合,每个partition是不可分割的
– 实际数据块的描述(实际数据到底存在哪,或者不存在)
– 其值依赖于哪些partition
• 与父RDD的依赖关系(rddA=>rddB)
– 宽依赖: B的每个partition依赖于A的所有partition
• 比如groupByKey、reduceByKey、join……,由A产生B时会先对A做shuffle分桶
– 窄依赖: B的每个partition依赖于A的常数个partition
• 比如map、filter、union……
• RDD依赖关系
• 每个partition的计算就是一个task,task是调度的基本单位• 若一个stage包含的其他stage中的任务已经全部完成,这个stage中的任务才会被加入调度
• 遵循数据局部性原则,使得数据传输代价最小
– 如果一个任务需要的数据在某个节点的内存中,这个任务就会被分配至那个节点
– 需要的数据在某个节点的文件系统中,就分配至那个节点
• 此时的调度指的是:由spark的AM来决定计算partition的task,分配到哪个executor上
• Spark容错
• 如果此task失败,AM会重新分配task
• 如果task依赖的上层partition数据已经失效了,会先将其依赖的partition计算任务再重算一遍
• 宽依赖中被依赖partition,可以将数据保存HDFS,以便快速重构(checkpoint)
– 窄依赖只依赖上层一个partition,恢复代价较少;宽依赖依赖上层所有partition,如果数据丢失,上层所有partiton要重算
• 可以指定保存一个RDD的数据至节点的cache中,如果内存不够,会LRU释放一部分,仍有重构的可能
这是一个递归过程,会 一直追本溯源,甚至直到最初的输入数据
• RDD的缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
• RDD练习
启动spark-shell
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell--master spark://node1.itcast.cn:7077
练习1:
//通过并行化生成rdd
val rdd1 =sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每一个元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于十的元素
val rdd3 =rdd2.filter(_ >= 10)
//将元素以数组的方式在客户端显示
rdd3.collect
练习2:
val rdd1 =sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd1里面的每一个元素先切分在压平
val rdd2 =rdd1.flatMap(_.split(' '))
rdd2.collect
练习3:
val rdd1 =sc.parallelize(List(5, 6, 4, 3))
val rdd2 =sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 =rdd1.union(rdd2)
//求交集
val rdd4 =rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
练习4:
val rdd1 =sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry",2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 =rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
rdd4.groupByKey
rdd4.collect
练习5:
val rdd1 =sc.parallelize(List(("tom", 1), ("tom", 2),("jerry", 3), ("kitty", 2)))
val rdd2 =sc.parallelize(List(("jerry", 2), ("tom", 1),("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
rdd3.collect
练习6:
val rdd1 = sc.parallelize(List(1,2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
练习7:
val rdd1 =sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2), ("shuke", 1)))
val rdd2 =sc.parallelize(List(("jerry", 2), ("tom", 3),("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
//想要了解更多,访问下面的地址
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
点击打开链接
Spark实践
5.1 环境准备
• SBT编译器安装• 安装包: sbt-0.13.15.tgz
• [[email protected] spark_test]# mkdir -p spark_wordcount/project
• [[email protected] spark_test]# mkdir -p spark_wordcount/src
• [[email protected] spark_test]# mkdir -p spark_wordcount/target
• [[email protected] spark_test]# mkdir -p spark_wordcount/src/main/scala
• 拷贝spark-assembly-1.6.0-hadoop2.6.0.jar到spark_wordcount/lib目录下
• 写完code后,执行编译:
• ]# sbt compile
• ]# sbt package
5.2 任务一:Word Count
• 完成基于scala的spark任务,完成wordcount任务
5.3 任务二:重构协同过滤推荐算法
• 完成基于Spark的协同过滤算法– 另外:杀死一个任务的方法:
– ]# yarn application -kill application_1491925179093_0012