Spark数据本地性

Spark中的数据本地性

分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。

分布式数据并行环境下,保持数据的本地性是非常重要的内容,事关分布式系统性能高下。

概念:

block HDFS的物理空间概念,固定大小,最小是64M,可以是128,256 。。也就是说单个文件大于block的大小,肯定会被切分,被切分的数目大概是:比如文件是250Mblock64M,就会被分为4block64+64+64+58,最后一个block没有满,一个block只能有一个文件的内容,加上每个block一般有3个副本存在,那么这个文件在HDFS集群就有12block分布,可能分布在datanode1,2,3,4  可能分布在datanode4,5,6,7 所以并不是所有的datanode都有这个文件的block

最理想的情况,我们希望一个文件的所有block在一个datanode上面都可以找到,这样可以在读数据的时候避免网络传输

partitionspark的计算数据概念,是RDD的最小单位,它的大小不是固定的,一般是根据集群的计算能力,以及block的数量来决定的,也就是说partition的个数我们是可以自己指定的

spark内部有2种分区策略,一种是hashpartition,一种是rangepartition,其实就是根据key,把k-v数据合理的划分为多个partition,然后对数据进行规划和计算

workerspark计算集群中,非master节点,它的分布也是在某些node上面

rack:数据中心由一堆堆的rack组成,一个rack由多个datanode组成,在rack中的datanode可以看做本地数据,因为网络比较好

所以有datanode不一定上面有worker,同样有workernode上面不一定有合适的数据,这样问题就抽象成为,如何让读取HDFS的时候,spark整体开销最小,这就是本地性的问题

 Spark数据本地性

为上面3block的数据不一样,所以我们在选择worker(h)的时候,要考虑哪个是最佳计算worker

考虑rack的网络本地性,h4 h3 h1 h2 h5 h6 h7 h8

不考虑rack的网络本地性,h1 h4 h5 h6 h2 h3 h7 h8

task运行在哪个机器h上面,是在DAGscheduler在进行stage的划分的时候,确定的

选出了计算节点的顺序,就可以告诉sparkpartition(RDD),然后在h上启动executor/tasker,然后读取数据,h从哪个节点获得block呢?block的几个副本离h的远近不同,网络开销不一样

spark里面数据本地化级别都有哪几种?

PROCESS_LOCAL

进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的taskexecutor执行,数据在executorBlockManager中,性能最好。

NODE_LOCAL

节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中,数据需要在进程间进行传输

NO_PREF

对于task来说,数据从哪里获取都一样,没有好坏之分,比如从数据库中获取数据

RACK_LOCAL

机架本地化,数据和task在一个机架的两个节点上,数据需要通过网络在节点之间进行传输;

ANY

数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差。

通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cachelazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中

文本匹配的实验,发现处理的数据Locality Level都是ANY级别的,从而导致数据在网络上传输,造成效率低下,发现:

Spark Worker Id和Address中都使用的IP地址作为Worker的标识,而HDFS集群中一般都以hostname作为slave的标识,这样,Spark HDFS中获取文件的保存位置对应的是hostname,而Spark自己的Worker标识为IP地址,两者不同,因此没有将任务的Locality Level标记为NODE_LOCAL,而是ANY。

解决方法:在Standalone模式下,单独启动各个Worker节点,命令如下所示:

$SPARK_HOME/sbin/start-slave.sh -h <hostname> <masterURI>

例如:start-slave.sh -h slave1 spark://master1:7077

假设我在slave1上启动Worker节点,master1是主节点

hostname是Worker所在的hostname即slave1,启动masterURL是”spark://master1:7070”

场景

        SparkDriver上,对Application的每一个stagetask,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partitionSparktask分配算法,优先,会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;但是呢,通常来说,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以呢,这种时候,通常来说,Spark会等待一段时间,默认情况下是3s(不是绝对的,还有很多种情况,对不同的本地化级别,可以设置不同的等待时长),默认重试5次,到最后,实在是等待不了了,就会选择一个比较差的本地化级别,比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。

第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现自己本地没有数据,会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。

      所以,当然不希望是类似于第二种情况的了。最好的,当然是task和数据在一个节点上,直接从本地executorBlockManager中获取数据,纯内存,或者带一点磁盘IO;如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。

         如果可以从数据所在的位置拿到数据,那就是最佳情况,直接在一个executor进程内,走内存速度最佳如果数据所在的机器资源被占用,超过3,就会放到离数据近的其他机器上面去,那样Task任务会找它自己本地的BlockManager要数据,没有就会通过BlockManager来管附近的BlockManager就是数据所在机器的要数据,可能不在一个节点,要走网络传输,当然要是说俩个executor都在一个节点里面,那这种情况,也还算不错,就在一个节点,走进程间数据传输即可

        还有一种情况,最差的就是这种跨机架拉取数据的方式了。速度非常慢,对性能的影响,相当大。

我们什么时候要调节这个参数?

spark.locality.wait,默认是3s

  观察日志,spark作业的运行日志,先用client模式,在本地就直接可以看到比较全的日志。日志里面会显示,starting task。。。,PROCESS LOCALNODE LOCAL观察大部分task的数据本地化级别。

        如果大多都是PROCESS_LOCAL,那就不用调节了;如果是发现,好多的级别都是RACK_LOCALANY,那么最好就去调节一下数据本地化的等待时长调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短,别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了

怎么调节

Spark数据本地性

参考博文:

https://www.cnblogs.com/jackie2016/p/5643100.html

http://www.cnblogs.com/yourarebest/p/5122372.html

https://my.oschina.net/rosetta/blog/777829