Spark:Transpose DataFrame Without Aggregating
问题描述:
我已经在线查看了一些问题,但他们似乎并没有做我想做的事情。Spark:Transpose DataFrame Without Aggregating
我在Scala上使用Apache Spark 2.0.2。
我有一个数据帧:
+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
| 1| 100| 0| 0| 0| 0| 0|
| 2| 0| 50| 0| 0| 20| 0|
| 3| 0| 0| 0| 0| 0| 0|
| 4| 0| 0| 0| 0| 0| 0|
+----------+-----+----+----+----+----+----+
,我要转置到
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+-----+----+----+----+
我一直在使用pivot()
尝试,但我无法得到正确的答案。我最终遍历了我的val{x}
列,并按照以下方式对每个列进行了旋转,但事实证明这很慢。
val d = df.select('segment_id, 'val1)
+----------+-----+
|segment_id| val1|
+----------+-----+
| 1| 100|
| 2| 0|
| 3| 0|
| 4| 0|
+----------+-----+
d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
+----+-----+----+----+----+
然后对val{x}
每次迭代给我的第一数据框中使用union()
。
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val2| 0| 50| 0| 0|
+----+-----+----+----+----+
是否有转在这里我不想汇总数据,更有效的方法是什么?
谢谢:)
答
不幸的是,在任何情况下:
- 星火
DataFrame
是有道理的考虑数据量。 - 数据转置是可行的。
您必须记住,在Spark中实现的DataFrame
是行的分布式集合,每行都在单个节点上存储和处理。
你可以表达对DataFrame
为pivot
置换:
val kv = explode(array(df.columns.tail.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
df
.withColumn("kv", kv)
.select($"segment_id", $"kv.k", $"kv.v")
.groupBy($"k")
.pivot("segment_id")
.agg(first($"v"))
.orderBy($"k")
.withColumnRenamed("k", "vals")
,但它仅仅是没有实际应用的玩具代码。在实践中,它比收集数据不是更好:既你会给你想要的结果
val df = Seq(
(1, 100, 0, 0, 0, 0, 0),
(2, 0, 50, 0, 0, 20, 0),
(3, 0, 0, 0, 0, 0, 0),
(4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
:
+----+---+---+---+---+
|vals| 1| 2| 3| 4|
+----+---+---+---+---+
|val1|100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+---+---+---+---+
话虽这么说,如果你
val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
case Array(h, t @ _*) => {
(h.map(_.toString), t.map(_.collect { case x: Int => x }))
}
}
val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)
spark.createDataFrame(sc.parallelize(rows), schema)
对于DataFrame
定义为需要对分布式数据结构进行有效的转换,您必须在其他地方查找。有许多结构,包括核心CoordinateMatrix
和BlockMatrix
,它们可以跨两个维度分布数据并且可以转置。
我怎样才能做到这一点与数据框? –
您是否期望不同的答案,或者您对现有答案满意? – 2016-12-11 02:34:59