有没有办法对ASYNCIO事件循环
我想利用事件循环监视任何数据插入我的asyncio.Queue手动切换(你可以在这里找到https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py它的源代码),但我碰到一些问题。这是下面的代码:有没有办法对ASYNCIO事件循环
import asyncio
import threading
async def recv(q):
while True:
msg = await q.get()
print(msg)
async def checking_task():
while True:
await asyncio.sleep(0.1)
def loop_in_thread(loop,q):
asyncio.set_event_loop(loop)
asyncio.ensure_future(recv(q))
asyncio.ensure_future(insert(q))
# asyncio.ensure_future(checking_task()) comment this out, and it will work as intended
loop.run_forever()
async def insert(q):
print('invoked')
await q.put('hello')
q = asyncio.Queue()
loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop, q,))
t.start()
程序已经开始,我们可以看到以下结果
invoked
hello
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>}
但现在,如果我们手动添加数据到q
使用q.put_nowait('test')
,我们会得到以下结果:
q.put_nowait('test') # a non-async way to add data into queue
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
wait_for=<Future finished result=None>>}
正如你可以看到,未来已经结束,但我们还没有打印出新添加的字符串'test'
。换句话说,msg = await q.get()
仍在等待,即使将来q.get相关的()完成,并没有运行其他任务。这让我困惑,因为官方文档(https://docs.python.org/3/library/asyncio-task.html)中,它说
结果=等待将来或结果=从未来的收益率 - 暂停协程,直到将来完成,然后返回未来的结果
看来,即使未来完成后,我们还需要在其他异步功能某种await
使事件循环继续处理任务。
我发现了一个解决此问题的,这是添加checking_task()
,并且还添加协程到事件循环;那么它将按预期工作。
但加入checking_task()协同程序对CPU非常昂贵的,因为它只是运行一个while循环。我想知道是否有一些手动方式让我们触发这个await
事件,而不使用异步功能。例如,像神奇的东西
q.put_nowait('test')
loop.ok_you_can_start_running_other_pending_tasks()
帮助将不胜感激!谢谢。
所以我结束了使用
loop.call_soon_threadsafe(q.put_nowait, 'test')
,并如预期它会奏效。在弄清楚之后,我搜索了一些有关的信息。原来这个帖子(Scheduling an asyncio coroutine from another thread)有同样的问题。而@ KFX的答案也将工作,这是
loop.call_soon_threadsafe(loop.create_task, q.put('test'))
通知asyncio.Queue.put()是一个协程,但asyncio.Queue.put_nowait()是一个正常功能。