Python进程在IO完成之前终止
问题描述:
我正在使用python多进程库来处理一组进程中的信息。这些过程还包含进一步划分必须完成的工作量的过程。有一个Manager.Queue积累了所有消耗数据的进程的结果。Python进程在IO完成之前终止
在python脚本的主线程中。我试图使用连接来阻塞主线程,直到我们可以合理确定所有子进程是否完成,然后将输出写入单个文件。但是,在所有数据写入文件之前,系统会终止并关闭文件。
以下代码是上述解决方案实现的简化提取。 用于inQueues队列: queue.join()
for p in processes:
p.join()
print "At the end output has: " + str(out_queue.qsize()) + " records"
with open("results.csv", "w") as out_file:
out_file.write("Algorithm,result\n")
while not out_queue.empty():
res = out_queue.get()
out_file.write(res['algorithm'] + ","+res['result']+"\n")
out_queue.task_done()
time.sleep(0.05)
out_queue.join()
out_file.close()
的out_queue.qsize()将打印过量的500个记录可用,但是只有100将被打印到该文件。 同样在这一点上,如果500条记录是系统生成的总数,我不能100%确定,但只是此时报告的数字。
如何确保将所有结果写入results.csv文件?
答
不要等到所有的进程完成你消耗的数据之前,但处理在同一时间的数据,并记住其进程仍在运行:
processes = []
"""start processes and append them to processes"""
while True:
try:
# get an item
item = queue.get(True, 0.5)
except Queue.Empty:
# no item received in half a second
if not processes:
# there are no more processes and nothing left to process
break
else:
proc_num = 0
while proc_num < len(processes):
process = processes[proc_num]
exit_code = process.poll()
if exit_code is None:
# process is still running, proceed to next
proc_num += 1
elif exit_code == 0:
# process ended gracefully, remove it from list
processes.pop(proc_num)
else:
# process ended with an error, what now?
raise Exception('Her last words were: "%r"' % exit_code)
else:
# got an item
"""process item"""
,不测试,如果processes
是空的外Queue.Empty
或您将有races。
,不过也许你会更开心一higher level function:
pool = multiprocessing.Pool(8)
items = pool.map_async(producer_function, producer_arguments)
for item in items:
"""process item"""
[QSIZE()](http://bugs.python.org/issue17985):“返回队列的近似大小由于。多线程/多处理语义, 是不可靠的。“ – kay
我知道,由qsize方法指示的队列大小可能会发生变化,但代码段是整个程序中从队列中删除的唯一部分,因此预计打印的记录数不会小于队列的大小(这是当前发生的)。 – kyleED