Merge pull request #2934 from bdarnell/manual-thread

asyncio: Manage our own thread instead of an executor
This commit is contained in:
Ben Darnell 2020-10-24 14:29:23 -04:00 committed by GitHub
commit f09d584ce9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 69 deletions

View File

@ -54,7 +54,7 @@ environment:
PYTHON_VERSION: "3.8.x"
PYTHON_ARCH: "32"
TOX_ENV: "py38"
TOX_ARGS: "tornado.test.websocket_test"
TOX_ARGS: "--fail-if-logs=false tornado.test.websocket_test"
- PYTHON: "C:\\Python38-x64"
PYTHON_VERSION: "3.8.x"

View File

@ -50,26 +50,27 @@ if typing.TYPE_CHECKING:
_T = TypeVar("_T")
# Collection of sockets to write to at shutdown to wake up any selector threads.
_waker_sockets = set() # type: Set[socket.socket]
# Collection of selector thread event loops to shut down on exit.
_selector_loops = set() # type: Set[AddThreadSelectorEventLoop]
def _atexit_callback() -> None:
for fd in _waker_sockets:
for loop in _selector_loops:
with loop._select_cond:
loop._closing_selector = True
loop._select_cond.notify()
try:
fd.send(b"a")
loop._waker_w.send(b"a")
except BlockingIOError:
pass
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
_selector_loops.clear()
# atexit callbacks are run in LIFO order. Our callback must run before
# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
# finish their work items until we write to their waker sockets). In
# recent versions of Python the thread pool atexit callback is
# registered in a getattr hook the first time TPE is *referenced*
# (instead of older versions of python where it was registered when
# concurrent.futures was imported).
concurrent.futures.ThreadPoolExecutor
atexit.register(_atexit_callback)
@ -422,7 +423,10 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_consume_waker",
"_executor",
"_select_cond",
"_select_args",
"_closing_selector",
"_thread",
"_handle_event",
"_readers",
"_real_loop",
@ -447,10 +451,21 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
# Create our own executor to ensure we always have a thread
# available (we'll keep it 100% busy) instead of contending
# with the application for a thread in the default executor.
self._executor = concurrent.futures.ThreadPoolExecutor(1)
# Create a thread to run the select system call. We manage this thread
# manually so we can trigger a clean shutdown from an atexit hook. Note
# that due to the order of operations at shutdown, only daemon threads
# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._select_cond = threading.Condition()
self._select_args = (
None
) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
self._closing_selector = False
self._thread = threading.Thread(
name="Tornado selector", daemon=True, target=self._run_select,
)
self._thread.start()
# Start the select loop once the loop is started.
self._real_loop.call_soon(self._start_select)
@ -462,7 +477,7 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
self._waker_r, self._waker_w = socket.socketpair()
self._waker_r.setblocking(False)
self._waker_w.setblocking(False)
_waker_sockets.add(self._waker_w)
_selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)
def __del__(self) -> None:
@ -471,14 +486,17 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
# can get a clean shutdown notification. If we're just left to
# be GC'd, we must explicitly close our sockets to avoid
# logging warnings.
_waker_sockets.discard(self._waker_w)
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
def close(self) -> None:
with self._select_cond:
self._closing_selector = True
self._select_cond.notify()
self._wake_selector()
self._executor.shutdown()
_waker_sockets.discard(self._waker_w)
self._thread.join()
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
self._real_loop.close()
@ -499,55 +517,64 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
# Capture reader and writer sets here in the event loop
# thread to avoid any problems with concurrent
# modification while the select loop uses them.
f = self.run_in_executor(
self._executor,
self._run_select,
list(self._readers.keys()),
list(self._writers.keys()),
)
asyncio.ensure_future(f).add_done_callback(self._handle_select)
with self._select_cond:
assert self._select_args is None
self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
self._select_cond.notify()
def _run_select(
self, to_read: List[int], to_write: List[int]
) -> Tuple[List[int], List[int]]:
# We use the simpler interface of the select module instead of
# the more stateful interface in the selectors module because
# this class is only intended for use on windows, where
# select.select is the only option. The selector interface
# does not have well-documented thread-safety semantics that
# we can rely on so ensuring proper synchronization would be
# tricky.
try:
# On windows, selecting on a socket for write will not
# return the socket when there is an error (but selecting
# for reads works). Also select for errors when selecting
# for writes, and merge the results.
#
# This pattern is also used in
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
rs, ws, xs = select.select(to_read, to_write, to_write)
ws = ws + xs
except OSError as e:
# After remove_reader or remove_writer is called, the file
# descriptor may subsequently be closed on the event loop
# thread. It's possible that this select thread hasn't
# gotten into the select system call by the time that
# happens in which case (at least on macOS), select may
# raise a "bad file descriptor" error. If we get that
# error, check and see if we're also being woken up by
# polling the waker alone. If we are, just return to the
# event loop and we'll get the updated set of file
# descriptors on the next iteration. Otherwise, raise the
# original error.
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
if rs:
return rs, []
raise
return rs, ws
def _run_select(self) -> None:
while True:
with self._select_cond:
while self._select_args is None and not self._closing_selector:
self._select_cond.wait()
if self._closing_selector:
return
assert self._select_args is not None
to_read, to_write = self._select_args
self._select_args = None
def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None:
rs, ws = f.result()
# We use the simpler interface of the select module instead of
# the more stateful interface in the selectors module because
# this class is only intended for use on windows, where
# select.select is the only option. The selector interface
# does not have well-documented thread-safety semantics that
# we can rely on so ensuring proper synchronization would be
# tricky.
try:
# On windows, selecting on a socket for write will not
# return the socket when there is an error (but selecting
# for reads works). Also select for errors when selecting
# for writes, and merge the results.
#
# This pattern is also used in
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
rs, ws, xs = select.select(to_read, to_write, to_write)
ws = ws + xs
except OSError as e:
# After remove_reader or remove_writer is called, the file
# descriptor may subsequently be closed on the event loop
# thread. It's possible that this select thread hasn't
# gotten into the select system call by the time that
# happens in which case (at least on macOS), select may
# raise a "bad file descriptor" error. If we get that
# error, check and see if we're also being woken up by
# polling the waker alone. If we are, just return to the
# event loop and we'll get the updated set of file
# descriptors on the next iteration. Otherwise, raise the
# original error.
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
if rs:
ws = []
else:
raise
else:
raise
self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
def _handle_select(
self, rs: List["_FileDescriptorLike"], ws: List["_FileDescriptorLike"]
) -> None:
for r in rs:
self._handle_event(r, self._readers)
for w in ws:

View File

@ -836,6 +836,7 @@ class SyncHTTPClientSubprocessTest(unittest.TestCase):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
timeout=5,
)
if proc.stdout:
print("STDOUT:")