diff --git a/appveyor.yml b/appveyor.yml index 273eb192..a6b6c6a0 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -54,7 +54,7 @@ environment: PYTHON_VERSION: "3.8.x" PYTHON_ARCH: "32" TOX_ENV: "py38" - TOX_ARGS: "tornado.test.websocket_test" + TOX_ARGS: "--fail-if-logs=false tornado.test.websocket_test" - PYTHON: "C:\\Python38-x64" PYTHON_VERSION: "3.8.x" diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index 633a35f9..012948b3 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -50,26 +50,27 @@ if typing.TYPE_CHECKING: _T = TypeVar("_T") -# Collection of sockets to write to at shutdown to wake up any selector threads. -_waker_sockets = set() # type: Set[socket.socket] +# Collection of selector thread event loops to shut down on exit. +_selector_loops = set() # type: Set[AddThreadSelectorEventLoop] def _atexit_callback() -> None: - for fd in _waker_sockets: + for loop in _selector_loops: + with loop._select_cond: + loop._closing_selector = True + loop._select_cond.notify() try: - fd.send(b"a") + loop._waker_w.send(b"a") except BlockingIOError: pass + # If we don't join our (daemon) thread here, we may get a deadlock + # during interpreter shutdown. I don't really understand why. This + # deadlock happens every time in CI (both travis and appveyor) but + # I've never been able to reproduce locally. + loop._thread.join() + _selector_loops.clear() -# atexit callbacks are run in LIFO order. Our callback must run before -# ThreadPoolExecutor's or it will deadlock (the pool's threads can't -# finish their work items until we write to their waker sockets). In -# recent versions of Python the thread pool atexit callback is -# registered in a getattr hook the first time TPE is *referenced* -# (instead of older versions of python where it was registered when -# concurrent.futures was imported). -concurrent.futures.ThreadPoolExecutor atexit.register(_atexit_callback) @@ -422,7 +423,10 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): # in this set are proxied through to the underlying loop. MY_ATTRIBUTES = { "_consume_waker", - "_executor", + "_select_cond", + "_select_args", + "_closing_selector", + "_thread", "_handle_event", "_readers", "_real_loop", @@ -447,10 +451,21 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: self._real_loop = real_loop - # Create our own executor to ensure we always have a thread - # available (we'll keep it 100% busy) instead of contending - # with the application for a thread in the default executor. - self._executor = concurrent.futures.ThreadPoolExecutor(1) + + # Create a thread to run the select system call. We manage this thread + # manually so we can trigger a clean shutdown from an atexit hook. Note + # that due to the order of operations at shutdown, only daemon threads + # can be shut down in this way (non-daemon threads would require the + # introduction of a new hook: https://bugs.python.org/issue41962) + self._select_cond = threading.Condition() + self._select_args = ( + None + ) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]] + self._closing_selector = False + self._thread = threading.Thread( + name="Tornado selector", daemon=True, target=self._run_select, + ) + self._thread.start() # Start the select loop once the loop is started. self._real_loop.call_soon(self._start_select) @@ -462,7 +477,7 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): self._waker_r, self._waker_w = socket.socketpair() self._waker_r.setblocking(False) self._waker_w.setblocking(False) - _waker_sockets.add(self._waker_w) + _selector_loops.add(self) self.add_reader(self._waker_r, self._consume_waker) def __del__(self) -> None: @@ -471,14 +486,17 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): # can get a clean shutdown notification. If we're just left to # be GC'd, we must explicitly close our sockets to avoid # logging warnings. - _waker_sockets.discard(self._waker_w) + _selector_loops.discard(self) self._waker_r.close() self._waker_w.close() def close(self) -> None: + with self._select_cond: + self._closing_selector = True + self._select_cond.notify() self._wake_selector() - self._executor.shutdown() - _waker_sockets.discard(self._waker_w) + self._thread.join() + _selector_loops.discard(self) self._waker_r.close() self._waker_w.close() self._real_loop.close() @@ -499,55 +517,64 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): # Capture reader and writer sets here in the event loop # thread to avoid any problems with concurrent # modification while the select loop uses them. - f = self.run_in_executor( - self._executor, - self._run_select, - list(self._readers.keys()), - list(self._writers.keys()), - ) - asyncio.ensure_future(f).add_done_callback(self._handle_select) + with self._select_cond: + assert self._select_args is None + self._select_args = (list(self._readers.keys()), list(self._writers.keys())) + self._select_cond.notify() - def _run_select( - self, to_read: List[int], to_write: List[int] - ) -> Tuple[List[int], List[int]]: - # We use the simpler interface of the select module instead of - # the more stateful interface in the selectors module because - # this class is only intended for use on windows, where - # select.select is the only option. The selector interface - # does not have well-documented thread-safety semantics that - # we can rely on so ensuring proper synchronization would be - # tricky. - try: - # On windows, selecting on a socket for write will not - # return the socket when there is an error (but selecting - # for reads works). Also select for errors when selecting - # for writes, and merge the results. - # - # This pattern is also used in - # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 - rs, ws, xs = select.select(to_read, to_write, to_write) - ws = ws + xs - except OSError as e: - # After remove_reader or remove_writer is called, the file - # descriptor may subsequently be closed on the event loop - # thread. It's possible that this select thread hasn't - # gotten into the select system call by the time that - # happens in which case (at least on macOS), select may - # raise a "bad file descriptor" error. If we get that - # error, check and see if we're also being woken up by - # polling the waker alone. If we are, just return to the - # event loop and we'll get the updated set of file - # descriptors on the next iteration. Otherwise, raise the - # original error. - if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): - rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) - if rs: - return rs, [] - raise - return rs, ws + def _run_select(self) -> None: + while True: + with self._select_cond: + while self._select_args is None and not self._closing_selector: + self._select_cond.wait() + if self._closing_selector: + return + assert self._select_args is not None + to_read, to_write = self._select_args + self._select_args = None - def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None: - rs, ws = f.result() + # We use the simpler interface of the select module instead of + # the more stateful interface in the selectors module because + # this class is only intended for use on windows, where + # select.select is the only option. The selector interface + # does not have well-documented thread-safety semantics that + # we can rely on so ensuring proper synchronization would be + # tricky. + try: + # On windows, selecting on a socket for write will not + # return the socket when there is an error (but selecting + # for reads works). Also select for errors when selecting + # for writes, and merge the results. + # + # This pattern is also used in + # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 + rs, ws, xs = select.select(to_read, to_write, to_write) + ws = ws + xs + except OSError as e: + # After remove_reader or remove_writer is called, the file + # descriptor may subsequently be closed on the event loop + # thread. It's possible that this select thread hasn't + # gotten into the select system call by the time that + # happens in which case (at least on macOS), select may + # raise a "bad file descriptor" error. If we get that + # error, check and see if we're also being woken up by + # polling the waker alone. If we are, just return to the + # event loop and we'll get the updated set of file + # descriptors on the next iteration. Otherwise, raise the + # original error. + if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): + rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) + if rs: + ws = [] + else: + raise + else: + raise + self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws) + + def _handle_select( + self, rs: List["_FileDescriptorLike"], ws: List["_FileDescriptorLike"] + ) -> None: for r in rs: self._handle_event(r, self._readers) for w in ws: diff --git a/tornado/test/httpclient_test.py b/tornado/test/httpclient_test.py index f87f972f..fd9a9786 100644 --- a/tornado/test/httpclient_test.py +++ b/tornado/test/httpclient_test.py @@ -836,6 +836,7 @@ class SyncHTTPClientSubprocessTest(unittest.TestCase): stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True, + timeout=5, ) if proc.stdout: print("STDOUT:")