多处理:如何在类中定义的函数上使用Pool.map?

问题描述:

当我运行类似:多处理:如何在类中定义的函数上使用Pool.map?

from multiprocessing import Pool 

p = Pool(5) 
def f(x): 
    return x*x 

p.map(f, [1,2,3]) 

它工作正常。但是,把这个作为一类功能:

class calculate(object): 
    def run(self): 
     def f(x): 
      return x*x 

     p = Pool() 
     return p.map(f, [1,2,3]) 

cl = calculate() 
print cl.run() 

使我有以下错误:

Exception in thread Thread-1: 
Traceback (most recent call last): 
    File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner 
    self.run() 
    File "/sw/lib/python2.6/threading.py", line 484, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

我已经看到亚历克斯·马尔泰利处理同类问题的帖子,但它不够明确。

+1

“这是一个类的功能”?你能发布实际得到实际错误的代码吗?没有实际的代码,我们只能猜测你做错了什么。 – 2010-07-20 10:05:39

+1

@ S.Lott我发布了代码 – Mermoz 2010-07-20 12:12:38

+0

作为一般性评论,存在比Python的标准pickle模块更强大的pickling模块(比如[picloud](https://pypi.python.org/pypi/cloud/2.7.2 )模块中提到[这个答案](http://stackoverflow.com/a/16626757/2292832))。 – 2013-08-20 15:50:40

我也很受限于什么样的函数pool.map可以接受。我写了以下来绕过这个。它似乎工作,即使递归使用parmap。

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(pipe,x): 
     pipe.send(f(x)) 
     pipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] 
    [p.start() for p in proc] 
    [p.join() for p in proc] 
    return [p.recv() for (p,c) in pipe] 

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5)) 
+1

这对我来说非常好,谢谢。我发现了一个弱点:我尝试在一些传递给defaultdict的函数上使用parmap,并再次得到PicklingError。我没有找到解决办法,我只是重写我的代码不使用defaultdict。 – sans 2011-07-08 23:41:22

+2

这并不在Python工作2.7.2(默认情况下,2011年6月12日,15点08分59秒)[MSC v.1500 32位(英特尔)]在Win32 – ubershmekel 2012-02-18 19:03:21

+3

这确实关于Python 2.7.3月1,2012工作,05:14:39。 这并不巨iterables工作 - >它会导致OSERROR:[错误24]打开的文件太多,由于它打开管道的数量。 – 2013-01-18 19:19:22

在类中定义的函数(即使在类内的函数内)也不会真正的腌制。然而,这个工程:

def f(x): 
    return x*x 

class calculate(object): 
    def run(self): 
     p = Pool() 
    return p.map(f, [1,2,3]) 

cl = calculate() 
print cl.run() 
+12

谢谢,但是我觉得在类之外定义函数有点脏。该类应该捆绑它实现给定任务所需的全部内容。 – Mermoz 2010-07-20 12:53:10

+3

@Memoz:“班级应该把所有需要的东西捆绑在一起”真的吗?我找不到很多这样的例子。大多数类依赖于其他类或函数。为什么调用类的依赖“脏”?依赖关系有什么问题? – 2010-07-20 12:59:40

+0

那么,函数不应该修改现有的类数据 - 因为它会修改其他进程中的版本 - 所以它可能是一个静态方法。您可以排序的腌制一个静态方法:http://stackoverflow.com/questions/1914261/pickling-a-staticmethod-in-python/1914798#1914798 或者,什么这个简单,你可以使用lambda。 – robert 2010-07-20 15:22:33

目前还没有解决您的问题,据我知道:你给map()功能必须通过模块的进口访问。这就是为什么罗伯特的代码工作:f()可以通过导入下面的代码获得功能:

def f(x): 
    return x*x 

class Calculate(object): 
    def run(self): 
     p = Pool() 
     return p.map(f, [1,2,3]) 

if __name__ == '__main__': 
    cl = Calculate() 
    print cl.run() 

其实我增加了一个“主要”部分,因为这是继recommendations for the Windows platform(“确保主模块可由新的Python解释器安全地导入,而不会造成意想不到的副作用“)。

我还在Calculate前加了一个大写字母,以便跟在PEP 8之后。 :)

我也一直在努力。我有用作类的数据成员,作为一个简化的例子:

from multiprocessing import Pool 
import itertools 
pool = Pool() 
class Example(object): 
    def __init__(self, my_add): 
     self.f = my_add 
    def add_lists(self, list1, list2): 
     # Needed to do something like this (the following line won't work) 
     return pool.map(self.f,list1,list2) 

