读《A Comparision of Join Algorithms for Log Processing in MapReduce》
这周组会我讲了《A Comparision of Join Algorithms for Log Processing in MapReduce》这篇文章,是2010年发在ACM SIGMOD国际数据管理会议上的,就是设计了一些数据的连接算法,然后为每种算法作了不同的预处理,测试性能,最后还测试了一下MapReduce的scalability。最后组里的大牛说其实这些算法是早就有的,就是用MapReduce重做了一遍...
其中的per-split semi-join和directed join我还是不太懂,per-split semi-join我在网上找到了一张别人画的图,最后贴一下,另外由此发现了一个制图网站:ProcessOn,缺点是别人的图不能直接保存。
1. introduction
【MapReduce优势】
MapReduce是近年来被广泛应用于在集群环境中处理分析大型数据集的计算模型、框架和平台。它成功地实现了隐藏并行的细节、容错、和在编程框架中负载平衡。Web2.0公司和一些传统的使用关系型数据库管理系统的企业用户也开始使用MapReduce。基于对Web2.0公司使用MapReduce情况的观察,日志处理逐渐成为由其完成的一种重要的数据分析工作。在日志处理中,点击流、通话记录日志、交易的顺序等许多事件,都被不断地收集存储在平面文件中。所以MapReduce被用来计算分析各种数据并获取商业上的趋势等。
在日志处理方面,MapReduce比并行的关系型数据库管理系统RDBMS更具优势,有以下几点原因:①待处理的数据量过于庞大。例如中国移动每天收集5到8T的通话记录,Facebook每天收集约6TB新的日志,仅仅格式化并将这些数据载入RDBMS就是一个时间挑战。②日志并不总是遵从某一种格式,开发者通常需要对其增添属性,而且对日志的分析可能会随时间改变,这使得不需要固定格式的MapReduce更受欢迎 ③在一定时间内的日志通常被放在一起分析,所以MapReduce采用的全文扫描比索引扫描更好。④日志分析非常耗时,即使出现故障事件,分析工作也不能停止。虽然多数RDBMS有容错机制,但如果集群中有一个节点发生故障,请求将被重新发出。但是MapReduce在面对故障时只将其错误的部分重新计算。⑤从MapReduce延伸的Hadoop是免费并开源的,并且可以在大多数普通商用硬件上很好运行。
MapReduce相比传统的关系型数据库也有一些缺点,MapReduce没有关系模式、声明式查询语言、也没有索引。
【MAPReduce日志处理方法】
使用MapReduce有多种进行日志处理的方法,但总结在抽象层面上只有几种通用的模式。日志数据按时间戳顺序存储在底层的DFS中,通常有一个或多个包含用户信息等的引用表,这些引用表大小不一但通常小于日志,它们被存放在RDBMS中,然后复制到DFS,使得MapReduce处理日志更高效。
MapReduce的一个日志处理工作可能会扫描在一个确切的时间窗内的所有日志文件,然后按年龄或地理等属性计算出分组后的聚合数。在分析过程中,日志和引用表之间常常建立等值连接。((省略)这种等值连接与传统数据仓库中的星型连接不同,在星型连接中,维度数据由于限制被第一个规约,然后维表与事实表连接。相反的,日志处理中的)等值连接常常是在所有日志记录上建立,引用数据很少被约束,因为日志处理的其中一个目的就是对所有的日志记录根据不同维度的各种联合计算数据。还有就是不能将日志与所有的引用数据预连接,因为预连接结果的大小数量级可能远大于原来的日志。
日志与引用表数据间的等值连接会对日志处理结果产生很大影响,但MapReduce框架不适合执行连接操作。因此,将知名的连接算法应用于MapReduce并不容易,这篇文章完成了将一些连接算法应用于MapReduce并设计了全面的实验进行比较。
(本文的贡献)1. 提供了一些用MapReduce进行等值连接的详细说明,展示了将那些知名常用的连接算法应用在MapReduce上是可行的.
2. 对于每种算法都设计了许多实用的预处理操作,以提高连接在查询时的性能
3. 在有100个节点的Hadoop集群上比较了多种连接算法。开始先详细分析了从MapReduce框架继承来的连接算法的开销,然后展示了这些算法分别所做的权衡和预处理带来的好处。4. 这篇文章还发现了MapReduce所做的权衡与并行的关系型数据库做的权衡有很大不同,这是因为MapReduce为了可扩展性而牺牲了性能。这些发现对MapReduce将来发展声明式查询语言有很大帮助。
2.MapReduce Overview
使用MapReduce编程非常便捷,开发者只需要定义map和reduce函数即可,map函数的输入时键值对(K, V),输出新的键值对(K', V'),reduce函数的输入是由map输出的K和V组成的列表构成的键值对。通常一个MapReduce任务的输入输出都是存放在DFS 中的文件。
当一个MapReduce任务在一个有n个节点的集群上发布时,一个job-tracker会创建m个map任务和r个reduce任务,每个map任务都在输入文件的互不重叠的其中一个split中进行,通常一个split对应着DFS中的一个块。
map任务执行如下步骤:读自己被分配到的文件的split,然后将这些原始字节转换为一系列(K,V)键值对,最后调用使用者定义的map函数,传入(K,V)队列。输出的(K,V)首先被分组为r个块,每个块对应一个reduce任务。
在一个map任务完成时,它会通知每个reduce任务将来要获取某个对应自己的块,这个阶段叫做shuffle。不同map任务的shuffle是可以并行操作的。在所有的map任务都已经完成而且所有块都已经复制完成后,每个reduce任务合并它的块然后生成一张按照K排序的(K,V)列表,然后从这张列表中生成一系列(K, LIST_V)对,最后调用使用者定义的reduce函数生成最终输出。
3.Join Algorithms in MapReduce
这部分描述了使用MapReduce进行日志处理的几个等值连接算法,这里的等值连接是建立在日志表L和引用表R之间的某个单列上的,并且假设L、R和连接的结果都存放在DFS中,map任务中默认的分组函数根据输出key的哈希将每个输出的键值对分配给某个reducer。
①重分区连接
重分区连接是MapReduce中最常用的连接方案,分为标准重分区连接和改进后重分区连接。首先是
1.标准重分区连接
这种连接方案与RDBMS中的sort-merge连接相似,可以在一个mapreduce job中完成。在map阶段,每个map任务在R或L的一个split,就是一个划分上进行,为了识别一个输入的日志记录是来自哪张表的,每个map任务将此输入与它来源的表做标记,然后输出由提取得到的key与标记后的输入构成的新key。然后这些输出被MapReduce框架划分、排序、合并。在reduce阶段,对于每个key值,reduce首先根据标记把输入的日志记录分离并缓存到两个集合中,然后在这些记录之间做笛卡尔积计算。
标准重分区连接的一个潜在问题就是对于来自L和R的一个给定的key对应的所有记录都必须缓存。当这个key的基数较小或数据分布高度不平衡时,记录可能无法全部放入内存。
2.改进的重分区连接
改进的重分区连接解决了缓存问题。首先,在map阶段,输出的key更改为key与表标记的结合,表标记保证了来自R的记录会先于来自L的记录被排序。其次,划分函数是自定义的,所以哈希码只由key计算得到,这样具有相同key的日志记录任然被分配给同一个的reduce任务。reducer中的分组函数也是自定义的所以记录只根据key分组。最后,因为R中的记录被保证先于L,所以只有R记录被缓存。
改进的重分区连接和标准重分区连接的一个共同问题是L和R都必须先排序然后在shuffle阶段发送,这样会损害性能,但如果在连接操作之前对L和R预处理,具体方法是在日志记录生成时对L预分组并在R放入DFS时对R预分组,然后在查询时直接将L和R的分组配对连接,这样shuffle产生的开销就会减小。
与RDBMS相反的是,这种重分区连接不能保证L 和R中相应的分组被放在同一个节点上,因为DFS在放置某特定数据块的位置上做出的决定是相互独立的。因此在查询时必须使用directed join。directed join是一种只有map的连接。每个map任务在L的一个split上执行,在初始化阶段,如果R不在本地,则从DFS中取出,在R上建立一个在主存的哈希表,然后map函数扫描L的一个split中所有的日志记录同时查看它的哈希表来连接。
②广播连接
广播连接是只使用map不用reduce的连接方案,在每个节点上,所有的R从DFS中取出进行广播,从而避免L在网络上传输,由于通常情况下R表远小于L表,所以这种连接节约了网络上的开销。
在每个map任务的初始化init()函数中,广播连接检查R是否已经存放在本地文件系统,如果没有就将R从DFS中取出并分组,然后把分组放入本地文件系统。
广播连接动态地决定是否在L或R上建立哈希表,由于R表和L的split占用内存较小所以被用来建立哈希表,一个split通常小于100兆,如果R表小于L的split,init函数就把R的所有split放入内存建立哈希表。然后map函数从L表的记录中提取key,用key来探测哈希表并生成连接的结果输出。但是如果L的split小于R,连接先不会执行,map函数将以划分R的方式划分L表,在close函数中,R和L相应的分组进行连接。
广播连接的预处理:通过增加R表的复制份数可以确保集群中的大部分节点在本地都有R的拷贝。这样就能确保广播连接成功,避免在init函数中从DFS取出R表。
③半连接
当R表较大时,R中的很多记录实际上并不会被L中的记录引用,如果这时采用广播连接,R中的大部分记录通过网络发送后载入哈希表,但实际上最后并不会被连接引用,如果采用半连接就能避免发送这些记录。半连接有三个阶段,每个阶段对应一个独立的MapReduce任务。
第一阶段,在map函数中,一个哈希表决定了L的一个split中所有不重复的key,然后把这个key的集合作为map任务的输出,这样需要经过排序的数据量就减少了,reduce任务只需将这些key合并存放到一个名为L.uk的文件中,而且这个文件通常是足够小的,能够放入内存的。
第二阶段和广播连接相似,仅使用map。init函数将L.uk文件载入主存哈希表,然后map函数遍历R表中的每个记录,如果某记录的key在L.uk中能找到,就把这条记录输出。这一阶段的输出是由不同的R构成的一个列表,每个R是原来R表的一个split。
第三阶段,所有第二阶段输出的R与L进行广播连接。
虽然半连接避免了通过网络发送不会与L连接的R,但是开销多了对L的一次扫描,后面会测试这个扫描是否真正有益。
半连接的预处理:
可以将半连接的前两个阶段作为预处理部分逐步完成,然后在查询时只需执行第三部分。比如说,当L中产生新的日志记录时,这些记录的不重复的key将被累积然后与R连接以决定R中将与L进行连接的部分。
④分片半连接
半连接存在的一个问题是在过滤后的R中并不是每条记录都能与L的某个特定划分进行连接,分片半连接解决了这个问题。分片半连接也有三个阶段,第一阶段只有map,它生成L的一个split中所有互不重复的key的集合,然后把它们作为文件L.uk存入DFS中。第二阶段,map将R的一个split中所有的记录载入一个哈希表。map的close函数从各个L.uk文件中读取key然后探究它们的哈希表来与R中记录配对,每条配对后的记录被标记后输出,然后这些记录被reduce用来收集R中所有将与L的split连接的记录。第三阶段,存放已经被标记的R记录的文件和与其配对的L的split用directed join进行连接。
与半连接相比,分片半连接的第三阶段开销更小,因为它只移动了将与L的split配对的R中的记录,但第一二阶段更加复杂。
分片半连接的预处理与半连接的预处理相似,可将第一二阶段作为预处理步骤。
⑤讨论
4. Experimental evaluation
1.数据集Datasets
这篇文章做的所有的实验都是在有100个节点的集群上运行的,每个节点同时跑两个map任务和两个reduce任务,因此任何时间集群至少能同时有200个map和200个reduce任务运行。
在实验中使用虚拟数据集模拟了一个日志表L,L连接存储用户信息的R表。L的大小确定为500GB,R的大小在100K到1000M之间不等。L和R的连接为典型的多对一连接,L中多条记录都引用R中某一条记录。连接结果是一个10字节的key、L中一个10字节的列和R中一个5字节的列。为了模拟日志分析中所有用户都不活跃的场景,实验者将R中被引用到的记录占比设置为0.1%, 1%, 10%,在这种场景中,L所有的记录都会出现在连接结果中,而R只有部分数据出现。另一种场景是一部分用户比其他用户更活跃,这时使用Zipf分布测算被引用的key在L中出现的频率,参数为skew。在两种场景中,R被引用的key是随机分配给L的记录的。
2.MapReduce Time BreakDown
3.Experimental Results
这部分记录了实验结果。首先考虑没有对L和R做预处理的连接情况,再考虑对L和R做了预处理的情况,最后评估这些连接算法的可扩展性。
另外需要注意的是实验结果省略了将连接结果写入HDFS的开销,因为这样会让不同算法间的差异更明显,也更能模拟将连接后的结果立即聚合到小型数据集的情况。
①没有预处理的情况
这张图中展现了在均匀数据集,就是skew为零,所有用户都不活跃的连接结果。每张图对应R中不同的被引用的部分。下面的X轴代表R中记录的数量,上面的X轴代表日志的大小规模。Y轴代表每个连接算法的总运行时间。右下方显示了四种连接方式。
第一个,standard表示重分区连接,从右往左看,当R变小,重分区连接用时也相应减少了,但在1%到0.1%用时又增加了,这是因为R越小,L中越多的记录会用同一个key,而是这种连接必须缓冲L中所有使用同一个key的记录,这就造成大量写内存的操作。甚至在一些极端情况下重分区连接会因为耗尽内存而无法完成,比如第二张和第三张图。
第二个improved代表改进后的重分区连接。由于这种连接不会把L中的记录缓存在内存中,所以整体性能比重分区连接更好。但是当R变小时用时还是增加了,这是因为最终没有足够的key来把工作均匀的分配给Reduce。比如,当R有100K的记录,其中有0.1%被引用,L中只有100个不相同的key,既然所有共享相同key的记录会被发给同一个reduce,那么所有的reduce中将有一半没有工作可做,同时另一半reduce承担两倍于平时的工作量。
第三个是广播连接,它的性能随着R的大小和被引用key的占比减小而提高。当R有1M左右的记录时,广播连接的性能好于改进后的重分区连接,但是当R表增大时广播连接的性能迅速下降,这是因为在网络上将R传播给各个节点与将它们载入内存开始占据大部分开销。
最后一个是半连接,这种连接在任何情况下都不是性能最好的,这是因为从HDFS扫描L表的开销较高。
下面这张图展示了在倾斜数据集,就是一部分用户比其他用户更活跃的情况中的实验结果。在倾斜数据集上的结果与在均匀数据集上的结果相似,主要区别就是当R表和被引用占比更小时,标准重分区连接比改进后重分区的性能下降的更快。这是因为倾斜数据集放大了这两种算法都存在的问题(L和R都必须先排序然后在shuffle阶段通过网络发送,这样会损害性能)。
②有预处理
这部分实验以未经预处理的改进后重分区算法为基准,测试通过预处理连接算法的性能可以提升的程度。实验在预分组后的L和R上directed join,由于很难确定分组应该的数量,实验者将分组数定为200和5000,命名为direct-part200和direct-part5000,directpart200匹配了集群的核的数量,directpart5000确保了内存能承受的最大分组数量。广播连接的预处理步骤将R表复制给集群中的每个节点。对于半连接和分片半连接,我们假设他们的前两个阶段都在预处理步骤中完成了,只有最后阶段在查询时执行。
这张图展示了在均匀数据集上预处理的结果,(由于倾斜数据集的结果与其相似就省略了。)从左往右看,可以观察到随着R表增大,广播连接和性能降级最快,随后是direct-part200和半连接。分片半连接和direct-part5000效率较高。这主要是因为在R上运行的哈希表更小。总体来看,预处理相较于未经预处理的改进后重分区算法的用时减少了60%。
最后是预处理本身的开销,半连接、分片半连接的用时最少,因为它们不需要在网络上传输L表。
可扩展性:
这一部分的实验探究随着集群中节点数量增加线性地增加连接操作的输入的大小对性能造成的影响。实验者使用均匀分布的L表和被引用率为1%的R表,固定R有一千万条记录。然后将节点数量从5增加到100,可以看到每个算法的用时都是几乎成线性增长的。
4.4在Pig中比较连接算法
MapReduce上的一些声明式的语言,如pig, hive,jaql已经开始研究连接算法,这部分实验将Pig中的连接算法与实验者使用的算法对比。pig提供的连接策略有两种:重分区连接和分片复制连接,分别与实验者的改进后重分区连接和广播连接相似。表格展示了在一些checkpoint的实验数据,结果显示使用改进后的重分区连接性能大概是pig的重分区连接的2.5倍。然后实验者还将自己的广播连接与pig的分片复制连接对比,对于有R中30万条记录,在均匀数据集与倾斜数据集以及引用率分别为0.1%与1%时,广播连接都能够保持3倍于分片复制连接的速度。这是因为在广播连接中,同一个节点上的所有的map任务共用一份本地的R的拷贝,但分片复制连接是在每个map任务中从DFS中重读R。另外,广播连接可以动态的为内存中的哈希表选择更小的输入,这里的输入就是指R或L的split,而pig总是将整个R表载入内存。
4.5 Discussion
性能分析:
从实验数据可以看出,在大多数情况下不同连接算法的性能差都在3倍以内,而不是像在传统的RDBMS中那样差了几个数量级。MapReduce有更多的内部计算开销,比如输入的格式转换、校验和检验和任务初始化,由于实验环境中的这些开销和较高的网络带宽,以重分区为基础的连接的shuffle开销显得就没有那么高了,但是在其他网络带宽更低的实验环境中,以重分区为基础的连接的网络开销会更加显著,不同算法的性能差异也被放大了。
自定义split:
对于广播连接,每个map任务都有将引用数据载入内存的固定开销。如果我们对负载平衡和应对故障的表现要求不高,可以增大map任务来分摊这一开销,比如说:不是把每个map放在一个单独的DFS块上工作,而是将多个块分配给一个map任务。但是如果我们只将数个连续块分到一个split,这些块就可能遍布在不同的节点。最终当我们规划map任务时就失去了数据的本地性。因此为了保护数据在本地,我们自定义了一个功能在Hadoop生成split,这样在一个节点上的多个非连续块就能被分到一个本地split,称之为big split。在实验中,实验者将5个块分为一个big split。
图8展示了使用big split连接的性能,广播连接的总用时大约减少到原来的50%。不用big split时每个节点必须从内存加载引用数据40次左右,而用big split只需加载8次。但是对于改进后重分区连接,性能只提升了3%,这是因为使用big split只是节约了算法中的初始化开销。实验者还注意到big split的局限性,如果split太大可能反而会影响负载平衡。
选择正确的策略:
图9将上面研究的连接算法所做的权衡折中总结为决策树。这个决策树试图依据相关的数据集规模和被引用的key等数据确定对于给定条件下的正确连接策略。从决策树来看,如果没有预处理,就看要通过网络传输的数据规模,如果将R表广播到每个节点的网络开销比将R和L都进行传输的开销更小,就使用广播连接。如果有预处理,最好的选择是分好组的半连接、分片半连接或directed join。这个决策树虽然比较简单,但是为建立在MapReduce框架上高级查询语言pig hive jaql的查询优化提供了很大的帮助。
5.Conclusion and Future Work
使用MapReduce依据各种引用表对日志数据做连接已经是企业用户的分析操作中的一项重要内容。这篇文章分析比较了数据库领域30年来的工作来在MapReduce上设计一系列连接算法,并且不需要现有框架作出任何修改,然后通过实验测试了这些算法通过某些特定的预处理能优化多少性能。这些研究结果能够帮助优化器仅依据少量的数据和可操作特性选择一个合适的算法。
未来的工作有很多方向,包括将这些算法用于多表连接、研究索引方法来加速连接查询、设计一个优化模型来自动选择合适的连接算法等等。另外,这篇文章展现出了MapReduce编程模型在完成连接方面的局限性。因此未来的另一个重要工作就是设计一种新的编程模型扩展MapReduce框架。