Apache Beam核心--延迟和窗格设计

翻译自:谷歌Apache Beam项目Leader Kenneth Knowles以及Mark Shields。

 

本文中定义了Apache Beam编程模型中的延迟和数据丢弃,以及在Pipeline中如何传播。目的是为Runner开发者提供参考。

目录摘要:

入门

定义公式

Watermarks

定义和标记延迟

要求

不变性

窗格标签

对输入进行分组转换

目标

定义

什么时候输入元素迟?

什么时候输入元素可以被丢弃?

我们应该做什么?

输出分组转换的窗格

目标

什么时候窗格会被打上LATE标签?

如何为发出的窗格打标签?

OutputTimeFn: 计算输出时间戳

tin 映射到tout

组合

合并

附录Appendix: 合并和将窗格标记为ON_TIME

 

入门

数据迟到是Beam的无序处理模型所固有的。 数据迟到是什么意思? 其定义及其属性与跟踪每个事件时间域内每个计算进度的Watermark交织在一起。 从简单的直觉来说,处理迟到的数据是这样的:只有迟到的输入才能导致Pipeline中任何位置的迟到数据。

对于像ParDo这样的数据转换,这个要求很简单:输出时间戳等于输入时间戳。 如果时间戳被向后移动并且可能变迟到,则必须通过允许的偏移明确声明。 Watermark的传播同样简单。

对于GroupByKey和Combine等组合转换,稍微复杂一点, 分组变换描述了如何聚合值,但也必须为分组结果选择时间戳。 本文的其余部分探讨如何为分组的结果选择时间戳,以及Watermark应如何演变,以避免将迟到数据从未迟到数据中删除。

定义公式

Watermarks

TL;DR: 每个PCollection都有一个定义迟到的Watermark。 但是,当执行变换时,这些Watermark仅通过上下限知道。

Watermark (简称WM):对于一个时间为X的watermark,所有事件时间 < X的输入(输出)的数据都已经到达(处理/提交)。Watermark有以下几种:

  • Global WM(全局VM): 在概念上,每个PCollection在事件时间上有一个只增不减的Watermark(WM),称为全局WM。 这是一个近似值,表示在这个时间点上PCollection的数据已经全部到达。 我们不需要计算、记录这个Watarmark,甚至不能准确地知道(因为在分布式环境下)。

 

  • Global input/output WM(全局输入/输出WM):转换可能消费(或生成)一个或多个PCollection作为输入(或输出)。 其全局输入(或输出)WM是所有输入(或输出) PCollection的全局WM的最小值。 本文档的其余部分,我们忽略了具有一个或多个的输入或输出的区别。

     

  • Local input WM(本地输入WM): 全局输入WM是远程的或不可知的(因为在分布式环境下),为了应对这个局面,每个转换被推定为仅知道其全局输入WM的非递减下界,称为本地输入WM。 本地输入WM在处理一组输入时不会改变。
    • 本地输入WM <= 全局输入WM

 

  • Local output WM(本地输出WM): 同样,为了应对转换的全局输出WM是远程的或不可知的情况,假设转换(Transform)知道在其全局输出WM的非递减上限,称为本地输出WM。 本地输出WM的上限是本地输入WM。 转换也明确地持有本地输出的WM,阻止WM向前推进。
    • 全局输出WM <=本地输出WM <=本地输入WM(<=全局输入WM)
  • Garbage collection WMs(垃圾回收WM): 每个PCollection概念上还有一个垃圾回收Watermark(GC WM)。 窗口末尾(EOW)被GC Watermark 超过时,窗口过期,该窗口的所有数据可能会被丢弃。 (见下文)
  • Local input & output GC WMs(本地输入和输出垃圾回收WM):

    每个PCollection的消费者有一个本地输入GC Watermark(本地输入垃圾回收Watermark),对应于输入的垃圾回收Watermark(输入GC WM)。每个PCollection的生产者有一个本地输出GC ,Watermark(本地输出垃圾回收Watermark),对应于输出垃圾回收Watermark的上限。

下图中总结了对于转换(Transform)来说,最重要的Watermak。

Apache Beam核心--延迟和窗格设计

定义和标记延迟

  • 迟到数据:

    添加到PCollection的数据元素如果其时间戳早于全局WM就是迟到了否则就是未迟到

    • 迟到: 时间戳< 全局WM
    • 未迟到: 时间戳>= 全局 WM
  • 窗格标签:

    在本文中分组运算中每1个输出成为1个窗格,窗格的标签有EARLY, ON_TIME LATE三种

  • 可丢弃: 如果GC WM超过了窗口的末尾 ,窗口就会过期,所有到达的属于这个窗口的数据就可能被丢弃。
    • 可丢弃: EOW < GC WM

因为我们无法知道全局的Watermark,所以不能精确的知道延迟情况,下图中中介了哪些可以知道和哪些无法知道。

