星火犯规让我算我加入dataframes

问题描述:

新星火乔布斯和我有以下问题。星火犯规让我算我加入dataframes

当我运行在任何新加入dataframes的计数,工作的年龄和运行内存溢出到磁盘。这里有没有逻辑错误?

// pass spark configuration 
    val conf = new SparkConf() 
     .setMaster(threadMaster) 
     .setAppName(appName) 

    // Create a new spark context 
    val sc = new SparkContext(conf) 

    // Specify a SQL context and pass in the spark context we created 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 


    // Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed 
    val dfSentRaw = sqlContext.read.parquet(inputPathSent) 
    val dfClickedRaw = sqlContext.read.parquet(inputPathClicked) 
    val dfFailedRaw = sqlContext.read.parquet(inputPathFailed) 



    // Rename the columns to avoid ambiguity when accessing the fields later 
    val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id") 
     .withColumnRenamed("campaign_id", "sent__campaign_id") 
     .withColumnRenamed("ced_email", "sent__ced_email") 
     .withColumnRenamed("event_captured_dt", "sent__event_captured_dt") 
     .withColumnRenamed("riid", "sent__riid") 


    val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id") 
     .withColumnRenamed("event_captured_dt", "clicked__event_captured_dt") 
    val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id") 


    // LEFT Join with CLICKED on two fields, customer_id and campaign_id 
    val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id") 
     && dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left") 
    dfSentClicked.count() //THIS WILL NOT WORK 

val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id") 
     && dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left") 

为什么不能再计算这两个/三个数据帧?我通过重命名了一些索引吗?

谢谢!

enter image description here

+0

几个问题:什么是您所看到的错误?你有没有试过对预加入数据帧进行计数?你是否尝试过在不同的数据框中合并列名并调用这个连接('join(right:DataFrame,usingColumns:Seq [String],joinType:String)')? –

+0

1.它没有真正的错误,我只是觉得在作业运行永远和泄漏(“溢出UnsafeExternalSorter内存到磁盘”) 2.是的,计数的作品 - 预连接 3.我不明白这一点,我做了什么不同? 非常感谢您的帮助! – ZedBrannigan

+0

数据的大小是多少?你是什​​么群集内存配置? – eliasah

count电话是你的星火工作的唯一实际物化在这里,所以它不是真正count这是一个问题,但目前正在为join权之前完成它的洗牌。您没有足够的内存来进行连接而不会溢出到磁盘。在shuffle中溢出到磁盘是使Spark工作永远持续=)的一种非常简单的方法。

一两件事,真的有助于防止与洗牌是有多个分区溢出。那么在任何给定的时间,通过洗牌的数据都会减少。您可以设置spark.sql.shuffle.partitions,它控制加入或聚合中Spark Sql使用的分区数。它默认为200,所以你可以尝试更高的设置。 http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

这时您可以提高当地星火分配的堆大小和/或增加内存的分数可通过增加spark.shuffle.memoryFraction(默认为0.4),并降低spark.storage.memoryFraction(默认为0.6)洗牌。例如,当您拨打.cache时,会使用存储分数,您可能不会在意。

如果您倾向于彻底避免泄漏,您可以通过将spark.shuffle.spill设置为false来关闭溢出。我相信如果您的内存不足,需要进行溢出而不是默默无闻地进行,并且可以帮助您更快地配置内存分配,这会引发异常。