并行化嵌套Python for循环
什么类型的并行Python方法适用于有效传播CPU绑定工作负载,如下所示。并行化该部分是否可行?看起来循环迭代之间没有太多紧密耦合,即只要在最后完成适当的通信以重建变量,就可以并行处理循环的部分。我目前使用的是Python2.7,但如果可以在较新的版本中轻松处理这个问题,那么我会考虑迁移代码库。并行化嵌套Python for循环
我试图用下面的例子来捕捉计算的精神。我相信它与我的实际代码具有相同的循环/变量之间的连通性。
nx = 20
ny = 30
myList1 = [0]*100
myList2 = [1]*25
value1 = np.zeros(nx)
value2 = np.zeros(ny)
store = np.zeros(nx,ny,len(myList1),len(myList2))
for i in range(nx):
for j in range(ny):
f = calc(value1[i],value2[j]) #returns a list
for k,data1 in enumerate(myList1):
for p,data2 in enumerate(myList2):
meanval = np.sum(f[:]/data1)*data2
store[i,j,k,p] = meanval
以下是您可以采取的两种方法。什么是智慧也取决于瓶颈在哪里,哪些东西最能衡量而不是猜测。
理想的选择是将所有低级优化都留给Numpy。现在,您可以混合使用本机Python代码和Numpy代码。后者在循环中播放不好。当然,它们是工作的,但通过在Python中进行循环,您可以按照指定的顺序强制执行操作。给予Numpy操作更好,它可以在尽可能多的元素上执行,即矩阵变换。这有利于性能,不仅仅是因为自动(部分)并行化;即使是单线程也能够从CPU中获得更多。强烈推荐阅读以了解更多关于此的是From Python to Numpy。
如果您确实需要并行化纯Python代码,那么除了多进程之外,您没有多少选择。为此,请参阅multiprocessing
模块。重新排列代码分为三个步骤:
- 准备投入每一项工作
- 分割工人的池之间的作业并行运行(叉/图)
- 收集的结果(加入/减少)
您需要在足够的过程之间取得平衡,以使并行有价值,而不是太多,以至于它们会过于短暂。转变流程并与他们沟通的成本将会变得非常重要。
一个简单的解决方案是生成一个(i,j)
对的列表,以便有nx*ny
作业。然后创建一个函数,将这个对作为输入并返回一个列表(i,j,k,p,meanval)
。尽量只使用函数的输入并返回结果。本地一切;无副作用等等。对全局变量(如myList1
)的只读访问是可以的,但修改需要文档中描述的特殊措施。将函数和输入列表传递给工作池。一旦完成了部分结果的生成,将所有这些结果合并到store
。
下面是一个示例脚本:
from multiprocessing import Pool
import numpy as np
# Global variables are OK, as long as their contents are not modified, although
# these might just as well be moved into the worker function or an initializer
nx = 20
ny = 30
myList1 = [0]*100
myList2 = [1]*25
value1 = np.zeros(nx)
value2 = np.zeros(ny)
def calc_meanvals_for(pair):
"""Process a reasonably sized chunk of the problem"""
i, j = pair
f = calc(value1[i], value2[j])
results = []
for k, data1 in enumerate(myList1):
for p, data2 in enumerate(myList2):
meanval = np.sum(f[:]/data1)*data2
results.append((i,j,k,p,meanval))
return results
# This module will be imported by every worker - that's how they will be able
# to find the global variables and the calc function - so make sure to check
# if this the main program, because without that, every worker will start more
# workers, each of which will start even more, and so on, in an endless loop
if __name__ == '__main__':
# Create a pool of worker processes, each able to use a CPU core
pool = Pool()
# Prepare the arguments, one per function invocation (tuples to fake multiple)
arg_pairs = [(i,j) for i in range(nx) for j in range(ny)]
# Now comes the parallel step: given a function and a list of arguments,
# have a worker invoke that function with one argument until all arguments
# have been used, collecting the return values in a list
return_values = pool.map(calc_meanvals_for, arg_pairs)
# Since the function also returns a list, there's now a list of lists - consider
# itertools.chain.from_iterable to flatten them - to be processed further
store = np.zeros(nx, ny, len(myList1), len(myList2))
for results in return_values:
for i, j, k, p, meanval in results:
store[i,j,k,p] = meanval
如果你有Python或numpy的代码,而无需或仅与其他模块几乎没有互动,我会建议numba。这里是一个使用多处理的标准并行化的例子和比较。 https://stackoverflow.com/a/45359693/4045774 – max9111