为什么协程不能与run_in_executor一起使用?
我想运行一个服务,使用协程和多线程来请求URL。但是我不能把协程去传给执行者的工作人员。请参阅下面的代码针对此问题的一个小例子:为什么协程不能与run_in_executor一起使用?
import time
import asyncio
import concurrent.futures
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5)
async def async_request(loop):
await asyncio.sleep(3)
def sync_request(_):
time.sleep(3)
async def main(loop):
futures = [loop.run_in_executor(EXECUTOR, async_request,loop)
for x in range(10)]
await asyncio.wait(futures)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
下面的错误而导致:
Traceback (most recent call last):
File "co_test.py", line 17, in <module>
loop.run_until_complete(main(loop))
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "co_test.py", line 10, in main
futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
File "co_test.py", line 10, in <listcomp>
futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor
raise TypeError("coroutines cannot be used with run_in_executor()")
TypeError: coroutines cannot be used with run_in_executor()
我知道,我可以使用,而不是async_request
sync_request
funcion,在这种情况下我会通过将阻塞函数发送到另一个线程来进行协程。
我也知道我可以在事件循环中调用async_request
十次。就像下面的代码:
loop = asyncio.get_event_loop()
futures = [async_request(loop) for i in range(10)]
loop.run_until_complete(asyncio.wait(futures))
但在这种情况下,我会使用一个单一的线程。
我怎么能使用这两种方案,在多线程内工作的协程?正如你通过代码所看到的那样,我将(不使用)pool
传递给async_request
,希望我可以编写一些告诉工作者创造未来的东西,将其发送给池并异步(释放工作者)等待为结果。
我想这样做的原因是为了使应用程序可扩展。这是不必要的一步吗?我应该只是每个网址有一个线程,就是这样吗?例如:
LEN = len(list_of_urls)
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN)
够好吗?
你必须创建和设置线程上下文的新事件循环来运行协程:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def run(corofn, *args):
loop = asyncio.new_event_loop()
try:
coro = corofn(*args)
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
loop.close()
async def main():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=5)
futures = [
loop.run_in_executor(executor, run, asyncio.sleep, 1, x)
for x in range(10)]
print(await asyncio.gather(*futures))
# Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我不认为这个回复做我想要的。运行此操作仍需要2秒钟。我期待所有的线程共享相同的事件池。也许我不清楚我的问题。 – zeh
@zeh事件循环意味着线程特定,因为'asyncio'是关于协作式多任务处理(与抢先式多任务相反,就像线程模型一样)。循环负责在不同任务之间切换上下文,以便一次只运行其中一个任务。使用线程会挫败这个目的。 – Vincent
我认为每个线程应该有一个循环,其中可以执行多个协程,如下所示: 'futures = [asyncio.run_coroutine_threadsafe(corofn(t,ix),loops [i])' 我是线程号码。也许我们可以为此得到一个工作示例。 这里还有一个有趣的话题:https://stackoverflow.com/questions/32059732/send-asyncio-tasks-to-loop-running-in-other-thread –
我真的不知道为什么你要做到这一点。实际上,你将为每个协程运行一个独特的循环,这会破坏使用事件循环的全部目的。你应该坚持asyncio,坚持线程,或者,如果你觉得由于某种原因这些都不够用,请尝试多处理。 – dirn