diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index e105984c..3cdf85f9 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -28,12 +28,41 @@ from __future__ import absolute_import import os +import threading import ansible_mitogen.loaders import ansible_mitogen.mixins import ansible_mitogen.process +def _patch_awx_callback(): + """ + issue #400: AWX loads a display callback that suffers from thread-safety + issues. Detect the presence of older AWX versions and patch the bug. + """ + # AWX uses sitecustomize.py to force-load this package. If it exists, we're + # running under AWX. + try: + from awx_display_callback.events import EventContext + from awx_display_callback.events import event_context + except ImportError: + return + + if hasattr(EventContext(), '_local'): + # Patched version. + return + + def patch_add_local(self, **kwargs): + tls = vars(self._local) + ctx = tls.setdefault('_ctx', {}) + ctx.update(kwargs) + + EventContext._local = threading.local() + EventContext.add_local = patch_add_local + +_patch_awx_callback() + + def wrap_action_loader__get(name, *args, **kwargs): """ While the mitogen strategy is active, trap action_loader.get() calls, diff --git a/docs/api.rst b/docs/api.rst index ea20ada7..844bb900 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -84,237 +84,16 @@ Message Class ============= .. currentmodule:: mitogen.core - -.. class:: Message - - Messages are the fundamental unit of communication, comprising fields from - the :ref:`stream-protocol` header, an optional reference to the receiving - :class:`mitogen.core.Router` for ingress messages, and helper methods for - deserialization and generating replies. - - .. attribute:: router - - The :class:`mitogen.core.Router` responsible for routing the - message. This is :data:`None` for locally originated messages. - - .. attribute:: receiver - - The :class:`mitogen.core.Receiver` over which the message was last - received. Part of the :class:`mitogen.select.Select` interface. - Defaults to :data:`None`. - - .. attribute:: dst_id - - Integer target context ID. :class:`mitogen.core.Router` delivers - messages locally when their :attr:`dst_id` matches - :data:`mitogen.context_id`, otherwise they are routed up or downstream. - - .. attribute:: src_id - - Integer source context ID. Used as the target of replies if any are - generated. - - .. attribute:: auth_id - - The context ID under whose authority the message is acting. See - :ref:`source-verification`. - - .. attribute:: handle - - Integer target handle in the destination context. This is one of the - :ref:`standard-handles`, or a dynamically generated handle used to - receive a one-time reply, such as the return value of a function call. - - .. attribute:: reply_to - - Integer target handle to direct any reply to this message. Used to - receive a one-time reply, such as the return value of a function call. - :data:`IS_DEAD` has a special meaning when it appears in this field. - - .. attribute:: data - - Message data, which may be raw or pickled. - - .. attribute:: is_dead - - :data:`True` if :attr:`reply_to` is set to the magic value - :data:`mitogen.core.IS_DEAD`, indicating the sender considers the - channel dead. - - .. py:method:: __init__ (\**kwargs) - - Construct a message from from the supplied `kwargs`. :attr:`src_id` - and :attr:`auth_id` are always set to :data:`mitogen.context_id`. - - .. py:classmethod:: pickled (obj, \**kwargs) - - Construct a pickled message, setting :attr:`data` to the - serialization of `obj`, and setting remaining fields using `kwargs`. - - :returns: - The new message. - - .. method:: unpickle (throw=True) - - Unpickle :attr:`data`, optionally raising any exceptions present. - - :param bool throw: - If :data:`True`, raise exceptions, otherwise it is the caller's - responsibility. - - :raises mitogen.core.CallError: - The serialized data contained CallError exception. - :raises mitogen.core.ChannelError: - The `is_dead` field was set. - - .. method:: reply (obj, router=None, \**kwargs) - - Compose a reply to this message and send it using :attr:`router`, or - `router` is :attr:`router` is :data:`None`. - - :param obj: - Either a :class:`Message`, or an object to be serialized in order - to construct a new message. - :param router: - Optional router to use if :attr:`router` is :data:`None`. - :param kwargs: - Optional keyword parameters overriding message fields in the reply. - +.. autoclass:: Message + :members: Router Class ============ .. currentmodule:: mitogen.core - -.. class:: Router - - Route messages between parent and child contexts, and invoke handlers - defined on our parent context. :meth:`Router.route() ` straddles - the :class:`Broker ` and user threads, it is safe - to call anywhere. - - **Note:** This is the somewhat limited core version of the Router class - used by child contexts. The master subclass is documented below this one. - - .. attribute:: unidirectional - - When :data:`True`, permit children to only communicate with the current - context or a parent of the current context. Routing between siblings or - children of parents is prohibited, ensuring no communication is - possible between intentionally partitioned networks, such as when a - program simultaneously manipulates hosts spread across a corporate and - a production network, or production networks that are otherwise - air-gapped. - - Sending a prohibited message causes an error to be logged and a dead - message to be sent in reply to the errant message, if that message has - ``reply_to`` set. - - The value of :data:`unidirectional` becomes the default for the - :meth:`local() ` `unidirectional` - parameter. - - .. method:: stream_by_id (dst_id) - - Return the :class:`mitogen.core.Stream` that should be used to - communicate with `dst_id`. If a specific route for `dst_id` is not - known, a reference to the parent context's stream is returned. - - .. method:: add_route (target_id, via_id) - - Arrange for messages whose `dst_id` is `target_id` to be forwarded on - the directly connected stream for `via_id`. This method is called - automatically in response to ``ADD_ROUTE`` messages, but remains public - for now while the design has not yet settled, and situations may arise - where routing is not fully automatic. - - .. method:: register (context, stream) - - Register a new context and its associated stream, and add the stream's - receive side to the I/O multiplexer. This This method remains public - for now while hte design has not yet settled. - - .. method:: add_handler (fn, handle=None, persist=True, respondent=None, policy=None) - - Invoke `fn(msg)` for each Message sent to `handle` from this context. - Unregister after one invocation if `persist` is :data:`False`. If - `handle` is :data:`None`, a new handle is allocated and returned. - - :param int handle: - If not :data:`None`, an explicit handle to register, usually one of - the ``mitogen.core.*`` constants. If unspecified, a new unused - handle will be allocated. - - :param bool persist: - If :data:`False`, the handler will be unregistered after a single - message has been received. - - :param mitogen.core.Context respondent: - Context that messages to this handle are expected to be sent from. - If specified, arranges for a dead message to be delivered to `fn` - when disconnection of the context is detected. - - In future `respondent` will likely also be used to prevent other - contexts from sending messages to the handle. - - :param function policy: - Function invoked as `policy(msg, stream)` where `msg` is a - :class:`mitogen.core.Message` about to be delivered, and - `stream` is the :class:`mitogen.core.Stream` on which it was - received. The function must return :data:`True`, otherwise an - error is logged and delivery is refused. - - Two built-in policy functions exist: - - * :func:`mitogen.core.has_parent_authority`: requires the - message arrived from a parent context, or a context acting with a - parent context's authority (``auth_id``). - - * :func:`mitogen.parent.is_immediate_child`: requires the - message arrived from an immediately connected child, for use in - messaging patterns where either something becomes buggy or - insecure by permitting indirect upstream communication. - - In case of refusal, and the message's ``reply_to`` field is - nonzero, a :class:`mitogen.core.CallError` is delivered to the - sender indicating refusal occurred. - - :return: - `handle`, or if `handle` was :data:`None`, the newly allocated - handle. - - .. method:: del_handler (handle) - - Remove the handle registered for `handle` - - :raises KeyError: - The handle wasn't registered. - - .. method:: _async_route(msg, stream=None) - - Arrange for `msg` to be forwarded towards its destination. If its - destination is the local context, then arrange for it to be dispatched - using the local handlers. - - This is a lower overhead version of :meth:`route` that may only be - called from the I/O multiplexer thread. - - :param mitogen.core.Stream stream: - If not :data:`None`, a reference to the stream the message arrived - on. Used for performing source route verification, to ensure - sensitive messages such as ``CALL_FUNCTION`` arrive only from - trusted contexts. - - .. method:: route(msg) - - Arrange for the :class:`Message` `msg` to be delivered to its - destination using any relevant downstream context, or if none is found, - by forwarding the message upstream towards the master context. If `msg` - is destined for the local context, it is dispatched using the handles - registered with :meth:`add_handler`. - - This may be called from any thread. +.. autoclass:: Router + :members: .. currentmodule:: mitogen.master @@ -362,11 +141,6 @@ Router Class ``ALLOCATE_ID`` message that causes the master to emit matching ``ADD_ROUTE`` messages prior to replying. - .. method:: context_by_id (context_id, via_id=None) - - Messy factory/lookup function to find a context by its ID, or construct - it. In future this will be replaced by a much more sensible interface. - .. _context-factories: **Context Factories** @@ -803,53 +577,8 @@ Context Class ============= .. currentmodule:: mitogen.core - -.. class:: Context - - Represent a remote context regardless of connection method. - - **Note:** This is the somewhat limited core version of the Context class - used by child contexts. The master subclass is documented below this one. - - .. method:: send (msg) - - Arrange for `msg` to be delivered to this context. - :attr:`dst_id ` is set to the target context ID. - - :param mitogen.core.Message msg: - The message. - - .. method:: send_async (msg, persist=False) - - Arrange for `msg` to be delivered to this context, with replies - directed to a newly constructed receiver. :attr:`dst_id - ` is set to the target context ID, and :attr:`reply_to - ` is set to the newly constructed receiver's handle. - - :param bool persist: - If :data:`False`, the handler will be unregistered after a single - message has been received. - - :param mitogen.core.Message msg: - The message. - - :returns: - :class:`mitogen.core.Receiver` configured to receive any replies - sent to the message's `reply_to` handle. - - .. method:: send_await (msg, deadline=None) - - Like :meth:`send_async`, but expect a single reply (`persist=False`) - delivered within `deadline` seconds. - - :param mitogen.core.Message msg: - The message. - :param float deadline: - If not :data:`None`, seconds before timing out waiting for a reply. - :returns: - The deserialized reply. - :raises mitogen.core.TimeoutError: - No message was received and `deadline` passed. +.. autoclass:: Context + :members: .. currentmodule:: mitogen.parent @@ -857,340 +586,33 @@ Context Class .. autoclass:: CallChain :members: -.. class:: Context - - Extend :class:`mitogen.core.Context` with functionality useful to masters, - and child contexts who later become parents. Currently when this class is - required, the target context's router is upgraded at runtime. - - .. attribute:: default_call_chain - - A :class:`CallChain` instance constructed by default, with pipelining - disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` - use this instance. - - .. method:: shutdown (wait=False) - - Arrange for the context to receive a ``SHUTDOWN`` message, triggering - graceful shutdown. - - Due to a lack of support for timers, no attempt is made yet to force - terminate a hung context using this method. This will be fixed shortly. - - :param bool wait: - If :data:`True`, block the calling thread until the context has - completely terminated. - :returns: - If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch` - whose :meth:`get() ` method returns - :data:`None` when shutdown completes. The `timeout` parameter may - be used to implement graceful timeouts. - - .. method:: call_async (fn, \*args, \*\*kwargs) - - See :meth:`CallChain.call_async`. - - .. method:: call (fn, \*args, \*\*kwargs) - - See :meth:`CallChain.call`. - - .. method:: call_no_reply (fn, \*args, \*\*kwargs) - - See :meth:`CallChain.call_no_reply`. +.. autoclass:: Context + :members: Receiver Class ============== .. currentmodule:: mitogen.core - -.. class:: Receiver (router, handle=None, persist=True, respondent=None) - - Receivers are used to wait for pickled responses from another context to be - sent to a handle registered in this context. A receiver may be single-use - (as in the case of :meth:`mitogen.parent.Context.call_async`) or - multiple use. - - :param mitogen.core.Router router: - Router to register the handler on. - - :param int handle: - If not :data:`None`, an explicit handle to register, otherwise an - unused handle is chosen. - - :param bool persist: - If :data:`True`, do not unregister the receiver's handler after the - first message. - - :param mitogen.core.Context respondent: - Reference to the context this receiver is receiving from. If not - :data:`None`, arranges for the receiver to receive a dead message if - messages can no longer be routed to the context, due to disconnection - or exit. - - .. attribute:: notify = None - - If not :data:`None`, a reference to a function invoked as - `notify(receiver)` when a new message is delivered to this receiver. - Used by :class:`mitogen.select.Select` to implement waiting on - multiple receivers. - - .. py:method:: to_sender () - - Return a :class:`mitogen.core.Sender` configured to deliver messages - to this receiver. Since a Sender can be serialized, this makes it - convenient to pass `(context_id, handle)` pairs around:: - - def deliver_monthly_report(sender): - for line in open('monthly_report.txt'): - sender.send(line) - sender.close() - - remote = router.ssh(hostname='mainframe') - recv = mitogen.core.Receiver(router) - remote.call(deliver_monthly_report, recv.to_sender()) - for msg in recv: - print(msg) - - .. py:method:: empty () - - Return :data:`True` if calling :meth:`get` would block. - - As with :class:`Queue.Queue`, :data:`True` may be returned even - though a subsequent call to :meth:`get` will succeed, since a - message may be posted at any moment between :meth:`empty` and - :meth:`get`. - - :meth:`empty` is only useful to avoid a race while installing - :attr:`notify`: - - .. code-block:: python - - recv.notify = _my_notify_function - if not recv.empty(): - _my_notify_function(recv) - - # It is guaranteed the receiver was empty after the notification - # function was installed, or that it was non-empty and the - # notification function was invoked at least once. - - .. py:method:: close () - - Cause :class:`mitogen.core.ChannelError` to be raised in any thread - waiting in :meth:`get` on this receiver. - - .. py:method:: get (timeout=None) - - Sleep waiting for a message to arrive on this receiver. - - :param float timeout: - If not :data:`None`, specifies a timeout in seconds. - - :raises mitogen.core.ChannelError: - The remote end indicated the channel should be closed, or - communication with its parent context was lost. - - :raises mitogen.core.TimeoutError: - Timeout was reached. - - :returns: - `(msg, data)` tuple, where `msg` is the - :class:`mitogen.core.Message` that was received, and `data` is - its unpickled data part. - - .. py:method:: get_data (timeout=None) - - Like :meth:`get`, except only return the data part. - - .. py:method:: __iter__ () - - Block and yield `(msg, data)` pairs delivered to this receiver until - :class:`mitogen.core.ChannelError` is raised. +.. autoclass:: Receiver + :members: Sender Class ============ .. currentmodule:: mitogen.core - -.. class:: Sender (context, dst_handle) - - Senders are used to send pickled messages to a handle in another context, - it is the inverse of :class:`mitogen.core.Sender`. - - Senders may be serialized, making them convenient to wire up data flows. - See :meth:`mitogen.core.Receiver.to_sender` for more information. - - :param mitogen.core.Context context: - Context to send messages to. - :param int dst_handle: - Destination handle to send messages to. - - .. py:method:: close () - - Send a dead message to the remote end, causing :meth:`ChannelError` - to be raised in any waiting thread. - - .. py:method:: send (data) - - Send `data` to the remote end. +.. autoclass:: Sender + :members: Select Class ============ .. module:: mitogen.select - .. currentmodule:: mitogen.select - -.. class:: Select (receivers=(), oneshot=True) - - Support scatter/gather asynchronous calls and waiting on multiple - receivers, channels, and sub-Selects. Accepts a sequence of - :class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` - instances and returns the first value posted to any receiver or select. - - If `oneshot` is :data:`True`, then remove each receiver as it yields a - result; since :meth:`__iter__` terminates once the final receiver is - removed, this makes it convenient to respond to calls made in parallel: - - .. code-block:: python - - total = 0 - recvs = [c.call_async(long_running_operation) for c in contexts] - - for msg in mitogen.select.Select(recvs): - print('Got %s from %s' % (msg, msg.receiver)) - total += msg.unpickle() - - # Iteration ends when last Receiver yields a result. - print('Received total %s from %s receivers' % (total, len(recvs))) - - :class:`Select` may drive a long-running scheduler: - - .. code-block:: python - - with mitogen.select.Select(oneshot=False) as select: - while running(): - for msg in select: - process_result(msg.receiver.context, msg.unpickle()) - for context, workfunc in get_new_work(): - select.add(context.call_async(workfunc)) - - :class:`Select` may be nested: - - .. code-block:: python - - subselects = [ - mitogen.select.Select(get_some_work()), - mitogen.select.Select(get_some_work()), - mitogen.select.Select([ - mitogen.select.Select(get_some_work()), - mitogen.select.Select(get_some_work()) - ]) - ] - - for msg in mitogen.select.Select(selects): - print(msg.unpickle()) - - .. py:classmethod:: all (it) - - Take an iterable of receivers and retrieve a :class:`Message` from - each, returning the result of calling `msg.unpickle()` on each in turn. - Results are returned in the order they arrived. - - This is sugar for handling batch - :meth:`Context.call_async ` - invocations: - - .. code-block:: python - - print('Total disk usage: %.02fMiB' % (sum( - mitogen.select.Select.all( - context.call_async(get_disk_usage) - for context in contexts - ) / 1048576.0 - ),)) - - However, unlike in a naive comprehension such as: - - .. code-block:: python - - recvs = [c.call_async(get_disk_usage) for c in contexts] - sum(recv.get().unpickle() for recv in recvs) - - Result processing happens in the order results arrive, rather than the - order requests were issued, so :meth:`all` should always be faster. - - .. py:method:: get (timeout=None, block=True) - - Fetch the next available value from any receiver, or raise - :class:`mitogen.core.TimeoutError` if no value is available within - `timeout` seconds. - - On success, the message's :attr:`receiver - ` attribute is set to the receiver. - - :param float timeout: - Timeout in seconds. - :param bool block: - If :data:`False`, immediately raise - :class:`mitogen.core.TimeoutError` if the select is empty. - :return: - :class:`mitogen.core.Message` - :raises mitogen.core.TimeoutError: - Timeout was reached. - :raises mitogen.core.LatchError: - :meth:`close` has been called, and the underlying latch is no - longer valid. - - .. py:method:: __bool__ () - - Return :data:`True` if any receivers are registered with this select. - - .. py:method:: close () - - Remove the select's notifier function from each registered receiver, - mark the associated latch as closed, and cause any thread currently - sleeping in :meth:`get` to be woken with - :class:`mitogen.core.LatchError`. - - This is necessary to prevent memory leaks in long-running receivers. It - is called automatically when the Python :keyword:`with` statement is - used. - - .. py:method:: empty () - - Return :data:`True` if calling :meth:`get` would block. - - As with :class:`Queue.Queue`, :data:`True` may be returned even - though a subsequent call to :meth:`get` will succeed, since a - message may be posted at any moment between :meth:`empty` and - :meth:`get`. - - :meth:`empty` may return :data:`False` even when :meth:`get` - would block if another thread has drained a receiver added to this - select. This can be avoided by only consuming each receiver from a - single thread. - - .. py:method:: __iter__ (self) - - Yield the result of :meth:`get` until no receivers remain in the - select, either because `oneshot` is :data:`True`, or each receiver was - explicitly removed via :meth:`remove`. - - .. py:method:: add (recv) - - Add the :class:`mitogen.core.Receiver` or - :class:`mitogen.core.Channel` `recv` to the select. - - .. py:method:: remove (recv) - - Remove the :class:`mitogen.core.Receiver` or - :class:`mitogen.core.Channel` `recv` from the select. Note that if - the receiver has notified prior to :meth:`remove`, then it will - still be returned by a subsequent :meth:`get`. This may change in a - future version. +.. autoclass:: Select + :members: Channel Class @@ -1327,64 +749,13 @@ Utility Functions A random assortment of utility functions useful on masters and children. .. currentmodule:: mitogen.utils -.. function:: cast (obj) +.. autofunction:: cast - Many tools love to subclass built-in types in order to implement useful - functionality, such as annotating the safety of a Unicode string, or adding - additional methods to a dict. However, cPickle loves to preserve those - subtypes during serialization, resulting in CallError during :meth:`call - ` in the target when it tries to deserialize - the data. - - This function walks the object graph `obj`, producing a copy with any - custom sub-types removed. The functionality is not default since the - resulting walk may be computationally expensive given a large enough graph. - - See :ref:`serialization-rules` for a list of supported types. - - :param obj: - Object to undecorate. - :returns: - Undecorated object. .. currentmodule:: mitogen.utils -.. function:: disable_site_packages - - Remove all entries mentioning ``site-packages`` or ``Extras`` from the - system path. Used primarily for testing on OS X within a virtualenv, where - OS X bundles some ancient version of the :mod:`six` module. - -.. currentmodule:: mitogen.utils -.. function:: log_to_file (path=None, io=False, level='INFO') - - Install a new :class:`logging.Handler` writing applications logs to the - filesystem. Useful when debugging slave IO problems. - - Parameters to this function may be overridden at runtime using environment - variables. See :ref:`logging-env-vars`. - - :param str path: - If not :data:`None`, a filesystem path to write logs to. Otherwise, - logs are written to :data:`sys.stderr`. - - :param bool io: - If :data:`True`, include extremely verbose IO logs in the output. - Useful for debugging hangs, less useful for debugging application code. - - :param str level: - Name of the :mod:`logging` package constant that is the minimum - level to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``, - and ``ERROR``. - -.. currentmodule:: mitogen.utils -.. function:: run_with_router(func, \*args, \**kwargs) - - Arrange for `func(router, \*args, \**kwargs)` to run with a temporary - :class:`mitogen.master.Router`, ensuring the Router and Broker are - correctly shut down during normal or exceptional return. - - :returns: - `func`'s return value. +.. autofunction:: disable_site_packages +.. autofunction:: log_to_file +.. autofunction:: run_with_router(func, \*args, \**kwargs) .. currentmodule:: mitogen.utils .. decorator:: with_router diff --git a/docs/changelog.rst b/docs/changelog.rst index 7c7609de..33c4489a 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -24,6 +24,12 @@ Mitogen for Ansible Enhancements ^^^^^^^^^^^^ +* `#76 `_: disconnect propagation has + improved, allowing Ansible to cancel waits for responses from targets that + where abruptly disconnected. This increases the chance a task will fail + gracefully, rather than hanging due to the connection being severed, for + example because of network failure or EC2 instance maintenance. + * `#369 `_: :meth:`Connection.reset` is implemented, allowing `meta: reset_connection `_ to shut @@ -50,6 +56,9 @@ Fixes now print a useful hint when Python fails to start, as no useful error is normally logged to the console by these tools. +* `#400 `_: work around a threading + bug in the AWX display callback when running with high verbosity setting. + * `#409 `_: the setns method was silently broken due to missing tests. Basic coverage was added to prevent a recurrence. @@ -100,6 +109,7 @@ Mitogen would not be possible without the support of users. A huge thanks for bug reports, features and fixes in this release contributed by `Brian Candler `_, `Guy Knights `_, +`Jiří Vávra `_, `Jonathan Rosser `_, and `Mehdi `_. diff --git a/mitogen/core.py b/mitogen/core.py index d3909773..f2dece48 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -500,18 +500,53 @@ else: class Message(object): + """ + Messages are the fundamental unit of communication, comprising fields from + the :ref:`stream-protocol` header, an optional reference to the receiving + :class:`mitogen.core.Router` for ingress messages, and helper methods for + deserialization and generating replies. + """ + #: Integer target context ID. :class:`Router` delivers messages locally + #: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise + #: they are routed up or downstream. dst_id = None + + #: Integer source context ID. Used as the target of replies if any are + #: generated. src_id = None + + #: Context ID under whose authority the message is acting. See + #: :ref:`source-verification`. auth_id = None + + #: Integer target handle in the destination context. This is one of the + #: :ref:`standard-handles`, or a dynamically generated handle used to + #: receive a one-time reply, such as the return value of a function call. handle = None + + #: Integer target handle to direct any reply to this message. Used to + #: receive a one-time reply, such as the return value of a function call. + #: :data:`IS_DEAD` has a special meaning when it appears in this field. reply_to = None + + #: Raw message data bytes. data = b('') + _unpickled = object() + #: The :class:`Router` responsible for routing the message. This is + #: :data:`None` for locally originated messages. router = None + + #: The :class:`Receiver` over which the message was last received. Part of + #: the :class:`mitogen.select.Select` interface. Defaults to :data:`None`. receiver = None def __init__(self, **kwargs): + """ + Construct a message from from the supplied `kwargs`. :attr:`src_id` and + :attr:`auth_id` are always set to :data:`mitogen.context_id`. + """ self.src_id = mitogen.context_id self.auth_id = mitogen.context_id vars(self).update(kwargs) @@ -551,14 +586,28 @@ class Message(object): @property def is_dead(self): + """ + :data:`True` if :attr:`reply_to` is set to the magic value + :data:`IS_DEAD`, indicating the sender considers the channel dead. + """ return self.reply_to == IS_DEAD @classmethod def dead(cls, **kwargs): + """ + Syntax helper to construct a dead message. + """ return cls(reply_to=IS_DEAD, **kwargs) @classmethod def pickled(cls, obj, **kwargs): + """ + Construct a pickled message, setting :attr:`data` to the serialization + of `obj`, and setting remaining fields using `kwargs`. + + :returns: + The new message. + """ self = cls(**kwargs) try: self.data = pickle.dumps(obj, protocol=2) @@ -568,6 +617,18 @@ class Message(object): return self def reply(self, msg, router=None, **kwargs): + """ + Compose a reply to this message and send it using :attr:`router`, or + `router` is :attr:`router` is :data:`None`. + + :param obj: + Either a :class:`Message`, or an object to be serialized in order + to construct a new message. + :param router: + Optional router to use if :attr:`router` is :data:`None`. + :param kwargs: + Optional keyword parameters overriding message fields in the reply. + """ if not isinstance(msg, Message): msg = Message.pickled(msg) msg.dst_id = self.src_id @@ -584,7 +645,18 @@ class Message(object): UNPICKLER_KWARGS = {} def unpickle(self, throw=True, throw_dead=True): - """Deserialize `data` into an object.""" + """ + Unpickle :attr:`data`, optionally raising any exceptions present. + + :param bool throw_dead: + If :data:`True`, raise exceptions, otherwise it is the caller's + responsibility. + + :raises CallError: + The serialized data contained CallError exception. + :raises ChannelError: + The `is_dead` field was set. + """ _vv and IOLOG.debug('%r.unpickle()', self) if throw_dead and self.is_dead: raise ChannelError(ChannelError.remote_msg) @@ -616,26 +688,43 @@ class Message(object): class Sender(object): + """ + Senders are used to send pickled messages to a handle in another context, + it is the inverse of :class:`mitogen.core.Sender`. + + Senders may be serialized, making them convenient to wire up data flows. + See :meth:`mitogen.core.Receiver.to_sender` for more information. + + :param Context context: + Context to send messages to. + :param int dst_handle: + Destination handle to send messages to. + """ def __init__(self, context, dst_handle): self.context = context self.dst_handle = dst_handle + def send(self, data): + """ + Send `data` to the remote end. + """ + _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) + self.context.send(Message.pickled(data, handle=self.dst_handle)) + + def close(self): + """ + Send a dead message to the remote, causing :meth:`ChannelError` to be + raised in any waiting thread. + """ + _vv and IOLOG.debug('%r.close()', self) + self.context.send(Message.dead(handle=self.dst_handle)) + def __repr__(self): return 'Sender(%r, %r)' % (self.context, self.dst_handle) def __reduce__(self): return _unpickle_sender, (self.context.context_id, self.dst_handle) - def close(self): - """Indicate this channel is closed to the remote side.""" - _vv and IOLOG.debug('%r.close()', self) - self.context.send(Message.dead(handle=self.dst_handle)) - - def send(self, data): - """Send `data` to the remote.""" - _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) - self.context.send(Message.pickled(data, handle=self.dst_handle)) - def _unpickle_sender(router, context_id, dst_handle): if not (isinstance(router, Router) and @@ -646,12 +735,39 @@ def _unpickle_sender(router, context_id, dst_handle): class Receiver(object): + """ + Receivers maintain a thread-safe queue of messages sent to a handle of this + context from another context. + + :param mitogen.core.Router router: + Router to register the handler on. + + :param int handle: + If not :data:`None`, an explicit handle to register, otherwise an + unused handle is chosen. + + :param bool persist: + If :data:`False`, unregister the handler after one message is received. + Single-message receivers are intended for RPC-like transactions, such + as in the case of :meth:`mitogen.parent.Context.call_async`. + + :param mitogen.core.Context respondent: + Context this receiver is receiving from. If not :data:`None`, arranges + for the receiver to receive a dead message if messages can no longer be + routed to the context, due to disconnection or exit. + """ + #: If not :data:`None`, a reference to a function invoked as + #: `notify(receiver)` when a new message is delivered to this receiver. + #: Used by :class:`mitogen.select.Select` to implement waiting on multiple + #: receivers. notify = None + raise_channelerror = True def __init__(self, router, handle=None, persist=True, respondent=None, policy=None): self.router = router + #: The handle. self.handle = handle # Avoid __repr__ crash in add_handler() self._latch = Latch() # Must exist prior to .add_handler() self.handle = router.add_handler( @@ -666,26 +782,73 @@ class Receiver(object): return 'Receiver(%r, %r)' % (self.router, self.handle) def to_sender(self): + """ + Return a :class:`Sender` configured to deliver messages to this + receiver. As senders are serializable, this makes it convenient to pass + `(context_id, handle)` pairs around:: + + def deliver_monthly_report(sender): + for line in open('monthly_report.txt'): + sender.send(line) + sender.close() + + remote = router.ssh(hostname='mainframe') + recv = mitogen.core.Receiver(router) + remote.call(deliver_monthly_report, recv.to_sender()) + for msg in recv: + print(msg) + """ context = Context(self.router, mitogen.context_id) return Sender(context, self.handle) def _on_receive(self, msg): - """Callback from the Stream; appends data to the internal queue.""" + """ + Callback from the Stream; appends data to the internal queue. + """ _vv and IOLOG.debug('%r._on_receive(%r)', self, msg) self._latch.put(msg) if self.notify: self.notify(self) def close(self): + """ + Unregister the receiver's handle from its associated router, and cause + :class:`ChannelError` to be raised in any thread waiting in :meth:`get` + on this receiver. + """ if self.handle: self.router.del_handler(self.handle) self.handle = None self._latch.put(Message.dead()) def empty(self): + """ + Return :data:`True` if calling :meth:`get` would block. + + As with :class:`Queue.Queue`, :data:`True` may be returned even though + a subsequent call to :meth:`get` will succeed, since a message may be + posted at any moment between :meth:`empty` and :meth:`get`. + """ return self._latch.empty() def get(self, timeout=None, block=True, throw_dead=True): + """ + Sleep waiting for a message to arrive on this receiver. + + :param float timeout: + If not :data:`None`, specifies a timeout in seconds. + + :raises mitogen.core.ChannelError: + The remote end indicated the channel should be closed, or + communication with its parent context was lost. + + :raises mitogen.core.TimeoutError: + Timeout was reached. + + :returns: + `(msg, data)` tuple, where `msg` is the :class:`Message` that was + received, and `data` is its unpickled data part. + """ _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) if msg.is_dead and throw_dead: @@ -696,6 +859,10 @@ class Receiver(object): return msg def __iter__(self): + """ + Yield consecutive :class:`Message` instances delivered to this receiver + until :class:`ChannelError` is raised. + """ while True: msg = self.get(throw_dead=False) if msg.is_dead: @@ -1213,6 +1380,28 @@ class Stream(BasicStream): class Context(object): + """ + Represent a remote context regardless of the underlying connection method. + Context objects are simple facades that emit messages through an + associated router, and have :ref:`signals` raised against them in response + to various events relating to the context. + + **Note:** This is the somewhat limited core version, used by child + contexts. The master subclass is documented below this one. + + Contexts maintain no internal state and are thread-safe. + + Prefer :meth:`Router.context_by_id` over constructing context objects + explicitly, as that method is deduplicating, and returns the only context + instance :ref:`signals` will be raised on. + + :param Router router: + Router to emit messages through. + :param int context_id: + Context ID. + :param str name: + Context name. + """ remote_name = None def __init__(self, router, context_id, name=None): @@ -1231,6 +1420,23 @@ class Context(object): fire(self, 'disconnect') def send_async(self, msg, persist=False): + """ + Arrange for `msg` to be delivered to this context, with replies + directed to a newly constructed receiver. :attr:`dst_id + ` is set to the target context ID, and :attr:`reply_to + ` is set to the newly constructed receiver's handle. + + :param bool persist: + If :data:`False`, the handler will be unregistered after a single + message has been received. + + :param mitogen.core.Message msg: + The message. + + :returns: + :class:`Receiver` configured to receive any replies sent to the + message's `reply_to` handle. + """ if self.router.broker._thread == threading.currentThread(): # TODO raise SystemError('Cannot making blocking call on broker thread') @@ -1254,8 +1460,13 @@ class Context(object): return self.send_async(msg) def send(self, msg): - """send `obj` to `handle`, and tell the broker we have output. May - be called from any thread.""" + """ + Arrange for `msg` to be delivered to this context. :attr:`dst_id + ` is set to the target context ID. + + :param Message msg: + Message. + """ msg.dst_id = self.context_id self.router.route(msg) @@ -1264,7 +1475,19 @@ class Context(object): return recv.get().unpickle() def send_await(self, msg, deadline=None): - """Send `msg` and wait for a response with an optional timeout.""" + """ + Like :meth:`send_async`, but expect a single reply (`persist=False`) + delivered within `deadline` seconds. + + :param mitogen.core.Message msg: + The message. + :param float deadline: + If not :data:`None`, seconds before timing out waiting for a reply. + :returns: + Deserialized reply. + :raises TimeoutError: + No message was received and `deadline` passed. + """ receiver = self.send_async(msg) response = receiver.get(deadline) data = response.unpickle() @@ -1710,8 +1933,33 @@ class IoLogger(BasicStream): class Router(object): + """ + Route messages between contexts, and invoke local handlers for messages + addressed to this context. :meth:`Router.route() ` straddles the + :class:`Broker ` and user threads, it is safe to call + anywhere. + + **Note:** This is the somewhat limited core version of the Router class + used by child contexts. The master subclass is documented below this one. + """ context_class = Context max_message_size = 128 * 1048576 + + #: When :data:`True`, permit children to only communicate with the current + #: context or a parent of the current context. Routing between siblings or + #: children of parents is prohibited, ensuring no communication is possible + #: between intentionally partitioned networks, such as when a program + #: simultaneously manipulates hosts spread across a corporate and a + #: production network, or production networks that are otherwise + #: air-gapped. + #: + #: Sending a prohibited message causes an error to be logged and a dead + #: message to be sent in reply to the errant message, if that message has + #: ``reply_to`` set. + #: + #: The value of :data:`unidirectional` becomes the default for the + #: :meth:`local() ` `unidirectional` + #: parameter. unidirectional = False def __init__(self, broker): @@ -1751,7 +1999,7 @@ class Router(object): fire(self._context_by_id[target_id], 'disconnect') - def on_stream_disconnect(self, stream): + def _on_stream_disconnect(self, stream): for context in self._context_by_id.values(): stream_ = self._stream_by_id.get(context.context_id) if stream_ is stream: @@ -1764,6 +2012,10 @@ class Router(object): func(Message.dead()) def context_by_id(self, context_id, via_id=None, create=True, name=None): + """ + Messy factory/lookup function to find a context by its ID, or construct + it. In future this will be replaced by a much more sensible interface. + """ context = self._context_by_id.get(context_id) if create and not context: context = self.context_class(self, context_id, name=name) @@ -1773,21 +2025,85 @@ class Router(object): return context def register(self, context, stream): + """ + Register a newly constructed context and its associated stream, and add + the stream's receive side to the I/O multiplexer. This method remains + public while the design has not yet settled. + """ _v and LOG.debug('register(%r, %r)', context, stream) self._stream_by_id[context.context_id] = stream self._context_by_id[context.context_id] = context self.broker.start_receive(stream) - listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream)) + listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream)) def stream_by_id(self, dst_id): + """ + Return the :class:`Stream` that should be used to communicate with + `dst_id`. If a specific route for `dst_id` is not known, a reference to + the parent context's stream is returned. + """ return self._stream_by_id.get(dst_id, self._stream_by_id.get(mitogen.parent_id)) def del_handler(self, handle): + """ + Remove the handle registered for `handle` + + :raises KeyError: + The handle wasn't registered. + """ del self._handle_map[handle] def add_handler(self, fn, handle=None, persist=True, policy=None, respondent=None): + """ + Invoke `fn(msg)` for each Message sent to `handle` from this context. + Unregister after one invocation if `persist` is :data:`False`. If + `handle` is :data:`None`, a new handle is allocated and returned. + + :param int handle: + If not :data:`None`, an explicit handle to register, usually one of + the ``mitogen.core.*`` constants. If unspecified, a new unused + handle will be allocated. + + :param bool persist: + If :data:`False`, the handler will be unregistered after a single + message has been received. + + :param Context respondent: + Context that messages to this handle are expected to be sent from. + If specified, arranges for a dead message to be delivered to `fn` + when disconnection of the context is detected. + + In future `respondent` will likely also be used to prevent other + contexts from sending messages to the handle. + + :param function policy: + Function invoked as `policy(msg, stream)` where `msg` is a + :class:`mitogen.core.Message` about to be delivered, and `stream` + is the :class:`mitogen.core.Stream` on which it was received. The + function must return :data:`True`, otherwise an error is logged and + delivery is refused. + + Two built-in policy functions exist: + + * :func:`has_parent_authority`: requires the message arrived from a + parent context, or a context acting with a parent context's + authority (``auth_id``). + + * :func:`mitogen.parent.is_immediate_child`: requires the + message arrived from an immediately connected child, for use in + messaging patterns where either something becomes buggy or + insecure by permitting indirect upstream communication. + + In case of refusal, and the message's ``reply_to`` field is + nonzero, a :class:`mitogen.core.CallError` is delivered to the + sender indicating refusal occurred. + + :return: + `handle`, or if `handle` was :data:`None`, the newly allocated + handle. + """ handle = handle or next(self._last_handle) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) @@ -1848,6 +2164,19 @@ class Router(object): msg.reply(Message.dead(), router=self) def _async_route(self, msg, in_stream=None): + """ + Arrange for `msg` to be forwarded towards its destination. If its + destination is the local context, then arrange for it to be dispatched + using the local handlers. + + This is a lower overhead version of :meth:`route` that may only be + called from the I/O multiplexer thread. + + :param Stream in_stream: + If not :data:`None`, the stream the message arrived on. Used for + performing source route verification, to ensure sensitive messages + such as ``CALL_FUNCTION`` arrive only from trusted contexts. + """ _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) if len(msg.data) > self.max_message_size: @@ -1902,6 +2231,15 @@ class Router(object): out_stream._send(msg) def route(self, msg): + """ + Arrange for the :class:`Message` `msg` to be delivered to its + destination using any relevant downstream context, or if none is found, + by forwarding the message upstream towards the master context. If `msg` + is destined for the local context, it is dispatched using the handles + registered with :meth:`add_handler`. + + This may be called from any thread. + """ self.broker.defer(self._async_route, msg) diff --git a/mitogen/kubectl.py b/mitogen/kubectl.py index c2be24c1..681da3b1 100644 --- a/mitogen/kubectl.py +++ b/mitogen/kubectl.py @@ -1,4 +1,3 @@ -# coding: utf-8 # Copyright 2018, Yannig Perré # # Redistribution and use in source and binary forms, with or without diff --git a/mitogen/parent.py b/mitogen/parent.py index 78a62380..a11b362e 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -618,6 +618,7 @@ class EofError(mitogen.core.StreamError): the child process. """ # inherits from StreamError to maintain compatibility. + pass class Argv(object): @@ -1390,7 +1391,16 @@ class CallChain(object): class Context(mitogen.core.Context): + """ + Extend :class:`mitogen.core.Context` with functionality useful to masters, + and child contexts who later become parents. Currently when this class is + required, the target context's router is upgraded at runtime. + """ + #: A :class:`CallChain` instance constructed by default, with pipelining + #: disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` use + #: this instance. call_chain_class = CallChain + via = None def __init__(self, *args, **kwargs): @@ -1406,15 +1416,41 @@ class Context(mitogen.core.Context): return hash((self.router, self.context_id)) def call_async(self, fn, *args, **kwargs): + """ + See :meth:`CallChain.call_async`. + """ return self.default_call_chain.call_async(fn, *args, **kwargs) def call(self, fn, *args, **kwargs): + """ + See :meth:`CallChain.call`. + """ return self.default_call_chain.call(fn, *args, **kwargs) def call_no_reply(self, fn, *args, **kwargs): + """ + See :meth:`CallChain.call_no_reply`. + """ self.default_call_chain.call_no_reply(fn, *args, **kwargs) def shutdown(self, wait=False): + """ + Arrange for the context to receive a ``SHUTDOWN`` message, triggering + graceful shutdown. + + Due to a lack of support for timers, no attempt is made yet to force + terminate a hung context using this method. This will be fixed shortly. + + :param bool wait: + If :data:`True`, block the calling thread until the context has + completely terminated. + + :returns: + If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch` + whose :meth:`get() ` method returns + :data:`None` when shutdown completes. The `timeout` parameter may + be used to implement graceful timeouts. + """ LOG.debug('%r.shutdown() sending SHUTDOWN', self) latch = mitogen.core.Latch() mitogen.core.listen(self, 'disconnect', lambda: latch.put(None)) @@ -1666,6 +1702,13 @@ class Router(mitogen.core.Router): msg.reply(None) def add_route(self, target_id, stream): + """ + Arrange for messages whose `dst_id` is `target_id` to be forwarded on + the directly connected stream for `via_id`. This method is called + automatically in response to :data:`mitogen.core.ADD_ROUTE` messages, + but remains public while the design has not yet settled, and situations + may arise where routing is not fully automatic. + """ LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) assert isinstance(target_id, int) assert isinstance(stream, Stream) diff --git a/mitogen/select.py b/mitogen/select.py index ce4023a9..6d46c336 100644 --- a/mitogen/select.py +++ b/mitogen/select.py @@ -34,11 +34,57 @@ class Error(mitogen.core.Error): class Select(object): - notify = None + """ + Support scatter/gather asynchronous calls and waiting on multiple + receivers, channels, and sub-Selects. Accepts a sequence of + :class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances + and returns the first value posted to any receiver or select. - @classmethod - def all(cls, receivers): - return list(msg.unpickle() for msg in cls(receivers)) + If `oneshot` is :data:`True`, then remove each receiver as it yields a + result; since :meth:`__iter__` terminates once the final receiver is + removed, this makes it convenient to respond to calls made in parallel: + + .. code-block:: python + + total = 0 + recvs = [c.call_async(long_running_operation) for c in contexts] + + for msg in mitogen.select.Select(recvs): + print('Got %s from %s' % (msg, msg.receiver)) + total += msg.unpickle() + + # Iteration ends when last Receiver yields a result. + print('Received total %s from %s receivers' % (total, len(recvs))) + + :class:`Select` may drive a long-running scheduler: + + .. code-block:: python + + with mitogen.select.Select(oneshot=False) as select: + while running(): + for msg in select: + process_result(msg.receiver.context, msg.unpickle()) + for context, workfunc in get_new_work(): + select.add(context.call_async(workfunc)) + + :class:`Select` may be nested: + + .. code-block:: python + + subselects = [ + mitogen.select.Select(get_some_work()), + mitogen.select.Select(get_some_work()), + mitogen.select.Select([ + mitogen.select.Select(get_some_work()), + mitogen.select.Select(get_some_work()) + ]) + ] + + for msg in mitogen.select.Select(selects): + print(msg.unpickle()) + """ + + notify = None def __init__(self, receivers=(), oneshot=True): self._receivers = [] @@ -47,12 +93,46 @@ class Select(object): for recv in receivers: self.add(recv) + @classmethod + def all(cls, receivers): + """ + Take an iterable of receivers and retrieve a :class:`Message` from + each, returning the result of calling `msg.unpickle()` on each in turn. + Results are returned in the order they arrived. + + This is sugar for handling batch :meth:`Context.call_async + ` invocations: + + .. code-block:: python + + print('Total disk usage: %.02fMiB' % (sum( + mitogen.select.Select.all( + context.call_async(get_disk_usage) + for context in contexts + ) / 1048576.0 + ),)) + + However, unlike in a naive comprehension such as: + + .. code-block:: python + + recvs = [c.call_async(get_disk_usage) for c in contexts] + sum(recv.get().unpickle() for recv in recvs) + + Result processing happens in the order results arrive, rather than the + order requests were issued, so :meth:`all` should always be faster. + """ + return list(msg.unpickle() for msg in cls(receivers)) + def _put(self, value): self._latch.put(value) if self.notify: self.notify(self) def __bool__(self): + """ + Return :data:`True` if any receivers are registered with this select. + """ return bool(self._receivers) def __enter__(self): @@ -62,6 +142,11 @@ class Select(object): self.close() def __iter__(self): + """ + Yield the result of :meth:`get` until no receivers remain in the + select, either because `oneshot` is :data:`True`, or each receiver was + explicitly removed via :meth:`remove`. + """ while self._receivers: yield self.get() @@ -80,6 +165,14 @@ class Select(object): owned_msg = 'Cannot add: Receiver is already owned by another Select' def add(self, recv): + """ + Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the + select. + + :raises mitogen.select.Error: + An attempt was made to add a :class:`Select` to which this select + is indirectly a member of. + """ if isinstance(recv, Select): recv._check_no_loop(self) @@ -95,6 +188,12 @@ class Select(object): not_present_msg = 'Instance is not a member of this Select' def remove(self, recv): + """ + Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv` + from the select. Note that if the receiver has notified prior to + :meth:`remove`, it will still be returned by a subsequent :meth:`get`. + This may change in a future version. + """ try: if recv.notify != self._put: raise ValueError @@ -104,16 +203,59 @@ class Select(object): raise Error(self.not_present_msg) def close(self): + """ + Remove the select's notifier function from each registered receiver, + mark the associated latch as closed, and cause any thread currently + sleeping in :meth:`get` to be woken with + :class:`mitogen.core.LatchError`. + + This is necessary to prevent memory leaks in long-running receivers. It + is called automatically when the Python :keyword:`with` statement is + used. + """ for recv in self._receivers[:]: self.remove(recv) self._latch.close() def empty(self): + """ + Return :data:`True` if calling :meth:`get` would block. + + As with :class:`Queue.Queue`, :data:`True` may be returned even though + a subsequent call to :meth:`get` will succeed, since a message may be + posted at any moment between :meth:`empty` and :meth:`get`. + + :meth:`empty` may return :data:`False` even when :meth:`get` would + block if another thread has drained a receiver added to this select. + This can be avoided by only consuming each receiver from a single + thread. + """ return self._latch.empty() empty_msg = 'Cannot get(), Select instance is empty' def get(self, timeout=None, block=True): + """ + Fetch the next available value from any receiver, or raise + :class:`mitogen.core.TimeoutError` if no value is available within + `timeout` seconds. + + On success, the message's :attr:`receiver + ` attribute is set to the receiver. + + :param float timeout: + Timeout in seconds. + :param bool block: + If :data:`False`, immediately raise + :class:`mitogen.core.TimeoutError` if the select is empty. + :return: + :class:`mitogen.core.Message` + :raises mitogen.core.TimeoutError: + Timeout was reached. + :raises mitogen.core.LatchError: + :meth:`close` has been called, and the underlying latch is no + longer valid. + """ if not self._receivers: raise Error(self.empty_msg) diff --git a/mitogen/service.py b/mitogen/service.py index ffb7308e..dc90b67f 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -395,11 +395,13 @@ class Service(object): Called when a message arrives on any of :attr:`select`'s registered receivers. """ + pass def on_shutdown(self): """ Called by Pool.shutdown() once the last worker thread has exitted. """ + pass class Pool(object): diff --git a/mitogen/utils.py b/mitogen/utils.py index 4fd80aa1..31669ea9 100644 --- a/mitogen/utils.py +++ b/mitogen/utils.py @@ -46,6 +46,11 @@ else: def disable_site_packages(): + """ + Remove all entries mentioning ``site-packages`` or ``Extras`` from + :attr:sys.path. Used primarily for testing on OS X within a virtualenv, + where OS X bundles some ancient version of the :mod:`six` module. + """ for entry in sys.path[:]: if 'site-packages' in entry or 'Extras' in entry: sys.path.remove(entry) @@ -65,6 +70,26 @@ def log_get_formatter(): def log_to_file(path=None, io=False, level='INFO'): + """ + Install a new :class:`logging.Handler` writing applications logs to the + filesystem. Useful when debugging slave IO problems. + + Parameters to this function may be overridden at runtime using environment + variables. See :ref:`logging-env-vars`. + + :param str path: + If not :data:`None`, a filesystem path to write logs to. Otherwise, + logs are written to :data:`sys.stderr`. + + :param bool io: + If :data:`True`, include extremely verbose IO logs in the output. + Useful for debugging hangs, less useful for debugging application code. + + :param str level: + Name of the :mod:`logging` package constant that is the minimum level + to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``, and + ``ERROR``. + """ log = logging.getLogger('') if path: fp = open(path, 'w', 1) @@ -94,6 +119,14 @@ def log_to_file(path=None, io=False, level='INFO'): def run_with_router(func, *args, **kwargs): + """ + Arrange for `func(router, \*args, \**kwargs)` to run with a temporary + :class:`mitogen.master.Router`, ensuring the Router and Broker are + correctly shut down during normal or exceptional return. + + :returns: + `func`'s return value. + """ broker = mitogen.master.Broker() router = mitogen.master.Router(broker) try: @@ -104,6 +137,17 @@ def run_with_router(func, *args, **kwargs): def with_router(func): + """ + Decorator version of :func:`run_with_router`. Example: + + .. code-block:: python + + @with_router + def do_stuff(router, arg): + pass + + do_stuff(blah, 123) + """ def wrapper(*args, **kwargs): return run_with_router(func, *args, **kwargs) if mitogen.core.PY3: @@ -122,7 +166,27 @@ PASSTHROUGH = ( mitogen.core.Secret, ) + def cast(obj): + """ + Many tools love to subclass built-in types in order to implement useful + functionality, such as annotating the safety of a Unicode string, or adding + additional methods to a dict. However, cPickle loves to preserve those + subtypes during serialization, resulting in CallError during :meth:`call + ` in the target when it tries to deserialize + the data. + + This function walks the object graph `obj`, producing a copy with any + custom sub-types removed. The functionality is not default since the + resulting walk may be computationally expensive given a large enough graph. + + See :ref:`serialization-rules` for a list of supported types. + + :param obj: + Object to undecorate. + :returns: + Undecorated object. + """ if isinstance(obj, dict): return dict((cast(k), cast(v)) for k, v in iteritems(obj)) if isinstance(obj, (list, tuple)): diff --git a/tests/ansible/integration/stub_connections/all.yml b/tests/ansible/integration/stub_connections/all.yml index 5a3f37cf..a9744ab7 100644 --- a/tests/ansible/integration/stub_connections/all.yml +++ b/tests/ansible/integration/stub_connections/all.yml @@ -1,6 +1,7 @@ - import_playbook: kubectl.yml - import_playbook: lxc.yml - import_playbook: lxd.yml +- import_playbook: mitogen_doas.yml +- import_playbook: mitogen_sudo.yml - import_playbook: setns_lxc.yml - import_playbook: setns_lxd.yml -- import_playbook: sudo.yml diff --git a/tests/ansible/integration/stub_connections/mitogen_doas.yml b/tests/ansible/integration/stub_connections/mitogen_doas.yml new file mode 100644 index 00000000..40d4f4b0 --- /dev/null +++ b/tests/ansible/integration/stub_connections/mitogen_doas.yml @@ -0,0 +1,21 @@ + +- name: integration/stub_connections/mitogen_doas.yml + hosts: test-targets + gather_facts: false + any_errors_fatal: true + tasks: + - meta: end_play + when: not is_mitogen + + - custom_python_detect_environment: + vars: + ansible_connection: mitogen_doas + ansible_become_exe: stub-doas.py + ansible_user: someuser + register: out + + - debug: var=out.env.ORIGINAL_ARGV + - assert: + that: + - out.env.THIS_IS_STUB_DOAS == '1' + - (out.env.ORIGINAL_ARGV|from_json)[1:3] == ['-u', 'someuser'] diff --git a/tests/ansible/integration/stub_connections/sudo.yml b/tests/ansible/integration/stub_connections/mitogen_sudo.yml similarity index 90% rename from tests/ansible/integration/stub_connections/sudo.yml rename to tests/ansible/integration/stub_connections/mitogen_sudo.yml index b5e6f263..b82b3ac2 100644 --- a/tests/ansible/integration/stub_connections/sudo.yml +++ b/tests/ansible/integration/stub_connections/mitogen_sudo.yml @@ -1,5 +1,5 @@ -- name: integration/stub_connections/sudo.yml +- name: integration/stub_connections/mitogen_sudo.yml hosts: test-targets gather_facts: false any_errors_fatal: true diff --git a/tests/data/stubs/stub-doas.py b/tests/data/stubs/stub-doas.py new file mode 100755 index 00000000..08caf044 --- /dev/null +++ b/tests/data/stubs/stub-doas.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +import json +import os +import sys + +os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) +os.environ['THIS_IS_STUB_DOAS'] = '1' +os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:]) diff --git a/tests/doas_test.py b/tests/doas_test.py new file mode 100644 index 00000000..0e27c2ab --- /dev/null +++ b/tests/doas_test.py @@ -0,0 +1,31 @@ + +import os + +import mitogen +import mitogen.parent + +import unittest2 + +import testlib + + +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): + doas_path = testlib.data_path('stubs/stub-doas.py') + + def test_okay(self): + context = self.router.doas( + doas_path=self.doas_path, + username='someuser', + ) + argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV')) + self.assertEquals(argv[:4], [ + self.doas_path, + '-u', + 'someuser', + '--', + ]) + self.assertEquals('1', context.call(os.getenv, 'THIS_IS_STUB_DOAS')) + + +if __name__ == '__main__': + unittest2.main() diff --git a/tests/minify_test.py b/tests/minify_test.py new file mode 100644 index 00000000..98307059 --- /dev/null +++ b/tests/minify_test.py @@ -0,0 +1,106 @@ +import codecs +import glob +import pprint + +import unittest2 + +import mitogen.minify +import testlib + + +def read_sample(fname): + sample_path = testlib.data_path('minimize_samples/' + fname) + sample_file = open(sample_path) + sample = sample_file.read() + sample_file.close() + return sample + + +class MinimizeSourceTest(unittest2.TestCase): + func = staticmethod(mitogen.minify.minimize_source) + + def test_class(self): + original = read_sample('class.py') + expected = read_sample('class_min.py') + self.assertEqual(expected, self.func(original)) + + def test_comment(self): + original = read_sample('comment.py') + expected = read_sample('comment_min.py') + self.assertEqual(expected, self.func(original)) + + def test_def(self): + original = read_sample('def.py') + expected = read_sample('def_min.py') + self.assertEqual(expected, self.func(original)) + + def test_hashbang(self): + original = read_sample('hashbang.py') + expected = read_sample('hashbang_min.py') + self.assertEqual(expected, self.func(original)) + + def test_mod(self): + original = read_sample('mod.py') + expected = read_sample('mod_min.py') + self.assertEqual(expected, self.func(original)) + + def test_pass(self): + original = read_sample('pass.py') + expected = read_sample('pass_min.py') + self.assertEqual(expected, self.func(original)) + + def test_obstacle_course(self): + original = read_sample('obstacle_course.py') + expected = read_sample('obstacle_course_min.py') + self.assertEqual(expected, self.func(original)) + + +class MitogenCoreTest(unittest2.TestCase): + # Verify minimize_source() succeeds for all built-in modules. + func = staticmethod(mitogen.minify.minimize_source) + + def read_source(self, name): + fp = codecs.open(name, encoding='utf-8') + try: + return fp.read() + finally: + fp.close() + + def _test_syntax_valid(self, minified, name): + compile(minified, name, 'exec') + + def _test_line_counts_match(self, original, minified): + self.assertEquals(original.count('\n'), + minified.count('\n')) + + def _test_non_blank_lines_match(self, name, original, minified): + # Verify first token matches. We just want to ensure line numbers make + # sense, this is good enough. + olines = original.splitlines() + mlines = minified.splitlines() + for i, (orig, mini) in enumerate(zip(olines, mlines)): + if i < 2: + assert orig == mini + continue + + owords = orig.split() + mwords = mini.split() + assert len(mwords) == 0 or (mwords[0] == owords[0]), pprint.pformat({ + 'line': i+1, + 'filename': name, + 'owords': owords, + 'mwords': mwords, + }) + + def test_minify_all(self): + for name in glob.glob('mitogen/*.py'): + original = self.read_source(name) + minified = self.func(original) + + self._test_syntax_valid(minified, name) + self._test_line_counts_match(original, minified) + self._test_non_blank_lines_match(name, original, minified) + + +if __name__ == '__main__': + unittest2.main() diff --git a/tests/minimize_source_test.py b/tests/minimize_source_test.py deleted file mode 100644 index b98cdebd..00000000 --- a/tests/minimize_source_test.py +++ /dev/null @@ -1,55 +0,0 @@ -import unittest2 - -import mitogen.minify -import testlib - - -def read_sample(fname): - sample_path = testlib.data_path('minimize_samples/' + fname) - sample_file = open(sample_path) - sample = sample_file.read() - sample_file.close() - return sample - - -class MinimizeSource(unittest2.TestCase): - func = staticmethod(mitogen.minify.minimize_source) - - def test_class(self): - original = read_sample('class.py') - expected = read_sample('class_min.py') - self.assertEqual(expected, self.func(original)) - - def test_comment(self): - original = read_sample('comment.py') - expected = read_sample('comment_min.py') - self.assertEqual(expected, self.func(original)) - - def test_def(self): - original = read_sample('def.py') - expected = read_sample('def_min.py') - self.assertEqual(expected, self.func(original)) - - def test_hashbang(self): - original = read_sample('hashbang.py') - expected = read_sample('hashbang_min.py') - self.assertEqual(expected, self.func(original)) - - def test_mod(self): - original = read_sample('mod.py') - expected = read_sample('mod_min.py') - self.assertEqual(expected, self.func(original)) - - def test_pass(self): - original = read_sample('pass.py') - expected = read_sample('pass_min.py') - self.assertEqual(expected, self.func(original)) - - def test_obstacle_course(self): - original = read_sample('obstacle_course.py') - expected = read_sample('obstacle_course_min.py') - self.assertEqual(expected, self.func(original)) - - -if __name__ == '__main__': - unittest2.main()