Twister: 迭代MapReduce计算框架

摘要:MapReduce编程模型已经简化了许多数据并行应用的实现。编程模型的简化和MapReduce实现提供的服务质量在分布式计算社区上吸引了很多的热情。把MapReduce应用到多种科学应用的这么多年来,我们扩展了一组编程模型,改善了其结构,以至于可以适用到更多的应用类型。这篇文章中,我们提出了一种编程模型和Twister结构,这是一种增强的MapReduce runtime,它支持高效地迭代MapReduce计算。我们也展示了在大型数据并行应用上Twister与其它类似的runtimes的性能比较,如Hadoop和DryadLINQ。
关键字:MapReduce,云技术,迭代算法

1. 介绍

  数据泛滥正在许多领域上演,在一些领域如天文学、粒子物理学和信息检索,数据的规模已经是千兆级别了。数据规模的增加也增加了原始数据转换为有意义的信息的计算量。在许多场景中,所需的处理能力远超过单台计算机的处理能力,需要强制使用并行/分布式计算策略。这些需求引导着新编程模型的发展和MapReduce和Dryad的实现。
  因为它的简单和提供服务质量的提升,MapReduce编程模型已经吸引了大量的热情。不像经典的分布式处理是靠计算资源的可用性来调度的,MapReduce更多地采用以数据为中心的方式,支持“将计算移动到数据”的概念。
  经典的并行应用程序是使用消息传递runtimes开发的(如MPI和PVM),其中使用上面两种技术开发的并行算法利用提供的丰富的通信和同步结构来创建各种通信拓扑。相反,MapReduce和类似的高水平编程模型支持简单的通信拓扑和同步结构。虽然这个限制了它们对于各类并行算法的适用性,但是在我们之前的文章中,我们已经展示了它们可以使用这些高水平编程模型来实现数据/计算密集的应用。当数据量很大时,基于简单通信拓扑的算法可能产生与具有紧密同步约束的算法相当的性能。这些观察也有利于MapReduce,因为它宽松的同步限制不会对大数据分析任务造成很大的开销。此外,这些编程模型的简单性和鲁棒性会代替额外的开销。
  当分析一定范围的应用时,而且这些应用对于MapReduce特别有效,我们注意到通过支持迭代MapReduce计算,可以扩展到更多领域的适用性,诸如数据聚类、机器学习和计算机视觉,迭代算法在这些算法里都非常常见。在这些算法中,MapReduce被用来处理并行化而其重复应用可以完成迭代。Cheng Tao等人在他们的文章中描述了MapReduce如何被应用到迭代机器学习算法中。
  目前已经存在了一些MapReduce实现,诸如Hadoop和Sphere,他们大多数采用Google提出的初始的编程模型和结构。这些结构专注于执行单步MapReduce(仅涉及MapReduce的一个应用的计算),具有较好的容错性,因此在整个计算过程中将大部分数据输出存储到某种形式的文件系统中。此外,在这些runtimes中,MapReduce的重复应用在每次重复加载或访问任何静态数据时都会创建新的map/reduce任务。虽然这些特征可以通过单步MapReduce计算完成,但会在许多迭代应用中引入大量的性能开销。
  Twister是一个增强的MapReduce runtime,具有扩展的编程模型,高效地支持迭代MapReduce计算。它使用publish/subscribe消息传递基础设施进行通信和数据传输,支持长时间的map/reduce任务,这些任务可以以“配置一次使用多次”的方法使用。另外,它还可以通过“广播”和“分散”类型的数据传输为MapReduce提供编程扩展。相比较其它MapReduce runtimes,这些改进允许Twister支持高效的MapReduce计算。
  下面几节,我们首先回顾了大多数MapReduce Runtimes使用的编程模型和结构。第三节我们介绍了Twister编程模型,并与典型的MapReduce结构(第四节)作比较。第五节讨论了一些应用,我们使用Twister开发这些应用,将Twister与其它并行/分布式runtimes(如MPI、Hadoop和DryadLINQ)作比较并提供性能分析。第6节我们讨论了Twister的相关工作,最后一节我们得出结论并概述未来的工作。

2. MapReduce