输入:

Apache Beam核心--延迟和窗格设计

 

输出:

Apache Beam核心--延迟和窗格设计

文档的剩余部分将会深入的讨论上边的不确定性。

要求

不变性

  • 如果1条数据元素添加到输入PCollection的时候是未迟到的那么从这个输入得到的输出在添加到输出Pcollection也必须是未迟到的
  • 如果1条数据元素添加到输入PCollection的时候是不可丢弃的那么从这个输入得到的输出在添加到输出Pcollection也是不可丢弃的
  • 如果窗格被发送出去了,那么就不能被丢弃。

窗格标签

  • 输出窗格需要遵循正则表达式EARLY* ON_TIME? LATE*
  • 如果一个窗格被标记为未迟到(non-LATE)标签后发出,那么此窗格必须真的是未迟到的。
  • 如果一个窗格被标记为LATE标签后发出那么它必须只包含了从迟到的输入数据中计算出来的输出

对输入进行分组转换

这个场景下,我们运行了类似于GroupByKey or Combine这样的分组操作。值v的事件时间戳是tin ,值v到达以后缓存起来等待输出。

目标

  • 满足上边的所有要求。
  • 输入元素,如果在输出的时候是迟到的,不应该阻拦输出Watermark,除非为了避免变成可丢弃的。

定义

到达的元素v的输入时间戳是tin ,也包含了一个用户赋予的输出时间戳tout,限制条件是:tin <= tout <= EOW。对于ParDo这样的转换,一般tin = tout区分tout的目的是允许输出WM在分组操作还没有准备好发送结果的时候向前推进,使其他窗口能够关闭。举例来说,可以将输出WM设置为窗口末尾或者未来的一个足够长的时间,来允许前一个滑动窗口关闭。

什么时候输入元素迟?

这里有两个问题比较有意思:

  1. 什么时刻输入PCollection的元素确定是迟到的? When is the element from the input PCollection known to be late?
  2. 什么时候输出PCollection 的结果将会迟到?

     

下边的图总结了tin,相对于输入PCollection的延迟和tout相对于输出PCollection的延迟。

 

Apache Beam核心--延迟和窗格设计

 

tin 是迟到的

tin < 本地输入WM (<= 全局输入 WM)

tin 可能未迟到

tin >= 本地输入WM(tin 可能位于全局输入WM的任意一侧,但是无法确定在哪一侧)

tout 未迟到

tout >= 本地输出WM (>= 全局输出 WM)

tout 可能迟到

tout < 本地输出 WM (tout 可能位于全局输出WM的任意一侧)

 

如果tout 可能迟到,那么tin 一定是迟到的:

tin <= tout < 本地输出 WM <= 本地输入 WM

Apache Beam核心--延迟和窗格设计

我们希望能够最多发出一个带有ON_TIME标签的窗格该窗格必须包含截止到窗口末尾(EOW)时所有未迟到的输入数据(也有可能包含了一些恰巧达到的迟到数据)。既然与窗口末尾有关,那么添加到图中如下所示:

 

Apache Beam核心--延迟和窗格设计

 

tout 是准时的

tout 未迟到,并且本地输入WM <= EOW。本地输入WM <= EOW这个条件用来确保带有ON_TIME 标签的窗格没有触发。这个判断条件可以确保只有1个ON_TIME窗格被触发,这个判断是在本地做的,所以根据全局输入WM进行判断没有必要。

 

上边场景里最简单的例子是,如果tin 是可能迟到的,tout 就是准时的。

本地输入 WM <= tin <= EOW

本地输出 WM <= 本地输入 WM <= tin <= tout

 

Apache Beam核心--延迟和窗格设计

如果本地输入WM超过了窗口末尾EOW呢?那么tin 毫无疑问是严重迟到的。

Apache Beam核心--延迟和窗格设计

我们也可能已经出发了一个ON_TIME窗格既然tin 确定迟到了,tout 被视作迟到是合理的,就算tout 有机会被转换为未迟到的。

这种情况下,tin是迟到的tout 未迟到,并且输入WM没有超过窗口末尾EOW。

 

 

Apache Beam核心--延迟和窗格设计

在这种情况下,因为tout 即基于迟到的输入来的,所以可以将tout 视为迟到对待,或者也可以生成非迟到的输出。将tout 视为未迟到的原因是,此时一定会有被标记为none-LATE的窗格来容纳输出。

什么时候输入元素可以被丢弃?

这里实际上有两个有趣的问题:

  1. 什么时候输入PCollection 的元素可以被丢弃?
  2. 什么时候输出结果将会被丢弃?

下表总结了tin 相对于输入PCollectiontout 相对于输出PCollection的可丢弃的判断条件。

 

tin 可丢弃

EOW < 本地输入GC WM (<= 全局输入GC WM)

