Flink的窗口算子 WindowOperator的实现原理
窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂。本文将以由面及点的方式来分析WindowOperator的实现。首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图:
上图中,左侧从左往右为事件流的方向。方框代表事件,事件流中夹杂着的竖直虚线代表水印,Flink通过水印分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事件流中注入水印。元素在streaming dataflow引擎中流动到WindowOperator时,会被分为两拨,分别是普通事件和水印。
如果是普通的事件,则会调用processElement方法(上图虚线框中的三个圆圈中的一个)进行处理,在processElement方法中,首先会利用窗口分配器为当前接收到的元素分配窗口,接着会调用触发器的onElement方法进行逐元素触发。对于时间相关的触发器,通常会注册事件时间或者处理时间定时器,这些定时器会被存储在WindowOperator的处理时间定时器队列和水印定时器队列中(见图中虚线框中上下两个圆柱体),如果触发的结果是FIRE,则对窗口进行计算。
如果是水印(事件时间场景),则方法processWatermark将会被调用,它将会处理水印定时器队列中的定时器。如果时间戳满足条件,则利用触发器的onEventTime方法进行处理。
而对于处理时间的场景,WindowOperator将自身实现为一个基于处理时间的触发器,以触发trigger方法来消费处理时间定时器队列中的定时器满足条件则会调用窗口触发器的onProcessingTime,根据触发结果判断是否对窗口进行计算。
以上是WindowOperator的常规流程最简单的表述,事实上其逻辑要复杂得多。我们首先分解掉几个内部核心对象,上图中我们看到有两个队列:分别是水印定时器队列和处理时间定时器队列。这里的定时器是什么?它有什么作用呢?接下来我们就来看看它的定义——WindowOperator的内部类Timer。Timer是所有时间窗口执行的基础,它其实是一个上下文对象,封装了三个属性:
timestamp:触发器触发的时间戳; key:当前元素所归属的分组的键; window:当前元素所属窗口;
在我们讲解窗口触发器时,我们曾提及过触发器上下文对象,它作为process系列方法参数。在WindowOperator内部我们终于看到了对该上下文对象接口的实现——Context,它主要提供了三种类型的方法:
提供状态存储与访问; 定时器的注册与删除; 窗口触发器process系列方法的包装;
在注册定时器时,会新建定时器对象并将其加入到定时器队列中。等到时间相关的处理方法(processWatermark和trigger)被触发调用,则会从定时器队列中消费定时器对象并调用窗口触发器,然后根据触发结果来判断是否触动窗口的计算。我们选择事件时间的处理方法processWatermark进行分析(处理时间的处理方法trigger跟其类似):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
|
以上方法虽然冗长但流程还算清晰,其中的fire方法用于对窗口进行计算,它会调用内部窗口函数(即InternalWindowFunction,它包装了WindowFunction)的apply方法。
而isCleanupTime和cleanup这对方法主要涉及到窗口的清理。如果当前窗口是时间窗口,且窗口的时间到达了清理时间,则会进行清理窗口清理。那么清理时间如何判断呢?Flink是通过窗口的最大时间戳属性结合允许延迟的时间联合计算的:
1 2 3 4 5 6 7 8 |
|
求出清理时间后会与定时器注册的时间进行对比,如果两者相等则布尔条件为真,否则为假:
1 2 3 4 |
|
下面我们来看一下清理方法主要做了哪些事情:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
关于窗口清理,其实三大处理方法(processElement\/processWatermark\/trigger)都会进行判断,如果满足条件则清理。而真正注册清理定时器的逻辑在processElement中,它会调用registerCleanupTimer方法:
1 2 3 4 5 6 7 8 9 10 |
|
从上面的代码段可知:清理定时器跟普通定时器是一样的。
如果没有延迟,对于事件时间和处理时间而言,也许它们的窗口清理不一定是由清理定时器触发。因为在事件时间触发器和处理时间触发器中,它们注册的定时器对应的时间点就是窗口的最大时间戳。由于这些定时器在队列中一般排在清理定时器之前,所以这些定时器会优先于清理定时器得到执行(优先触发窗口的清理)。而这里的registerCleanupTimer方法,是一般化的清理机制,针对所有类型的窗口都适用,并确保窗口一定会得到清理。而对于刚刚提到的这种情况,重复的“清理”定时器并不会产生负作用。
WindowOperator还有一个继承者:EvictingWindowOperator,该算子在常规的窗口算子上支持了元素驱逐器(见上图中大虚线框内部的小虚线长方形)。EvictingWindowOperator特别的地方主要在于其fire的实现——在进行窗口计算之前会预先对符合驱逐条件的元素进行剔除,具体实现见如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
在最终调用窗口计算的apply方法之前,会先计算要驱逐的元素个数,然后跳过这些元素并且跳过的都是从首个元素开始的连续个元素(这一点在之前我们分析窗口元素驱逐器是也曾提及过)。
这里采用了Guava类库的FluentIterable帮助类,它扩展了Iterable接口并提供了非常丰富的扩展API。