我需要在同一类内使用功能self.f在Pool.map()调用从和self.f没以一个元组为参数。由于这个函数被嵌入到一个类中,所以我不清楚如何编写包装类型的其他答案。

我解决了这个问题,它使用了一个不同的包装器,它使用一个元组/列表,其中第一个元素是该函数,其余元素是该函数的参数,称为eval_func_tuple(f_args)。使用这个,有问题的行可以被替换为返回pool.map(eval_func_tuple,itertools.izip(itertools.repeat(self.f),list1,list2))。下面是完整的代码:

文件:util.py

def add(a, b): return a+b 

def eval_func_tuple(f_args): 
    """Takes a tuple of a function and args, evaluates and returns result""" 
    return f_args[0](*f_args[1:]) 

文件:main.py

from multiprocessing import Pool 
import itertools 
import util 

pool = Pool() 
class Example(object): 
    def __init__(self, my_add): 
     self.f = my_add 
    def add_lists(self, list1, list2): 
     # The following line will now work 
     return pool.map(util.eval_func_tuple, 
      itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__': 
    myExample = Example(util.add) 
    list1 = [1, 2, 3] 
    list2 = [10, 20, 30] 
    print myExample.add_lists(list1, list2) 

运行main.py会给[11,22,33]。随意改进这一点,例如eval_func_tuple也可以修改为采用关键字参数。

另一个说明,在另一个答案中,功能“parmap”可以更有效地处理比可用CPU数量更多的进程。我正在复制下面的编辑版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我也重命名了一些变量。

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(pipe,x): 
     pipe.send(f(x)) 
     pipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] 
    numProcesses = len(processes) 
    processNum = 0 
    outputList = [] 
    while processNum < numProcesses: 
     endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses) 
     for proc in processes[processNum:endProcessNum]: 
      proc.start() 
     for proc in processes[processNum:endProcessNum]: 
      proc.join() 
     for proc,c in pipe[processNum:endProcessNum]: 
      outputList.append(proc.recv()) 
     processNum = endProcessNum 
    return outputList  

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5))   

通过mrule该解决方案是正确的,但有一个错误:如果孩子发回大量数据,它可以填补管道缓冲区,阻止孩子的pipe.send(),而家长在等待孩子退出pipe.join()。解决方案是在儿童读取儿童数据之前。此外,孩子应该关闭父母的管道末端以防止死锁。下面的代码解决了这个问题。另请注意,此parmap会在X中为每个元素创建一个进程。更高级的解决方案是使用multiprocessing.cpu_count()X分成多个块,然后在返回之前合并结果。我把这个作为练习留给读者,以免破坏mrule的简洁性。 ;)

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(ppipe, cpipe,x): 
     ppipe.close() 
     cpipe.send(f(x)) 
     cpipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)] 
    [p.start() for p in proc] 
    ret = [p.recv() for (p,c) in pipe] 
    [p.join() for p in proc] 
    return ret 

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5)) 
+0

你如何选择进程数量? – 2016-04-27 13:39:52

+0

它工作!谢谢。没有其他解决方案适用于我:D – 2016-04-27 14:36:18

+0

但是,由于错误“OSError:[Errno 24]太多打开的文件”,它会很快死亡。我认为需要对进程数量进行某种限制才能正常工作...... – 2016-04-27 15:01:04

,因为使用“multiprocessing.Pool”不lambda表达式的工作代码的代码,而不是使用“multiprocessing.Pool”因为有产卵多达过程我不能使用至今发布的代码工作项目。

我改编了代码s.t.它会产生预定义数量的工作人员,并且只会在输入列表中迭代(如果存在空闲工作人员)。我还为员工提供了“守护进程”模式。 ctrl-c按预期工作。

import multiprocessing 


def fun(f, q_in, q_out): 
    while True: 
     i, x = q_in.get() 
     if i is None: 
      break 
     q_out.put((i, f(x))) 


def parmap(f, X, nprocs=multiprocessing.cpu_count()): 
    q_in = multiprocessing.Queue(1) 
    q_out = multiprocessing.Queue() 

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out)) 
      for _ in range(nprocs)] 
    for p in proc: 
     p.daemon = True 
     p.start() 

    sent = [q_in.put((i, x)) for i, x in enumerate(X)] 
    [q_in.put((None, None)) for _ in range(nprocs)] 
    res = [q_out.get() for _ in range(len(sent))] 

    [p.join() for p in proc] 

    return [x for i, x in sorted(res)] 


