Fix KeyboardInterrupt handling logic.

When uvloop is run in the main thread we *always* want to set up a
self-pipe and a signal wakeup FD.  That's the only way how libuv
can be notified that a ^C happened and break away from selecting
on sockets.

asyncio does not need to do that, as the 'selectors' module it uses
is already aware of the way Python implements ^C handling.

This translates to a slightly different behavior between asyncio &
uvloop:

1. uvloop needs to always call signal.set_wakeup_fd() when run in the
  main thread;

2. asyncio only needs to call signal.set_wakeup_fd() when a user
  registers a signal handler.

(2) means that if the user had not set up any signals, the signal
wakeup FD stays the same between different asyncio runs.  This commit
fixes uvloop signal implementation to make sure that uvloop behaves
the same way as asyncio in regards to signal wakeup FD between the
loop runs.  It also ensures that uvloop always have a proper
self-pipe set up so that ^C is always supported when it is run in
the main thread.

Issue #295.
This commit is contained in:
Yury Selivanov 2019-10-28 18:07:27 -04:00
parent 6476aad6fd
commit c32c7039cd
5 changed files with 146 additions and 95 deletions

View File

@ -36,7 +36,7 @@ async def foo():
return 42
def main():
asyncio.set_event_loop(uvloop.new_event_loop())
uvloop.install()
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(foo())

View File