在现在的实现中EOW < 本地WM – 最大允许延迟 (<= 全局输入WM – 最大允许延迟)

tin 可能不可丢弃

EOW >= 本地输入GC WM(EOW可能处于全局输入GC WM的任意一侧,但是无法直接观察到)

tout 不可丢弃

EOW >= 本地输入GC WM

tout 可能可丢弃

EOW < 本地输出GC WM(tout可能在全局输出GC WM的任意一侧)

 

如果tin 可能不可丢弃,那么tout 就是不可丢弃

  • 本地输出 GC WM <=本地输入 GC WM <= tin <= tout

如果tout 可能可丢弃,那么tin 就是可丢弃的

  • tin <= tout <= 本地输出GC WM <= 本地输入GC WM

我们应该做什么?

当数据元素到达的时候分组转换的时候,需要做以下判断:

  • 是否应该丢失数据元素?
  • 应该用什么来作为本地输出WM?
  • 使用什么作为实际的输出时间戳temittemit 用来决定什么时候将分组的值发送出去?
    • 这是元素的另一个时间戳,允许我们将迟到数据和非迟到数据相结合,得到非迟到的结果。
    • tout <= temit <= EOW + 最大允许延迟。

 

我们选择输出尽可能多的非迟到数据,同时如果必须要更晚的输出数据,也要尽可能少的持有本地输出WM,并添加条件保证只有1个ON_TIME窗格

  • 如果tout 是准时的,那么就被当做未迟到来对待(tout 可以是未迟到但不是准时的,取决于本书输入WM是否超过了窗口末尾EOW)
    • temit := tout
    • hold := tout

    Apache Beam核心--延迟和窗格设计

  • 如果tout 是不准时的,就当做迟到来对待。这是因为tout 如果是可能迟到的或者ON_TIME 窗格已经触发,这两种情况下tin 是确定迟到的。
    • temit := 窗口末尾 (EOW)
    • hold := EOW + 最大允许延迟 (这是一个代理持有本地输出GC WM到EOW的Watermark,防止产生可丢弃的窗格)
    • 这种情况如下不等式和图片所示:

      tout < 本地输出 WM (tout 可能位于全局输出WM的任意一侧)

Apache Beam核心--延迟和窗格设计

 

EOW < 本地输入WM (ON_TIME窗格可能已经触发)

Apache Beam核心--延迟和窗格设计

从图中来看,temit 通常来说会迟到,但永远不会被丢弃。temit 是位于正确窗口中的最迟的时间,而hold是防止丢弃的最迟的时间。

  • 其他的处理方式,我们不选,因为当输入迟到的时候(temit := 本地输出WM)会导致输出WM的严重延迟。
  • 如果本地输入GC WM 超过了窗口末尾就丢弃。在这种情况里,tintout 都在窗口内,所以都是可以丢弃的。

输出分组转换的窗格

这个场景下,分组转换需要决定输出窗格,并且为输出元素打上EARLY, ON_TIME, 或LATE标签根据OutputTimeFn(要注意的集中模式是使用最小值,最大值或EOW作为输出时间戳),窗格的输出时间戳记是从根据进入窗格中的元素"temit"计算的。 OutputTimeFn的规范确保了temit必须在最早的输入时间戳和窗口的末尾(包括两个端点,闭区间)之间。

目标

  • 满足入门中所提及的所有要求Meet all the requirements from the introduction.
  • 只输出一个ON_TIME 窗格Output only a single ON_TIME pane.

什么时候窗格会被打上LATE标签?

我们必须要决定窗格是否迟到(打上LATE标签),在判断的时候,不仅基于temit 和不同Watermark的状态,同时还必须满足所有的要求。

窗格是输出PCollection的1个元素,适用于LATE的标准定义下边是回顾,待翻译!!!

  • temit 未迟到如果
    • temit >= 本地输出 WM (>= 全局 WM)
    • 设计上,这意味着对于每个进入窗格的tout 也是未迟到的。
  • temit 可能迟到:
    • temit < 本地输出 WM(可能位于全局输出WM的任意一侧)
    • 设计上,这意味着至少有一些tout 被视为迟到了,然后意味着对应的tin 必然是迟到的,所以可以放心的将输出标记为LATE 延迟发送。
  • temitON_TIME的窗格(忽略合并),如果temit是未迟到的并且本地输入WM > EOW

    Apache Beam核心--延迟和窗格设计

    • 此条件成立后的任何传入元素将被视为迟到,唯一的目的是确保ON_TIME窗格是唯一的。 所以任何进一步的水印保持将是EOW +延迟,并且EOW将发出更多的元素。如果我们想 ,也可以允许多个ON_TIME窗格,将这些元素视为未迟到的 进行处理(这没有什么明显的问题), 但是他们肯定晚于他们的输PCollection。
  • temit 可能是最后一个窗格,如果输入GC WM > EOW Apache Beam核心--延迟和窗格设计
    • 任何超过这个条件的hold会被丢弃

