Apache Spark streaming - Timeout长时间运行批处理

问题描述:

我正在设置Apache Spark长时间运行的流式作业,以使用InputDStream执行(非并行化)流式传输。Apache Spark streaming - Timeout长时间运行批处理

我想要实现的是当队列上的批处理时间过长(基于用户定义的超时时间)时,我希望能够跳过批处理并完全放弃它 - 并继续其余部分执行。

我无法在Spark API或在线上找到解决这个问题的方法 - 我使用StreamingContext awaitTerminationOrTimeout进行了研究,但是这会在超时时杀死整个StreamingContext,而我想要做的就是跳过/ kill当前批次。

我也考虑过使用mapWithState,但是这似乎并不适用于这个用例。最后,我正在考虑设置一个StreamingListener,并在批处理启动时启动计时器,然后在达到某个超时阈值时批量停止/跳过/终止,但似乎还没有办法终止批处理。

谢谢!

+0

好奇为什么mapWithState在这里不适用。像在批处理中创建会话一样?像这样? – user1452132

+0

好吧,我没有使用Pair DStreams。理论上,如果我是,我也不清楚API - 如果我确实设置了一个键的超时时间,这是否会按照我的要求做(跳过批处理中的作业)? –

+0

这可能很难实现。听众会给你监控工作的运行时间的手段,但我认为取消它会很困难。我看着(作业调度程序)[https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L47],我看不到一个API钩子在哪里解散批处理的结果。如果你真的需要这个,恐怕你需要修补代码来实现这种截止期限取消政策。 – maasg

我看过一些来自yelp的文档,但我自己没有做过。

使用UpdateStateByKey(update_func)mapWithState(stateSpec)

  1. 连接超时时,首先看到的事件和状态初始化
  2. 下降,如果它过期状态

    def update_function(new_events, current_state): 
        if current_state is None: 
         current_state = init_state() 
         attach_expire_datetime(new_events) 
         ...... 
        if is_expired(current_state): 
         return None //current_state drops? 
        if new_events: 
         apply_business_logic(new_events, current_state) 
    

这看起来像结构化的流媒体水印还会在事件超时时删除事件,如果这可能适用于您的jo bs /阶段超时下降。