在saprk流媒体应用程序中的异常处理
问题描述:
我开发了一个Spark Streaming应用程序,它检查一个文件流。我需要停止任何驱动器在我流应用程序exception..my代码如下:在saprk流媒体应用程序中的异常处理
val fileStream=..
fileStream.checkpoint(Duration(batchIntervalSeconds * 1000 * 5))
//initiate the chekpointing
fileStream.foreachRDD(r=> {
try {
r.count()
} catch {
case ex: Exception => {
ssc.stop(true, true)
}
}
}
)
不过,我从上面的代码
yarn.ApplicationMaster: User class threw exception:
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected])
- field (class: UnionStream$$anonfun$creatingFunc$3, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext)
- object (class UnionStream$$anonfun$creatingFunc$3, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected])
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
我正在纱线我的代码异常集群模式..
答
你试过采取尝试{}赶上了forEachRDD和包装调用foreachrdd内尝试捕捉+ {},这样的事情
try {
//initiate the chekpointing
fileStream.foreachRDD(r=> {
r.count()
}
}
} catch {
case ex: Exception => {
ssc.stop(true, true)
}
)
从异常它看起来像spark正在采取foreachRDD块内的所有代码,包括需要SparkStreamingContext的异常句柄,并试图序列化它,以便它可以将它发送到将处理当前RDD上的进程的节点。由于SparkStreamingContext不是可序列化的,因此它爆炸了。
答
要在foreachRDD调用中发生异常时停止Spark应用程序,请勿尝试捕获foreachRDD中的异常。而是将ssc.awaitTermination调用包装在try/catch块中并从那里调用ssc.stop:
val ssc = createStreamingContext()
ssc.start()
try {
ssc.awaitTermination()
} catch {
case e: Exception =>
ssc.stop(stopSparkContext = true, stopGracefully = true)
throw e // to exit with error condition
}
我可以问一个问题吗? r.count()是否总是在驱动程序节点上运行?我需要从驱动程序中获取例外。 – mahdi62
不管你在foreach里面做什么,RDD都会去处理任务的节点。但是你想要的是在发生错误时停止上下文,并且只能从驱动程序访问上下文。 –