将RDD转换为DataFrame时会导致重新分区的原因是什么?
问题描述:
我使用Window.sum
函数来获取RDD中的值的总和,但是当我将DataFrame转换为RDD时,我发现结果只有一个分区。重新分区何时发生? ?将RDD转换为DataFrame时会导致重新分区的原因是什么?
val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8), 4)
val df = rdd.toDF("values").
withColumn("csum", sum(col("values")).over(Window.orderBy("values")))
df.show()
println(s"numPartitions ${df.rdd.getNumPartitions}")
// 1
//df is:
// +------+----+
// |values|csum|
// +------+----+
// | 1| 1|
// | 2| 3|
// | 3| 6|
// | 4| 10|
// | 5| 15|
// | 6| 21|
// | 7| 28|
// | 8| 36|
// +------+----+
我添加partitionBy在窗口,但结果是错误,我应该怎么做,这是我改变代码:
val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4)
val sqlContext = new SQLContext(m_sparkCtx)
import sqlContext.implicits._
val df = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy("values").orderBy("values")))
df.show()
println(s"numPartitions ${df.rdd.getNumPartitions}")
//1
//df is:
// +------+----+
// |values|csum|
// +------+----+
// | 1| 1|
// | 6| 6|
// | 3| 3|
// | 5| 5|
// | 4| 4|
// | 8| 8|
// | 7| 7|
// | 2| 2|
// +------+----+
答
Window
功能有partitionBy
API用于分组的dataframe
和orderBy
订购按升序或降序分组rows
。
在您的第一个案例中,您尚未定义partitionBy
,因此所有值都归入一个dataframe
以进行排序,从而将数据混合到一个分区中。
但在第二种情况下,您在values
本身上定义了partitionBy
。因此,由于每个值都不相同,因此每个row
都被分组为单个组。
的在第二种情况下partition
是200,因为这是当你还没有定义分区和洗牌发生
当您第一种情况下得到同样的结果在spark
定义的默认分区,则需要添加其他column
与分组价值,以便您可以将它们按照您的第一种情况分组,即分组到一个组中。
val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4)
val df = rdd.toDF("values").withColumn("grouping", lit("group"))
df.withColumn("csum", sum(col("values")).over(Window.partitionBy("grouping").orderBy("values"))).drop("grouping").show(false)
通过这样做,我看到您的原始分区被保留。
我该怎么办? – mentongwu
“我该怎么办?”是什么意思? –
我该怎么做才能得到与分区相同的结果? – mentongwu