@ -74,36 +74,33 @@ class TestIssue39Regr(tb.UVTestCase):
raise FailedTestError
def run_test(self):
try:
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
target=run_server,
args=(qin, qout),
daemon=True).start()
else:
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
multiprocessing.Process(
target=run_server,
args=(qin, qout),
daemon=True).start()
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
target=run_server,
args=(qin, qout),
daemon=True).start()
else:
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
multiprocessing.Process(
target=run_server,
args=(qin, qout),
daemon=True).start()
addr = qout.get()
loop = self.new_loop()
asyncio.set_event_loop(loop)
loop.create_task(
loop.create_connection(
lambda: EchoClientProtocol(loop),
host=addr[0], port=addr[1]))
loop.run_forever()
loop.close()
qin.put('stop')
qout.get()
finally:
loop.close()
addr = qout.get()
loop = self.new_loop()
asyncio.set_event_loop(loop)
loop.create_task(
loop.create_connection(
lambda: EchoClientProtocol(loop),
host=addr[0], port=addr[1]))
loop.run_forever()
loop.close()
qin.put('stop')
qout.get()
@unittest.skipIf(
multiprocessing.get_start_method(False) == 'spawn',

View File

@ -117,6 +117,45 @@ async def worker():
loop = """ + self.NEW_LOOP + """
asyncio.set_event_loop(loop)
loop.create_task(worker())
try:
loop.run_forever()
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.close()
"""
proc = await asyncio.create_subprocess_exec(
sys.executable, b'-W', b'ignore', b'-c', PROG,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
await proc.stdout.readline()
time.sleep(DELAY)
proc.send_signal(signal.SIGINT)
out, err = await proc.communicate()
self.assertIn(b'KeyboardInterrupt', err)
self.loop.run_until_complete(runner())
@tb.silence_long_exec_warning()
def test_signals_sigint_uvcode_two_loop_runs(self):
async def runner():
PROG = R"""\
import asyncio
import uvloop
srv = None
async def worker():
global srv
cb = lambda *args: None
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
loop = """ + self.NEW_LOOP + """
asyncio.set_event_loop(loop)
loop.run_until_complete(worker())
print('READY', flush=True)
try:
loop.run_forever()
finally:

View File

@ -65,6 +65,7 @@ cdef class Loop:
object _ssock
object _csock
bint _listening_signals
int _old_signal_wakeup_id
set _timers
dict _polls
@ -149,6 +150,8 @@ cdef class Loop:
cdef void _handle_exception(self, object ex)
cdef inline _is_main_thread(self)
cdef inline _new_future(self)
cdef inline _check_signal(self, sig)
cdef inline _check_closed(self)
@ -186,10 +189,9 @@ cdef class Loop:
cdef _sock_set_reuseport(self, int fd)
cdef _setup_signals(self)
cdef _setup_or_resume_signals(self)
cdef _shutdown_signals(self)
cdef _recv_signals_start(self)
cdef _recv_signals_stop(self)
cdef _pause_signals(self)
cdef _handle_signal(self, sig)
cdef _read_from_self(self)

View File

@ -167,6 +167,7 @@ cdef class Loop:
self._ssock = self._csock = None
self._signal_handlers = {}
self._listening_signals = False
self._old_signal_wakeup_id = -1
self._coroutine_debug_set = False
@ -183,6 +184,9 @@ cdef class Loop:
self._servers = set()
cdef inline _is_main_thread(self):
return MAIN_THREAD_ID == PyThread_get_thread_ident()
def __init__(self):
self.set_debug((not sys_ignore_environment
and bool(os_environ.get('PYTHONASYNCIODEBUG'))))
@ -241,34 +245,40 @@ cdef class Loop:
self._debug_exception_handler_cnt = 0
cdef _setup_signals(self):
cdef int old_wakeup_fd
cdef _setup_or_resume_signals(self):
if not self._is_main_thread():
return
if self._listening_signals:
return
raise RuntimeError('signals handling has been already setup')
if self._ssock is not None:
raise RuntimeError('self-pipe exists before loop run')
# Create a self-pipe and call set_signal_wakeup_fd() with one
# of its ends. This is needed so that libuv knows that it needs
# to wakeup on ^C (no matter if the SIGINT handler is still the
# standard Python's one or or user set their own.)
self._ssock, self._csock = socket_socketpair()
self._ssock.setblocking(False)
self._csock.setblocking(False)
try:
old_wakeup_fd = _set_signal_wakeup_fd(self._csock.fileno())
except (OSError, ValueError):
# Not the main thread
self._ssock.setblocking(False)
self._csock.setblocking(False)
fileno = self._csock.fileno()
self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno)
except Exception:
# Out of all statements in the try block, only the
# "_set_signal_wakeup_fd()" call can fail, but it shouldn't,
# as we ensure that the current thread is the main thread.
# Still, if something goes horribly wrong we want to clean up
# the socket pair.
self._ssock.close()
self._csock.close()
self._ssock = self._csock = None
return
self._listening_signals = True
return old_wakeup_fd
cdef _recv_signals_start(self):
cdef object old_wakeup_fd = None
if self._ssock is None:
old_wakeup_fd = self._setup_signals()
if self._ssock is None:
# Not the main thread.
return
self._ssock = None
self._csock = None
raise
self._add_reader(
self._ssock,
@ -277,30 +287,24 @@ cdef class Loop:
"Loop._read_from_self",
<method_t>self._read_from_self,
self))
return old_wakeup_fd
cdef _recv_signals_stop(self):
if self._ssock is None:
return
self._listening_signals = True
self._remove_reader(self._ssock)
cdef _shutdown_signals(self):
if not self._listening_signals:
return
for sig in list(self._signal_handlers):
self.remove_signal_handler(sig)
cdef _pause_signals(self):
if not self._is_main_thread():
if self._listening_signals:
raise RuntimeError(
'cannot pause signals handling; no longer running in '
'the main thread')
else:
return
if not self._listening_signals:
# `remove_signal_handler` will call `_shutdown_signals` when
# removing last signal handler.
return
raise RuntimeError('signals handling has not been setup')
try:
signal_set_wakeup_fd(-1)
except (ValueError, OSError) as exc:
aio_logger.info('set_wakeup_fd(-1) failed: %s', exc)
self._listening_signals = False
_set_signal_wakeup_fd(self._old_signal_wakeup_id)
self._remove_reader(self._ssock)
self._ssock.close()
@ -308,7 +312,24 @@ cdef class Loop:
self._ssock = None
self._csock = None
self._listening_signals = False
cdef _shutdown_signals(self):
if not self._is_main_thread():
if self._signal_handlers:
aio_logger.warning(
'cannot cleanup signal handlers: closing the event loop '
'in a non-main OS thread')
return
if self._listening_signals:
raise RuntimeError(
'cannot shutdown signals handling as it has not been paused')
if self._ssock:
raise RuntimeError(
'self-pipe was not cleaned up after loop was run')
for sig in list(self._signal_handlers):
self.remove_signal_handler(sig)
def __sighandler(self, signum, frame):
self._signals.add(signum)
@ -451,7 +472,6 @@ cdef class Loop:
cdef _run(self, uv.uv_run_mode mode):
cdef int err
cdef object old_wakeup_fd
if self._closed == 1:
raise RuntimeError('unable to start the loop; it was closed')
@ -474,7 +494,7 @@ cdef class Loop:
self.handler_check__exec_writes.start()
self.handler_idle.start()
old_wakeup_fd = self._recv_signals_start()
self._setup_or_resume_signals()
if aio_set_running_loop is not None:
aio_set_running_loop(self)
@ -484,13 +504,11 @@ cdef class Loop:
if aio_set_running_loop is not None:
aio_set_running_loop(None)
self._recv_signals_stop()
if old_wakeup_fd is not None:
signal_set_wakeup_fd(old_wakeup_fd)
self.handler_check__exec_writes.stop()
self.handler_idle.stop()
self._pause_signals()
self._thread_is_main = 0
self._thread_id = 0
self._running = 0
@ -2794,10 +2812,10 @@ cdef class Loop:
cdef:
Handle h
if not self._listening_signals:
self._setup_signals()
if not self._listening_signals:
raise ValueError('set_wakeup_fd only works in main thread')
if not self._is_main_thread():
raise ValueError(
'add_signal_handler() can only be called from '
'the main thread')
if (aio_iscoroutine(callback)
or aio_iscoroutinefunction(callback)):
@ -2829,14 +2847,6 @@ cdef class Loop:
self._check_signal(sig)
self._check_closed()
try:
# set_wakeup_fd() raises ValueError if this is not the
# main thread. By calling it early we ensure that an
# event loop running in another thread cannot add a signal
# handler.
_set_signal_wakeup_fd(self._csock.fileno())
except (ValueError, OSError) as exc:
raise RuntimeError(str(exc))
h = new_Handle(self, callback, args or None, None)
self._signal_handlers[sig] = h
@ -2866,6 +2876,12 @@ cdef class Loop:
Return True if a signal handler was removed, False if not.
"""
if not self._is_main_thread():
raise ValueError(
'remove_signal_handler() can only be called from '
'the main thread')
self._check_signal(sig)
if not self._listening_signals:
@ -2889,9 +2905,6 @@ cdef class Loop:
else:
raise
if not self._signal_handlers:
self._shutdown_signals()
return True
@cython.iterable_coroutine