2.1 编程模型

  MapReduce是一个分布式的编程技术,由Google提出并用来在分布式环境下处理大型数据。根据Jeffrey Dean和Sanjay Ghemawat,MapReudce计算的输入是一列(key,value),并且每个map函数会通过使用一个输入(key,value)产生0或多个中间(key,value)。Runtime把这些中间结果基于一些机制进行分组,如hash成代表reduce任务的桶。Reduce任务把一个中间key和一列values作为输入,并且产生0个或多个输出结果。
  此外,由于它的函数编程的继承性,MapReduce要求map和reduce任务是“side-effect-free”的。通常情况下,map任务从一个数据分区开始,reduce任务执行像“合并”或“合计”这样的操作。为了支持这些MapReduce,还要求在reduce任务中执行的操作都是“可关联的”和“可交换的”。这些都是一般reductions常见的要求。举个例子,在MPI中的MPI_Reduce或MPI_Allreduce缺省的操作或用户定义的操作也要求是可关联的,或许还要可交换。

2.2 架构

  随着MapReduce编程模型,Jeffrey Dean和Sanjay Ghemawat在他们的论文中描述了它们在Google采用的架构。他们的许多决策都是基于采用MapReduce来解决问题的规模和这些应用程序部署的大型计算基础设施的特点。Apache Hadoop和一些其它的MapReduce runtimes如Disco和Sector/Sphere也采用了大部分的这些结构决策。下面我们会列出一些最重要的runtime特性,因为这有利于我们解释和比较之后在Twister采用的结构决策。

2.2.1 处理输入和输出数据

  Google和Hadoop在他们的MapReduce runtimes中都利用了分布式容错文件系统——GFS和HDFS。这些文件系统使用计算结点的本地磁盘去创建一个分布式文件系统,该文件系统可用来共定位数据和计算。他们也提供了大型磁盘带宽去读取输入数据。两个runtimes都使用分布式文件系统去读取输入数据和存储输出结果。这些文件系统通过数据冗余策略构造,以至于他们可以从数据/计算结点的个别本地磁盘的失败中恢复。(注意:在MapReduce领域,“结点”通常是指可用来存储数据和计算的计算机。整篇文章中我们也用术语“结点”来表示这样的计算机)。

2.2.2 处理中间数据

  在大多数MapReduce runtimes,计算map阶段生成的中间数据首先存储在本地磁盘上,其中本地磁盘所在的结点为数据生成的结点。然后master调度器分配这些输出给reducer,通过传输协议如HTTP恢复数据,之后执行reudce函数。这个方法大大简化了运行时失败的处理。然而,这也对于一些应用的整个计算增加了大量的性能开销。

2.2.3 调度任务

  Google的MapReduce和Hadoop使用动态的调度机制。在这个方法中,runtime分配map/reduce任务给可用的计算资源,这样简化了异构计算资源的最优利用,而map任务最初的分配是基于数据位置的。这个方法还为具有偏斜数据或计算分布的map任务提供自动复制平衡。

2.2.4 容错

  在Google的MapReduce架构中,处理失败是值得考虑的关键。他们将每个数据产品写入持久存储的方法简化了处理逻辑。在Google和Hadoop MapReduce中,分布式文件系统使用数据冗余处理磁盘或结点的失败。Map任务失败了会重新运行,reduce失败了则要求下载map任务的输出并重新执行reduce任务。Master处理调度和跟踪整个计算的过程是运行在一个结点上的,该结点是不容易失败的。该结点上的失败要求重启整个runtime。

3. 迭代MapReduce With Twister

  有许多具有简单迭代结构的并行算法。其中的大多数可以在数据聚类、降维、链接分析、机器学习和计算机视觉中找到。比如K-means,确定性退火聚类,pagerank和降维算法如SMACOF都是这些算法的例子。当分析如上所述的算法时,我们注意到这些算法的并行部分可以轻松实现为MapReduce计算,以至于使整体计算成为迭代MapReduce计算。
  进一步的分析揭露了一些有趣的特性,如:他们利用数据产品的两个类型——静态和动态,使用很多次的迭代直到收敛,要求reduce输出作为一个整体来决定继续或停止迭代。这些特征要求一个扩展的MapReduce编程模型和一个高效的runtime实现,这些我们会在Twister中提供。下面章节我们会详细讨论我们在Twister中支持的编程扩展。图1展示了扩展的编程模型。
Twister: 迭代MapReduce计算框架

