当群集大小很大时,Spark作业失败,小时成功
我有一个需要三个输入并执行两个外部连接的spark任务。数据采用键值格式(String,Array [String])。代码的最重要的部分是:当群集大小很大时,Spark作业失败,小时成功
val partitioner = new HashPartitioner(8000)
val joined = inputRdd1.fullOuterJoin(inputRdd2.fullOuterJoin(inputRdd3, partitioner), partitioner).cache
saveAsSequenceFile(joined, filter="X")
saveAsSequenceFile(joined, filter="Y")
我正在与r3.4xlarge司机节点和500个m3.xlarge工作节点上EMR工作。火花提交的参数是:
spark-submit --deploy-mode client --master yarn-client --executor-memory 3g --driver-memory 100g --executor-cores 3 --num-executors 4000 --conf spark.default.parallelism=8000 --conf spark.storage.memoryFraction=0.1 --conf spark.shuffle.memoryFraction=0.2 --conf spark.yarn.executor.memoryOverhead=4000 --conf spark.network.timeout=600s
更新:使用此设置,在火花的作业UI看到执行人的人数分别为500(每个节点之一)
的例外,我在驱动程序的日志中看到以下:
17/10/13 21:37:57 WARN HeartbeatReceiver: Removing executor 470 with no recent heartbeats: 616136 ms exceeds timeout 600000 ms
17/10/13 21:39:04 ERROR ContextCleaner: Error cleaning broadcast 5
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
...
有些事情我试过失败:
- 我想这个问题是因为的T这里有太多的执行者正在产生,驱动程序会追踪这些执行者。我试图通过将执行程序内存增加到4g来减少执行程序的数量。这没有帮助。
- 我试着将驱动程序的实例类型更改为r3.8xlarge,这也没有帮助。
令人惊讶的是,当我将工作节点的数量减少到300时,作业将运行文件。有没有人有任何其他假设为什么会发生?
我尝试了大量的配置,每次修改一个参数500个节点。我最终通过将HashPartitioner中的分区数从8000减少到3000来完成工作。
val partitioner = new HashPartitioner(3000)
所以可能是司机不堪重负了大量的洗牌是必须做的时候有更多的分区,因此下分区帮助。
那么理解Spark的分配是如何工作的,这是一个小问题。根据你的信息,你有500个节点,每个节点有4个核心。所以,你有4000个内核。你在处理你的请求是创建4000个执行器,每个执行器有3个内核。这意味着您正在为群集请求12000个内核,并且没有这样的事情。
RPC超时的这个错误通常与您在同一台计算机上启动多少个jvms相关联,并且由于同时发生的事件,该计算机无法在适当的时间作出响应。
您需要知道的是,--num-executors
最好与您的节点相关联,并且核心数量应该与每个节点中的核心相关联。
例如,m3.xLarge的配置是4个内核,15 Gb的RAM。在那里运行工作的最佳配置是什么?这取决于你打算做什么。看看你是否要运行只有一个工作,我建议你设置这样的:
spark-submit --deploy-mode client --master yarn-client --executor-memory 10g --executor-cores 4 --num-executors 500 --conf spark.default.parallelism=2000 --conf spark.yarn.executor.memoryOverhead=4000
这将使你的工作一切正常,如果没有问题,以适应您的数据,你的工人是更好地将default.parallelism
更改为2000,否则您将随着洗牌而浪费大量时间。
但是,我认为你可以做的最好的方法是保持默认情况下EMR启用它的动态分配,只需设置核心数量,并行度和内存,你的工作就像魅力一样运行。
- https://stackoverflow.com/questions/40474057/what-are-possible-reasons-for-receiving-timeoutexception-futures-timed-out-afte - https://stackoverflow.com/questions/45171175/错误错误清洁广播的异常 –