获取星火重复的记录

问题描述:

我有一个数据帧DF如下所述:获取星火重复的记录

**customers** **product** **val_id** **rule_name** **rule_id** **priority** 
    1    A   1   ABC   123   1 
    3    Z   r   ERF   789   2 
    2    B   X   ABC   123   2 
    2    B   X   DEF   456   3 
    1    A   1   DEF   456   2  

我想创建一个新的数据帧DF2,这将只有唯一客户ID,但作为RULE_NAMERULE_ID列在数据同一客户不同的,所以我要选择哪些具有最高优先级相同客户那些记录,所以我的最终结果应该是:

**customers** **product** **val_id** **rule_name** **rule_id** **priority** 
     1    A   1   ABC   123   1 
     3    Z   r   ERF   789   2 
     2    B   X   ABC   123   2 

任何人都可以请帮助我使用Spark Scala实现它。任何帮助都会被蒙蔽。

你基本上想要选择列中具有极端值的行。这是一个非常常见的问题,所以甚至有一个标签。也看到这个问题SQL Select only rows with Max Value on a Column,它有一个很好的答案。

下面是您的具体情况的一个例子。

请注意,这可以为客户选择多行,如果为,那么该客户有多个具有相同(最小)优先级值的行。

这个例子是在pyspark,但它应该是简单的翻译到Scala

# find best priority for each customer. this DF has only two columns. 
cusPriDF = df.groupBy("customers").agg(F.min(df["priority"]).alias("priority")) 
# now join back to choose only those rows and get all columns back 
bestRowsDF = df.join(cusPriDF, on=["customers","priority"], how="inner") 

要通过优先创建DF2你必须首先为了DF,然后找到独特的客户id。就像这样:

val columns = df.schema.map(_.name).filterNot(_ == "customers").map(col => first(col).as(col)) 

val df2 = df.orderBy("priority").groupBy("customers").agg(columns.head, columns.tail:_*).show 

它会给你期望的输出:

+----------+--------+-------+----------+--------+---------+ 
| customers| product| val_id| rule_name| rule_id| priority| 
+----------+--------+-------+----------+--------+---------+ 
|   1|  A|  1|  ABC|  123|  1| 
|   3|  Z|  r|  ERF|  789|  2| 
|   2|  B|  X|  ABC|  123|  2| 
+----------+--------+-------+----------+--------+---------+ 

科里打我给它,但这里的斯卡拉版本:

val df = Seq(
(1,"A","1","ABC",123,1), 
(3,"Z","r","ERF",789,2), 
(2,"B","X","ABC",123,2), 
(2,"B","X","DEF",456,3), 
(1,"A","1","DEF",456,2)).toDF("customers","product","val_id","rule_name","rule_id","priority") 
val priorities = df.groupBy("customers").agg(min(df.col("priority")).alias("priority")) 
val top_rows = df.join(priorities, Seq("customers","priority"), "inner") 

+---------+--------+-------+------+---------+-------+ 
|customers|priority|product|val_id|rule_name|rule_id| 
+---------+--------+-------+------+---------+-------+ 
|  1|  1|  A|  1|  ABC| 123| 
|  3|  2|  Z|  r|  ERF| 789| 
|  2|  2|  B|  X|  ABC| 123| 
+---------+--------+-------+------+---------+-------+ 

你将不得不使用minaggregationprioritygroupingdataframecustomers然后inner join th e original dataframeaggregated dataframeselect所需的列。

val aggregatedDF = dataframe.groupBy("customers").agg(max("priority").as("priority_1")) 
     .withColumnRenamed("customers", "customers_1") 

    val finalDF = dataframe.join(aggregatedDF, dataframe("customers") === aggregatedDF("customers_1") && dataframe("priority") === aggregatedDF("priority_1")) 
    finalDF.select("customers", "product", "val_id", "rule_name", "rule_id", "priority").show 

你应该有希望的结果