3.1 静态与变量数据

  我们分析的许多迭代应用显示了一个共性,他们都执行在两种类型的数据上,它们是静态和变量数据。静态数据(大多数时间是两者中最大的)在每次迭代中使用并且在整个计算中保持固定,而在许多期望最大化类型的算法(EM)中,变量数据是每次计算的结果,通常会在下一次迭代中使用。举个例子,如果我们考虑K-means聚类算法,在第n次迭代期间,程序使用输入数据集和第n-1次迭代计算的聚类中心来计算下一个聚类中心集合。为了支持map/reduce任务在这两种类型的数据上运行,我们为map和reduce任务引入了一个“configure”阶段,该阶段可以为map和reduce任务加载(读取)任何静态数据。举个例子,通常计算的map阶段会使用被指定成(key,value)的变量数据和静态数据(已加载)来生成一个(key,value)输出集合。

3.2 长时间运行的Map/Reduce任务

  上述编程扩展在map/reduce任务中添加了处理静态和变量数据的功能。然而,在每个MapReduce计算的执行中读取静态数据是非常低效的。尽管一些典型的MapReduce计算如信息检索会使用非常大的数据集,但是我们遇到的许多迭代应用是在中等大小的数据集上操作,那样就可以适应计算基础设施的分布式存储器。这个发现引导我们去探索使用长时间运行map/reduce任务的想法,这些任务与在许多MPI应用中的并行处理相似,这些应用会持续整个计算的生命周期。长时间运行的map/reduce任务消除了每次迭代重新加载静态数据的必要。当前的MapReduce实现如Hadoop和DryadLINQ不支持该行为,因此它们初始化新的map/reduce任务并且在每次迭代中加载静态数据,这对于迭代的MapReduce计算会造成大量的性能开销。

3.3 任务的粒度

  Google的MapReduce文章中呈现的应用使用了细粒度的map任务。比如,在word count应用中,map任务对于每个碰到的单词简单地生成(word,1)。然而,对于许多应用我们注意到通过增加map任务的粒度可以减少中间数据的规模。在上述的例子中,map任务可以生成部分的总和如(word,n)。有了可配置的map任务选项,map任务可以访问大块数据/文件。在Twister里,我们在许多数据分析应用中采用这个方法来最小化中间数据的规模,为计算的map阶段分配更多的计算权重。

3.4 Side-effect-free编程

  第一眼看到长时间运行map/reduce任务的概念时,似乎违反了MapReduce的“Side-effect-free”性质,允许用户将状态信息存储在map/reduce任务中。然而,在map/reduce任务的调用中,Twister编程模型不保证状态信息的可用性。在失败的情况下,runtime可能使用静态配置重新初始化map/reduce任务,并清除所有存储在其中的状态信息。因此,我们并没有违反上面所说的。

3.5 Combine操作

  在Google的MapReduce架构中,reduce任务的输出以分开的文件存储在分布式文件系统(GFS)中。然而,在迭代的MapReduce计算中,应用程序经常需要决定是否要进行另一个迭代,应用程序需要访问reduce任务的“combined”输出。在Twister中,我们已经引入了新的MapReduce阶段叫做“combine”,来作为reduction的另一个水平(注意:这个跟本地的combine操作不同,在Hadoop中,本地的combine操作只是在map任务之后运行)。可以使用combine操作来产生所有reduce输出的集体输出。

3.6 编程扩展

  对于MapReduce,我们还在Twister结合了一套编程扩展。其中一个非常有用的扩展是mapReduceBCast(Value,value)。从名字可以推断出该扩展促进了传输一个单值(注意:MapReduce通常使用(key,value))给所有map任务。举个例子,“Value”可以是参数的集合、资源(文件或可执行文件)名称或甚至一个数据块。除了上述,3.1节描述的“configure”选项在Twister中被多种方式支持。Map任务通过一个“分区文件”配置——该文件包含数据分区和它们的位置的元数据。另外可以从values中配置map/reduce任务。举个例子,configureMaps(Value[]values)和configureReduce(Value[]values)是两个Twister提供的编程扩展。在接下来的一节中我们将讨论这些扩展是如何支持的。

4. Twister架构

  Twister是针对迭代MapReduce计算优化的分布式内存MapReduce runtime。它从工作结点的本地磁盘读取数据并在工作结点的分布式内存中处理中间数据。所有通信和数据传输通过一个publish/subscribe消息基础设施来处理。图2展示了Twister runtime的架构。
