2016-05-20 19:37:23 +00:00
|
|
|
import asyncio
|
|
|
|
|
|
|
|
from uvloop._testbase import UVTestCase
|
|
|
|
|
|
|
|
|
|
|
|
class TestCythonIntegration(UVTestCase):
|
|
|
|
|
|
|
|
def test_cython_coro_is_coroutine(self):
|
|
|
|
from uvloop.loop import _test_coroutine_1
|
|
|
|
from asyncio.coroutines import _format_coroutine
|
|
|
|
|
|
|
|
coro = _test_coroutine_1()
|
|
|
|
|
2021-08-06 23:53:04 +00:00
|
|
|
coro_fmt = _format_coroutine(coro)
|
2018-05-22 22:29:57 +00:00
|
|
|
self.assertTrue(
|
2021-08-06 23:53:04 +00:00
|
|
|
coro_fmt.startswith('_test_coroutine_1() done')
|
|
|
|
or coro_fmt.startswith('_test_coroutine_1() running')
|
|
|
|
)
|
2016-05-20 19:37:23 +00:00
|
|
|
self.assertEqual(_test_coroutine_1.__qualname__, '_test_coroutine_1')
|
|
|
|
self.assertEqual(_test_coroutine_1.__name__, '_test_coroutine_1')
|
|
|
|
self.assertTrue(asyncio.iscoroutine(coro))
|
2019-10-23 21:58:46 +00:00
|
|
|
fut = asyncio.ensure_future(coro)
|
2016-05-20 19:37:23 +00:00
|
|
|
self.assertTrue(isinstance(fut, asyncio.Future))
|
|
|
|
self.assertTrue(isinstance(fut, asyncio.Task))
|
|
|
|
fut.cancel()
|
|
|
|
|
|
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
|
|
self.loop.run_until_complete(fut)
|
|
|
|
|
2018-05-22 22:29:57 +00:00
|
|
|
try:
|
|
|
|
_format_coroutine(coro) # This line checks against Cython segfault
|
|
|
|
except TypeError:
|
|
|
|
# TODO: Fix Cython to not reset __name__/__qualname__ to None
|
|
|
|
pass
|
2016-05-20 19:37:23 +00:00
|
|
|
coro.close()
|