认沽值从一行到另一阶dataaframe
我有一个的大数据帧有222列我想要做像下面的例子认沽值从一行到另一阶dataaframe
|id |day |col1 |col2 | col3 ....................
+----------+----------------+-------+-----+
| 329| 0| null|2.0
| 329| 42| null|null
| 329| 72| 5.55|null
| 329| 106| null|null
| 329| 135| null|3.0
| 329| 168| null|4.0
| 329| 189| 4.995|null
| 329| 212| null|6.0
| 329| 247| null|null
| 329| 274| null|8.0
|id | day |col1 |col2 |.......................
+----------+----------------+-------+-----+
| 329| 0| null|2.0
| 329| 42| null|2.0
| 329| 72| 5.55|2.0
| 329| 106| 5.55|2.0
| 329| 135| 5.55|3.0
| 329| 168| 5.55|4.0
| 329| 189| 4.995|4.0
| 329| 212| 4.995|6.0
| 329| 247| 4.995|6.0
| 329| 274| 4.995|8.0
.
.
.
.
.
1.read行1 2.我有85K的唯一ID的和每个ID有10个结果(只有一个ID的示出的示例)3.如果 在第2行的数据不存在,则把它从ID的前一行
我得到导致这样
id | day |original_col1 |Result_col1|prevValue|
+----------+----------------+--------------+-----------+---------+
| 329| 0| null | null | null|
| 329| 42| null | null | null|
| 329| 72| 5.55 | 5.55 | null|
| 329| 106| null | 5.55 | 5.55|
| 329| 135| null | null | null|
| 329| 168| null | null | null|
| 329| 189| 4.995 | 4.995 | null|
| 329| 212| null | 4.995 | 4.995|
| 330|....................................................
| 330|.....................................................
.
你不能做到这一点与现有的窗函数(例如落后)。您需要使用类似的概念进行分区和排序,但需要使用定制逻辑来滚动非空值。
case class MyRec(id: Integer, day: Integer, col1: Option[Double], col2: Option[Double])
defined class MyRec
scala> :paste
// Entering paste mode (ctrl-D to finish)
val ds = Seq(
MyRec(329, 0, None, Some(2.0)),
MyRec(329, 42, None, None),
MyRec(329, 72, Some(5.55), None),
MyRec(329, 106, None, None),
MyRec(329, 135, None, Some(3.0)),
MyRec(329, 168, None, Some(4.0)),
MyRec(329, 189, Some(4.995), None),
MyRec(329, 212, None, Some(6.0)),
MyRec(329, 247, None, None),
MyRec(329, 274, None, Some(8.0))
).toDS()
ds.printSchema()
ds.show(false)
val updated_ds = ds.repartition('id).sortWithinPartitions('id, 'day)
.mapPartitions(iter => {
var crtId: Integer = null
var prevId: Integer = null
var rollingVals = collection.mutable.Map[String, Option[Double]]()
for (rec <- iter) yield {
crtId = rec.id
// 1st record for new id
if (prevId == null || crtId != prevId) {
rollingVals = collection.mutable.Map[String, Option[Double]]()
prevId = crtId
}
rollingVals("col1") = if (rec.col1.isDefined) rec.col1 else rollingVals.getOrElse("col1", None)
rollingVals("col2") = if (rec.col2.isDefined) rec.col2 else rollingVals.getOrElse("col2", None)
MyRec(rec.id, rec.day, rollingVals("col1"), rollingVals("col2"))
}
})
updated_ds.printSchema()
updated_ds.show(false)
// Exiting paste mode, now interpreting.
root
|-- id: integer (nullable = true)
|-- day: integer (nullable = true)
|-- col1: double (nullable = true)
|-- col2: double (nullable = true)
+---+---+-----+----+
|id |day|col1 |col2|
+---+---+-----+----+
|329|0 |null |2.0 |
|329|42 |null |null|
|329|72 |5.55 |null|
|329|106|null |null|
|329|135|null |3.0 |
|329|168|null |4.0 |
|329|189|4.995|null|
|329|212|null |6.0 |
|329|247|null |null|
|329|274|null |8.0 |
+---+---+-----+----+
root
|-- id: integer (nullable = true)
|-- day: integer (nullable = true)
|-- col1: double (nullable = true)
|-- col2: double (nullable = true)
+---+---+-----+----+
|id |day|col1 |col2|
+---+---+-----+----+
|329|0 |null |2.0 |
|329|42 |null |2.0 |
|329|72 |5.55 |2.0 |
|329|106|5.55 |2.0 |
|329|135|5.55 |3.0 |
|329|168|5.55 |4.0 |
|329|189|4.995|4.0 |
|329|212|4.995|6.0 |
|329|247|4.995|6.0 |
|329|274|4.995|8.0 |
+---+---+-----+----+
ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields]
updated_ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields]
它的工作谢谢你 –
使用窗函数,然后案例时:
val df2 = df
.withColumn("prevValue", lag('col1, 1).over(Window.partitionBy('id).orderBy('day)))
.withColumn("col1", when('col1.isNull, 'prevValue).otherwise('col1))
进口也spark.implicits._
我是新来的,所以请理解它可能很容易错误:重载的方法值滞后与替代品: (电子邮件:org.apache.spark.sql.Column,偏移量:Int,defaultValue:任何)org.apache.spark.sql .COLUMN
@RahulNirdhar对不起,我忘了一个参数。现在它应该工作:) –
谢谢,但它不工作,因为我想,它给出了错误的结果,请参阅结果的问题 –
是否有确定性的方式来排序数据,以便能够使用窗口函数(滞后)?我知道你希望在由col“id”定义的分区中应用上面的逻辑,但是除非你有办法定义一些排序(假定顺序很重要),对于id为“1”的分区,对于“col1”你可能会在第1/2/3行得到空值,结果会有所不同。如果数据没有排序,您可以尝试使用monotonically_increasing_id()函数在从文件/源读取数据后立即生成order_id。 – Traian
我忘了添加一列请现在检查,ID是唯一的,强制性ID不过是用户而且每个ID都少于6条记录。 –