如何更改移动中的多处理池工作人员数

问题描述:

我想更改当前使用的池中的工作人员数。 我现在的想法是如何更改移动中的多处理池工作人员数

while True: 
    current_connection_number = get_connection_number() 
    forced_break = False 
    with mp.Pool(current_connection_number) as p: 
     for data in p.imap_unordered(fun, some_infinite_generator): 
      yield data 
      if current_connection_number != get_connection_number(): 
       forced_break = True 
       break 
    if not forced_break: 
     break 

的问题是,它只是终止工等从some_infinite_generator得到了这一点,并没有尚未处理都失去了最后的项目。有没有一些标准的方式来做到这一点?

编辑:我已经尝试打印some_infinite_generator里面,事实证明,p.imap_unordered请求1565项目只有2池工作人员甚至在处理任何事情之前,我如何限制从发电机请求的项目数量?如果我使用上面的代码并在2个项目后更改连接数,我将丢失1563个项目

问题是Pool会在单独的线程内部使用生成器。你无法控制该逻辑。

你可以做什么,正在向Pool.imap_unordered方法提供一部分生成器,并在根据可用连接进行缩放之前消耗该部分。

CHUNKSIZE = 100 

while True: 
    current_connection_number = get_connection_number() 
    with mp.Pool(current_connection_number) as p: 
     while current_connection_number == get_connection_number(): 
      for data in p.imap_unordered(fun, grouper(CHUNKSIZE, some_infinite_generator)): 
       yield data 

def grouper(n, iterable): 
    it = iter(iterable) 
    while True: 
     chunk = tuple(itertools.islice(it, n)) 
     if not chunk: 
      return 
     yield chunk 

这有点不太理想的比例发生的每块而不是每次迭代中却带着几分的CHUNKSIZE值的微调,你可以很容易地得到它的权利。

grouper recipe