pyspark mapPartitions函数是如何工作的?

问题描述:

所以我想学习使用Python(Pyspark)的Spark。我想知道功能mapPartitions是如何工作的。这就是它所需要的输入和它给出的输出。我从互联网上找不到任何适当的例子。比方说,我有一个包含列表的RDD对象,如下所示。pyspark mapPartitions函数是如何工作的?

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

我想从所有列表中删除元素2,我将如何实现这一目标使用mapPartitions

mapPartition应该被认为是对分区的映射操作,而不是分区的元素。它的输入是当前分区的集合,它的输出将是另一组分区。

你通过地图一定带你去通过mapPartition必须将RDD类型的迭代您的RDD

功能的单个元素和其他一些或同一类型的返回和迭代函数。

在你的情况,你可能只想做这样的事情

def filterOut2(line): 
    return [x for x in line if x != 2] 

filtered_lists = data.map(filterOut2) 

,如果你想使用mapPartition这将是

def filterOut2FromPartion(list_of_lists): 
    final_iterator = [] 
    for sub_list in list_of_lists: 
    final_iterator.append([x for x in sub_list if x != 2]) 
    return iter(final_iterator) 

filtered_lists = data.mapPartition(filterOut2FromPartion) 
+0

为什么不在filterOut2FromPartition中返回任何内容f结。其次,在python中最后是一些关键字吗?我想你的意思是说final.iterator = []而不是final_iterator。 – MetallicPriest 2014-11-04 21:39:39

+0

解决了问题 – bearrito 2014-11-05 01:30:51

+0

我试图实现这个,但我得到错误“列表对象不是迭代器”。另外,我认为当你写[x for x in line if x!= 2]时,我认为你的意思是[x for x in list if x!= 2]。我在那里使用了列表。 – MetallicPriest 2014-11-05 10:27:55

它更容易使用yield使用mapPartitions与发电机功能语法:

def filter_out_2(partition): 
    for element in partition: 
     if element != 2: 
      yield element 

filtered_lists = data.mapPartition(filter_out_2) 
+0

这比仅仅返回一个列表更快吗? – cgreen 2017-01-03 22:05:16

+1

@cgreen该分区包含您的所有数据。我不确定你想要将所有数据加载到列表中。当您迭代数据时,生成器优先于列表。 – Narek 2017-01-03 22:40:28

+0

@cgreen生成器使用较少的内存,因为它们根据需要生成每个项目,而不是最初必须生成整个对象列表。所以它绝对使用更少的内存,因此速度可能更快。 [这是Python中生成器的一个很好的解释](https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db)。 – 2017-11-26 22:52:28