Python多处理 - 跟踪pool.map操作的过程
我有一个函数执行一些模拟,并且 返回一个字符串格式的数组。Python多处理 - 跟踪pool.map操作的过程
我想运行 变化的输入参数值,超过10000个可能的输入值, 的仿真(功能),并将结果写入单个文件。
我正在使用多处理,特别是pool.map函数 并行运行模拟。
由于运行仿真功能10000次以上的整个过程需要很长时间,我真的很想跟踪整个操作过程。
我认为我当前代码中的问题是,pool.map运行10000次函数,在这些操作过程中没有任何进程跟踪。一旦并行处理完成10000次仿真(可能需要数小时至数天),那么当10000个仿真结果保存到文件时,我会继续跟踪。因此,这并不是真正跟踪pool.map操作的处理。
是否有一个容易修复我的代码,将允许进程跟踪?
def simFunction(input):
# Does some simulation and outputs simResult
return str(simResult)
# Parallel processing
inputs = np.arange(0,10000,1)
if __name__ == "__main__":
numCores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = numCores)
t = pool.map(simFunction, inputs)
with open('results.txt','w') as out:
print("Starting to simulate " + str(len(inputs)) + " input values...")
counter = 0
for i in t:
out.write(i + '\n')
counter = counter + 1
if counter%100==0:
print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
print('Finished!!!!')
如果使用迭代的map
函数,可以很容易地跟踪进度。您也可以使用异步map
。在这里,我会做一些不同的事情,只是混淆。
>>> import time
>>> res = Pool().amap(simFunction, x,y)
>>> while not res.ready():
... print "waiting..."
... time.sleep(5)
...
waiting...
waiting...
waiting...
waiting...
>>> res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
请注意,我使用pathos.multiprocessing
代替multiprocessing
。这只是multiprocessing
的一个分支,它使您能够利用多个输入来执行map
函数,具有更好的序列化,并允许您在任何地方(而不仅仅是在__main__
)执行map
调用。您也可以使用multiprocessing
来完成上述操作,但代码会略有不同。
迭代或异步map
将使您能够编写任何代码,以便执行更好的过程跟踪。例如,将一个唯一的“id”传递给每个作业,并观察哪些回来,或者让每个作业返回它的进程ID。有很多方法可以跟踪进度和流程...但上面的内容应该会为您提供一个开始。
你可以pathos
这里:https://github.com/uqfoundation
没有“简单修复”。 map
是关于隐藏你的实现细节。在这种情况下,你想要细节。就是说,根据定义,事情变得更加复杂一些。你需要改变通信范式。有很多方法可以这样做。
一个是:创建一个队列来收集您的结果,并让您的工作人员将结果放入此队列中。然后,您可以在监视线程或进程内查看队列,并在进入时使用结果。在使用时,您可以分析它们并生成日志输出。这可能是跟踪进度的最常用方式:您可以实时以任何方式响应传入结果。
更简单的方法可能是稍微修改您的辅助函数,并在那里生成日志输出。通过使用外部工具仔细分析日志输出(例如grep
和wc
),您可以想出很简单的方法来跟踪。
谢谢。你能提供一些简单的例子吗? – user32147 2015-02-06 22:53:50
我想你需要的是一个日志文件。
我建议您使用日志记录模块,它是Python标准库的一部分。但不幸的是日志记录不是多处理安全的。所以你不能在你的应用程序中使用它。
因此,您将需要使用多处理安全的日志处理程序,或者使用Queue或实现您的模块或日志模块。
在Stackoverflow中有很多关于此的讨论。这比如:How should I log while using multiprocessing in Python?
如果大多数CPU的负载是在模拟功能,你不打算使用日志轮换,你也许可以用一个简单的锁定机制是这样的:
import multiprocessing
import logging
from random import random
import time
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(process)s %(levelname)s %(message)s',
filename='results.log',
filemode='a'
)
def simulation(a):
# logging
with multiprocessing.Lock():
logging.debug("Simulating with %s" % a)
# simulation
time.sleep(random())
result = a*2
# logging
with multiprocessing.Lock():
logging.debug("Finished simulation with %s. Result is %s" % (a, result))
return result
if __name__ == '__main__':
logging.debug("Starting the simulation")
inputs = [x for x in xrange(100)]
num_cores = multiprocessing.cpu_count()
print "num_cores: %d" % num_cores
pool = multiprocessing.Pool(processes=num_cores)
t = pool.map(simulation, inputs)
logging.debug("The simulation has ended")
你可以运行时“tail -f”您的日志文件。这是你应该看到的:
2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5
在Windows和Linux上试过。
希望这会有所帮助
'multiprocessing.get_logger()'返回受锁保护的特性受限记录器,请参阅https://docs.python.org/2/library/multiprocessing.html#logging – 2015-02-09 00:07:36
是的,但是这是模块记录器...所以你可以使用它,你的日志将与模块级消息混合在一起:尝试它,你会看到这样的消息:2015-02-08 23:47:10,954 9288 DEBUG创建带句柄的semlock 448 – 2015-02-09 02:48:34
哦,你对,我从来没有真正使用过它,并且太快地浏览了文档。 – 2015-02-09 13:04:49
非常感谢你! – user32147 2015-02-26 17:08:45