python multiprocessing TypeError:'int'对象不可迭代
以下代码错误偶尔出现一次。如果我只启动一个进程,它工作正常。但我不断增加进程的数量,可能是11,并开始抛出一个错误。python multiprocessing TypeError:'int'对象不可迭代
try:
num_workers = int(sys.argv[1])
except:
num_workers = 1
someval = 10
def do_work(in_queue,x):
i = 0
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
if i > 0 :
work.put(i,)
# work.put(i)
return
else:
print "value from work " + line.rstrip('\n')
i = i + 1
if __name__ == "__main__":
manager = Manager()
work = manager.Queue(num_workers)
someval = 20
print " Number of workers is " + str(num_workers)
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work,someval))
p.start()
pool.append(p)
with open("/home/jay/scripts/a.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
x = 0
for p in pool:
p.join()
文件/home/jay/scripts/a.txt有10行。
如果我做
./x.py 7
Number of workers is 7
value from work 1
value from work 2
value from work 3
value from work 4
value from work 5
value from work 6
value from work 7
value from work 8
value from work 9
value from work 10
x is 0
all done
./x.py 11
Number of workers is 11
value from work 1
value from work 2
value from work 3
value from work 4
value from work 5
value from work 6
value from work 7
value from work 8
value from work 9
value from work 10
Process Process-11:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "./x.py", line 18, in do_work
line_no, line = item
TypeError: 'int' object is not iterable
x is 0
all done
的违规行为do_work
work.put(i,)
你把int
到队列和int
被读取,而另一个工人解开。
另外我同意dano使用multiprocessing.Pool更容易和更短。
if __name__ == "__main__":
pool = multiprocessing.Pool(num_workers)
with open("/home/jay/scripts/a.txt") as f:
mapped = pool.map(do_work, f)
如果您需要i
从工人只返回它,它会被存储在mapped
我的真实文件很大 - 大于100 GB。根据另一个线程,“地图将在开始工作之前一次性使用您的文件。”因此,我决定采取这种方法。 http://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes-in-python – Jayadevan
的问题是,work.put(1,)
没有做什么,你认为它。您打算将1元组(1,)
放入队列中,但实际上只是将1
放入队列中。如果您将该行更改为work.put((1,))
,您会看到您期望的行为。
有与num_workers
较大值,让你的子过程的在主处理循环完成装载Queue
了与(None,)
定点值前添加1
到队列中的一个竞争条件。对于较小的值num_workers
,您可以在任何工作进程将1
添加到队列之前通过for循环。
此外,您是否考虑过使用multiprocessing.Pool
,而不是使用Process
和Queue
手动创建Pool
?它会相当简化你的代码。
在第18行之前加上'print(repr(item))'这样你就可以知道这个值是什么 –