多处理:如何在类中定义的函数上使用Pool.map?
当我运行类似:多处理:如何在类中定义的函数上使用Pool.map?
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
它工作正常。但是,把这个作为一类功能:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
使我有以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我已经看到亚历克斯·马尔泰利处理同类问题的帖子,但它不够明确。
我也很受限于什么样的函数pool.map可以接受。我写了以下来绕过这个。它似乎工作,即使递归使用parmap。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p,c) in pipe]
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
这对我来说非常好,谢谢。我发现了一个弱点:我尝试在一些传递给defaultdict的函数上使用parmap,并再次得到PicklingError。我没有找到解决办法,我只是重写我的代码不使用defaultdict。 – sans 2011-07-08 23:41:22
这并不在Python工作2.7.2(默认情况下,2011年6月12日,15点08分59秒)[MSC v.1500 32位(英特尔)]在Win32 – ubershmekel 2012-02-18 19:03:21
这确实关于Python 2.7.3月1,2012工作,05:14:39。 这并不巨iterables工作 - >它会导致OSERROR:[错误24]打开的文件太多,由于它打开管道的数量。 – 2013-01-18 19:19:22
在类中定义的函数(即使在类内的函数内)也不会真正的腌制。然而,这个工程:
def f(x):
return x*x
class calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
谢谢,但是我觉得在类之外定义函数有点脏。该类应该捆绑它实现给定任务所需的全部内容。 – Mermoz 2010-07-20 12:53:10
@Memoz:“班级应该把所有需要的东西捆绑在一起”真的吗?我找不到很多这样的例子。大多数类依赖于其他类或函数。为什么调用类的依赖“脏”?依赖关系有什么问题? – 2010-07-20 12:59:40
那么,函数不应该修改现有的类数据 - 因为它会修改其他进程中的版本 - 所以它可能是一个静态方法。您可以排序的腌制一个静态方法:http://stackoverflow.com/questions/1914261/pickling-a-staticmethod-in-python/1914798#1914798 或者,什么这个简单,你可以使用lambda。 – robert 2010-07-20 15:22:33
目前还没有解决您的问题,据我知道:你给map()
功能必须通过模块的进口访问。这就是为什么罗伯特的代码工作:f()
可以通过导入下面的代码获得功能:
def f(x):
return x*x
class Calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
if __name__ == '__main__':
cl = Calculate()
print cl.run()
其实我增加了一个“主要”部分,因为这是继recommendations for the Windows platform(“确保主模块可由新的Python解释器安全地导入,而不会造成意想不到的副作用“)。
我还在Calculate
前加了一个大写字母,以便跟在PEP 8之后。 :)
我也一直在努力。我有用作类的数据成员,作为一个简化的例子:
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
我需要在同一类内使用功能self.f在Pool.map()调用从和self.f没以一个元组为参数。由于这个函数被嵌入到一个类中,所以我不清楚如何编写包装类型的其他答案。
我解决了这个问题,它使用了一个不同的包装器,它使用一个元组/列表,其中第一个元素是该函数,其余元素是该函数的参数,称为eval_func_tuple(f_args)。使用这个,有问题的行可以被替换为返回pool.map(eval_func_tuple,itertools.izip(itertools.repeat(self.f),list1,list2))。下面是完整的代码:
文件:util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
文件:main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
运行main.py会给[11,22,33]。随意改进这一点,例如eval_func_tuple也可以修改为采用关键字参数。
另一个说明,在另一个答案中,功能“parmap”可以更有效地处理比可用CPU数量更多的进程。我正在复制下面的编辑版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我也重命名了一些变量。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
通过mrule该解决方案是正确的,但有一个错误:如果孩子发回大量数据,它可以填补管道缓冲区,阻止孩子的pipe.send()
,而家长在等待孩子退出pipe.join()
。解决方案是在儿童读取儿童数据之前。此外,孩子应该关闭父母的管道末端以防止死锁。下面的代码解决了这个问题。另请注意,此parmap
会在X
中为每个元素创建一个进程。更高级的解决方案是使用multiprocessing.cpu_count()
将X
分成多个块,然后在返回之前合并结果。我把这个作为练习留给读者,以免破坏mrule的简洁性。 ;)
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(ppipe, cpipe,x):
ppipe.close()
cpipe.send(f(x))
cpipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
ret = [p.recv() for (p,c) in pipe]
[p.join() for p in proc]
return ret
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
你如何选择进程数量? – 2016-04-27 13:39:52
它工作!谢谢。没有其他解决方案适用于我:D – 2016-04-27 14:36:18
但是,由于错误“OSError:[Errno 24]太多打开的文件”,它会很快死亡。我认为需要对进程数量进行某种限制才能正常工作...... – 2016-04-27 15:01:04
,因为使用“multiprocessing.Pool”不lambda表达式的工作代码的代码,而不是使用“multiprocessing.Pool”因为有产卵多达过程我不能使用至今发布的代码工作项目。
我改编了代码s.t.它会产生预定义数量的工作人员,并且只会在输入列表中迭代(如果存在空闲工作人员)。我还为员工提供了“守护进程”模式。 ctrl-c按预期工作。
import multiprocessing
def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
如何获得进度条以正确使用此parmap功能? – shockburner 2014-07-19 00:33:36
一个问题 - 我使用了这个解决方案,但注意到我产生的python进程在内存中保持活动状态。任何快速思考如何在你的parmap退出时杀死那些人? – CompEcon 2014-11-15 13:19:15
@ klaus-se我知道我们不鼓励在评论中表示感谢,但您的回答对我来说太重要了,我无法抗拒。我希望我可以给你的不仅仅是一个声誉更多... – deshtop 2015-07-30 17:58:05
除非您跳出标准库,否则多处理和酸洗会受到破坏和限制。
如果使用名为pathos.multiprocesssing
的multiprocessing
的分支,可以在多处理的map
函数中直接使用类和类方法。这是因为使用了dill
而不是pickle
或cPickle
和dill
可以序列化几乎所有的python。
pathos.multiprocessing
还提供了异步映射功能......它可以map
功能与多个参数(如map(math.pow, [1,2,3], [4,5,6])
)
见讨论: What can multiprocessing and dill do together?
和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
它甚至处理你最初编写的代码,不需要修改,也可以来自口译员。为什么还有其他更脆弱和更具体的案例呢?
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
... def run(self):
... def f(x):
... return x*x
... p = Pool()
... return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]
获得此代码: https://github.com/uqfoundation/pathos
而且,只是为了炫耀多一点的可以做什么:
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
pathos.multiprocessing也有一个异步映射('amap'),可以使用进度条和其他异步编程。 – 2014-04-15 14:05:36
我喜欢pathos.multiprocessing,它可以在享受多处理的同时几乎为非平行地图提供一个直接替换。我有pathos.multiprocessing.map的一个简单的包装,使得它更内存效率时处理只读大型数据在多个内核结构,请参见[本git仓库(https://github.com/fashandge/ python_parmap)。 – Fashandge 2014-12-29 02:50:03
似乎很有趣,但它不会安装。这是pip给出的信息:'无法找到满足要求的版本pp == 1.5.7-pathos(来自病态)' – xApple 2016-05-18 14:52:01
我修改克劳斯本身的方法,因为尽管这是工作对于我的小列表,当项目数量大于或等于1000时,它会挂起。我不是一次一个地用None
停止条件推送一个作业,而是一次加载输入队列,只让进程在其上进行咬合,直到它为空。
from multiprocessing import cpu_count, Queue, Process
def apply_func(f, q_in, q_out):
while not q_in.empty():
i, x = q_in.get()
q_out.put((i, f(x)))
# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
q_in, q_out = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]
return [x for i,x in sorted(res)]
编辑:可惜现在我遇到了我的系统上这个错误:Multiprocessing Queue maxsize limit is 32767,希望变通办法将有帮助。
我采取了klaus se's和aganders3的答案,并且制作了一个更具可读性的文档化模块,并将其保存在一个文件中。您可以将其添加到您的项目中。它甚至有一个可选的进度条!
"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.
Adapted from http://stackoverflow.com/a/16071616/287297
Example usage:
print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)
Comments:
"It spawns a predefined amount of workers and only iterates through the input list
if there exists an idle worker. I also enabled the "daemon" mode for the workers so
that KeyboardInterupt works as expected."
Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.
Alternatively, use this fork of multiprocessing:
https://github.com/uqfoundation/multiprocess
"""
# Modules #
import multiprocessing
from tqdm import tqdm
################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
while not queue_in.empty():
num, obj = queue_in.get()
queue_out.put((num, func_to_apply(obj)))
################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
# Number of processes to use #
if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
# Create queues #
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
# Process list #
new_proc = lambda t,a: multiprocessing.Process(target=t, args=a)
processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
# Put all the items (objects) in the queue #
sent = [q_in.put((i, x)) for i, x in enumerate(items)]
# Start them all #
for proc in processes:
proc.daemon = True
proc.start()
# Display progress bar or not #
if verbose:
results = [q_out.get() for x in tqdm(range(len(sent)))]
else:
results = [q_out.get() for x in range(len(sent))]
# Wait for them to finish #
for proc in processes: proc.join()
# Return results #
return [x for i, x in sorted(results)]
################################################################################
def test():
def slow_square(x):
import time
time.sleep(2)
return x**2
objs = range(20)
squares = prll_map(slow_square, objs, 4, verbose=True)
print "Result: %s" % squares
编辑:添加@亚历山大 - 麦克法兰的建议和测试功能
您的进度条有一个问题...该条只能衡量工作负载在处理器中的分配效率如何。如果工作负载完全分离,那么所有处理器将同时加入(),并且只会在'tqdm'显示中完成'100%'的闪光。只有每个处理器有偏差的工作负载 – 2016-06-30 21:17:23
移动'tqdm()'来包装行:'result = [q_out.get()for _ in tqdm(sent)]'并且它工作很多更好 - 很大的努力,虽然真的很感谢这个+1 – 2016-06-30 21:22:01
感谢您的建议,我会尝试一下,然后更新答案! – xApple 2016-07-02 10:49:56
我不知道,如果这种方法被征用,但一个工作,我周围正在使用的是:
from multiprocessing import Pool
t = None
def run(n):
return t.f(n)
class Test(object):
def __init__(self, number):
self.number = number
def f(self, x):
print x * self.number
def pool(self):
pool = Pool(2)
pool.map(run, range(10))
if __name__ == '__main__':
t = Test(9)
t.pool()
pool = Pool(2)
pool.map(run, range(10))
输出应该是:
0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
class Calculate(object):
# Your instance method to be executed
def f(self, x, y):
return x*y
if __name__ == '__main__':
inp_list = [1,2,3]
y = 2
cal_obj = Calculate()
pool = Pool(2)
results = pool.map(lambda x: cal_obj.f(x, y), inp_list)
您可能希望将这个函数应用于每个不同类的实例。然后这里是也
class Calculate(object):
# Your instance method to be executed
def __init__(self, x):
self.x = x
def f(self, y):
return self.x*y
if __name__ == '__main__':
inp_list = [Calculate(i) for i in range(3)]
y = 2
pool = Pool(2)
results = pool.map(lambda x: x.f(y), inp_list)
我知道这是在6年前问了,但只是想补充我的解决方案,因为一些上述建议看起来可怕复杂的解决方案,但我的解决办法其实很简单。
我只需要将pool.map()调用包装到一个辅助函数中。将类对象与args一起作为元组传递给方法,看起来有点像这样。
def run_in_parallel(args):
return args[0].method(args[1])
myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
这是我的解决方案,我觉得这里的解决方案比其他大多数人都少。这与睡衣的答案类似。
someclasses = [MyClass(), MyClass(), MyClass()]
def method_caller(some_object, some_method='the method'):
return getattr(some_object, some_method)()
othermethod = partial(method_caller, some_method='othermethod')
with Pool(6) as pool:
result = pool.map(othermethod, someclasses)
“这是一个类的功能”?你能发布实际得到实际错误的代码吗?没有实际的代码,我们只能猜测你做错了什么。 – 2010-07-20 10:05:39
@ S.Lott我发布了代码 – Mermoz 2010-07-20 12:12:38
作为一般性评论,存在比Python的标准pickle模块更强大的pickling模块(比如[picloud](https://pypi.python.org/pypi/cloud/2.7.2 )模块中提到[这个答案](http://stackoverflow.com/a/16626757/2292832))。 – 2013-08-20 15:50:40