如何更改移动中的多处理池工作人员数
问题描述:
我想更改当前使用的池中的工作人员数。 我现在的想法是如何更改移动中的多处理池工作人员数
while True:
current_connection_number = get_connection_number()
forced_break = False
with mp.Pool(current_connection_number) as p:
for data in p.imap_unordered(fun, some_infinite_generator):
yield data
if current_connection_number != get_connection_number():
forced_break = True
break
if not forced_break:
break
的问题是,它只是终止工等从some_infinite_generator得到了这一点,并没有尚未处理都失去了最后的项目。有没有一些标准的方式来做到这一点?
编辑:我已经尝试打印some_infinite_generator里面,事实证明,p.imap_unordered请求1565项目只有2池工作人员甚至在处理任何事情之前,我如何限制从发电机请求的项目数量?如果我使用上面的代码并在2个项目后更改连接数,我将丢失1563个项目
答
问题是Pool
会在单独的线程内部使用生成器。你无法控制该逻辑。
你可以做什么,正在向Pool.imap_unordered
方法提供一部分生成器,并在根据可用连接进行缩放之前消耗该部分。
CHUNKSIZE = 100
while True:
current_connection_number = get_connection_number()
with mp.Pool(current_connection_number) as p:
while current_connection_number == get_connection_number():
for data in p.imap_unordered(fun, grouper(CHUNKSIZE, some_infinite_generator)):
yield data
def grouper(n, iterable):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk
这有点不太理想的比例发生的每块而不是每次迭代中却带着几分的CHUNKSIZE
值的微调,你可以很容易地得到它的权利。