如何从DataFrame获取最后一行?
我有一个DataFrame,DataFrame hava两列'value'和'timestamp','timestmp'是有序的,我想得到DataFrame的最后一行,我该怎么办?如何从DataFrame获取最后一行?
这是我输入:
+-----+---------+
|value|timestamp|
+-----+---------+
| 1| 1|
| 4| 2|
| 3| 3|
| 2| 4|
| 5| 5|
| 7| 6|
| 3| 7|
| 5| 8|
| 4| 9|
| 18| 10|
+-----+---------+
这是我的代码:
val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10))
var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp")
这是我预期的结果:
+-----+---------+
|value|timestamp|
+-----+---------+
| 18| 10|
+-----+---------+
我想简单地reduce
:
df.reduce { (x, y) =>
if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y
}
如果timestamp列是独一无二的,是递增顺序然后有以下方法得到最后一行
println(df.sort($"timestamp", $"timestamp".desc).first())
// Output [1,1]
df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println)
// Output [1,1]
df.where($"timestamp" === df.count()).show
输出:
+-----+---------+
|value|timestamp|
+-----+---------+
| 18| 10|
+-----+---------+
如果没有创建索引的新列并选择最后一个指标如下
val df1 = spark.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
StructType(df.schema.fields :+ StructField("index", LongType, false)))
df1.where($"timestamp" === df.count()).drop("index").show
输出:
+-----+---------+
|value|timestamp|
+-----+---------+
| 18| 10|
+-----+---------+
排序功能效率低下,我不想使用排序功能 – mentongwu
比你可以使用df.where($“timestamp”=== df.count()) –
最有效的方法是到你的DataFrame中reduce
。这给你一个你可以转换回DataFrame的单行,但由于它只包含1条记录,所以这没什么意义。
sparkContext.parallelize(
Seq(
df.reduce {
(a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b
} match {case Row(value:Int,timestamp:Int) => (value,timestamp)}
)
)
.toDF("value","timestamp")
.show
+-----+---------+
|value|timestamp|
+-----+---------+
| 18| 10|
+-----+---------+
效率较低(因为它需要改组)虽然短是这样的解决方案:
df
.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head)
是 我会简单地使用查询 - 订单表格由降序排列 - 来自这需要1个值为了
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
请问'df.where($ “时间戳” === MAX($ “时间戳”)'工作? –
它亘古不变的工作交流rangepartitioning(TS# 7 ASC NULLS FIRST,200) – mentongwu