if __name__ == '__main__': 
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8])) 
+2

如何获得进度条以正确使用此parmap功能? – shockburner 2014-07-19 00:33:36

+2

一个问题 - 我使用了这个解决方案,但注意到我产生的python进程在内存中保持活动状态。任何快速思考如何在你的parmap退出时杀死那些人? – CompEcon 2014-11-15 13:19:15

+1

@ klaus-se我知道我们不鼓励在评论中表示感谢,但您的回答对我来说太重要了,我无法抗拒。我希望我可以给你的不仅仅是一个声誉更多... – deshtop 2015-07-30 17:58:05

除非您跳出标准库,否则多处理和酸洗会受到破坏和限制。

如果使用名为pathos.multiprocesssingmultiprocessing的分支,可以在多处理的map函数中直接使用类和类方法。这是因为使用了dill而不是picklecPickledill可以序列化几乎所有的python。

pathos.multiprocessing还提供了异步映射功能......它可以map功能与多个参数(如map(math.pow, [1,2,3], [4,5,6])

见讨论: What can multiprocessing and dill do together?

和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至处理你最初编写的代码,不需要修改,也可以来自口译员。为什么还有其他更脆弱和更具体的案例呢?

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> class calculate(object): 
... def run(self): 
... def f(x): 
... return x*x 
... p = Pool() 
... return p.map(f, [1,2,3]) 
... 
>>> cl = calculate() 
>>> print cl.run() 
[1, 4, 9] 

获得此代码: https://github.com/uqfoundation/pathos

而且,只是为了炫耀多一点的可以做什么:

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> 
>>> p = Pool(4) 
>>> 
>>> def add(x,y): 
... return x+y 
... 
>>> x = [0,1,2,3] 
>>> y = [4,5,6,7] 
>>> 
>>> p.map(add, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> 
>>> p.map(Test.plus, [t]*4, x, y) 
[4, 6, 8, 10] 
>>> 
>>> res = p.amap(t.plus, x, y) 
>>> res.get() 
[4, 6, 8, 10] 
+0

pathos.multiprocessing也有一个异步映射('amap'),可以使用进度条和其他异步编程。 – 2014-04-15 14:05:36

+0

我喜欢pathos.multiprocessing,它可以在享受多处理的同时几乎为非平行地图提供一个直接替换。我有pathos.multiprocessing.map的一个简单的包装,使得它更内存效率时处理只读大型数据在多个内核结构,请参见[本git仓库(https://github.com/fashandge/ python_parmap)。 – Fashandge 2014-12-29 02:50:03

+0

似乎很有趣,但它不会安装。这是pip给出的信息:'无法找到满足要求的版本pp == 1.5.7-pathos(来自病态)' – xApple 2016-05-18 14:52:01

我修改克劳斯本身的方法,因为尽管这是工作对于我的小列表,当项目数量大于或等于1000时,它会挂起。我不是一次一个地用None停止条件推送一个作业,而是一次加载输入队列,只让进程在其上进行咬合,直到它为空。

from multiprocessing import cpu_count, Queue, Process 

def apply_func(f, q_in, q_out): 
    while not q_in.empty(): 
     i, x = q_in.get() 
     q_out.put((i, f(x))) 

# map a function using a pool of processes 
def parmap(f, X, nprocs = cpu_count()): 
    q_in, q_out = Queue(), Queue() 
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)] 
    sent = [q_in.put((i, x)) for i, x in enumerate(X)] 
    [p.start() for p in proc] 
    res = [q_out.get() for _ in sent] 
    [p.join() for p in proc] 

    return [x for i,x in sorted(res)] 

编辑:可惜现在我遇到了我的系统上这个错误:Multiprocessing Queue maxsize limit is 32767,希望变通办法将有帮助。

我采取了klaus se's和aganders3的答案,并且制作了一个更具可读性的文档化模块,并将其保存在一个文件中。您可以将其添加到您的项目中。它甚至有一个可选的进度条!

""" 
The ``processes`` module provides some convenience functions 
for using parallel processes in python. 

Adapted from http://stackoverflow.com/a/16071616/287297 

Example usage: 

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True) 

Comments: 

"It spawns a predefined amount of workers and only iterates through the input list 
if there exists an idle worker. I also enabled the "daemon" mode for the workers so 
that KeyboardInterupt works as expected." 

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined. 

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess 
""" 

# Modules # 
import multiprocessing 
from tqdm import tqdm 

################################################################################ 
def apply_function(func_to_apply, queue_in, queue_out): 
    while not queue_in.empty(): 
     num, obj = queue_in.get() 
     queue_out.put((num, func_to_apply(obj))) 

################################################################################ 
def prll_map(func_to_apply, items, cpus=None, verbose=False): 
    # Number of processes to use # 
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32) 
    # Create queues # 
    q_in = multiprocessing.Queue() 
    q_out = multiprocessing.Queue() 
    # Process list # 
    new_proc = lambda t,a: multiprocessing.Process(target=t, args=a) 
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)] 
    # Put all the items (objects) in the queue # 
    sent = [q_in.put((i, x)) for i, x in enumerate(items)] 
    # Start them all # 
    for proc in processes: 
     proc.daemon = True 
     proc.start() 
    # Display progress bar or not # 
    if verbose: 
     results = [q_out.get() for x in tqdm(range(len(sent)))] 
    else: 
     results = [q_out.get() for x in range(len(sent))] 
    # Wait for them to finish # 
    for proc in processes: proc.join() 
    # Return results # 
    return [x for i, x in sorted(results)] 

