diff --git a/docs/howitworks.rst b/docs/howitworks.rst index db4aafed..15043380 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -210,6 +210,12 @@ to call :py:meth:`socket.shutdown(SHUT_WR) ` on their :py:class:`IoLogger ` socket's write ends before draining any remaining data buffered on the read ends. +An alternative approach is to wait until the socket is completely closed, with +some hard timeout, but this necessitates greater discipline than is common in +infrastructure code (how often have you forgotten to redirect stderr to +``/dev/null``?), so needless irritating delays would often be experienced +during program termination. + If the main thread (responsible for function call dispatch) fails to trigger shutdown (because some user function is hanging), then the eventual force disconnection by the master will cause the IO multiplexer thread to enter diff --git a/docs/internals.rst b/docs/internals.rst index ef81a942..638a930b 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -17,17 +17,18 @@ Side Class Stream Classes -------------- -.. class:: foo - -.. class:: foo - - -:py:class:`foo` - .. autoclass:: econtext.core.BasicStream :members: +.. autoclass:: econtext.core.IoLogger + :members: + + +.. autoclass:: econtext.core.Waker + :members: + + econtext.master =============== diff --git a/econtext/core.py b/econtext/core.py index 5050d0bc..d591260c 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -517,6 +517,13 @@ class Context(object): class Waker(BasicStream): + """ + :py:class:`BasicStream` subclass implementing the + `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when + some of its state has been changed by another thread. + + .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html + """ def __init__(self, broker): self._broker = broker rfd, wfd = os.pipe() @@ -530,15 +537,26 @@ class Waker(BasicStream): return '' def wake(self): + """ + Write a byte to the self-pipe, causing the IO multiplexer to wake up. + Nothing is written if the current thread is the IO multiplexer thread. + """ if threading.currentThread() != self._broker._thread and \ self.transmit_side.fd: os.write(self.transmit_side.fd, ' ') def on_receive(self, broker): + """ + Read a byte from the self-pipe. + """ os.read(self.receive_side.fd, 1) class IoLogger(BasicStream): + """ + :py:class:`BasicStream` subclass that sets up redirection of a standard + UNIX file descriptor back into the Python :py:mod:`logging` package. + """ _buf = '' def __init__(self, broker, name, dest_fd): @@ -564,6 +582,7 @@ class IoLogger(BasicStream): self._log.info('%s', line.rstrip('\n')) def on_shutdown(self, broker): + """Shut down the write end of the logging socket.""" LOG.debug('%r.on_shutdown()', self) self._wsock.shutdown(socket.SHUT_WR) self._wsock.close()