From c938ddba666980c56d863a190430d04f8b1a363a Mon Sep 17 00:00:00 2001
From: Ben Darnell <ben@bendarnell.com>
Date: Mon, 5 Oct 2020 09:39:41 -0400
Subject: [PATCH 1/2] test: Add a timeout to SyncHTTPClient test

---
 tornado/test/httpclient_test.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tornado/test/httpclient_test.py b/tornado/test/httpclient_test.py
index f87f972f..fd9a9786 100644
--- a/tornado/test/httpclient_test.py
+++ b/tornado/test/httpclient_test.py
@@ -836,6 +836,7 @@ class SyncHTTPClientSubprocessTest(unittest.TestCase):
             stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT,
             check=True,
+            timeout=5,
         )
         if proc.stdout:
             print("STDOUT:")

From 15832bc423c33c9280564770046dd6918f3a31b4 Mon Sep 17 00:00:00 2001
From: Ben Darnell <ben@bendarnell.com>
Date: Sun, 11 Oct 2020 20:54:01 -0400
Subject: [PATCH 2/2] asyncio: Manage our own thread instead of an executor

Python 3.9 changed the behavior of ThreadPoolExecutor at interpreter
shutdown (after the already-tricky import-order issues around
atexit hooks). Avoid these issues by managing the thread by hand.
---
 appveyor.yml                |   2 +-
 tornado/platform/asyncio.py | 163 +++++++++++++++++++++---------------
 2 files changed, 96 insertions(+), 69 deletions(-)

diff --git a/appveyor.yml b/appveyor.yml
index 273eb192..a6b6c6a0 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -54,7 +54,7 @@ environment:
       PYTHON_VERSION: "3.8.x"
       PYTHON_ARCH: "32"
       TOX_ENV: "py38"
-      TOX_ARGS: "tornado.test.websocket_test"
+      TOX_ARGS: "--fail-if-logs=false tornado.test.websocket_test"
 
     - PYTHON: "C:\\Python38-x64"
       PYTHON_VERSION: "3.8.x"
diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py
index 633a35f9..012948b3 100644
--- a/tornado/platform/asyncio.py
+++ b/tornado/platform/asyncio.py
@@ -50,26 +50,27 @@ if typing.TYPE_CHECKING:
 _T = TypeVar("_T")
 
 
-# Collection of sockets to write to at shutdown to wake up any selector threads.
-_waker_sockets = set()  # type: Set[socket.socket]
+# Collection of selector thread event loops to shut down on exit.
+_selector_loops = set()  # type: Set[AddThreadSelectorEventLoop]
 
 
 def _atexit_callback() -> None:
-    for fd in _waker_sockets:
+    for loop in _selector_loops:
+        with loop._select_cond:
+            loop._closing_selector = True
+            loop._select_cond.notify()
         try:
-            fd.send(b"a")
+            loop._waker_w.send(b"a")
         except BlockingIOError:
             pass
+        # If we don't join our (daemon) thread here, we may get a deadlock
+        # during interpreter shutdown. I don't really understand why. This
+        # deadlock happens every time in CI (both travis and appveyor) but
+        # I've never been able to reproduce locally.
+        loop._thread.join()
+    _selector_loops.clear()
 
 
-# atexit callbacks are run in LIFO order. Our callback must run before
-# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
-# finish their work items until we write to their waker sockets). In
-# recent versions of Python the thread pool atexit callback is
-# registered in a getattr hook the first time TPE is *referenced*
-# (instead of older versions of python where it was registered when
-# concurrent.futures was imported).
-concurrent.futures.ThreadPoolExecutor
 atexit.register(_atexit_callback)
 
 
