spark调优的一些点
1.RDD操作的优化
不要创建数据相同的RDD两次(多次)
通常创建一个RDD(读取HDFS或者Hive中的文件),然后对这个RDD做一些算子操作,得到下一个RDD,如果同一个RDD创建了两遍(数据相同),就会从磁盘中读取两次,会浪费大量的时间和性能。
RDD要尽可能的复用
如果需要RDD中的部分值,不需要创建一个新的RDD,这样会多使用一次spark算子。
比如需要tuple中的第二个值,可以用tuple._2来代表,而不是创建一个新的RDD。
要持久化使用次数多的RDD
Spark中对于一个RDD,在每次执行一个算子操作的时候,都会重新从源头处计算一遍,然后再执行算子。如果RDD持久化后,Spark就会根据持久化策略,将RDD中的数据保存到内存或磁盘中,以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,而不会从源头处重新计算一遍。
cache()、persist()进行持久化的时候,常用的持久化策略:
MEMORY_ONLY、MEMORY_SER、MEMORY_AND_DISK、MEMORY_AND_DISK_SER。
MEMORY_ONLY会把数据存在内存中,如果内存足够大可以这样做。
MEMORY_SER内存足够但是有时候会OOM时,可以把数据序列化存储,可以节省空间。
MEMORY_AND_DISK内存不够时,这个策略会优先存到内存,内存不够再存到磁盘。
MEMORY_AND_DISK_SER把数据序列化存储,优先内存,内存不够存到磁盘。
非必要的情况下尽量避免使用shuffle类算子
要尽量避免使用shuffle类算子,因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,就是将分布在集群众多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。
shuffle过程中,各个节点上的相同key有可能会写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同的key都拉取到同一个节点进行聚合操作时,有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发上大量的磁盘文件读写的IO操作以及数据的网络传输操作。
传统的join操作会导致shuffle操作。
val rdd3 = rdd1.join(rdd2)
使用Broadcast+map的join操作,不会导致shuffle操作。
使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用,因为rdd2.collect()会把rdd2的数据返回到driver中,成为一个数组,而且每个Executor的内存中,都会有一份rdd2的全量数据
使用map-side预聚合的shuffle操作
如果一定要使用shuffle操作,无法用map类的算子代替,那么尽量使用可以map-side预聚合的算子。
map-side预聚合指的是每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常情况下,使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为他们能够预聚合,而groupByKey不能。
下图展示了部分原理:
使用高性能的算子
除了shuffle相关的算子有优化原则外,其他的算子也都有着相应的优化原则。
reduceByKey/aggregateByKey替代groupByKey
因为map-side预聚合的原因
使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是注意mapPartitions有时候会出现OOM,因为单此函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收是无法回收掉太多对象的,很可能出现OOM,所以使用要慎重。
使用foreachPartitions替代foreach
原理与mapPartitions替代map相同。使用这种整体操作的算子,比一条一条处理的算子对性能的提升有很大的帮助(比如将RDD中的所有数据写入MySQL)。
使用filter之后进行coalesce操作
通常对一个RDD执行filter算子过滤掉RDD中比较多的数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。减少了partition数量,就可以使用更少的task来处理,在某些场景下,对于性能的提升会有一定的帮助。
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
Spark官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort同时进行,比先shuffle在sort来说,性能可能是要高的。