使用asyncio事件循环运行tornado.testing.AsyncTestCase
问题描述:
我有一个基于asyncio的类,我想单元测试。使用tornado.testing.AsyncTestCase
这个工作很好,很容易。但是,我的课程的一个特定方法使用asyncio.ensure_future
来安排另一种方法的执行。这永远不会在AsyncTestCase
中完成,因为默认测试运行器使用龙卷风KQueueIOLoop
事件循环,而不是asyncio事件循环。使用asyncio事件循环运行tornado.testing.AsyncTestCase
class TestSubject:
def foo(self):
asyncio.ensure_future(self.bar())
async def bar(self):
pass
class TestSubjectTest(AsyncTestCase):
def test_foo(self):
t = TestSubject()
# here be somewhat involved setup with MagicMock and self.stop
t.foo()
self.wait()
$ python -m tornado.testing baz.testsubject_test
...
[E 160627 17:48:22 testing:731] FAIL
[E 160627 17:48:22 base_events:1090] Task was destroyed but it is pending!
task: <Task pending coro=<TestSubject.bar() running at ...>>
.../asyncio/base_events.py:362: RuntimeWarning: coroutine 'TestSubject.bar' was never awaited
如何使用不同的事件循环运行,以确保我的任务测试将实际执行?或者,我如何使我的实现事件独立于循环并交叉兼容?
答
可谓是非常简单......
class TestSubjectTest(AsyncTestCase):
def get_new_ioloop(self): # override this method
return tornado.platform.asyncio.AsyncIOMainLoop()
我之前尝试此,而是直接返回asyncio.get_event_loop()
,没有工作。返回Tornado的asyncio循环包装器就可以做到这一点。