diff --git a/docs/api.rst b/docs/api.rst
index 2f1f9784..51895318 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -2,11 +2,6 @@
API Reference
*************
-.. toctree::
- :hidden:
-
- signals
-
Package Layout
==============
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 9f3e3546..eb889daa 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -22,7 +22,7 @@ To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub `_.
Enhancements
-^^^^^^^^^^^^
+~~~~~~~~~~~~
* `#556 `_,
`#587 `_: Ansible 2.8 is partially
@@ -61,7 +61,7 @@ Enhancements
Mitogen for Ansible
-^^^^^^^^^^^^^^^^^^^
+~~~~~~~~~~~~~~~~~~~
* `#363 `_: fix an obscure race
matching *Permission denied* errors from some versions of ``su`` running on
@@ -488,7 +488,7 @@ Enhancements
`#491 `_,
`#493 `_: the interface employed
for in-process queues changed from `kqueue
- `_ / `epoll
+ `_ / `epoll
`_ to `poll()
`_, which requires no setup
or teardown, yielding a 38% latency reduction for inter-thread communication.
@@ -1034,7 +1034,7 @@ bug reports, testing, features and fixes in this release contributed by
`Josh Smift `_,
`Luca Nunzi `_,
`Orion Poplawski `_,
-`Peter V. Saveliev `_,
+`Peter V. Saveliev `_,
`Pierre-Henry Muller `_,
`Pierre-Louis Bonicoli `_,
`Prateek Jain `_,
@@ -1092,7 +1092,7 @@ Core Library
* `#300 `_: the broker could crash on
OS X during shutdown due to scheduled `kqueue
- `_ filter changes for
+ `_ filter changes for
descriptors that were closed before the IO loop resumes. As a temporary
workaround, kqueue's bulk change feature is not used.
diff --git a/docs/internals.rst b/docs/internals.rst
index 1df1c2ad..d062b6d9 100644
--- a/docs/internals.rst
+++ b/docs/internals.rst
@@ -7,11 +7,6 @@ Internal API Reference
Internal APIs are subject to rapid change even across minor releases. This
page exists to help users modify and extend the library.
-.. toctree::
- :hidden:
-
- signals
-
Constants
=========
@@ -50,6 +45,10 @@ Logging
See also :class:`mitogen.core.IoLoggerProtocol`.
+.. currentmodule:: mitogen.core
+.. autoclass:: LogHandler
+ :members:
+
.. currentmodule:: mitogen.master
.. autoclass:: LogForwarder
:members:
@@ -270,6 +269,8 @@ Helpers
.. autofunction:: minimize_source
+.. _signals:
+
Signals
=======
@@ -312,6 +313,10 @@ These signals are used internally by Mitogen.
- ``disconnect``
- Fired on the Broker thread when disconnection is detected.
+ * - :py:class:`mitogen.core.Stream`
+ - ``shutdown``
+ - Fired on the Broker thread when broker shutdown begins.
+
* - :py:class:`mitogen.core.Context`
- ``disconnect``
- Fired on the Broker thread during shutdown (???)
diff --git a/mitogen/core.py b/mitogen/core.py
index 8b4f135e..aebd337e 100644
--- a/mitogen/core.py
+++ b/mitogen/core.py
@@ -1024,11 +1024,11 @@ class Receiver(object):
routed to the context due to disconnection, and ignores messages that
did not originate from the respondent context.
"""
- #: If not :data:`None`, a reference to a function invoked as
- #: `notify(receiver)` when a new message is delivered to this receiver. The
- #: function is invoked on the broker thread, therefore it must not block.
- #: Used by :class:`mitogen.select.Select` to implement waiting on multiple
- #: receivers.
+ #: If not :data:`None`, a function invoked as `notify(receiver)` after a
+ #: message has been received. The function is invoked on :class:`Broker`
+ #: thread, therefore it must not block. Used by
+ #: :class:`mitogen.select.Select` to efficiently implement waiting on
+ #: multiple event sources.
notify = None
raise_channelerror = True
@@ -1513,6 +1513,22 @@ class Importer(object):
class LogHandler(logging.Handler):
+ """
+ A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG`
+ messages to be sent to a parent context in response to logging messages
+ generated by the current context. This is installed by default in child
+ contexts during bootstrap, so that :mod:`logging` events can be viewed and
+ managed centrally in the master process.
+
+ The handler is initially *corked* after construction, such that it buffers
+ messages until :meth:`uncork` is called. This allows logging to be
+ installed prior to communication with the target being available, and
+ avoids any possible race where early log messages might be dropped.
+
+ :param mitogen.core.Context context:
+ The context to send log messages towards. At present this is always
+ the master process.
+ """
def __init__(self, context):
logging.Handler.__init__(self)
self.context = context
@@ -1549,6 +1565,9 @@ class LogHandler(logging.Handler):
self._buffer_lock.release()
def emit(self, rec):
+ """
+ Send a :data:`FORWARD_LOG` message towards the target context.
+ """
if rec.name == 'mitogen.io' or \
getattr(self.local, 'in_emit', False):
return
@@ -1566,6 +1585,30 @@ class LogHandler(logging.Handler):
class Stream(object):
+ """
+ A :class:`Stream` is one readable and optionally one writeable file
+ descriptor (represented by :class:`Side`) aggregated alongside an
+ associated :class:`Protocol` that knows how to respond to IO readiness
+ events for those descriptors.
+
+ Streams are registered with :class:`Broker`, and callbacks are invoked on
+ the broker thread in response to IO activity. When registered using
+ :meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker
+ may call any of :meth:`on_receive`, :meth:`on_transmit`,
+ :meth:`on_shutdown` or :meth:`on_disconnect`.
+
+ It is expected that the :class:`Protocol` associated with a stream will
+ change over its life. For example during connection setup, the initial
+ protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to
+ enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to
+ the target, before handing off to :class:`MitogenProtocol` when the target
+ process is initialized.
+
+ Streams connecting to children are in turn aggregated by
+ :class:`mitogen.parent.Connection`, which contains additional logic for
+ managing any child process, and a reference to any separate ``stderr``
+ :class:`Stream` connected to that process.
+ """
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
@@ -1578,14 +1621,16 @@ class Stream(object):
#: In parents, the :class:`mitogen.parent.Connection` instance.
conn = None
+ #: The stream name. This is used in the :meth:`__repr__` output in any log
+ #: messages, it may be any descriptive string.
name = u'default'
def set_protocol(self, protocol):
"""
- Bind a protocol to this stream, by updating :attr:`Protocol.stream` to
- refer to this stream, and updating this stream's
- :attr:`Stream.protocol` to the refer to the protocol. Any prior
- protocol's :attr:`Protocol.stream` is set to :data:`None`.
+ Bind a :class:`Protocol` to this stream, by updating
+ :attr:`Protocol.stream` to refer to this stream, and updating this
+ stream's :attr:`Stream.protocol` to the refer to the protocol. Any
+ prior protocol's :attr:`Protocol.stream` is set to :data:`None`.
"""
if self.protocol:
self.protocol.stream = None
@@ -1593,6 +1638,21 @@ class Stream(object):
self.protocol.stream = self
def accept(self, rfp, wfp):
+ """
+ Attach a pair of file objects to :attr:`receive_side` and
+ :attr:`transmit_side`, after wrapping them in :class:`Side` instances.
+ :class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
+ on the underlying file descriptors during construction.
+
+ The same file object may be used for both sides. The default
+ :meth:`on_disconnect` is handles the possibility that only one
+ descriptor may need to be closed.
+
+ :param file rfp:
+ The file object to receive from.
+ :param file wfp:
+ The file object to transmit to.
+ """
self.receive_side = Side(self, rfp)
self.transmit_side = Side(self, wfp)
@@ -1601,13 +1661,17 @@ class Stream(object):
def on_receive(self, broker):
"""
- Called by :class:`Broker` when the stream's :attr:`receive_side` has
+ Invoked by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
- Subclasses must implement this if :meth:`Broker.start_receive` is ever
- called on them, and the method must call :meth:`on_disconect` if
- reading produces an empty string.
+ Subclasses must implement this if they are registered using
+ :meth:`Broker.start_receive`, and the method must invoke
+ :meth:`on_disconnect` if reading produces an empty string.
+
+ The default implementation reads :attr:`Protocol.read_size` bytes and
+ passes the resulting bytestring to :meth:`Protocol.on_receive`. If the
+ bytestring is 0 bytes, invokes :meth:`on_disconnect` instead.
"""
buf = self.receive_side.read(self.protocol.read_size)
if not buf:
@@ -1618,30 +1682,39 @@ class Stream(object):
def on_transmit(self, broker):
"""
- Called by :class:`Broker` when the stream's :attr:`transmit_side`
- has been marked writeable using :meth:`Broker._start_transmit` and
- the broker has detected the associated file descriptor is ready for
+ Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has
+ been marked writeable using :meth:`Broker._start_transmit` and the
+ broker has detected the associated file descriptor is ready for
writing.
- Subclasses must implement this if :meth:`Broker._start_transmit` is
- ever called on them.
+ Subclasses must implement they are ever registerd with
+ :meth:`Broker._start_transmit`.
+
+ The default implementation invokes :meth:`Protocol.on_transmit`.
"""
self.protocol.on_transmit(broker)
def on_shutdown(self, broker):
"""
- Called by :meth:`Broker.shutdown` to allow the stream time to
- gracefully shutdown. The base implementation simply called
- :meth:`on_disconnect`.
+ Invoked by :meth:`Broker.shutdown` to allow the stream time to
+ gracefully shutdown.
+
+ The default implementation emits a ``shutdown`` signal before
+ invoking :meth:`on_disconnect`.
"""
fire(self, 'shutdown')
self.protocol.on_shutdown(broker)
def on_disconnect(self, broker):
"""
- Called by :class:`Broker` to force disconnect the stream. The base
- implementation simply closes :attr:`receive_side` and
- :attr:`transmit_side` and unregisters the stream from the broker.
+ Invoked by :class:`Broker` to force disconnect the stream during
+ shutdown, invoked by the default :meth:`on_shutdown` implementation,
+ and usually invoked by any subclass :meth:`on_receive` implementation
+ in response to a 0-byte read.
+
+ The base implementation fires a ``disconnect`` event, then closes
+ :attr:`receive_side` and :attr:`transmit_side` after unregistering the
+ stream from the broker.
"""
fire(self, 'disconnect')
self.protocol.on_disconnect(broker)
@@ -1666,6 +1739,8 @@ class Protocol(object):
#: :data:`None`.
stream = None
+ #: The size of the read buffer used by :class:`Stream` when this is the
+ #: active protocol for the stream.
read_size = CHUNK_SIZE
@classmethod
@@ -2369,8 +2444,18 @@ class Latch(object):
See :ref:`waking-sleeping-threads` for further discussion.
"""
+ #: The :class:`Poller` implementation to use for waiting. Since the poller
+ #: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller`
+ #: if it is available, or :class:`mitogen.core.Poller` otherwise, since
+ #: these implementations require no system calls to create, configure or
+ #: destroy.
poller_class = Poller
+ #: If not :data:`None`, a function invoked as `notify(latch)` after a
+ #: successful call to :meth:`put`. The function is invoked on the
+ #: :meth:`put` caller's thread, which may be the :class:`Broker` thread,
+ #: therefore it must not block. Used by :class:`mitogen.select.Select` to
+ #: efficiently implement waiting on multiple event sources.
notify = None
# The _cls_ prefixes here are to make it crystal clear in the code which
@@ -2725,15 +2810,22 @@ class Waker(Protocol):
class IoLoggerProtocol(DelimitedProtocol):
"""
- Handle redirection of standard IO into the :mod:`logging` package.
+ Attached to one end of a socket pair whose other end overwrites one of the
+ standard ``stdout`` or ``stderr`` file descriptors in a child context.
+ Received data is split up into lines, decoded as UTF-8 and logged to the
+ :mod:`logging` package as either the ``stdout`` or ``stderr`` logger.
+
+ Logging in child contexts is in turn forwarded to the master process using
+ :class:`LogHandler`.
"""
@classmethod
def build_stream(cls, name, dest_fd):
"""
- Even though the descriptor `dest_fd` will hold the opposite end of the
- socket open, we must keep a separate dup() of it (i.e. wsock) in case
- some code decides to overwrite `dest_fd` later, which would thus break
- :meth:`on_shutdown`.
+ Even though the file descriptor `dest_fd` will hold the opposite end of
+ the socket open, we must keep a separate dup() of it (i.e. wsock) in
+ case some code decides to overwrite `dest_fd` later, which would
+ prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
+ ` on it.
"""
rsock, wsock = socket.socketpair()
os.dup2(wsock.fileno(), dest_fd)
diff --git a/mitogen/master.py b/mitogen/master.py
index 814f7019..09da775e 100644
--- a/mitogen/master.py
+++ b/mitogen/master.py
@@ -741,7 +741,7 @@ class ModuleFinder(object):
The list is determined by retrieving the source code of
`fullname`, compiling it, and examining all IMPORT_NAME ops.
- :param fullname: Fully qualified name of an _already imported_ module
+ :param fullname: Fully qualified name of an *already imported* module
for which source code can be retrieved
:type fullname: str
"""
@@ -789,7 +789,7 @@ class ModuleFinder(object):
This method is like :py:meth:`find_related_imports`, but also
recursively searches any modules which are imported by `fullname`.
- :param fullname: Fully qualified name of an _already imported_ module
+ :param fullname: Fully qualified name of an *already imported* module
for which source code can be retrieved
:type fullname: str
"""
@@ -841,7 +841,7 @@ class ModuleResponder(object):
def add_source_override(self, fullname, path, source, is_pkg):
"""
- See :meth:`ModuleFinder.add_source_override.
+ See :meth:`ModuleFinder.add_source_override`.
"""
self._finder.add_source_override(fullname, path, source, is_pkg)
diff --git a/mitogen/parent.py b/mitogen/parent.py
index 79b484c2..ec218913 100644
--- a/mitogen/parent.py
+++ b/mitogen/parent.py
@@ -2516,7 +2516,7 @@ class Reaper(object):
:param mitogen.core.Broker broker:
The :class:`Broker` on which to install timers
- :param Process proc:
+ :param mitogen.parent.Process proc:
The process to reap.
:param bool kill:
If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process.
diff --git a/preamble_size.py b/preamble_size.py
index 692ad7b1..43c10029 100644
--- a/preamble_size.py
+++ b/preamble_size.py
@@ -24,10 +24,12 @@ conn = mitogen.ssh.Connection(options, router)
conn.context = context
print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),))
-print('Preamble size: %s (%.2fKiB)' % (
+print('Bootstrap (mitogen.core) size: %s (%.2fKiB)' % (
len(conn.get_preamble()),
len(conn.get_preamble()) / 1024.0,
))
+print('')
+
if '--dump' in sys.argv:
print(zlib.decompress(conn.get_preamble()))
exit()