Twister: 迭代MapReduce计算框架
(注意:在整个讨论中,我们将简单地使用“broker network”术语来指消息传递基础设施)。
  Twister架构有三个实体组成:(i)client side driver(Twister Driver),它驱动整个MapReduce计算,(ii)Twister Daemon,它运行在每个工作结点上,(iii)the broker network。Runtime初始化期间,Twister在每个工作结点上启动一个守护进程,为能接收命令和数据,该进程随后与broker network建立起连接。守护进程负责管理分配给它的map/reduce任务,维护一个工作池来执行map/reduce任务,通知状态,并最终响应控制事件。The client side driver给用户提供编程API并转换这些Twister API为控制命令和输入数据消息,该消息通过broker network发送给运行在工作结点上的守护进程。
  Twister使用publish/subscribe消息传递基础设施处理四种通信需求:(i)发送/接收控制事件,(ii)从the client side driver发送数据给Twister daemons,(iii)map与reduce任务之间的中间数据转换,(iv)发回reduce任务的输出给the client side driver来唤醒combine操作。目前,它使用NaradaBrokering作为消息传递基础设施。Twister架构清晰地从其它组成部分的实现中分离出通信逻辑,以至于它可以直接使用其它消息传递基础设施如ActiveMQ或基于持久队列。

4.1 处理输入和输出数据

  对于map任务,Twister提供两种访问输入数据的机制;(i)从工作结点的本地磁盘读取数据和(ii)直接通过the broker network接收数据。第一个选项允许Twister使用分布在计算基础架构的工作结点上的大型数据集启动MapReduce计算。Twister假定从本地磁盘读取的数据被维护为文件,因此支持基于文件的输入格式,这简化了 runtime的实现。使用本地文件允许Twister将数据文件直接传递到任何可执行文件(可能是作为map运行的脚本或reduce计算)作为命令行参数,文件系统(如HDFS)不可能使用此功能。这个方法可能的缺点是它确实要求用户把大数据文件分成多个文件。
  关于跨工作结点的输入文件分发的元数据从被称为“分区文件”的文件中读取。目前,分区文件包括一列(file_id,node_id,file_path)元组。Twister中的分区文件的概念从DryadLINQ的分区文件机制得到灵感。Twister提供一个跨工作结点执行典型的文件系统操作的工具,例如(i)创建目录,(ii)删除目录,(iii)跨工作结点分发输入文件,(iv)将一组资源/输入文件复制到所有工作结点,(v)从工作结点收集输出文件到给定的位置,(vi)为分配给工作结点的给定数据集创建分区文件。虽然这些功能不能通过分布式文件系统(如GFS或HDFS)实现的全部功能提供,但上述服务尝试使用从本地磁盘读取的数据来捕获运行MapReduce计算的关键要求,同时支持“移动计算到数据”。将分布式文件系统(如HDFS)与Twister集成是一个有趣的未来工作。而且,Twister支持通过broker network直接发送map任务的输入数据。通过broker network发送map任务的大规模输入数据将是低效的。然而,这个方法对于发送小变量数据给map任务是非常有效的。举个例子,一组参数,几行矩阵,一组聚类中心都是这样的数据项。为了提高发送单个数据对象给所有map任务的效率,Twister在每个Twister守护程序中使用内存对象缓存,以便每个守护进程需要一个副本。当在一个结点上用多核计算机运行多个map任务操作时是非常有利的。

4.2 处理中间数据

  为了实现更好的性能,Twister在工作结点的分布式内存中处理中间数据。Map任务的结果通过broker network直接推送给合适的reduce任务。因此,Twister假定在map阶段之后产生的中间数据计算将适合分布式存储器。为了支持具有大中间结果的场景,可以通过将reduce输入存储在本地磁盘内来扩展Twister runtime,而不是缓存在内存中。

4.3 使用Pub/Sub消息

  publish/subscribe消息基础设施的使用提高了Twister runtime的效率。然而,要使runtime可扩展,通信基础设施也应该是可扩展的。我们在Twister中使用的NaradaBrokering pub/sub通信基础设施可以配置为一个broker network(如图2所示),以至于Twister守护进程可以连接到网络中不同的brokers,减少给定的broker的负载。当应用对大数据集使用mapReduceBcast()时,这是特别有用的。对于20MB的消息,使用624个Twister守护进程执行的基准测试显示,通过使用5个brokers(一个根broker和4个叶子broker分层连接),而不是一个broker可以将broadcast时间提高4倍。

