获取星火重复的记录
问题描述:
我有一个数据帧DF如下所述:获取星火重复的记录
**customers** **product** **val_id** **rule_name** **rule_id** **priority**
1 A 1 ABC 123 1
3 Z r ERF 789 2
2 B X ABC 123 2
2 B X DEF 456 3
1 A 1 DEF 456 2
我想创建一个新的数据帧DF2,这将只有唯一客户ID,但作为RULE_NAME和RULE_ID列在数据同一客户不同的,所以我要选择哪些具有最高优先级相同客户那些记录,所以我的最终结果应该是:
**customers** **product** **val_id** **rule_name** **rule_id** **priority**
1 A 1 ABC 123 1
3 Z r ERF 789 2
2 B X ABC 123 2
任何人都可以请帮助我使用Spark Scala实现它。任何帮助都会被蒙蔽。
答
你基本上想要选择列中具有极端值的行。这是一个非常常见的问题,所以甚至有一个标签greatest-n-per-group。也看到这个问题SQL Select only rows with Max Value on a Column,它有一个很好的答案。
下面是您的具体情况的一个例子。
请注意,这可以为客户选择多行,如果为,那么该客户有多个具有相同(最小)优先级值的行。
这个例子是在pyspark
,但它应该是简单的翻译到Scala
# find best priority for each customer. this DF has only two columns.
cusPriDF = df.groupBy("customers").agg(F.min(df["priority"]).alias("priority"))
# now join back to choose only those rows and get all columns back
bestRowsDF = df.join(cusPriDF, on=["customers","priority"], how="inner")
答
要通过优先创建DF2你必须首先为了DF,然后找到独特的客户id。就像这样:
val columns = df.schema.map(_.name).filterNot(_ == "customers").map(col => first(col).as(col))
val df2 = df.orderBy("priority").groupBy("customers").agg(columns.head, columns.tail:_*).show
它会给你期望的输出:
+----------+--------+-------+----------+--------+---------+
| customers| product| val_id| rule_name| rule_id| priority|
+----------+--------+-------+----------+--------+---------+
| 1| A| 1| ABC| 123| 1|
| 3| Z| r| ERF| 789| 2|
| 2| B| X| ABC| 123| 2|
+----------+--------+-------+----------+--------+---------+
答
科里打我给它,但这里的斯卡拉版本:
val df = Seq(
(1,"A","1","ABC",123,1),
(3,"Z","r","ERF",789,2),
(2,"B","X","ABC",123,2),
(2,"B","X","DEF",456,3),
(1,"A","1","DEF",456,2)).toDF("customers","product","val_id","rule_name","rule_id","priority")
val priorities = df.groupBy("customers").agg(min(df.col("priority")).alias("priority"))
val top_rows = df.join(priorities, Seq("customers","priority"), "inner")
+---------+--------+-------+------+---------+-------+
|customers|priority|product|val_id|rule_name|rule_id|
+---------+--------+-------+------+---------+-------+
| 1| 1| A| 1| ABC| 123|
| 3| 2| Z| r| ERF| 789|
| 2| 2| B| X| ABC| 123|
+---------+--------+-------+------+---------+-------+
答
你将不得不使用min
aggregation
在priority
列grouping
dataframe
由customers
然后inner join
th e original dataframe
与aggregated dataframe
和select
所需的列。
val aggregatedDF = dataframe.groupBy("customers").agg(max("priority").as("priority_1"))
.withColumnRenamed("customers", "customers_1")
val finalDF = dataframe.join(aggregatedDF, dataframe("customers") === aggregatedDF("customers_1") && dataframe("priority") === aggregatedDF("priority_1"))
finalDF.select("customers", "product", "val_id", "rule_name", "rule_id", "priority").show
你应该有希望的结果