Apache Spark streaming - Timeout长时间运行批处理
问题描述:
我正在设置Apache Spark长时间运行的流式作业,以使用InputDStream执行(非并行化)流式传输。Apache Spark streaming - Timeout长时间运行批处理
我想要实现的是当队列上的批处理时间过长(基于用户定义的超时时间)时,我希望能够跳过批处理并完全放弃它 - 并继续其余部分执行。
我无法在Spark API或在线上找到解决这个问题的方法 - 我使用StreamingContext awaitTerminationOrTimeout进行了研究,但是这会在超时时杀死整个StreamingContext,而我想要做的就是跳过/ kill当前批次。
我也考虑过使用mapWithState,但是这似乎并不适用于这个用例。最后,我正在考虑设置一个StreamingListener,并在批处理启动时启动计时器,然后在达到某个超时阈值时批量停止/跳过/终止,但似乎还没有办法终止批处理。
谢谢!
答
我看过一些来自yelp的文档,但我自己没有做过。
使用UpdateStateByKey(update_func)
或mapWithState(stateSpec)
,
- 连接超时时,首先看到的事件和状态初始化
-
下降,如果它过期状态
def update_function(new_events, current_state): if current_state is None: current_state = init_state() attach_expire_datetime(new_events) ...... if is_expired(current_state): return None //current_state drops? if new_events: apply_business_logic(new_events, current_state)
这看起来像结构化的流媒体水印还会在事件超时时删除事件,如果这可能适用于您的jo bs /阶段超时下降。
好奇为什么mapWithState在这里不适用。像在批处理中创建会话一样?像这样? – user1452132
好吧,我没有使用Pair DStreams。理论上,如果我是,我也不清楚API - 如果我确实设置了一个键的超时时间,这是否会按照我的要求做(跳过批处理中的作业)? –
这可能很难实现。听众会给你监控工作的运行时间的手段,但我认为取消它会很困难。我看着(作业调度程序)[https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L47],我看不到一个API钩子在哪里解散批处理的结果。如果你真的需要这个,恐怕你需要修补代码来实现这种截止期限取消政策。 – maasg