4.4 调度任务

  如果缓存位置保持不变,只有在Twister中使用的缓存map/reduce任务受益。因此,Twister。因此Twister静态地调度map/reduce任务。然而,当工作结点失败时,将在不同的结点上重新调度计算。在加上偏斜的输入数据或map任务的执行时间,静态的调度可能导致资源的非最优利用。然而,我们可以通过随机分配输入数据到map任务来最小化这个影响。

4.5 容错

  我们正在使用Twister对迭代MapReduce计算进行容错支持。我们的方法是在迭代之间保存计算的状态,以至于在失败的情况下整个计算可以回滚几次迭代。支持单个map或reduce失败需要采用类似于Google的架构,对于迭代MapReduce计算,这将消除使用Twister获得的大部分效率。因此,在Twister,我们决定只对迭代MapReduce计算提供容错支持,并基于以下两个假定:(i)类似于Google和Hadoop的实现,我们也假定master结点极少失败,因此没有对master结点的失败提供支持,(ii)通信基础设施可以独立于Twister runtime来建立容错机制。基于这些假定我们尝试去处理map/reduce任务、守护进程和工作结点的失败。
  在迭代MapReduce计算中,Combine操作是一个隐含的全局障碍。这个特征简化了以防失败后恢复Twister所需要记住的状态数。为了开启容错,Twister保存了在启动MapReduce迭代前配置map/reduce任务用到的配置。然后它也保存了输入数据,这个输入数据每隔X次迭代直接从主程序送到map任务,其中X是用户配置的参数。一旦有任何的失败,Twister使用可用的资源简单地重新配置map/reduce任务,并从最近保存的状态重新计算。这个方法最多要求回滚X次迭代。

5. 应用和性能

  我们使用Twister已经实现了一系列MapReduce应用,这些应用的细节在之前的出版物中呈现了。这里我们将描述一些新的应用,这些应用使用了Twister并与其它MapReduce runtimes(如Hadoop和DryadLINQ)作性能比较。对于性能分析,我们使用768个具有32个结点的CPU核心集群。
集群中每个结点包含4个6核CPU(Intel(R)Xeon(R)E7450 2.4GHz)和48GB内存。除了使用InfiniBand的MPI应用程序,所有其他runtime都使用千兆网络。结点可以从Linux(Red Hat Enterprise Linux Server release 5.4 -64位)和Windows(Windows Server 2008 -64位)中启动。集群中所有结点我们使用DryadLINQ的学术发布、Apache Hadoop0.20.2版本,以及用来比较性能的Twister。Twister和Hadoop都使用JDK(64位)版本1.6.0_18,DryadLINQ和MPI使用Microsoft .NET版本3.5。

5.1 成对的距离计算

  计算一个数据集的每个元素和另一个数据集的每个元素之间的相似度或差异度是很常见的问题,通常被称作是一个All-pairs问题。在给定的gene集合,我们选择的应用在每个genes对之间计算Smith Waterman Gotoh(SW-G)距离(δij ——gene i和gene j之间的距离)。
  我们开发了一种Twister应用程序,通过采用粗粒任务分解方法来执行上述计算。为了阐明我们的算法,让我们考虑一个例子,其中N个基因序列产生大小为NxN的成对距离矩阵。我们通过考虑合成矩阵来分解计算任务,并将整体计算分组成大小为DxD的块矩阵。由于距离δijδji的对称性,我们只计算上三角的块的距离,如图3所示。上三角的块用作map任务的values,块的坐标用作keys。由于对称性,当maps计算出给定块的SW-G距离时,将会发射出两个结果矩阵的副本,对应当前的块(i,j)和块(j,i)。块(j,i)被标记为转置矩阵。给定块的行数用作reduce任务的输入key,这样简单地收集了一行的数据块,并以正确的顺序组织后输出到文件。计算的最后,对应于单个行块的所有块通过reduce任务写入数据文件。我们已经使用类似的算法使用Hadoop和DryadLINQ开发了相同的应用程序。在Hadoop和Twister程序中,SW-G距离的计算通过JAligner程序完成,这是一个被用在DryadLINQ的java实现。
