Apache Beam核心--延迟和窗格设计
翻译自:谷歌Apache Beam项目Leader Kenneth Knowles以及Mark Shields。
本文中定义了Apache Beam编程模型中的延迟和数据丢弃,以及在Pipeline中如何传播。目的是为Runner开发者提供参考。
目录摘要:
入门
数据迟到是Beam的无序处理模型所固有的。 数据迟到是什么意思? 其定义及其属性与跟踪每个事件时间域内每个计算进度的Watermark交织在一起。 从简单的直觉来说,处理迟到的数据是这样的:只有迟到的输入才能导致Pipeline中任何位置的迟到数据。
对于像ParDo这样的数据转换,这个要求很简单:输出时间戳等于输入时间戳。 如果时间戳被向后移动并且可能变迟到,则必须通过允许的偏移明确声明。 Watermark的传播同样简单。
对于GroupByKey和Combine等组合转换,稍微复杂一点, 分组变换描述了如何聚合值,但也必须为分组结果选择时间戳。 本文的其余部分探讨如何为分组的结果选择时间戳,以及Watermark应如何演变,以避免将迟到数据从未迟到数据中删除。
定义公式
Watermarks
TL;DR: 每个PCollection都有一个定义迟到的Watermark。 但是,当执行变换时,这些Watermark仅通过上下限知道。
Watermark (简称WM):对于一个时间为X的watermark,所有事件时间 < X的输入(输出)的数据都已经到达(处理/提交)。Watermark有以下几种:
- Global WM(全局VM): 在概念上,每个PCollection在事件时间上有一个只增不减的Watermark(WM),称为全局WM。 这是一个近似值,表示在这个时间点上PCollection的数据已经全部到达。 我们不需要计算、记录这个Watarmark,甚至不能准确地知道(因为在分布式环境下)。
-
Global input/output WM(全局输入/输出WM):转换可能消费(或生成)一个或多个PCollection作为输入(或输出)。 其全局输入(或输出)WM是所有输入(或输出) PCollection的全局WM的最小值。 本文档的其余部分,我们忽略了具有一个或多个的输入或输出的区别。
-
Local input WM(本地输入WM): 全局输入WM是远程的或不可知的(因为在分布式环境下),为了应对这个局面,每个转换被推定为仅知道其全局输入WM的非递减下界,称为本地输入WM。 本地输入WM在处理一组输入时不会改变。
- 本地输入WM <= 全局输入WM
-
Local output WM(本地输出WM): 同样,为了应对转换的全局输出WM是远程的或不可知的情况,假设转换(Transform)知道在其全局输出WM的非递减上限,称为本地输出WM。 本地输出WM的上限是本地输入WM。 转换也明确地持有本地输出的WM,阻止WM向前推进。
- 全局输出WM <=本地输出WM <=本地输入WM(<=全局输入WM)
- Garbage collection WMs(垃圾回收WM): 每个PCollection概念上还有一个垃圾回收Watermark(GC WM)。 窗口末尾(EOW)被GC Watermark 超过时,窗口过期,该窗口的所有数据可能会被丢弃。 (见下文)
-
Local input & output GC WMs(本地输入和输出垃圾回收WM):
每个PCollection的消费者有一个本地输入GC Watermark(本地输入垃圾回收Watermark),对应于输入的垃圾回收Watermark(输入GC WM)。每个PCollection的生产者有一个本地输出GC ,Watermark(本地输出垃圾回收Watermark),对应于输出垃圾回收Watermark的上限。
下图中总结了对于转换(Transform)来说,最重要的Watermak。
定义和标记延迟
-
迟到数据:
添加到PCollection的数据元素,如果其时间戳早于全局WM就是迟到了,否则就是未迟到。
- 迟到: 时间戳< 全局WM
- 未迟到: 时间戳>= 全局 WM
-
窗格标签:
在本文中,分组运算中每1个输出成为1个窗格,窗格的标签有EARLY, ON_TIME和 LATE三种。
-
可丢弃: 如果GC WM超过了窗口的末尾 ,窗口就会过期,所有到达的属于这个窗口的数据就可能被丢弃。
- 可丢弃: EOW < GC WM
因为我们无法知道全局的Watermark,所以不能精确的知道延迟情况,下图中中介了哪些可以知道和哪些无法知道。
输入:
输出:
文档的剩余部分将会深入的讨论上边的不确定性。
要求
不变性
- 如果1条数据元素添加到输入PCollection的时候是未迟到的,那么从这个输入得到的输出,在添加到输出Pcollection也必须是未迟到的。
- 如果1条数据元素添加到输入PCollection的时候是不可丢弃的,那么从这个输入得到的输出在添加到输出Pcollection也是不可丢弃的。
- 如果窗格被发送出去了,那么就不能被丢弃。
窗格标签
- 输出窗格需要遵循正则表达式EARLY* ON_TIME? LATE*
- 如果一个窗格被标记为未迟到(non-LATE)标签后发出,那么此窗格必须真的是未迟到的。
- 如果一个窗格被标记为LATE标签后发出,那么它必须只包含了从迟到的输入数据中计算出来的输出。
对输入进行分组转换
这个场景下,我们运行了类似于GroupByKey or Combine这样的分组操作。值v的事件时间戳是tin ,值v到达以后缓存起来等待输出。
目标
- 满足上边的所有要求。
- 输入元素,如果在输出的时候是迟到的,不应该阻拦输出Watermark,除非为了避免变成可丢弃的。
定义
到达的元素v的输入时间戳是tin ,也包含了一个用户赋予的输出时间戳tout,限制条件是:tin <= tout <= EOW。对于ParDo这样的转换,一般tin = tout。区分tout的目的是允许输出WM在分组操作还没有准备好发送结果的时候向前推进,使其他窗口能够关闭。举例来说,可以将输出WM设置为窗口末尾或者未来的一个足够长的时间,来允许前一个滑动窗口关闭。
什么时候输入元素迟?
这里有两个问题比较有意思:
- 什么时刻输入PCollection的元素确定是迟到的? When is the element from the input PCollection known to be late?
-
什么时候输出PCollection 的结果将会迟到?
下边的图总结了tin,相对于输入PCollection的延迟和tout相对于输出PCollection的延迟。
tin 是迟到的 |
tin < 本地输入WM (<= 全局输入 WM) |
tin 可能未迟到 |
tin >= 本地输入WM(tin 可能位于全局输入WM的任意一侧,但是无法确定在哪一侧) |
tout 未迟到 |
tout >= 本地输出WM (>= 全局输出 WM) |
tout 可能迟到 |
tout < 本地输出 WM (tout 可能位于全局输出WM的任意一侧) |
如果tout 可能迟到,那么tin 一定是迟到的:
tin <= tout < 本地输出 WM <= 本地输入 WM
我们希望能够最多发出一个带有ON_TIME标签的窗格,该窗格必须包含截止到窗口末尾(EOW)时所有未迟到的输入数据(也有可能包含了一些恰巧达到的迟到数据)。既然与窗口末尾有关,那么添加到图中如下所示:
tout 是准时的 |
tout 未迟到,并且本地输入WM <= EOW。本地输入WM <= EOW这个条件用来确保带有ON_TIME 标签的窗格没有触发过。这个判断条件可以确保只有1个ON_TIME窗格被触发,这个判断是在本地做的,所以根据全局输入WM进行判断没有必要。 |
上边场景里最简单的例子是,如果tin 是可能迟到的,tout 就是准时的。
本地输入 WM <= tin <= EOW
本地输出 WM <= 本地输入 WM <= tin <= tout
如果本地输入WM超过了窗口末尾EOW呢?那么tin 毫无疑问是严重迟到的。
我们也可能已经出发了一个ON_TIME窗格。既然tin 确定迟到了,tout 被视作迟到是合理的,就算tout 有机会被转换为未迟到的。
这种情况下,tin是迟到的,tout 未迟到,并且输入WM没有超过窗口末尾EOW。
在这种情况下,因为tout 即基于迟到的输入来的,所以可以将tout 视为迟到对待,或者也可以生成非迟到的输出。将tout 视为未迟到的原因是,此时一定会有被标记为none-LATE的窗格来容纳输出。
什么时候输入元素可以被丢弃?
这里实际上有两个有趣的问题:
- 什么时候输入PCollection 的元素可以被丢弃?
- 什么时候输出结果将会被丢弃?
下表总结了tin 相对于输入PCollection 和 tout 相对于输出PCollection的可丢弃的判断条件。
tin 可丢弃 |
EOW < 本地输入GC WM (<= 全局输入GC WM) 在现在的实现中EOW < 本地WM – 最大允许延迟 (<= 全局输入WM – 最大允许延迟) |
tin 可能不可丢弃 |
EOW >= 本地输入GC WM(EOW可能处于全局输入GC WM的任意一侧,但是无法直接观察到) |
tout 不可丢弃 |
EOW >= 本地输入GC WM |
tout 可能可丢弃 |
EOW < 本地输出GC WM(tout可能在全局输出GC WM的任意一侧) |
如果tin 可能不可丢弃,那么tout 就是不可丢弃
- 本地输出 GC WM <=本地输入 GC WM <= tin <= tout
如果tout 可能可丢弃,那么tin 就是可丢弃的
- tin <= tout <= 本地输出GC WM <= 本地输入GC WM
我们应该做什么?
当数据元素到达的时候分组转换的时候,需要做以下判断:
- 是否应该丢失数据元素?
- 应该用什么来作为本地输出WM?
-
使用什么作为实际的输出时间戳temit ,temit 用来决定什么时候将分组的值发送出去?
- 这是元素的另一个时间戳,允许我们将迟到数据和非迟到数据相结合,得到非迟到的结果。
- tout <= temit <= EOW + 最大允许延迟。
我们选择输出尽可能多的非迟到数据,同时如果必须要更晚的输出数据,也要尽可能少的持有本地输出WM,并添加条件保证只有1个ON_TIME窗格。
-
如果tout 是准时的,那么就被当做未迟到来对待(tout 可以是未迟到但不是准时的,取决于本书输入WM是否超过了窗口末尾EOW)
- temit := tout
- hold := tout
-
如果tout 是不准时的,就当做迟到来对待。这是因为tout 如果是可能迟到的或者ON_TIME 窗格已经触发,这两种情况下tin 是确定迟到的。
- temit := 窗口末尾 (EOW)
- hold := EOW + 最大允许延迟 (这是一个代理持有本地输出GC WM到EOW的Watermark,防止产生可丢弃的窗格)
-
这种情况如下不等式和图片所示:
tout < 本地输出 WM (tout 可能位于全局输出WM的任意一侧)
EOW < 本地输入WM (ON_TIME窗格可能已经触发)
从图中来看,temit 通常来说会迟到,但永远不会被丢弃。temit 是位于正确窗口中的最迟的时间,而hold是防止丢弃的最迟的时间。
- 其他的处理方式,我们不选,因为当输入迟到的时候(temit := 本地输出WM)会导致输出WM的严重延迟。
- 如果本地输入GC WM 超过了窗口末尾就丢弃。在这种情况里,tin 和tout 都在窗口内,所以都是可以丢弃的。
输出分组转换的窗格
这个场景下,分组转换需要决定输出窗格,并且为输出元素打上EARLY, ON_TIME, 或LATE标签。根据OutputTimeFn(要注意的集中模式是使用最小值,最大值或EOW作为输出时间戳),窗格的输出时间戳记是从根据进入窗格中的元素"temit"计算的。 OutputTimeFn的规范确保了temit必须在最早的输入时间戳和窗口的末尾(包括两个端点,闭区间)之间。
目标
- 满足入门中所提及的所有要求Meet all the requirements from the introduction.
- 只输出一个ON_TIME 窗格Output only a single ON_TIME pane.
什么时候窗格会被打上LATE标签?
我们必须要决定窗格是否迟到(打上LATE标签),在判断的时候,不仅基于temit 和不同Watermark的状态,同时还必须满足所有的要求。
窗格是输出PCollection的1个元素,适用于LATE的标准定义。下边是回顾,待翻译!!!
-
temit 未迟到,如果:
- temit >= 本地输出 WM (>= 全局 WM)
- 设计上,这意味着对于每个进入窗格的tout 也是未迟到的。
-
temit 可能迟到:
- temit < 本地输出 WM(可能位于全局输出WM的任意一侧)
- 设计上,这意味着至少有一些tout 被视为迟到了,然后意味着对应的tin 必然是迟到的,所以可以放心的将输出标记为LATE 延迟发送。
-
temit是ON_TIME的窗格(忽略合并),如果temit是未迟到的,并且本地输入WM > EOW
- 此条件成立后的任何传入元素将被视为迟到,唯一的目的是确保ON_TIME窗格是唯一的。 所以任何进一步的水印保持将是EOW +延迟,并且EOW将发出更多的元素。如果我们想 ,也可以允许多个ON_TIME窗格,将这些元素视为未迟到的 进行处理(这没有什么明显的问题), 但是他们肯定晚于他们的输PCollection。
-
temit 可能是最后一个窗格,如果输入GC WM > EOW
- 任何超过这个条件的hold会被丢弃
如何为发出的窗格打标签?
决定如何标记发出的窗格,EARLY, ON_TIME, 或 LATE:
- 如果temit 可能迟到 (temit < 本地输出 WM),部分数据可能实际上是迟到的,所以必须要这么做。
-
如果temit 是未迟到的:
-
标记为ON_TIME ,temit 如果可以是ON_TIME 窗格(本地输入WM > EOW)。
Label it ON_TIME if temit can be the ON_TIME pane (local input WM > EOW).
- 标记为EARLY ,如果本地输入WM <= EOW (查看附录中对于EARLY的讨论)
-
- 通过一个isFinal 标识是否是最后一个窗格,当输入GC WM > EOW的时候为true。满足该条件的时候再也不会发送新的窗格,所以所有的输入将会被丢弃。
OutputTimeFn: 计算输出时间戳
对于任何一种分组转换,Pipeline必须制定如何聚合输出的值,也必须要指定如何聚合输出时间戳。Beam模型中的OutputTimeFn专门用来处理这类问题。包含了三个紧密结合的方面:
-
从tin 到tout 的映射
The mapping from tin to tout.
- 如何组合多个tout 时间戳
- 当窗口合并的时候如何组合tout
对于任何要聚合的值来说,只需要跟踪一个聚合时间戳输,这个时间戳就像是CombineFn的累加器。
将tin 映射到tout
以下是两种最常见的:
- 恒等映射,tout := tin
- tout := EOW, 忽略tin
在实际中,一些其他的映射也挺有用。例如,在滑动窗口中,将tout 移动到前一个窗口的末尾,可以让前一个窗口关闭,而不用考虑OutputTimeFn如何处理。
对于任何新的SDK或者执行引擎,推荐的默认行为是tout:= EOW。这样可以容易的推导和实现。自定义很有必要,所以这只是一个默认行为,但不是写死的。
组合
在一个单一窗口中,,所有元素的输出时间戳(tout)组合产生一个时间戳,作为输出的时间戳。时间戳的组合也要符合交换律和结合律,与值的组合类似,并且是一个合法窗口时间戳。为了确保不会将未迟到的输入变成了迟到,输出时间戳需要大于等于所有的非迟到输入时间戳。
如下是几个显而易见的组合输出时间戳的方式:
- 被进行分组处理的所有未迟到tout 时间戳的最小值。
- 某些应用于窗口计算的输入时间戳的最小值
-
被进行分组处理的所有未迟到tout 时间戳的最大值。
最小值Minimum: 迟到的时间戳必须被忽略,否则它们会导致未迟到的数据最终变成了迟到的聚合值。使用场景是允许输入时间戳被存储,然后在后续的过程中提取和使用。因为输入时间戳是最小值,所以持有了输出WM,所有的窗格内的时间戳都是未迟到的。
最大值Maximum: 在这种场景中,输出Watermark会尽可能快的向前推进。输入时间戳可能不会再次被解析出来和使用。
仅取决于窗口Depending only on window:简单、高效,且易于实现、并且不会有问题。推荐的默认行为是tout:= EOW。
合并
合并窗口时,窗口中所有数据的时间戳也需要进行聚合,生成1个新的时间戳。无论相对于待聚合的数据的输入时间戳,无论窗口何时合并,都应该产生相同的输出时间戳。特别注意,新窗口的末尾和输入时间戳之间没有任何关系。如下是两个最显而易见的合并方式:
- 与窗口合并时的时间相同。
-
基于合并的窗口重新赋予一个时间戳。
组合Combine: 这种场景下,输出时间戳只取决于输入数据元素。实际上,必须要满足的条件是,合并窗口的结果必须要与窗后先合并后添加元素是一致的。
重新赋予Re-assign: 这种情况下,还是一样tout:= EOW 。没有其他方式可以从输入时间戳系统地导出输出时间戳(存在使用合并窗口端点的最大和最小值的窗口化方案,尽管后者有点奇怪)。
附录Appendix: 合并和将窗格标记为ON_TIME
假设窗口w1和窗口末尾更靠后的w2 合并成w1+2 。窗口末尾改变,影响上边所有定义。如果窗口w1 ON_TIME窗格已经触发,而w2没有,对于窗口w1+2 这可能是一个标记为EARLY的窗格。此外,除非通过触发器禁止,否则将为w1+2发出附加的ON_TIME窗格。 简而言之,通过正则表达式EARLY* ON_TIME? LATE*保证只保留每窗口,而对于这一点,w1 and w1+2是不同的窗口。
结束!
转载需标明文章来源!http://blog.****.net/ffjl1985/article/details/78217218