Python Spark实现map-reduce算法来创建(列,值)元组
问题描述:
更新(04/20/17): 我正在使用Apache Spark 2.1.0,我将使用Python。Python Spark实现map-reduce算法来创建(列,值)元组
我已经缩小了问题范围,希望有人对Spark有更多的了解。我需要从values.csv文件的头部创建的元组的RDD:
values.csv(主收集的数据,非常大的):
+--------+---+---+---+---+---+----+
| ID | 1 | 2 | 3 | 4 | 9 | 11 |
+--------+---+---+---+---+---+----+
| | | | | | | |
| abc123 | 1 | 2 | 3 | 1 | 0 | 1 |
| | | | | | | |
| aewe23 | 4 | 5 | 6 | 1 | 0 | 2 |
| | | | | | | |
| ad2123 | 7 | 8 | 9 | 1 | 0 | 3 |
+--------+---+---+---+---+---+----+
输出(RDD):
+----------+----------+----------+----------+----------+----------+----------+
| abc123 | (1;1) | (2;2) | (3;3) | (4;1) | (9;0) | (11;1) |
| | | | | | | |
| aewe23 | (1;4) | (2;5) | (3;6) | (4;1) | (9;0) | (11;2) |
| | | | | | | |
| ad2123 | (1;7) | (2;8) | (3;9) | (4;1) | (9;0) | (11;3) |
+----------+----------+----------+----------+----------+----------+----------+
发生了什么事我配对的每个值与该格式值的列名:
(column_number, value)
原始格式(如果你有兴趣与它的工作):
id,1,2,3,4,9,11
abc123,1,2,3,1,0,1
aewe23,4,5,6,1,0,2
ad2123,7,8,9,1,0,3
问题:
的例子values.csv文件只包含几列,但在实际的文件有成千上万的专栏。我可以提取标题并将其广播到分布式环境中的每个节点,但我不确定这是否是解决问题的最有效方法。是否可以通过并行头来实现输出?
答
我想你也可以使用PySpark Dataframe来实现解决方案。但是,我的解决方案尚未达到最佳状态。我使用split
来获取新的列名和相应的列来执行sum
。这取决于你的key_list
有多大。如果它太大,这可能无法正常工作,因为您必须在内存上加载key_list
(使用collect
)。
import pandas as pd
import pyspark.sql.functions as func
# example data
values = spark.createDataFrame(pd.DataFrame([['abc123', 1, 2, 3, 1, 0, 1],
['aewe23', 4, 5, 6, 1, 0, 2],
['ad2123', 7, 8, 9, 1, 0, 3]],
columns=['id', '1', '2', '3','4','9','11']))
key_list = spark.createDataFrame(pd.DataFrame([['a', '1'],
['b','2;4'],
['c','3;9;11']],
columns=['key','cols']))
# use values = spark.read.csv(path_to_csv, header=True) for your data
key_list_df = key_list.select('key', func.split('cols', ';').alias('col'))
key_list_rdd = key_list_df.rdd.collect()
for row in key_list_rdd:
values = values.withColumn(row.key, sum(values[c] for c in row.col if c in values.columns))
keys = [row.key for row in key_list_rdd]
output_df = values.select(keys)
输出
output_df.show(n=3)
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 3| 4|
| 4| 6| 8|
| 7| 9| 12|
+---+---+---+
我不知道这是不是我所此刻在我的代码更新的更好,因为你的代码必须阅读大量文件到大熊猫据帧,这是没有分发。我可能是错的。我更新了我的代码,以便在RDD中提供解决方案,但是我想知道是否可以改进它,因为我是Apache Spark的新手,尤其是get_output_row()函数需要传递收集的键列表版本。 – Dobob
哦,对于阅读部分,您可以通过直接提供CSV路径来'spark.read.csv(path_to_csv)'。它会给你PySpark数据帧。 – titipata