@@ -422,7 +423,10 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
     # in this set are proxied through to the underlying loop.
     MY_ATTRIBUTES = {
         "_consume_waker",
-        "_executor",
+        "_select_cond",
+        "_select_args",
+        "_closing_selector",
+        "_thread",
         "_handle_event",
         "_readers",
         "_real_loop",
@@ -447,10 +451,21 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
 
     def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
         self._real_loop = real_loop
-        # Create our own executor to ensure we always have a thread
-        # available (we'll keep it 100% busy) instead of contending
-        # with the application for a thread in the default executor.
-        self._executor = concurrent.futures.ThreadPoolExecutor(1)
+
+        # Create a thread to run the select system call. We manage this thread
+        # manually so we can trigger a clean shutdown from an atexit hook. Note
+        # that due to the order of operations at shutdown, only daemon threads
+        # can be shut down in this way (non-daemon threads would require the
+        # introduction of a new hook: https://bugs.python.org/issue41962)
+        self._select_cond = threading.Condition()
+        self._select_args = (
+            None
+        )  # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
+        self._closing_selector = False
+        self._thread = threading.Thread(
+            name="Tornado selector", daemon=True, target=self._run_select,
+        )
+        self._thread.start()
         # Start the select loop once the loop is started.
         self._real_loop.call_soon(self._start_select)
 
@@ -462,7 +477,7 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         self._waker_r, self._waker_w = socket.socketpair()
         self._waker_r.setblocking(False)
         self._waker_w.setblocking(False)
-        _waker_sockets.add(self._waker_w)
+        _selector_loops.add(self)
         self.add_reader(self._waker_r, self._consume_waker)
 
     def __del__(self) -> None:
@@ -471,14 +486,17 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         # can get a clean shutdown notification. If we're just left to
         # be GC'd, we must explicitly close our sockets to avoid
         # logging warnings.
-        _waker_sockets.discard(self._waker_w)
+        _selector_loops.discard(self)
         self._waker_r.close()
         self._waker_w.close()
 
     def close(self) -> None:
+        with self._select_cond:
+            self._closing_selector = True
+            self._select_cond.notify()
         self._wake_selector()
-        self._executor.shutdown()
-        _waker_sockets.discard(self._waker_w)
+        self._thread.join()
+        _selector_loops.discard(self)
         self._waker_r.close()
         self._waker_w.close()
         self._real_loop.close()
@@ -499,55 +517,64 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         # Capture reader and writer sets here in the event loop
         # thread to avoid any problems with concurrent
         # modification while the select loop uses them.
-        f = self.run_in_executor(
-            self._executor,
-            self._run_select,
-            list(self._readers.keys()),
-            list(self._writers.keys()),
-        )
-        asyncio.ensure_future(f).add_done_callback(self._handle_select)
+        with self._select_cond:
+            assert self._select_args is None
+            self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
+            self._select_cond.notify()
 
-    def _run_select(
-        self, to_read: List[int], to_write: List[int]
-    ) -> Tuple[List[int], List[int]]:
-        # We use the simpler interface of the select module instead of
-        # the more stateful interface in the selectors module because
-        # this class is only intended for use on windows, where
-        # select.select is the only option. The selector interface
-        # does not have well-documented thread-safety semantics that
-        # we can rely on so ensuring proper synchronization would be
-        # tricky.
-        try:
-            # On windows, selecting on a socket for write will not
-            # return the socket when there is an error (but selecting
-            # for reads works). Also select for errors when selecting
-            # for writes, and merge the results.
-            #
-            # This pattern is also used in
-            # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
-            rs, ws, xs = select.select(to_read, to_write, to_write)
-            ws = ws + xs
-        except OSError as e:
-            # After remove_reader or remove_writer is called, the file
-            # descriptor may subsequently be closed on the event loop
-            # thread. It's possible that this select thread hasn't
-            # gotten into the select system call by the time that
-            # happens in which case (at least on macOS), select may
-            # raise a "bad file descriptor" error. If we get that
-            # error, check and see if we're also being woken up by
-            # polling the waker alone. If we are, just return to the
-            # event loop and we'll get the updated set of file
-            # descriptors on the next iteration. Otherwise, raise the
-            # original error.
-            if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
-                rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
-                if rs:
-                    return rs, []
-            raise
-        return rs, ws
+    def _run_select(self) -> None:
+        while True:
+            with self._select_cond:
+                while self._select_args is None and not self._closing_selector:
+                    self._select_cond.wait()
+                if self._closing_selector:
+                    return
+                assert self._select_args is not None
+                to_read, to_write = self._select_args
+                self._select_args = None
 
-    def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None:
-        rs, ws = f.result()
+            # We use the simpler interface of the select module instead of
+            # the more stateful interface in the selectors module because
+            # this class is only intended for use on windows, where
+            # select.select is the only option. The selector interface
+            # does not have well-documented thread-safety semantics that
+            # we can rely on so ensuring proper synchronization would be
+            # tricky.
+            try:
+                # On windows, selecting on a socket for write will not
+                # return the socket when there is an error (but selecting
+                # for reads works). Also select for errors when selecting
+                # for writes, and merge the results.
+                #
+                # This pattern is also used in
+                # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
+                rs, ws, xs = select.select(to_read, to_write, to_write)
+                ws = ws + xs
+            except OSError as e:
+                # After remove_reader or remove_writer is called, the file
+                # descriptor may subsequently be closed on the event loop
+                # thread. It's possible that this select thread hasn't
+                # gotten into the select system call by the time that
+                # happens in which case (at least on macOS), select may
+                # raise a "bad file descriptor" error. If we get that
+                # error, check and see if we're also being woken up by
+                # polling the waker alone. If we are, just return to the
+                # event loop and we'll get the updated set of file
+                # descriptors on the next iteration. Otherwise, raise the
+                # original error.
+                if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
+                    rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
+                    if rs:
+                        ws = []
+                    else:
+                        raise
+                else:
+                    raise
+            self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
+
+    def _handle_select(
+        self, rs: List["_FileDescriptorLike"], ws: List["_FileDescriptorLike"]
+    ) -> None:
         for r in rs:
             self._handle_event(r, self._readers)
         for w in ws: