Spark RDD的缓存机制、CheckPoint机制(容错机制)和RDD的依赖关系
RDD的缓存机制
RDD通过cache方法或者persist方法可以将前面的计算结果缓存,但并不是立即缓存,而是在接下来调用Action类的算子的时候,该RDD将会被缓存在计算节点的内存中,并供后面使用。它既不是transformation也不是action类的算子。
注意:缓存结束后,不会产生新的RDD
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
使用缓存的条件:(或者说什么时候进行缓存)
1.要求的计算速度快,对效率要求高的时候
2.集群的资源要足够大,能容得下要被缓存的数据
3.被缓存的数据会多次的触发Action(多次调用Action类的算子)
4.先进行过滤,然后将缩小范围后的数据缓存到内存中
在使用完数据之后,要释放缓存,否则会一直在内存中占用资源。
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
源码显示的不全,简单说下等级中的五个参数的含义:
第一个:是否存储在磁盘
第二个:是否存储在内存
第三个:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
第四个:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
第五个:备份数,在多个节点上备份
这是存储等级的比较:
CheckPoint机制(容错机制)
在上面我们说到,RDD的缓存容错机制能保证数据丢失也能正常的运行,是因为在每一个RDD中,都存有上一个RDD的信息,当RDD丢失以后,可以追溯到元数据,再进行计算。
检查点(本质是通过将RDD写入高可用的地方(例如 hdfs)做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。
设置checkpoint的目录,可以是本地的文件夹、也可以是HDFS。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据。
在设置检查点之后,该RDD之前的有依赖关系的父RDD都会被销毁,下次调用的时候直接从检查点开始计算。
checkPoint和cache一样,都是通过调用一个Action类的算子才能运行。
checkPoint减少运行时间的原因:第一次调用检查点的时候,会产生两个executor,两个进程分别是从hdfs读文件和计算(调用的Action类的算子),在第二次调用的时候会发现,运行的时间大大减少,是由于第二次调用算子的时候,不会再从hdfs读文件,而读取的是缓存到的数据,同样是从hdfs上读取。
RDD的依赖关系
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
RDD的宽依赖在源码中又叫Shuffle Dependency
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
总结:窄依赖我们形象的比喻为超生
假如RDD在传输过程中造成了数据丢失,就会根据该RDD的依赖关系进行推断,如果是窄依赖,直接找该RDD的父RDD就可以,不会造成太大的冗余数据,但假如是宽依赖,该RDD的父RDD就比较多,有可能是众多父RDD中的一个造成的,但是还是会重新计算,就造成了很大的数据冗余,这也是宽依赖(Shuffle)耗费资源的原因。
这是checkPoint 的源码:
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
这是一个简单的案例,在文件中查找指定科目的出现次数最多的老师:
package day02
import java.net.URL
import day01.MySpark
import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
/**
* 减少shuffle过程
* @author WangLeiKai
* 2018/9/19 14:42
*/
object FavSubTeacher3 {
def main(args: Array[String]): Unit = {
val sc: SparkContext = MySpark(this.getClass.getSimpleName)//这是将sc的定义写到另外一个类中
val lines: RDD[String] = sc.textFile("hdfs://hadoop-master:9000/data/teacher.log")
// val subjects = Array("bigdata", "javaee", "php")
sc.setCheckpointDir("hdfs://hadoop-master:9000/ck")
val subjectAndTeacher: RDD[((String, String), Int)] = lines.map(line => {
val teacher: String = line.substring(line.lastIndexOf("/") + 1)
val host = new URL(line).getHost
val subject = host.substring(0, host.indexOf("."))
((subject, teacher), 1)
})
//取到所有的科目
val subjects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()
val sbPartitioner: SubjectPartitioner = new SubjectPartitioner(subjects)
//reduceByKey方法 参数可以是分区器,如果没有的话 使用的是默认的
val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(sbPartitioner,_+_)
//val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPartitioner)
reduced.checkpoint()
val sorted: RDD[((String, String), Int)] = reduced.mapPartitions(it => {
it.toList.sortBy(_._2).reverse.take(2).iterator
})
val tuples = sorted.collect()
tuples.foreach(println)
sc.stop()
}
}
//这是自定义分区器
class SubjectPartitioner(sbs: Array[String]) extends Partitioner{
//map里放的是科目和对应的分区号 0 1 2
private val rules: mutable.HashMap[String, Int] = new mutable.HashMap[String,Int]()
var index = 0
for(sb <- sbs){
rules.put(sb,index)
index += 1
}
//返回分区的数量 下一个RDD有多少个分区
override def numPartitions: Int = sbs.length
//这里的key是一个元组
override def getPartition(key: Any): Int = {
//获取学科名称
val subject: String = key.asInstanceOf[(String,String)]._1
//根据规则计算分区编号
rules(subject)
}
}