################################################################################ 
def test(): 
    def slow_square(x): 
     import time 
     time.sleep(2) 
     return x**2 
    objs = range(20) 
    squares = prll_map(slow_square, objs, 4, verbose=True) 
    print "Result: %s" % squares 

编辑:添加@亚历山大 - 麦克法兰的建议和测试功能

+0

您的进度条有一个问题...该条只能衡量工作负载在处理器中的分配效率如何。如果工作负载完全分离,那么所有处理器将同时加入(),并且只会在'tqdm'显示中完成'100%'的闪光。只有每个处理器有偏差的工作负载 – 2016-06-30 21:17:23

+1

移动'tqdm()'来包装行:'result = [q_out.get()for _ in tqdm(sent)]'并且它工作很多更好 - 很大的努力,虽然真的很感谢这个+1 – 2016-06-30 21:22:01

+0

感谢您的建议,我会尝试一下,然后更新答案! – xApple 2016-07-02 10:49:56

我不知道,如果这种方法被征用,但一个工作,我周围正在使用的是:

from multiprocessing import Pool 

t = None 

def run(n): 
    return t.f(n) 

class Test(object): 
    def __init__(self, number): 
     self.number = number 

    def f(self, x): 
     print x * self.number 

    def pool(self): 
     pool = Pool(2) 
     pool.map(run, range(10)) 

if __name__ == '__main__': 
    t = Test(9) 
    t.pool() 
    pool = Pool(2) 
    pool.map(run, range(10)) 

输出应该是:

0 
9 
18 
27 
36 
45 
54 
63 
72 
81 
0 
9 
18 
27 
36 
45 
54 
63 
72 
81 

class Calculate(object): 
    # Your instance method to be executed 
    def f(self, x, y): 
    return x*y 

if __name__ == '__main__': 
    inp_list = [1,2,3] 
    y = 2 
    cal_obj = Calculate() 
    pool = Pool(2) 
    results = pool.map(lambda x: cal_obj.f(x, y), inp_list) 

您可能希望将这个函数应用于每个不同类的实例。然后这里是也

class Calculate(object): 
    # Your instance method to be executed 
    def __init__(self, x): 
    self.x = x 

    def f(self, y): 
    return self.x*y 

if __name__ == '__main__': 
    inp_list = [Calculate(i) for i in range(3)] 
    y = 2 
    pool = Pool(2) 
    results = pool.map(lambda x: x.f(y), inp_list) 

我知道这是在6年前问了,但只是想补充我的解决方案,因为一些上述建议看起来可怕复杂的解决方案,但我的解决办法其实很简单。

我只需要将pool.map()调用包装到一个辅助函数中。将类对象与args一起作为元组传递给方法,看起来有点像这样。

def run_in_parallel(args): 
    return args[0].method(args[1]) 

myclass = MyClass() 
method_args = [1,2,3,4,5,6] 
args_map = [ (myclass, arg) for arg in method_args ] 
pool = Pool() 
pool.map(run_in_parallel, args_map) 

这是我的解决方案,我觉得这里的解决方案比其他大多数人都少。这与睡衣的答案类似。

someclasses = [MyClass(), MyClass(), MyClass()] 

def method_caller(some_object, some_method='the method'): 
    return getattr(some_object, some_method)() 

othermethod = partial(method_caller, some_method='othermethod') 

with Pool(6) as pool: 
    result = pool.map(othermethod, someclasses)