CIEL : a universal execution engine for distributed data-flow computing
简介
本文介绍了CIEL,一种用于分布式数据流程序的通用执行引擎。与以前的执行引擎一样,CIEL掩盖了分布式编程的复杂性。与那些系统不同,CIEL作业可以做出与数据相关的控制流决策,这使它能够计算迭代和递归算法。
我们还开发了Skywriting,一种直接在CIEL上运行的图灵完整脚本语言。执行引擎为Skywriting脚本和用其他编程语言编写的高性能代码提供透明的容错和分发。 我们在云计算平台上部署了CIEL,并证明它可以实现迭代和非迭代算法的可扩展性能。
1 介绍
许多集群越来越需要处理大型数据集。分布式执行引擎(如MapReduce和Dryad)已成为此类集群的流行系统。这些系统公开了一个简单的编程模型,并可以自动处理分布式计算的难点:容错,调度,同步和通信。MapReduce和Dryad可用于实现大多数算法,但对于迭代算法则显得低效。迭代算法构成了许多机器学习和优化问题的基础,但需要更具表现力的编程模型和更强大的执行引擎。为了解决这些限制,并将分布式执行引擎的优势扩展到更广泛的应用程序,我们开发了Skywriting和CIEL。
Skywriting是一种脚本语言,允许使用命令式和函数式语言语法直接表达迭代和递归任务并行算法。Skywriting脚本在CIEL上运行,CIEL是一个执行引擎,为分布式数据流提供通用执行模型。与先前的系统一样,CIEL协调根据数据流DAG划分的一组任务(数据并行)的分布式执行,并受益于透明扩展和容错。不同的是,CIEL通过在执行任务时动态构建DAG来扩展以前的模型。正如我们将要展示的那样,允许任务创建更多任务,使CIEL能够支持依赖于数据的迭代或递归算法。我们在第3节介绍了CIEL的高级架构,并解释了Skywriting如何在第4节中映射到CIEL的基元。
我们的实现包含了第5节中描述的几个附加功能。与现有系统一样,CIEL为worker节点提供透明的容错功能。此外,CIEL可以容忍集群中主机和客户机程序间的故障。为了提高资源利用率并减少执行延迟,CIEL可以记录任务结果。最后,CIEL支持在同时执行的任务之间传输数据。
2 动机
一些研究人员已经确定了MapReduce和Dryad编程模型的局限性。这些系统最初是为面向批处理的工作开发的,即用于信息检索的大规模文本挖掘。它们旨在最大限度地提高吞吐量,而不是最大限度地减少单个作业延迟。这在迭代计算中尤其明显,其中多个作业被链接在一起并且作业等待时间成倍增加。
尽管如此,MapReduce - 特别是它的开源实现Hadoop仍然是一个流行的平台,用于大输入的并行迭代计算。例如,Apache Mahout机器学习库使用Hadoop作为其执行引擎。几种Mahout算法 - 例如k均值聚类和奇异值分解 - 是迭代的,包括在未收敛的循环内的数据并行内核。Mahout使用一个驱动程序,向Hadoop提交多个作业,并在客户端执行收敛测试。但是,由于驱动程序在Hadoop集群外部以逻辑方式(通常是物理上)执行,因此每次迭代都会导致作业提交开销,并且驱动程序无法从透明容错中受益。这些问题并非Hadoop独有,而是MapReduce和Dryad原始版本都有的问题。
Figure 1:现有分布式执行引擎提供的功能分析。
分布式执行引擎的计算能力由它可以表达的数据流决定。在MapReduce中,数据流限于由map和reduce任务数量参数化的二分图; Dryad允许数据流遵循更通用的有向无环图(DAG),但必须在开始作业之前完全指定。通常,为了支持单个作业中的迭代或递归算法,我们需要依赖于数据的控制流,即基于先前计算的结果,动态创建更多工作的能力。同时,我们希望保留任务级并行性的现有好处:透明容错,基于局部性的调度和透明扩展。在Figure 1中,我们根据这些目标分析了一系列现有系统。
MapReduce和Dryad已经支持透明容错,基于局部性的调度和透明扩展。此外,Dryad支持任意任务依赖,这使它能够执行比MapReduce更大的计算类。但是,它们都不支持依赖于数据的控制流,因此每个计算中的工作必须是静态预先确定的。
各种系统提供数据相关的控制流程,但牺牲了其他功能。 Google的Pregel是支持控制流的分布式执行引擎的最大规模示例。 Pregel是一个用于执行图算法(如PageRank)的批量同步并行(BSP)系统,Pregel计算分为“supersteps”,在此期间,对图中的每个顶点执行“vertex method”。至关重要的是,每个顶点都可以投票以终止计算,并且当所有顶点投票终止时计算终止。然而,与简单的MapReduce作业一样,Pregel计算仅在单个数据集上运行,并且编程模型不支持多个计算的组合。
最近的两个系统为MapReduce添加了迭代功能。CGL-MapReduce是MapReduce的一个新实现,它在几个MapReduce作业中缓存RAM中的静态(循环不变)数据。HaLoop扩展了Hadoop,能够评估减少输出的收敛功能。两个系统都不提供跨多个迭代的容错,也不支持Dryad样式的任务依赖图。
最后,Piccolo是一种新的数据并行编程模型,它使用分区的内存中键值表来代替MapReduce的reduce阶段。 Piccolo程序分为“内核”函数,它们并行应用于表分区,通常将键值对写入一个或多个其他表。“控制”功能协调内核功能,并且它可以执行任意数据相关的控制流程。Piccolo支持用户辅助检查点(基于Chandy-Lamport算法),并且仅限于固定的群集成员资格。如果单个节点出现故障,则必须从具有相同数量节点的检查点重新启动整个计算。
我们相信CIEL是第一个支持Figure 1中所有五个目标的系统。CIEL设计用于跨大数据集的粗粒度并行,MapReduce和Dryad也是如此。在整个数据集可以放入RAM的情况下,Piccolo可能更高效,因为它可以避免写入磁盘。最重要的是,实现最高性能需要大量的开发人员努力,使用显式消息传递等低级技术。
3 CIEL
CIEL是一个分布式执行引擎,可以执行具有任意数据依赖控制流的程序。在本节中,我们首先描述CIEL支持的核心抽象:动态任务图。然后,我们描述CIEL如何执行表示为动态任务图的作业。最后,我们描述了用于分布式数据流计算的CIEL集群的具体体系结构。
3.1 动态任务图
Figure 2:CIEL作业由动态任务图表示,其中包含任务和对象。在此示例中,根任务A生成任务B,C和D,并将其结果的生成委托给D。C IEL使用任务和对象表来表示图。
在本小节中,我们定义了三个CIEL原语 - 对象,引用和任务 - 并解释它们在动态任务图中的相关性(Figure 2)。
CIEL是一个以数据为中心的执行引擎:CIEL作业的目标是生成一个或多个输出对象。对象是一个非结构化的,有限长度的字节序列。每个对象都有一个唯一的名称:如果两个对象同名,则它们必须具有相同的内容。为了简化一致性和副本,对象在写入后是不可变的,但有时可以附加到一个对象中。
能够在不拥有其全部内容的情况下描述该对象是有帮助的; CIEL使用引用来达到此目的。引用包括名称和一组位置(例如主机名 - 端口对),用于存储具有该名称的对象。位置集可以是空的:在这种情况下,引用是对尚未生成的对象的未来引用。否则,它是一个具体的参考,可以被消费。
CIEL作业通过执行任务来进行。任务是一种非阻塞原子计算,完全在一台机器上执行。任务具有一个或多个依赖项,这些依赖项由引用表示,并且当所有依赖项变得具体时,任务变为可运行。依赖项包括一个特殊对象,它指定任务的行为(例如可执行二进制文件或Java类),并可能将某些结构强加给其他依赖项。为了简化容错,CIEL要求所有任务计算其依赖关系的确定性函数。任务还具有一个或多个预期输出,这些输出是该任务将创建或委派另一个任务创建的对象的名称。
任务可以有两个外部可观察的行为。首先,任务可以通过为这些对象创建具体的引用来发布一个或多个对象。特别是,任务可以为其预期输出发布对象,如果其他任务依赖于那些输出,则可能导致其他任务变为可运行。但是,为了支持依赖于数据的控制流,任务还可以生成执行额外计算的新任务。CIEL对任务行为强制执行以下条件:
-
对于每个预期输出,任务必须发布具体引用,或者将子任务的名称生成为预期输出。这确保了,只要子项最终终止,任何依赖于父项输出的任务最终都将变为可运行。
-
子任务必须仅依赖于具体的引用(即已经存在的对象)或已指定运行任务的未来输出的引用(即已经预期要发布的对象)。这可以防止死锁,因为循环不能在依赖图中形成。
动态任务图存储任务和对象之间的关系。从对象到任务的边缘意味着任务依赖于该对象。从任务到对象的边缘意味着期望任务输出该对象。当作业运行时,新任务将添加到动态任务图中,并且当预期新生成的任务产生对象时,将重写边。
动态任务图提供类似于尾递归的低级数据相关控制流:任务产生其输出(类似于返回值)或产生新任务以产生该输出(类似于尾调用)。它还提供数据并行的功能,因为可以并行分派独立任务。但是,我们不希望程序员手动构建动态任务图,而是提供Skywriting脚本语言以编程方式生成这些图。
3.2 评估对象
给定动态任务图,CIEL的作用是评估与作业输出相对应的一个或多个对象。实际上,可以将CIEL作业指定为仅具有具体依赖关系的单个根任务,以及指定计算的最终结果的预期输出。这导致了两种自然策略,即拓扑排序的变体:
Eager evaluation:由于任务依赖关系形成DAG,因此至少一个任务必须仅具有具体的依赖关系。首先执行只有具体依赖的任务;随后在所有依赖项变得具体时执行任务。
Lazy evaluation:寻求评估根任务的预期输出。要评估对象,请确定生成预期对象的任务T.如果T只有具体的依赖关系,请立即执行;否则,阻塞T并使用相同的过程递归地评估其所有未实现的依赖关系。当被阻止的任务的输入变得具体时,执行它。将所需对象的生成委托给安排的任务时,请重新评估该对象。
当我们第一次开发CIEL时,我们尝试了这两种策略,后来专门使用Lazy evaluation,因为它更自然地支持我们在§5中描述的容错和记忆功能。
3.3 系统架构
Figure 3:CIEL集群有一个master和许多worker。 master将任务
分派给worker执行。任务完成后,worker发布一组对象并可能产生更多任务。
Figure 3显示了C IEL集群的体系结构。单个master协调作业的端到端执行,并且多个worker执行单个任务。
master在对象表和任务表中维护动态任务图的当前状态(Figure 2(b))。对象表中的每一行都包含该对象的最新引用,包括其位置(如果有),以及指向期望生成它的任务的指针(如果有的话:如果一个对象通过外部工具加载到集群中,它将没有任务指针)。任务表中的每一行对应一个生成的任务,并包含指向任务所依赖的引用的指针。
master调度程序负责在CIEL计算中取得进展:它采用懒惰策略评估输出对象并将可运行任务与空闲worker配对。由于任务输入和输出可能非常大(每个任务的千兆字节数量级),所有批量数据都是存储在worker上,并且master处理之间的引用。master使用基于多队列的调度程序(派生自Hadoop)将任务分派给离数据最近的worker。如果worker需要获取远程对象,它将直接从另一个worker读取该对象。
worker执行任务并存储对象。在启动时,worker向master注册,并定期发送心跳以证明其持续可用性。将任务分派给worker时,将调用相应的执行程序。 executor是一个通用组件,它准备输入数据以供使用,并在其上调用一些计算,通常是执行外部进程。我们已经为Java,.NET,基于shell和本地代码实现了简单的执行程序,并为Skywriting实现了更复杂的执行程序。
假设一个worker成功执行一个任务,它将希望发布的一组引用回复给master,并为它希望生成的任何新任务回复一个任务描述符列表。然后,master将更新对象表和任务表,并重新评估现在可运行的任务集。
除了master和worker之外,还会有一个或多个客户端(未显示)。客户端的角色很小:它将作业提交给master,并轮询master以发现作业状态或阻塞直到作业完成。
作业提交消息包含根任务,该任务必须只具有具体的依赖关系。master将根任务添加到任务表,并通过懒惰策略评估其输出来启动作业。
请注意,为简单起见,CIEL当前使用单个master。尽管如此,我们的实现可以从主故障中恢复,并且在我们的评估期间不会导致性能瓶颈。但是,如果它在将来成为一个问题,那么就可以对主状态(任务表和对象表)进行分片。在多个主机之间,逻辑上保留单个master的功能。
4 Skywriting
Figure 4:在Skywriting中实现的迭代计算。 输入数据是n个输入块的列表,并且curr被初始化为n个部分结果的列表。
Skywriting是一种用于表达在CIEL之上运行的任务级并行性的语言。Skywriting是图灵完整脚本语言,可以使用while循环和递归函数等构造表达任意数据相关的控制流。Figure 4显示了一个计算迭代算法的Skywriting脚本示例;我们在k-means实验中使用了类似的结构。
我们在之前的一篇论文中介绍过Skywriting,但在这里简要介绍一下主要特征:
•ref(url)返回对存储在给定URL的数据的引用。该函数支持常见的URL方案和自定义ciel方案,后者访问CIEL对象表中的条目。如果URL是外部的,则CIEL将数据作为对象下载到集群中,并为该对象指定名称。
•spawn(f,[arg,...])产生并行任务来评估f(arg,...)。Skywriting函数不会有副效果,所有参数都按值传递。返回值是对f(arg,...)结果的引用。
•exec(executor,args,n)使用给定的args同步运行指定的executor。executor将产生n个输出。返回值是对这些输出的n个引用的列表。
•spawn_exec(executor,args,n)生成并行任务以使用给定的args运行指定的executor。与exec()一样,返回值是对这些输出的n个引用的列表。
•间接引用(一元 - *)运算符可以应用于任何引用;它将引用的数据加载到Skywriting执行上下文中,并评估结果数据的结构。
在下文中,我们将描述Skywriting如何映射到CIEL基元。我们描述了如何创建任务,如何使用引用来促进依赖于数据的控制流,以及Skywriting与其他框架之间的关系。
4.1 创建任务
Figure 5:Skywriting中的任务创建。可以使用(a)spawn(),(b)spawn_exec()和(c)间接引用(*)运算符创建任务。
Skywriting的独特之处在于它能够在执行作业的过程中产生新任务。该语言提供了两个显式机制来生成新任务(spawn()和spawn_exec()函数)和一个隐式机制(* -operator)。Figure 5总结了这些机制。
spawn()函数创建一个新任务来运行给定的Skywriting函数。为此,Skywriting运行时首先创建一个包含新任务环境的数据对象,包括要执行的函数的内容以及传递给函数的任何参数的值。此对象称为Skywriting后续,因为它封装了计算的状态。然后,运行时为新任务创建任务描述符,其中包括新任务的依赖。最后,它为任务结果分配一个引用,并将其返回给调用脚本。Figure 5(a)显示了创建任务的结构。
spawn_exec()函数是一种较低级别的任务创建机制,允许调用者调用不同语言编写的代码。通常,不直接调用此函数,而是通过相关executor的包装器(例如内置的java()库函数)。当调用spawn_exec()时,运行时将参数序列化为数据对象并创建依赖于该对象的任务(Figure 5(b))。如果spawn_exec()的参数包含引用,则运行时会将这些引用添加到新任务的依赖项中,以确保CIEL在其所有参数都可用之前不会调度任务。同样,运行时为任务输出创建引用,并将它们返回给调用脚本。
如果任务尝试间接引用尚未创建的对象,例如,调用spawn()的结果,当前任务必须阻塞。但是,CIEL任务是非阻塞的:所有同步(和数据流)必须在动态任务图(第3.1节)中显式化。为了解决这个矛盾,运行时隐式地创建一个用于继续执行的任务,该任务依赖于间接引用的对象和当前任务的后续操作(即当前的Skywriting执行堆栈)。因此,新任务仅在生成间接引用的对象时运行,这提供了必要的同步。Figure 5(c)显示了当任务间接引用spawn()的结果时产生的依赖图。
任务在到达return语句时终止(或者在future对象的引用上阻塞)。 Skywriting任务有一个输出,它是return语句中表达式的值。 在终止时,运行时将输出存储在本地对象库中,发布对该对象的具体引用,并按创建顺序将生成的任务列表发送给master。
Skywriting确保动态任务图保持非循环。在评估由任务创建的函数时,任务的依赖关系是固定的,这意味着在评估函数之前,它们只能包含存储在本地Skywriting范围中的引用。因此,任务不能依赖于自身或其任何后续任务。请注意,Skywriting任务可以在其返回值或随后的任务创建函数调用中传递引用。这使脚本能够创建任意非循环依赖图,例如MapReduce依赖图(第4.3节)。
4.2 数据相关控制流
Skywriting旨在协调以数据为中心的计算,这意味着计算中的对象可以分为两个空间:
数据空间。包含大小可能高达几千兆字节的大型数据对象。
协调空间。包含确定控制流的小对象(如整数,布尔值,字符串,列表和词典)。
通常,数据空间中的对象由编程语言处理,以实现比Skywriting提供的更好的I/O或计算性能。在现有的分布式执行引擎(例如MapReduce和Dryad)中,数据空间和协调空间是不相交的,这阻止了这些系统支持依赖于数据的控制流。
为了支持依赖于数据的控制流,数据必须能够从数据空间传递到协调空间,以便它可以帮助确定控制流。在Skywriting中,*操作符将引用从数据空间对象转换为协调空间的值。
生成的任务也许会由任何executor运行,必须以Sky-writing可识别的格式编写引用的对象; 为此,我们使用JavaScript Object Notation(JSON)。此序列化格式仅用于传递给Skywriting的引用,并且大多数executor使用适当的二进制格式作为其数据。
4.3 其他语言与框架
Figure 6:Skywriting中实现MapReduce编程模型。用户提供输入列表,mapper函数,reducer函数和要使用的reduce个数。
像MapReduce这样的系统至少在某些方面变得流行,因为它们的界面简单:开发人员只需要一对map()和reduce()函数就可以指定整个分布式计算。为了证明Skywriting接近这种简洁程度,Figure 6展示了MapReduce执行模型的实现,它取自于Skywriting标准库。
mapreduce()函数首先将mapper应用于输入的每个元素。mapper是一个Skywriting函数,它返回r个元素的列表。然后对map操作后的输出进行shuffle,使得每个map的第i个输出变为第i个reduce的输入。最后,将收集到的reduce的输入传到reducer函数运行r次。在典型的使用中,mapreduce()的输入是包含已拆分的数据对象,mapper和reducer函数调用spawn_exec()来执行另一种语言的计算。
请注意,mapper函数负责在reducer之间对数据进行分区,reducer函数必须合并它接收的输入。如果需要,mapper的实现还可以包含组合器。为了简化开发,我们已将Hadoop MapReduce框架的一部分移植到CIEL任务中,并提供辅助函数来分区,合并和处理Hadoop文件格式。
任何编译为DAG任务的高级语言也可以编译为Skywriting程序,并在CIEL集群上执行。例如,可以为Pig和DryadLINQ开发Skywriting后端,从而提高了扩展这些语言的可能性,并支持无限迭代。
5 实现所遇问题
CIEL和Skywriting的当前实现包含大约9,500行Python代码,以及执行器绑定中的几百行C,Java和其他语言。本节的其余部分描述了我们实现的三个有趣特性:记忆化,master容错和流。
5.1 确定性命名和记忆化
回想一下,CIEL集群中的所有对象都具有唯一的名称。在本小节中,我们将展示如何通过适当的名称选择来实现记忆化功能。
我们最初的C IEL实现使用全局唯一标识符(UUID)来标识所有数据对象。虽然这是一个概念上简单的方案,但它使容错变得复杂,因为master必须记录生成的UUID以支持任务失败后重新运行。
这促使我们重新考虑名称选择。为了支持容错,现有系统假设单个任务是确定的,并且CIEL做出相同的假设(§3.1)。因此,具有相同依赖关系的两个任务(包括作为依赖关系的可执行代码)将具有相同的行为。因此,使用以下Skywriting语句创建n个输出的任务
result = spawn_exec(executor,args,n);
将完全由executor,args,n及其标志决定。因此,我们可以通过将executor,args,n和i通过适当的分隔符连接来构造第i个输出的名称。但是,由于args本身可能包含引用,因此名称可能会变得难以管理。因此,我们使用抗冲突哈希函数H来计算args和n的digest,从而得到结果名称:
我们目前使用160位SHA-1哈希函数来生成digest。
回想一下§3.2中的惰性评估算法:任务的预期输出可以解决另一个阻塞任务的依赖,只有这样任务才会被执行。如果先前任务已经生成了新任务的输出,则根本不需要执行新任务。因此,由于确定性命名,CIEL会记住任务的结果,这可以提高执行重复任务的作业的性能。
我们的记忆化的目标与最近的Nectar系统类似。 Nectar对DryadLINQ查询执行静态分析,以识别先前在同一数据上计算的子查询.Nectar在DryadLINQ级别实现,这使得它能够对每个任务的语义和缓存中间结果的成本/收益比做出假设。例如,如果先前的查询在当前查询的输入的前缀上操作,则Nectar可以重复使用来自先前查询的交换和关联聚合的结果。 CIEL工作的表现力使得运行这些分析变得更具挑战性,我们正在研究Skywriting程序中如何通过简单注释在我们的系统中提供类似的功能。
5.2 容错
分布式执行引擎必须在面对网络和计算机故障时继续运行。随着工作时间变得更长(并且,由于CIEL允许无限次迭代,它们可能变得非常长)经历故障的概率增加。因此,CIEL必须容忍计算中涉及的任何机器的故障:客户端,worker和master。
客户端容错是微不足道的,因为CIEL本身支持迭代作业并从头到尾管理作业执行。客户端唯一的角色是提交作业:如果客户端随后失败,则作业将继续而不会中断。相比之下,为了在非支持迭代框架上执行迭代作业,客户端必须运行一个驱动程序,该程序执行所有依赖于数据的控制流(例如收敛测试)。由于驱动程序在框架外执行,它不会从透明容错中受益,开发人员必须手动提供此功能,例如通过检查执行状态。在我们的系统中,一个Skywriting脚本取代了驱动程序,CIEL可靠地执行整个脚本。
CIEL中的worker容错类似于Dryad。master接收来自每个worker的定期心跳消息,并且如果(i)它在指定的超时之后没有发送心跳,并且(ii)它不响应来自master的反馈消息,则认为worker已经故障。此时,如果已为worker分配任务,则认为该任务已失败。
当任务失败时,CIEL会自动重新执行它。但是,如果由于其输入存储在失败的worker中而失败,则该任务不可再运行。在这种情况下,CIEL递归地重新执行先前的任务,直到满足失败任务的所有依赖关系。为实现此目的,master使对象表中每个丢失输入的位置无效,并采取懒惰策略重新评估丢失的输入。依赖于来自故障worker中数据的其他任务也将失败,并且这些任务同样由master重新执行。
CIEL还支持master容错。在MapReduce和Dryad中,如果一个作业的master进程失败,它就会完全失败;在Hadoop中,如果JobTracker失败,所有作业都会失败;并且master故障通常会导致多次重复提交作业的驱动程序失败。但是在CIEL中,所有master状态都可以从活动中的作业集中派生出。至少,持久化存储每个存活作业的根任务允许创建新的master并立即恢复执行。CIEL提供了三种扩展master容错的互补机制:持久日志记录,从master和对象表重构。
创建新作业时,master会为作业创建日志文件,并将其根任务描述符同步写入日志。默认情况下,它将日志写入本地辅助存储上的日志目录,但它也可以写入网络文件系统或分布式存储服务。创建新任务时,其描述符将异步附加到日志文件,并定期写入到磁盘。作业完成后,将对其结果的具体引用写入日志目录。重新启动后,master会在其日志目录中扫描没有匹配结果的作业。对于这些作业,它会重新生成日志,重建动态任务图。处理完所有日志后,master会通过懒惰策略评估其输出来重新启动作业。
或者,master可以将状态更新记录到从master。在从master向主master注册后,主master将所有任务表和对象表更新转发到从master。每个新作业都是同步发送的,以确保在客户端收到确认之前将其记录在从master中。此外,从master记录了向主master注册的每个worker的地址,以便它可以在故障转移方案中联系到worker。从master定期向主master发送心跳;当它检测到主master发生故障时,从master指示所有worker重新向其注册。我们在§6.5中评估这种情况。
如果主master失败并随后重新启动,worker可以使用其本地对象存储库的内容帮助重建对象表。如果主master没有响应请求,则认为主master故障了。此时,worker切换到重新注册模式,并且不再发送心跳消息,而是定期发送注册请求到同一网络位置。当worker最终联系新的主master时,主master使用基于GFS主master恢复的协议来提取worker中数据对象的列表。
5.3 流
我们之前对任务的定义(第3.1节)声明任务生成的数据对象是作为其结果的一部分。这个定义意味着对象生成是原子性的:对象要么完全存在要么根本不存在。但是,由于数据对象可能非常大,因此通常有机会在任务之间流式传输部分写入的对象,这可能导致流式并行。
如果生成的任务具有流式输出,则它向主master发送预发布的消息,其中包含每个流式输出的流引用。这些引用用于更新对象表,并可能阻塞其他任务:如流对象消费者。流对象消费者像以前一样执行,但执行的代码从命名管道中而不是本地文件读取其输入。担任消费者的worker进程会启动单独的线程从担任生产者的worker进程中取得输入信息块,并将它们写入管道。当生产者成功终止时,它会提交其输出,该输出向消费者发出信号,告知不再有数据需要读取。
在当前实现中,流消息生产者还将其输出数据写入本地磁盘,因此,如果流消费者失败,则流生产者不受影响。如果生产者在拥有消费者时故障,则生产者回滚任何部分写入的输出。在这种情况下,消费者将因缺少输入而失败,并触发生产者的重新执行(第5.2节)。我们正在研究更复杂的容错和调度策略,这些策略允许生产者和消费者通过直接TCP流进行通信,如Dryad和Hadoop Online Prototype。
6 结论
我们设计CIEL,以提供现有分布式执行引擎提供的功能集合。使用Skywriting,可以以命令式方式编写迭代算法,并通过透明的容错和自动分发来执行它们。但是,CIEL还可以执行任何MapReduce作业或Dryad图,并且对迭代的支持允许它执行Pregel和Piccolo样式的计算。
我们的下一步是将CIEL原语与现有的编程语言集成。目前,只有Skywriting脚本可以创建新任务。这并不限制普遍性,但它要求开发人员在Skywriting中重写他们的驱动程序。所有与调度相关的控制流决策必须最终通过解释代码,这可能会对Skywriting运行时施加压力。 Skywriting的主要好处是它掩盖了间接引用算子背后的连续传递风格的复杂性(§4.2)。我们现在寻求一种方法将这种抽象扩展到主流编程语言。
CIEL可以扩展到数百台机器,但其他扩展挑战仍然存在。例如,目前还不清楚如何在一台机器中最好地利用多个内核,我们目前将此问题传递给执行器,后者可以充分利用单个机器。这使应用程序开发人员能够以更高的复杂性为代价,精确控制程序的执行方式。但是,如果任务本质上是顺序的并且有多个核可用,则会限制效率。此外,通过在单个主机上并置流生产者及其使用者而节省的I/O可能会超过CPU争用的成本。找到最佳调度时间是一个难题,我们正在研究简单的注释方案和启发式方法,以提高常见情况下的性能。最近关于集群操作系统和调度算法的工作使人们希望这个问题能够得到一个优雅的解决方案。
参考资料:
1. 论文下载。
2. 参考网址(包括源代码,语言参考和教程)。