如何以非阻塞的方式链接期货?也就是说,如何在不阻塞的情况下将未来用作未来的输入?

如何以非阻塞的方式链接期货?也就是说,如何在不阻塞的情况下将未来用作未来的输入?

问题描述:

使用下面的示例,一旦future1完成(不阻止future3被提交),future2如何使用future1的结果?如何以非阻塞的方式链接期货?也就是说,如何在不阻塞的情况下将未来用作未来的输入?

from concurrent.futures import ProcessPoolExecutor 
import time 

def wait(seconds): 
    time.sleep(seconds) 
    return seconds 

pool = ProcessPoolExecutor() 

s = time.time() 
future1 = pool.submit(wait, 5) 
future2 = pool.submit(wait, future1.result()) 
future3 = pool.submit(wait, 10) 

time_taken = time.time() - s 
print(time_taken) 

这是可以通过仔细制定回调提交第一个完成后的第二个操作。令人遗憾的是,不可能将任意的未来传递给pool.submit,因此需要采取额外措施将两种期货结合在一起。

这里是一个可能的实现:

import concurrent.futures 

def copy_future_state(source, destination): 
    if source.cancelled(): 
     destination.cancel() 
    if not destination.set_running_or_notify_cancel(): 
     return 
    exception = source.exception() 
    if exception is not None: 
     destination.set_exception(exception) 
    else: 
     result = source.result() 
     destination.set_result(result) 


def chain(pool, future, fn): 
    result = concurrent.futures.Future() 

    def callback(_): 
     try: 
      temp = pool.submit(fn, future.result()) 
      copy = lambda _: copy_future_state(temp, result) 
      temp.add_done_callback(copy) 
     except: 
      result.cancel() 
      raise 

    future.add_done_callback(callback) 
    return result 

注意copy_future_stateasyncio.futures._set_concurrent_future_state略加修改。

用法:

from concurrent.futures import ProcessPoolExecutor 

def wait(seconds): 
    time.sleep(seconds) 
    return seconds 

pool = ProcessPoolExecutor() 
future1 = pool.submit(wait, 5) 
future2 = chain(pool, future1, wait) 
future3 = pool.submit(wait, 10)