为什么协程不能与run_in_executor一起使用?

为什么协程不能与run_in_executor一起使用?

问题描述:

我想运行一个服务,使用协程和多线程来请求URL。但是我不能把协程去传给执行者的工作人员。请参阅下面的代码针对此问题的一个小例子:为什么协程不能与run_in_executor一起使用?

import time 
import asyncio 
import concurrent.futures 

EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5) 

async def async_request(loop): 
    await asyncio.sleep(3) 

def sync_request(_): 
    time.sleep(3) 

async def main(loop): 
    futures = [loop.run_in_executor(EXECUTOR, async_request,loop) 
       for x in range(10)] 

    await asyncio.wait(futures) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main(loop)) 

下面的错误而导致:

Traceback (most recent call last): 
    File "co_test.py", line 17, in <module> 
    loop.run_until_complete(main(loop)) 
    File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete 
    return future.result() 
    File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result 
    raise self._exception 
    File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step 
    result = coro.send(None) 
    File "co_test.py", line 10, in main 
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)] 
    File "co_test.py", line 10, in <listcomp> 
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)] 
    File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor 
    raise TypeError("coroutines cannot be used with run_in_executor()") 
TypeError: coroutines cannot be used with run_in_executor() 

我知道,我可以使用,而不是async_requestsync_request funcion,在这种情况下我会通过将阻塞函数发送到另一个线程来进行协程。

我也知道我可以在事件循环中调用async_request十次。就像下面的代码:

loop = asyncio.get_event_loop() 
futures = [async_request(loop) for i in range(10)] 
loop.run_until_complete(asyncio.wait(futures)) 

但在这种情况下,我会使用一个单一的线程。

我怎么能使用这两种方案,在多线程内工作的协程?正如你通过代码所看到的那样,我将(不使用)pool传递给async_request,希望我可以编写一些告诉工作者创造未来的东西,将其发送给池并异步(释放工作者)等待为结果。

我想这样做的原因是为了使应用程序可扩展。这是不必要的一步吗?我应该只是每个网址有一个线程,就是这样吗?例如:

LEN = len(list_of_urls) 
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN) 

够好吗?

+2

我真的不知道为什么你要做到这一点。实际上,你将为每个协程运行一个独特的循环,这会破坏使用事件循环的全部目的。你应该坚持asyncio,坚持线程,或者,如果你觉得由于某种原因这些都不够用,请尝试多处理。 – dirn

你必须创建和设置线程上下文的新事件循环来运行协程:

import asyncio 
from concurrent.futures import ThreadPoolExecutor 


def run(corofn, *args): 
    loop = asyncio.new_event_loop() 
    try: 
     coro = corofn(*args) 
     asyncio.set_event_loop(loop) 
     return loop.run_until_complete(coro) 
    finally: 
     loop.close() 


async def main(): 
    loop = asyncio.get_event_loop() 
    executor = ThreadPoolExecutor(max_workers=5) 
    futures = [ 
     loop.run_in_executor(executor, run, asyncio.sleep, 1, x) 
     for x in range(10)] 
    print(await asyncio.gather(*futures)) 
    # Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 
+0

我不认为这个回复做我想要的。运行此操作仍需要2秒钟。我期待所有的线程共享相同的事件池。也许我不清楚我的问题。 – zeh

+2

@zeh事件循环意味着线程特定,因为'asyncio'是关于协作式多任务处理(与抢先式多任务相反,就像线程模型一样)。循环负责在不同任务之间切换上下文,以便一次只运行其中一个任务。使用线程会挫败这个目的。 – Vincent

+0

我认为每个线程应该有一个循环,其中可以执行多个协程,如下所示: 'futures = [asyncio.run_coroutine_threadsafe(corofn(t,ix),loops [i])' 我是线程号码。也许我们可以为此得到一个工作示例。 这里还有一个有趣的话题:https://stackoverflow.com/questions/32059732/send-asyncio-tasks-to-loop-running-in-other-thread –