Twister: 迭代MapReduce计算框架
我们使用Repeatmasker与Repbase Update来确定人类和黑猩猩Alu基因序列的样品,并从原始数据中产生了50000个基因的数据集,复制了10000个基因的随机样本。我们使用这个数据集去衡量DryadLINQ、Hadoop和Twister runtimes对逐渐增加输入数据的性能。图4展示了3个runtimes的结果。
Twister: 迭代MapReduce计算框架
SW-G距离的计算是典型的MapReduce计算,结果清晰地展示了对于该应用程序三个runtimes scale都很好,尽管DryadLINQ的运行时间比Twister和Hadoop都高。(注意:对于JAligner和NAligner程序的性能差异,图4展示的DryadLINQ的结果被归一化了)这些结果也证明了Twister(最好的结果)有能力运行典型的MapReduce计算,尽管我们增加了强调迭代MapReduce计算的功能。

5.2 Pagerank

  PageRank是最有名的网页排名算法。它给万维网的每个网页计算数值,该数值反映了随机上网者将访问那个页面的可能性大小。PageRank的过程可以理解为一个Markov Chain,它需要递归计算直到收敛。通过之前计算的值,算法的一个迭代可以计算出每个网页新的访问可能性。迭代直到差别(δ)小于一个预定义的阈值才会停止,其中δ为第N次迭代的页面访问可能性到第(N+1)次迭代的页面访问可能性的距离向量。
  已经有很多发表的工作去优化PageRank算法,其中有些通过探索超链接的块结构来加速计算。在这篇文章中,我们不创造任何创新的PageRank算法,而是通过Twister系统上的MapReduce编程模型实现最普通的PageRank。在MapReduce处理之前,网页图以邻接矩阵的形式存储,并且在计算结点上配置成静态的输入数据。Map任务的变量输入是初始的页面排名分数。Reduce任务的输出是map任务在下一次迭代的输入。
  通过借助Twister的特性,我们做了几个PageRank的优化,为了能够扩展到更大的网页图:(i)在每个计算结点上配置邻接矩阵为一个静态的输入数据,(ii)将变量输入数据广播到不同的计算结点,以便同一结点上的map任务使用Twister的对象缓存访问相同的数据副本。独立于Twister的进一步优化包括:(i)通过将一定数量的URL条目包含在一起来增加map任务的粒度,(ii)把所有tangling结点合并为一个结点,这样可以节省通信和计算的开销。
Twister: 迭代MapReduce计算框架
  我们构建了一个程度分布遵循幂律的网络图。五个模拟网络图在链接的规模和密度上有所不同。表1总结了在实验中用到的数据集。我们注意到对于这五个网页图,Twister/PageRank作业的周转时间是线性的。
Twister: 迭代MapReduce计算框架
Twister: 迭代MapReduce计算框架

