pyspark mapPartitions函数是如何工作的?
所以我想学习使用Python(Pyspark)的Spark。我想知道功能mapPartitions
是如何工作的。这就是它所需要的输入和它给出的输出。我从互联网上找不到任何适当的例子。比方说,我有一个包含列表的RDD对象,如下所示。pyspark mapPartitions函数是如何工作的?
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
我想从所有列表中删除元素2,我将如何实现这一目标使用mapPartitions
。
mapPartition应该被认为是对分区的映射操作,而不是分区的元素。它的输入是当前分区的集合,它的输出将是另一组分区。
你通过地图一定带你去通过mapPartition必须将RDD类型的迭代您的RDD
功能的单个元素和其他一些或同一类型的返回和迭代函数。
在你的情况,你可能只想做这样的事情
def filterOut2(line):
return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)
,如果你想使用mapPartition这将是
def filterOut2FromPartion(list_of_lists):
final_iterator = []
for sub_list in list_of_lists:
final_iterator.append([x for x in sub_list if x != 2])
return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)
它更容易使用yield
使用mapPartitions与发电机功能语法:
def filter_out_2(partition):
for element in partition:
if element != 2:
yield element
filtered_lists = data.mapPartition(filter_out_2)
这比仅仅返回一个列表更快吗? – cgreen 2017-01-03 22:05:16
@cgreen该分区包含您的所有数据。我不确定你想要将所有数据加载到列表中。当您迭代数据时,生成器优先于列表。 – Narek 2017-01-03 22:40:28
@cgreen生成器使用较少的内存,因为它们根据需要生成每个项目,而不是最初必须生成整个对象列表。所以它绝对使用更少的内存,因此速度可能更快。 [这是Python中生成器的一个很好的解释](https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db)。 – 2017-11-26 22:52:28
为什么不在filterOut2FromPartition中返回任何内容f结。其次,在python中最后是一些关键字吗?我想你的意思是说final.iterator = []而不是final_iterator。 – MetallicPriest 2014-11-04 21:39:39
解决了问题 – bearrito 2014-11-05 01:30:51
我试图实现这个,但我得到错误“列表对象不是迭代器”。另外,我认为当你写[x for x in line if x!= 2]时,我认为你的意思是[x for x in list if x!= 2]。我在那里使用了列表。 – MetallicPriest 2014-11-05 10:27:55