如何为发出的窗格打标签?

决定如何标记发出的窗格,EARLY, ON_TIME, 或 LATE:

  • 如果temit 可能迟到 (temit < 本地输出 WM),部分数据可能实际上是迟到的,所以必须要这么做。
  • 如果temit 是未迟到的:
    • 标记为ON_TIMEtemit 如果可以是ON_TIME 窗格(本地输入WM > EOW)。

      Label it ON_TIME if temit can be the ON_TIME pane (local input WM > EOW).

    • 标记为EARLY ,如果本地输入WM <= EOW (查看附录中对于EARLY的讨论
  • 通过一个isFinal 标识是否是最后一个窗格,当输入GC WM > EOW的时候为true。满足该条件的时候再也不会发送新的窗格,所以所有的输入将会被丢弃。

OutputTimeFn: 计算输出时间戳

对于任何一种分组转换,Pipeline必须制定如何聚合输出的值,也必须要指定如何聚合输出时间戳。Beam模型中的OutputTimeFn专门用来处理这类问题。包含了三个紧密结合的方面:

  1. tintout 的映射

    The mapping from tin to tout.

  2. 如何组合多个tout 时间戳
  3. 当窗口合并的时候如何组合tout

对于任何要聚合的值来说,只需要跟踪一个聚合时间戳输,这个时间戳就像是CombineFn的累加器

tin 映射到tout

以下是两种最常见的:

  1. 恒等映射,tout := tin
  2. tout := EOW, 忽略tin

在实际中,一些其他的映射也挺有用。例如,在滑动窗口中,将tout 移动到前一个窗口的末尾,可以让前一个窗口关闭,而不用考虑OutputTimeFn如何处理

对于任何新的SDK或者执行引擎,推荐的默认行为是tout:= EOW。这样可以容易的推导和实现。自定义很有必要,所以这只是一个默认行为,但不是写死的。

组合

在一个单一窗口中,,所有元素的输出时间戳(tout)组合产生一个时间戳,作为输出的时间戳。时间戳的组合也要符合交换律和结合律,与值的组合类似,并且是一个合法窗口时间戳。为了确保不会将未迟到的输入变成了迟到,输出时间戳需要大于等于所有的非迟到输入时间戳。

如下是几个显而易见的组合输出时间戳的方式:

  1. 被进行分组处理的所有未迟到tout 时间戳的最小值。
  2. 某些应用于窗口计算的输入时间戳的最小值
  3. 被进行分组处理的所有未迟到tout 时间戳的最大值。

     

最小值Minimum: 迟到的时间戳必须被忽略,否则它们会导致未迟到的数据最终变成了迟到的聚合值。使用场景是允许输入时间戳被存储,然后在后续的过程中提取和使用。因为输入时间戳是最小值,所以持有了输出WM,所有的窗格内的时间戳都是未迟到的。

最大值Maximum: 在这种场景中,输出Watermark会尽可能快的向前推进。输入时间戳可能不会再次被解析出来和使用。

仅取决于窗口Depending only on window:简单、高效,且易于实现、并且不会有问题。推荐的默认行为是tout:= EOW。

合并

合并窗口时,窗口中所有数据的时间戳也需要进行聚合,生成1个新的时间戳。无论相对于待聚合的数据的输入时间戳,无论窗口何时合并,都应该产生相同的输出时间戳。特别注意,新窗口的末尾和输入时间戳之间没有任何关系。如下是两个最显而易见的合并方式:

  1. 与窗口合并时的时间相同。
  2. 基于合并的窗口重新赋予一个时间戳。

     

组合Combine: 这种场景下,输出时间戳只取决于输入数据元素。实际上,必须要满足的条件是,合并窗口的结果必须要与窗后先合并后添加元素是一致的。

重新赋予Re-assign: 这种情况下,还是一样tout:= EOW 。没有其他方式可以从输入时间戳系统地导出输出时间戳(存在使用合并窗口端点的最大和最小值的窗口化方案,尽管后者有点奇怪)。

附录Appendix: 合并和将窗格标记为ON_TIME

假设窗口w1和窗口末尾更靠后的w2 合并成w1+2 。窗口末尾改变,影响上边所有定义。如果窗口w1 ON_TIME窗格已经触发w2没有对于窗口w1+2 这可能是一个标记为EARLY的窗格此外,除非通过触发器禁止,否则将为w1+2发出附加的ON_TIME窗格。 简而言之,通过正则表达式EARLY* ON_TIME? LATE*保证只保留每窗口,而对于这一点,w1 and w1+2是不同的窗口。

结束!

转载需标明文章来源!http://blog.****.net/ffjl1985/article/details/78217218