6. 相关工作

  MapReduce简化了许多并行应用程序的编程。目前已经有几种基于Google的MapReduce架构的MapReduce实现,其中有一些在Google提出的初始MapReduce模型中有改进/功能。然而,据我们所知,没有其它能够支持长时间运行map/reduce任务这样一个特性的实现了,也没有像Twister一样能够在大规模数据分析应用下高效地支持迭代MapReduce计算的MapReduce扩展了。
  这篇文章由Cheng-Tao等人提出,讨论了他们在开发多核机器的MapReduce实现的经历。他们使用MapReduce runtime来实现几个机器学习算法,显示MapReduce对于可以以某种“求和形式”表达的许多算法尤其有效。Colby Ranger等人提出的Phoenix runtime是针对多核系统和多处理器系统的MapReduce实现。Ranger等人使用的评估包括Google MapReduce论文中发现的典型用例,例如字数统计、反向索引以及迭代计算,如K-means。Twister中的一些设计决策受到的启发来自这些共享内存runtime获得的好处。举个例子,在上述的runtimes中,数据传输只需要共享内存引用,在Twister中,我们使用pub/sub消息传输分布式内存。向所有map任务发送一些数据值是一个共享内存的简单操作,在Twister中,我们引入了mapReduceBcast()来处理这些需求。
  Sphere是一个运行在Sector分布式文件系统上的并行runtime。Sector在作用上与HDFS相似,但是它期望将数据存储为文件并保留数据分割供用户管理。不像map/reduce,Sphere在这些数据分割上执行用户定义的函数。它的作者也展示了它也能够执行MapReduce风格的计算。然而,我们注意到他们的方法在管理计算时,要求更多的用户干涉。支持MapReduce的各种编程语言时许多map reduce runtime的动力,如Disco,Qizmt和Skynet。
  支持基于有向非循环图(DAG)的执行流的并行runtimes与MapReduce编程模型相比提供了更多的并行拓扑。Condor DAGMan是一个有名的并行runtime,它支持可表示为DAG的应用程序。许多工作流runtimes也支持基于执行流的DAG。在这些runtimes中,并行任务可以从几个输入源读取,产生一个或多个输出。通常情况下,执行在这些任务上的计算粒度比执行在MapReduce的map/reduce函数上的计算粒度大。举个例子,在工作流runtimes中,一个任务可以分成并行的程序运行在多个计算机上。可以通过编排多个MapReduce计算来模拟使用MapReduce的DAG。在这方面,由于Twister可以发送输入数据直接从主程序到map/reduce任务,并且收集reduce的输出到主程序,Twister将更好地支持这方面。
  Microsoft Dryad也在其分布式执行引擎中使用基于DAG的执行模型。Dryad在顶点的任务粒度与MapReduce的map/reduce的任务粒度更相似,因此作者称之为MapReduce的超集。扩展Twister runtime去执行更多的用户定义函数和基于执行流的DAG是一个有趣的未来工作。然而对这样一个runtime编程是不直截了当的,这也正是微软引入DryadLINQ的原因。
  DryadLINQ对Dryad提供了基于编程API的LINQ,因此这更适合处理结构化数据的应用程序。使用DryadLINQ执行涉及遗留应用程序或脚本的计算并不那么容易。DryadLINQ还支持“循环展开”一个功能,可用于创建组合迭代计算的几次迭代的聚合执行图。可以展开的迭代数量是依赖应用程序和机器上的可用内存。通常情况下只有一小部分的迭代。因此,正如我们之前的论文所述,对于迭代应用程序,它并没有减少编程模型的开销。我们测试的迭代应用程序甚至在我们初始的适度大小问题中执行了10000次迭代。他们在Twister长时间运行map/reduce计算任务中大大获益。此外,DryadLINQ也是用基于通信机制的文件来传输数据导致更高的开销。
  Swift是一个脚本语言,是一种用于开发并行应用程序的执行和管理runtime。它的主要重点是使用简单的并行结构来表达计算,他们更容易与数据分区相结合,并使用Grid/Cluster基础设施进行调度。一旦数据分区可用,就可以很轻松地使用MapReduce来调度“map-only”操作(Twister也支持这个)将其作为许多任务计算来处理而无需使用完整的MapReduce循环。Swift也支持并行任务的迭代执行,但不提供像Twister一样长时间运行任务或快速数据传输的优化。

7. 结论和未来工作

  在这篇文章中,我们讨论了设计和实现Twister的经验——针对迭代MapReduce计算优化的分布式内存MapReduce runtime。我们讨论了Twister的扩展编程模型及其架构,将其与典型的MapReduce及其当前架构进行比较,展示了Twister如何将MapReduce的包络扩展到更多类应用程序。我们还对大量数据集呈现了一组应用程序的结果。一些基准是使用768个核心集群测试的。包括一些复杂的迭代应用程序(如MDS)的结果显示Twister对于许多迭代MapReduce计算表现的很好,甚至可以使用经典的并行runtime(如MPI)生成可比较的结果。正如4.5节讨论的,我们当前的工作重点在增加Twister的容错能力。我们打算在两个领域扩展我们未来的研究:(i)研究可以与Twister一起使用的不同的通信基础设施,并确定减少消息传递基础设施负载的方法;(ii)探索可与Twister结合使用的可能的分布式文件系统,以提供更好的数据处理能力,同时保持Twister完整的大部分效率。有了上述的增进,Twister将为MapReduce提供有价值的工具,它可以支持数据密集型的学科,诸如物理、化学、医疗和生命科学。

8. 参考文献

本文翻译自《Twister:A Runtime for Iterative MapReduce》,具体参考文献详见原文。

Ekanayake J, Li H, Zhang B, et al. Twister: a runtime for iterative MapReduce [C] //Proc of ACM International Symposium on High PERFORMANCE Distributed Computing.2010:810-818.