在saprk流媒体应用程序中的异常处理

问题描述:

我开发了一个Spark Streaming应用程序,它检查一个文件流。我需要停止任何驱动器在我流应用程序exception..my代码如下:在saprk流媒体应用程序中的异常处理

val fileStream=.. 
    fileStream.checkpoint(Duration(batchIntervalSeconds * 1000 * 5)) 

//initiate the chekpointing 
fileStream.foreachRDD(r=> { 
    try { 
    r.count() 
    } catch { 
    case ex: Exception => { 
     ssc.stop(true, true) 
    } 

    } 
} 
) 

不过,我从上面的代码

yarn.ApplicationMaster: User class threw exception: 
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable 
org.apache.spark.streaming.StreamingContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 
    - field (class: UnionStream$$anonfun$creatingFunc$3, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext) 
    - object (class UnionStream$$anonfun$creatingFunc$3, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream) 
    - object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected]) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) 
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, 

我正在纱线我的代码异常集群模式..

你试过采取尝试{}赶上了forEachRDD和包装调用foreachrdd内尝试捕捉+ {},这样的事情

try { 
//initiate the chekpointing 
fileStream.foreachRDD(r=> { 
    r.count() 
    } 
} 
} catch { 
    case ex: Exception => { 
     ssc.stop(true, true) 
    } 
) 

从异常它看起来像spark正在采取foreachRDD块内的所有代码,包括需要SparkStreamingContext的异常句柄,并试图序列化它,以便它可以将它发送到将处理当前RDD上的进程的节点。由于SparkStreamingContext不是可序列化的,因此它爆炸了。

+0

我可以问一个问题吗? r.count()是否总是在驱动程序节点上运行?我需要从驱动程序中获取例外。 – mahdi62

+0

不管你在foreach里面做什么,RDD都会去处理任务的节点。但是你想要的是在发生错误时停止上下文,并且只能从驱动程序访问上下文。 –

要在foreachRDD调用中发生异常时停止Spark应用程序,请勿尝试捕获foreachRDD中的异常。而是将ssc.awaitTermination调用包装在try/catch块中并从那里调用ssc.stop:

val ssc = createStreamingContext()   
ssc.start() 
try { 
    ssc.awaitTermination() 
} catch { 
    case e: Exception => 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    throw e // to exit with error condition 
}