diff --git a/tests/test_dealloc.py b/tests/test_dealloc.py index 3ae436e..0b9b2f6 100644 --- a/tests/test_dealloc.py +++ b/tests/test_dealloc.py @@ -36,7 +36,7 @@ async def foo(): return 42 def main(): - asyncio.set_event_loop(uvloop.new_event_loop()) + uvloop.install() loop = asyncio.get_event_loop() loop.set_debug(True) loop.run_until_complete(foo()) diff --git a/tests/test_regr1.py b/tests/test_regr1.py index 8c8d557..c502457 100644 --- a/tests/test_regr1.py +++ b/tests/test_regr1.py @@ -74,36 +74,33 @@ class TestIssue39Regr(tb.UVTestCase): raise FailedTestError def run_test(self): - try: - for i in range(10): - for threaded in [True, False]: - if threaded: - qin, qout = queue.Queue(), queue.Queue() - threading.Thread( - target=run_server, - args=(qin, qout), - daemon=True).start() - else: - qin = multiprocessing.Queue() - qout = multiprocessing.Queue() - multiprocessing.Process( - target=run_server, - args=(qin, qout), - daemon=True).start() + for i in range(10): + for threaded in [True, False]: + if threaded: + qin, qout = queue.Queue(), queue.Queue() + threading.Thread( + target=run_server, + args=(qin, qout), + daemon=True).start() + else: + qin = multiprocessing.Queue() + qout = multiprocessing.Queue() + multiprocessing.Process( + target=run_server, + args=(qin, qout), + daemon=True).start() - addr = qout.get() - loop = self.new_loop() - asyncio.set_event_loop(loop) - loop.create_task( - loop.create_connection( - lambda: EchoClientProtocol(loop), - host=addr[0], port=addr[1])) - loop.run_forever() - loop.close() - qin.put('stop') - qout.get() - finally: - loop.close() + addr = qout.get() + loop = self.new_loop() + asyncio.set_event_loop(loop) + loop.create_task( + loop.create_connection( + lambda: EchoClientProtocol(loop), + host=addr[0], port=addr[1])) + loop.run_forever() + loop.close() + qin.put('stop') + qout.get() @unittest.skipIf( multiprocessing.get_start_method(False) == 'spawn', diff --git a/tests/test_signals.py b/tests/test_signals.py index 96879cb..e51f569 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -117,6 +117,45 @@ async def worker(): loop = """ + self.NEW_LOOP + """ asyncio.set_event_loop(loop) loop.create_task(worker()) +try: + loop.run_forever() +finally: + srv.close() + loop.run_until_complete(srv.wait_closed()) + loop.close() +""" + + proc = await asyncio.create_subprocess_exec( + sys.executable, b'-W', b'ignore', b'-c', PROG, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + await proc.stdout.readline() + time.sleep(DELAY) + proc.send_signal(signal.SIGINT) + out, err = await proc.communicate() + self.assertIn(b'KeyboardInterrupt', err) + + self.loop.run_until_complete(runner()) + + @tb.silence_long_exec_warning() + def test_signals_sigint_uvcode_two_loop_runs(self): + async def runner(): + PROG = R"""\ +import asyncio +import uvloop + +srv = None + +async def worker(): + global srv + cb = lambda *args: None + srv = await asyncio.start_server(cb, '127.0.0.1', 0) + +loop = """ + self.NEW_LOOP + """ +asyncio.set_event_loop(loop) +loop.run_until_complete(worker()) +print('READY', flush=True) try: loop.run_forever() finally: diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index 765e48e..f36d1e2 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -65,6 +65,7 @@ cdef class Loop: object _ssock object _csock bint _listening_signals + int _old_signal_wakeup_id set _timers dict _polls @@ -149,6 +150,8 @@ cdef class Loop: cdef void _handle_exception(self, object ex) + cdef inline _is_main_thread(self) + cdef inline _new_future(self) cdef inline _check_signal(self, sig) cdef inline _check_closed(self) @@ -186,10 +189,9 @@ cdef class Loop: cdef _sock_set_reuseport(self, int fd) - cdef _setup_signals(self) + cdef _setup_or_resume_signals(self) cdef _shutdown_signals(self) - cdef _recv_signals_start(self) - cdef _recv_signals_stop(self) + cdef _pause_signals(self) cdef _handle_signal(self, sig) cdef _read_from_self(self) diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index fe79165..c865fed 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -167,6 +167,7 @@ cdef class Loop: self._ssock = self._csock = None self._signal_handlers = {} self._listening_signals = False + self._old_signal_wakeup_id = -1 self._coroutine_debug_set = False @@ -183,6 +184,9 @@ cdef class Loop: self._servers = set() + cdef inline _is_main_thread(self): + return MAIN_THREAD_ID == PyThread_get_thread_ident() + def __init__(self): self.set_debug((not sys_ignore_environment and bool(os_environ.get('PYTHONASYNCIODEBUG')))) @@ -241,34 +245,40 @@ cdef class Loop: self._debug_exception_handler_cnt = 0 - cdef _setup_signals(self): - cdef int old_wakeup_fd + cdef _setup_or_resume_signals(self): + if not self._is_main_thread(): + return if self._listening_signals: - return + raise RuntimeError('signals handling has been already setup') + + if self._ssock is not None: + raise RuntimeError('self-pipe exists before loop run') + + # Create a self-pipe and call set_signal_wakeup_fd() with one + # of its ends. This is needed so that libuv knows that it needs + # to wakeup on ^C (no matter if the SIGINT handler is still the + # standard Python's one or or user set their own.) self._ssock, self._csock = socket_socketpair() - self._ssock.setblocking(False) - self._csock.setblocking(False) try: - old_wakeup_fd = _set_signal_wakeup_fd(self._csock.fileno()) - except (OSError, ValueError): - # Not the main thread + self._ssock.setblocking(False) + self._csock.setblocking(False) + + fileno = self._csock.fileno() + + self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno) + except Exception: + # Out of all statements in the try block, only the + # "_set_signal_wakeup_fd()" call can fail, but it shouldn't, + # as we ensure that the current thread is the main thread. + # Still, if something goes horribly wrong we want to clean up + # the socket pair. self._ssock.close() self._csock.close() - self._ssock = self._csock = None - return - - self._listening_signals = True - return old_wakeup_fd - - cdef _recv_signals_start(self): - cdef object old_wakeup_fd = None - if self._ssock is None: - old_wakeup_fd = self._setup_signals() - if self._ssock is None: - # Not the main thread. - return + self._ssock = None + self._csock = None + raise self._add_reader( self._ssock, @@ -277,30 +287,24 @@ cdef class Loop: "Loop._read_from_self", self._read_from_self, self)) - return old_wakeup_fd - cdef _recv_signals_stop(self): - if self._ssock is None: - return + self._listening_signals = True - self._remove_reader(self._ssock) - - cdef _shutdown_signals(self): - if not self._listening_signals: - return - - for sig in list(self._signal_handlers): - self.remove_signal_handler(sig) + cdef _pause_signals(self): + if not self._is_main_thread(): + if self._listening_signals: + raise RuntimeError( + 'cannot pause signals handling; no longer running in ' + 'the main thread') + else: + return if not self._listening_signals: - # `remove_signal_handler` will call `_shutdown_signals` when - # removing last signal handler. - return + raise RuntimeError('signals handling has not been setup') - try: - signal_set_wakeup_fd(-1) - except (ValueError, OSError) as exc: - aio_logger.info('set_wakeup_fd(-1) failed: %s', exc) + self._listening_signals = False + + _set_signal_wakeup_fd(self._old_signal_wakeup_id) self._remove_reader(self._ssock) self._ssock.close() @@ -308,7 +312,24 @@ cdef class Loop: self._ssock = None self._csock = None - self._listening_signals = False + cdef _shutdown_signals(self): + if not self._is_main_thread(): + if self._signal_handlers: + aio_logger.warning( + 'cannot cleanup signal handlers: closing the event loop ' + 'in a non-main OS thread') + return + + if self._listening_signals: + raise RuntimeError( + 'cannot shutdown signals handling as it has not been paused') + + if self._ssock: + raise RuntimeError( + 'self-pipe was not cleaned up after loop was run') + + for sig in list(self._signal_handlers): + self.remove_signal_handler(sig) def __sighandler(self, signum, frame): self._signals.add(signum) @@ -451,7 +472,6 @@ cdef class Loop: cdef _run(self, uv.uv_run_mode mode): cdef int err - cdef object old_wakeup_fd if self._closed == 1: raise RuntimeError('unable to start the loop; it was closed') @@ -474,7 +494,7 @@ cdef class Loop: self.handler_check__exec_writes.start() self.handler_idle.start() - old_wakeup_fd = self._recv_signals_start() + self._setup_or_resume_signals() if aio_set_running_loop is not None: aio_set_running_loop(self) @@ -484,13 +504,11 @@ cdef class Loop: if aio_set_running_loop is not None: aio_set_running_loop(None) - self._recv_signals_stop() - if old_wakeup_fd is not None: - signal_set_wakeup_fd(old_wakeup_fd) - self.handler_check__exec_writes.stop() self.handler_idle.stop() + self._pause_signals() + self._thread_is_main = 0 self._thread_id = 0 self._running = 0 @@ -2794,10 +2812,10 @@ cdef class Loop: cdef: Handle h - if not self._listening_signals: - self._setup_signals() - if not self._listening_signals: - raise ValueError('set_wakeup_fd only works in main thread') + if not self._is_main_thread(): + raise ValueError( + 'add_signal_handler() can only be called from ' + 'the main thread') if (aio_iscoroutine(callback) or aio_iscoroutinefunction(callback)): @@ -2829,14 +2847,6 @@ cdef class Loop: self._check_signal(sig) self._check_closed() - try: - # set_wakeup_fd() raises ValueError if this is not the - # main thread. By calling it early we ensure that an - # event loop running in another thread cannot add a signal - # handler. - _set_signal_wakeup_fd(self._csock.fileno()) - except (ValueError, OSError) as exc: - raise RuntimeError(str(exc)) h = new_Handle(self, callback, args or None, None) self._signal_handlers[sig] = h @@ -2866,6 +2876,12 @@ cdef class Loop: Return True if a signal handler was removed, False if not. """ + + if not self._is_main_thread(): + raise ValueError( + 'remove_signal_handler() can only be called from ' + 'the main thread') + self._check_signal(sig) if not self._listening_signals: @@ -2889,9 +2905,6 @@ cdef class Loop: else: raise - if not self._signal_handlers: - self._shutdown_signals() - return True @cython.iterable_coroutine