diff --git a/tests/test_base.py b/tests/test_base.py index 0cb8b3b..6677a61 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -347,19 +347,16 @@ class _TestBase: if self.implementation == 'asyncio' and sys.version_info < (3, 6, 2): raise unittest.SkipTest('unfixed asyncio') - class ShowStopper(BaseException): - pass - async def foo(delay): await asyncio.sleep(delay) def throw(): - raise ShowStopper + raise KeyboardInterrupt self.loop.call_soon(throw) try: self.loop.run_until_complete(foo(0.1)) - except ShowStopper: + except KeyboardInterrupt: pass # This call fails if run_until_complete does not clean up diff --git a/tests/test_executors.py b/tests/test_executors.py index 90d3075..c793bb9 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -1,5 +1,7 @@ import asyncio import concurrent.futures +import multiprocessing +import unittest from uvloop import _testbase as tb @@ -26,6 +28,9 @@ class _TestExecutors: fib10 = [fib(i) for i in range(10)] self.loop.run_until_complete(run()) + @unittest.skipIf( + multiprocessing.get_start_method(False) == 'spawn', + 'no need to test on macOS where spawn is used instead of fork') def test_executors_process_pool_01(self): self.run_pool_test(concurrent.futures.ProcessPoolExecutor) diff --git a/tests/test_regr1.py b/tests/test_regr1.py index b576474..8c8d557 100644 --- a/tests/test_regr1.py +++ b/tests/test_regr1.py @@ -3,6 +3,9 @@ import queue import multiprocessing import signal import threading +import unittest + +import uvloop from uvloop import _testbase as tb @@ -32,35 +35,40 @@ class FailedTestError(BaseException): pass +def run_server(quin, qout): + server_loop = None + + def server_thread(): + nonlocal server_loop + loop = server_loop = uvloop.new_event_loop() + asyncio.set_event_loop(loop) + coro = loop.create_server(EchoServerProtocol, '127.0.0.1', 0) + server = loop.run_until_complete(coro) + addr = server.sockets[0].getsockname() + qout.put(addr) + loop.run_forever() + server.close() + loop.run_until_complete(server.wait_closed()) + try: + loop.close() + except Exception as exc: + print(exc) + qout.put('stopped') + + thread = threading.Thread(target=server_thread, daemon=True) + thread.start() + + quin.get() + server_loop.call_soon_threadsafe(server_loop.stop) + thread.join(1) + + class TestIssue39Regr(tb.UVTestCase): """See https://github.com/MagicStack/uvloop/issues/39 for details. Original code to reproduce the bug is by Jim Fulton. """ - def run_server(self, quin, qout): - def server_thread(): - loop = self.server_loop = self.new_loop() - coro = loop.create_server(EchoServerProtocol, '127.0.0.1', 0) - server = loop.run_until_complete(coro) - addr = server.sockets[0].getsockname() - qout.put(addr) - loop.run_forever() - server.close() - loop.run_until_complete(server.wait_closed()) - try: - loop.close() - except Exception as exc: - print(exc) - qout.put('stopped') - - thread = threading.Thread(target=server_thread, daemon=True) - thread.start() - - quin.get() - self.server_loop.call_soon_threadsafe(self.server_loop.stop) - thread.join(1) - def on_alarm(self, sig, fr): if self.running: raise FailedTestError @@ -72,19 +80,20 @@ class TestIssue39Regr(tb.UVTestCase): if threaded: qin, qout = queue.Queue(), queue.Queue() threading.Thread( - target=self.run_server, + target=run_server, args=(qin, qout), daemon=True).start() else: qin = multiprocessing.Queue() qout = multiprocessing.Queue() multiprocessing.Process( - target=self.run_server, + target=run_server, args=(qin, qout), daemon=True).start() addr = qout.get() loop = self.new_loop() + asyncio.set_event_loop(loop) loop.create_task( loop.create_connection( lambda: EchoClientProtocol(loop), @@ -96,6 +105,9 @@ class TestIssue39Regr(tb.UVTestCase): finally: loop.close() + @unittest.skipIf( + multiprocessing.get_start_method(False) == 'spawn', + 'no need to test on macOS where spawn is used instead of fork') def test_issue39_regression(self): signal.signal(signal.SIGALRM, self.on_alarm) signal.alarm(5) diff --git a/tests/test_signals.py b/tests/test_signals.py index bea37f4..5c1185d 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -3,6 +3,7 @@ import signal import subprocess import sys import time +import unittest import uvloop from uvloop import _testbase as tb @@ -280,6 +281,9 @@ class Test_UV_Signals(_TestSignal, tb.UVTestCase): self.loop.add_signal_handler(signal.SIGCHLD, lambda *a: None) + @unittest.skipIf(sys.version_info[:3] >= (3, 8, 0), + 'in 3.8 a ThreadedChildWatcher is used ' + '(does not rely on SIGCHLD)') def test_asyncio_add_watcher_SIGCHLD_nop(self): asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) asyncio.get_event_loop_policy().get_child_watcher() diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 925f740..6a8a63f 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -25,6 +25,10 @@ class _TestSockets: # See https://github.com/python/asyncio/pull/366 for details. raise unittest.SkipTest() + if sys.version_info[:3] >= (3, 8, 0): + # @asyncio.coroutine is deprecated in 3.8 + raise unittest.SkipTest() + def srv_gen(sock): sock.send(b'helo') data = sock.recv_all(4 * _SIZE) @@ -190,6 +194,11 @@ class _TestSockets: self.loop.run_until_complete(asyncio.sleep(0.01)) def test_sock_cancel_add_reader_race(self): + if self.is_asyncio_loop() and sys.version_info[:3] == (3, 8, 0): + # asyncio 3.8.0 seems to have a regression; + # tracked in https://bugs.python.org/issue30064 + raise unittest.SkipTest() + srv_sock_conn = None async def server(): @@ -236,6 +245,11 @@ class _TestSockets: self.loop.run_until_complete(server()) def test_sock_send_before_cancel(self): + if self.is_asyncio_loop() and sys.version_info[:3] == (3, 8, 0): + # asyncio 3.8.0 seems to have a regression; + # tracked in https://bugs.python.org/issue30064 + raise unittest.SkipTest() + srv_sock_conn = None async def server(): diff --git a/tests/test_tasks.py b/tests/test_tasks.py deleted file mode 100644 index 54f3a52..0000000 --- a/tests/test_tasks.py +++ /dev/null @@ -1,386 +0,0 @@ -# LICENSE: PSF. - -import asyncio - -from uvloop import _testbase as tb - - -class Dummy: - - def __repr__(self): - return '' - - def __call__(self, *args): - pass - - -def format_coroutine(qualname, state, src, source_traceback, generator=False): - if generator: - state = '%s' % state - else: - state = '%s, defined' % state - if source_traceback is not None: - frame = source_traceback[-1] - return ('coro=<%s() %s at %s> created at %s:%s' - % (qualname, state, src, frame[0], frame[1])) - else: - return 'coro=<%s() %s at %s>' % (qualname, state, src) - - -try: - all_tasks = asyncio.all_tasks -except AttributeError: - all_tasks = asyncio.Task.all_tasks - - -try: - current_task = asyncio.current_task -except AttributeError: - current_task = asyncio.Task.current_task - - -class _TestTasks: - - def test_task_basics(self): - @asyncio.coroutine - def outer(): - a = yield from inner1() - b = yield from inner2() - return a + b - - @asyncio.coroutine - def inner1(): - return 42 - - @asyncio.coroutine - def inner2(): - return 1000 - - t = outer() - self.assertEqual(self.loop.run_until_complete(t), 1042) - - def test_task_cancel_yield(self): - @asyncio.coroutine - def task(): - while True: - yield - return 12 - - t = self.create_task(task()) - tb.run_briefly(self.loop) # start coro - t.cancel() - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, t) - self.assertTrue(t.done()) - self.assertTrue(t.cancelled()) - self.assertFalse(t.cancel()) - - def test_task_cancel_inner_future(self): - f = self.create_future() - - @asyncio.coroutine - def task(): - yield from f - return 12 - - t = self.create_task(task()) - tb.run_briefly(self.loop) # start task - f.cancel() - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - self.assertTrue(f.cancelled()) - self.assertTrue(t.cancelled()) - - def test_task_cancel_both_task_and_inner_future(self): - f = self.create_future() - - @asyncio.coroutine - def task(): - yield from f - return 12 - - t = self.create_task(task()) - self.assertEqual(all_tasks(), {t}) - tb.run_briefly(self.loop) - - f.cancel() - t.cancel() - - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - - self.assertTrue(t.done()) - self.assertTrue(f.cancelled()) - self.assertTrue(t.cancelled()) - - def test_task_cancel_task_catching(self): - fut1 = self.create_future() - fut2 = self.create_future() - - @asyncio.coroutine - def task(): - yield from fut1 - try: - yield from fut2 - except asyncio.CancelledError: - return 42 - - t = self.create_task(task()) - tb.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut1) # White-box test. - fut1.set_result(None) - tb.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut2) # White-box test. - t.cancel() - self.assertTrue(fut2.cancelled()) - res = self.loop.run_until_complete(t) - self.assertEqual(res, 42) - self.assertFalse(t.cancelled()) - - def test_task_cancel_task_ignoring(self): - fut1 = self.create_future() - fut2 = self.create_future() - fut3 = self.create_future() - - @asyncio.coroutine - def task(): - yield from fut1 - try: - yield from fut2 - except asyncio.CancelledError: - pass - res = yield from fut3 - return res - - t = self.create_task(task()) - tb.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut1) # White-box test. - fut1.set_result(None) - tb.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut2) # White-box test. - t.cancel() - self.assertTrue(fut2.cancelled()) - tb.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut3) # White-box test. - fut3.set_result(42) - res = self.loop.run_until_complete(t) - self.assertEqual(res, 42) - self.assertFalse(fut3.cancelled()) - self.assertFalse(t.cancelled()) - - def test_task_cancel_current_task(self): - @asyncio.coroutine - def task(): - t.cancel() - self.assertTrue(t._must_cancel) # White-box test. - # The sleep should be canceled immediately. - yield from asyncio.sleep(100) - return 12 - - t = self.create_task(task()) - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, t) - self.assertTrue(t.done()) - self.assertFalse(t._must_cancel) # White-box test. - self.assertFalse(t.cancel()) - - def test_task_step_with_baseexception(self): - @asyncio.coroutine - def notmutch(): - raise BaseException() - - task = self.create_task(notmutch()) - with self.assertRaises(BaseException): - tb.run_briefly(self.loop) - - self.assertTrue(task.done()) - self.assertIsInstance(task.exception(), BaseException) - - def test_task_step_result_future(self): - # If coroutine returns future, task waits on this future. - - class Fut(asyncio.Future): - def __init__(self, *args, **kwds): - self.cb_added = False - super().__init__(*args, **kwds) - - def add_done_callback(self, fn, context=None): - self.cb_added = True - super().add_done_callback(fn) - - fut = Fut() - result = None - - @asyncio.coroutine - def wait_for_future(): - nonlocal result - result = yield from fut - - t = self.create_task(wait_for_future()) - tb.run_briefly(self.loop) - self.assertTrue(fut.cb_added) - - res = object() - fut.set_result(res) - tb.run_briefly(self.loop) - self.assertIs(res, result) - self.assertTrue(t.done()) - self.assertIsNone(t.result()) - - def test_task_step_result(self): - @asyncio.coroutine - def notmuch(): - yield None - yield 1 - return 'ko' - - self.assertRaises( - RuntimeError, self.loop.run_until_complete, notmuch()) - - def test_task_yield_vs_yield_from(self): - fut = asyncio.Future() - - @asyncio.coroutine - def wait_for_future(): - yield fut - - task = wait_for_future() - with self.assertRaises(RuntimeError): - self.loop.run_until_complete(task) - - self.assertFalse(fut.done()) - - def test_task_current_task(self): - self.assertIsNone(current_task()) - - @asyncio.coroutine - def coro(loop): - self.assertTrue(current_task(loop=loop) is task) - - task = self.create_task(coro(self.loop)) - self.loop.run_until_complete(task) - self.assertIsNone(current_task()) - - def test_task_current_task_with_interleaving_tasks(self): - self.assertIsNone(current_task()) - - fut1 = self.create_future() - fut2 = self.create_future() - - @asyncio.coroutine - def coro1(loop): - self.assertTrue(current_task(loop=loop) is task1) - yield from fut1 - self.assertTrue(current_task(loop=loop) is task1) - fut2.set_result(True) - - @asyncio.coroutine - def coro2(loop): - self.assertTrue(current_task(loop=loop) is task2) - fut1.set_result(True) - yield from fut2 - self.assertTrue(current_task(loop=loop) is task2) - - task1 = self.create_task(coro1(self.loop)) - task2 = self.create_task(coro2(self.loop)) - - self.loop.run_until_complete(asyncio.wait((task1, task2), - )) - self.assertIsNone(current_task()) - - def test_task_yield_future_passes_cancel(self): - # Canceling outer() cancels inner() cancels waiter. - proof = 0 - waiter = self.create_future() - - @asyncio.coroutine - def inner(): - nonlocal proof - try: - yield from waiter - except asyncio.CancelledError: - proof += 1 - raise - else: - self.fail('got past sleep() in inner()') - - @asyncio.coroutine - def outer(): - nonlocal proof - try: - yield from inner() - except asyncio.CancelledError: - proof += 100 # Expect this path. - else: - proof += 10 - - f = asyncio.ensure_future(outer()) - tb.run_briefly(self.loop) - f.cancel() - self.loop.run_until_complete(f) - self.assertEqual(proof, 101) - self.assertTrue(waiter.cancelled()) - - def test_task_yield_wait_does_not_shield_cancel(self): - # Canceling outer() makes wait() return early, leaves inner() - # running. - proof = 0 - waiter = self.create_future() - - @asyncio.coroutine - def inner(): - nonlocal proof - yield from waiter - proof += 1 - - @asyncio.coroutine - def outer(): - nonlocal proof - d, p = yield from asyncio.wait([inner()]) - proof += 100 - - f = asyncio.ensure_future(outer()) - tb.run_briefly(self.loop) - f.cancel() - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, f) - waiter.set_result(None) - tb.run_briefly(self.loop) - self.assertEqual(proof, 1) - - -############################################################################### -# Tests Matrix -############################################################################### - - -class Test_UV_UV_Tasks(_TestTasks, tb.UVTestCase): - def create_future(self): - return self.loop.create_future() - - def create_task(self, coro): - return self.loop.create_task(coro) - - -class Test_UV_UV_Tasks_AIO_Future(_TestTasks, tb.UVTestCase): - def create_future(self): - return asyncio.Future() - - def create_task(self, coro): - return self.loop.create_task(coro) - - -class Test_UV_AIO_Tasks(_TestTasks, tb.UVTestCase): - def create_future(self): - return asyncio.Future() - - def create_task(self, coro): - return asyncio.Task(coro) - - -class Test_AIO_Tasks(_TestTasks, tb.AIOTestCase): - def create_future(self): - return asyncio.Future() - - def create_task(self, coro): - return asyncio.Task(coro) diff --git a/tests/test_tcp.py b/tests/test_tcp.py index 2654e9c..5b40bb2 100644 --- a/tests/test_tcp.py +++ b/tests/test_tcp.py @@ -418,6 +418,7 @@ class _TestTCP: sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)) writer.close() + await self.wait_closed(writer) self._test_create_connection_1(client) @@ -440,6 +441,7 @@ class _TestTCP: sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)) writer.close() + await self.wait_closed(writer) self._test_create_connection_1(client) @@ -486,6 +488,8 @@ class _TestTCP: async def client(): reader, writer = await asyncio.open_connection(*addr) + writer.close() + await self.wait_closed(writer) async def runner(): with self.assertRaises(ConnectionRefusedError): @@ -511,6 +515,7 @@ class _TestTCP: await reader.readexactly(10) writer.close() + await self.wait_closed(writer) nonlocal CNT CNT += 1 @@ -538,6 +543,8 @@ class _TestTCP: async def client(): reader, writer = await asyncio.open_connection(sock=sock) + writer.close() + await self.wait_closed(writer) async def runner(): with self.assertRaisesRegex(OSError, 'Bad file'): @@ -605,6 +612,7 @@ class _TestTCP: self.assertEqual(data, b'OK') writer.close() + await self.wait_closed(writer) async def start_server(): nonlocal CNT @@ -996,6 +1004,7 @@ class Test_UV_TCP(_TestTCP, tb.UVTestCase): w.close() await fut + await self.wait_closed(w) srv.close() await srv.wait_closed() @@ -1004,6 +1013,8 @@ class Test_UV_TCP(_TestTCP, tb.UVTestCase): self.loop.run_until_complete(run()) + @unittest.skipIf(sys.version_info[:3] >= (3, 8, 0), + "3.8 has a different method of GCing unclosed streams") def test_tcp_handle_unclosed_gc(self): fut = self.loop.create_future() @@ -1385,6 +1396,7 @@ class _TestSSL(tb.SSLTestCase): CNT += 1 writer.close() + await self.wait_closed(writer) async def client_sock(addr): sock = socket.socket() @@ -1404,6 +1416,7 @@ class _TestSSL(tb.SSLTestCase): CNT += 1 writer.close() + await self.wait_closed(writer) sock.close() def run(coro): @@ -1450,6 +1463,8 @@ class _TestSSL(tb.SSLTestCase): ssl=client_sslctx, server_hostname='', ssl_handshake_timeout=1.0) + writer.close() + await self.wait_closed(writer) with self.tcp_server(server, max_clients=1, @@ -1488,6 +1503,8 @@ class _TestSSL(tb.SSLTestCase): ssl=client_sslctx, server_hostname='', ssl_handshake_timeout=1.0) + writer.close() + await self.wait_closed(writer) with self.tcp_server(server, max_clients=1, @@ -1663,6 +1680,10 @@ class _TestSSL(tb.SSLTestCase): with self.assertRaises(ssl.SSLError): await reader.readline() writer.close() + try: + await self.wait_closed(writer) + except ssl.SSLError: + pass return 'OK' with self.tcp_server(server, @@ -2272,6 +2293,7 @@ class _TestSSL(tb.SSLTestCase): CNT += 1 writer.close() + await self.wait_closed(writer) async def client_sock(addr): sock = socket.socket() @@ -2291,6 +2313,7 @@ class _TestSSL(tb.SSLTestCase): CNT += 1 writer.close() + await self.wait_closed(writer) sock.close() def run(coro): @@ -2460,6 +2483,7 @@ class _TestSSL(tb.SSLTestCase): CNT += 1 writer.close() + await self.wait_closed(writer) def run(coro): nonlocal CNT @@ -2534,6 +2558,9 @@ class _TestSSL(tb.SSLTestCase): await future + writer.close() + await self.wait_closed(writer) + def run(meth): def wrapper(sock): try: @@ -2631,6 +2658,7 @@ class _TestSSL(tb.SSLTestCase): for _ in range(SIZE): writer.write(b'x' * CHUNK) writer.close() + await self.wait_closed(writer) try: data = await reader.read() self.assertEqual(data, b'') @@ -2749,6 +2777,9 @@ class _TestSSL(tb.SSLTestCase): await future + writer.close() + await self.wait_closed(writer) + def run(meth): def wrapper(sock): try: diff --git a/tests/test_unix.py b/tests/test_unix.py index 55f31e1..be7252f 100644 --- a/tests/test_unix.py +++ b/tests/test_unix.py @@ -29,6 +29,7 @@ class _TestUnix: await writer.drain() writer.close() + await self.wait_closed(writer) CNT += 1 @@ -192,6 +193,7 @@ class _TestUnix: self.assertEqual(await reader.readexactly(4), b'SPAM') writer.close() + await self.wait_closed(writer) self._test_create_unix_connection_1(client) @@ -208,6 +210,7 @@ class _TestUnix: self.assertEqual(await reader.readexactly(4), b'SPAM') writer.close() + await self.wait_closed(writer) self._test_create_unix_connection_1(client) @@ -224,6 +227,7 @@ class _TestUnix: self.assertEqual(await reader.readexactly(4), b'SPAM') writer.close() + await self.wait_closed(writer) self._test_create_unix_connection_1(client) @@ -271,6 +275,8 @@ class _TestUnix: async def client(): reader, writer = await asyncio.open_unix_connection(path) + writer.close() + await self.wait_closed(writer) async def runner(): with self.assertRaises(FileNotFoundError): @@ -299,6 +305,7 @@ class _TestUnix: await reader.readexactly(10) writer.close() + await self.wait_closed(writer) nonlocal CNT CNT += 1 @@ -326,6 +333,8 @@ class _TestUnix: async def client(): reader, writer = await asyncio.open_unix_connection(sock=sock) + writer.close() + await self.wait_closed(writer) async def runner(): with self.assertRaisesRegex(OSError, 'Bad file'): @@ -666,6 +675,7 @@ class _TestSSL(tb.SSLTestCase): CNT += 1 writer.close() + await self.wait_closed(writer) def run(coro): nonlocal CNT diff --git a/uvloop/_testbase.py b/uvloop/_testbase.py index adb9da1..b706be5 100644 --- a/uvloop/_testbase.py +++ b/uvloop/_testbase.py @@ -70,6 +70,15 @@ class BaseTestCase(unittest.TestCase, metaclass=BaseTestCaseMeta): def mock_pattern(self, str): return MockPattern(str) + async def wait_closed(self, obj): + if not isinstance(obj, asyncio.StreamWriter): + return + if sys.version_info >= (3, 7, 0): + try: + await obj.wait_closed() + except (BrokenPipeError, ConnectionError): + pass + def has_start_serving(self): return not (self.is_asyncio_loop() and sys.version_info[:2] in [(3, 5), (3, 6)]) @@ -78,7 +87,7 @@ class BaseTestCase(unittest.TestCase, metaclass=BaseTestCaseMeta): return type(self.loop).__module__.startswith('asyncio.') def run_loop_briefly(self, *, delay=0.01): - self.loop.run_until_complete(asyncio.sleep(delay, loop=self.loop)) + self.loop.run_until_complete(asyncio.sleep(delay)) def loop_exception_handler(self, loop, context): self.__unhandled_exceptions.append(context) @@ -530,7 +539,7 @@ def run_until(loop, pred, timeout=30): timeout = deadline - time.time() if timeout <= 0: raise asyncio.futures.TimeoutError() - loop.run_until_complete(asyncio.tasks.sleep(0.001, loop=loop)) + loop.run_until_complete(asyncio.tasks.sleep(0.001)) @contextlib.contextmanager