diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index ba8d5eeb..3123b37f 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -78,16 +78,23 @@ class ContextService(mitogen.service.Service): def __init__(self, *args, **kwargs): super(ContextService, self).__init__(*args, **kwargs) - #: JoinPoint for context responses. self._lock = threading.Lock() + #: Records the :meth:`get` result dict for successful calls, returned + #: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`. self._response_by_key = {} + #: List of :class:`mitogen.core.Message` waiting for the result dict + #: for a particular connection config. Keyed as sbove. self._waiters_by_key = {} - #: Active context count. + #: Mapping of :class:`mitogen.core.Context` -> reference count. Each + #: call to :meth:`get` increases this by one. Calls to :meth:`put` + #: decrease it by one. self._refs_by_context = {} - #: List of contexts in creation order by via= parameter. - self._lru_by_via = {} - #: (method_name, kwargs) pairs by Conetxt - self._cfg_by_context = {} + #: List of contexts in creation order by via= parameter. When + #: :attr:`max_interpreters` is reached, the most recently used context + #: is destroyed to make room for any additional context. + self._update_lru_by_via = {} + #: :meth:`key_from_kwargs` result by Context. + self._key_by_context = {} @mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.arg_spec({ @@ -111,6 +118,17 @@ class ContextService(mitogen.service.Service): return pprint.pformat(kwargs) def _produce_response(self, key, response): + """ + Reply to every waiting request matching a configuration key with a + response dictionary, deleting the list of waiters when done. + + :param str key: + Result of :meth:`key_from_kwargs` + :param dict response: + Response dictionary + :returns: + Number of waiters that were replied to. + """ self._lock.acquire() try: waiters = self._waiters_by_key.pop(key) @@ -121,13 +139,18 @@ class ContextService(mitogen.service.Service): self._lock.release() return count - def _lru(self, new_context, **kwargs): + def _update_lru(self, new_context, **kwargs): + """ + Update the LRU ("MRU"?) list associated with the connection described + by `kwargs`, destroying the most recently created context if the list + is full. Finally add `new_context` to the list. + """ via = kwargs.get('via') if via is None: # We don't have a limit on the number of directly connections. return - lru = self._lru_by_via.setdefault(via, []) + lru = self._update_lru_by_via.setdefault(via, []) if len(lru) < self.max_interpreters: lru.append(new_context) return @@ -143,20 +166,44 @@ class ContextService(mitogen.service.Service): LOG.info('%r._discard_one(): shutting down %r', self, context) context.shutdown() - method_name, kwargs = self._cfg_by_context[context] - key = self.key_from_kwargs(method_name=method_name, **kwargs) + key = self._key_by_context[context] self._lock.acquire() try: del self._response_by_key[key] del self._refs_by_context[context] - del self._cfg_by_context[context] + del self._key_by_context[context] lru.remove(context) lru.append(new_context) finally: self._lock.release() - def _connect(self, method_name, **kwargs): + def _connect(self, key, method_name, **kwargs): + """ + Actual connect implementation. Arranges for the Mitogen connection to + be created and enqueues an asynchronous call to start the forked task + parent in the remote context. + + :param key: + Deduplication key representing the connection configuration. + :param method_name: + :class:`mitogen.parent.Router` method implementing the connection + type. + :param kwargs: + Keyword arguments passed to the router method. + :returns: + Dict like:: + + { + 'context': mitogen.core.Context or None, + 'home_dir': str or None, + 'msg': str or None + } + + Where either `msg` is an error message and the remaining fields are + :data:`None`, or `msg` is :data:`None` and the remaining fields are + set. + """ method = getattr(self.router, method_name, None) if method is None: raise Error('no such Router method: %s' % (method_name,)) @@ -171,13 +218,13 @@ class ContextService(mitogen.service.Service): } if kwargs.get('via'): - self._lru(context, method_name=method_name, **kwargs) + self._update_lru(context, method_name=method_name, **kwargs) home_dir = context.call(os.path.expanduser, '~') # We don't need to wait for the result of this. Ideally we'd check its # return value somewhere, but logs will catch a failure anyway. context.call_async(ansible_mitogen.target.start_fork_parent) - self._cfg_by_context[context] = (method_name, kwargs) + self._key_by_context[context] = key self._refs_by_context[context] = 0 return { 'context': context, @@ -222,13 +269,14 @@ class ContextService(mitogen.service.Service): finally: self._lock.release() - # I'm the first thread to wait on a result, so I will create the - # connection. + # I'm the first thread to wait, so I will create the connection. try: - response = self._connect(**kwargs) + response = self._connect(key, **kwargs) count = self._produce_response(key, response) if response['msg'] is None: + # Only record the response for non-error results. self._response_by_key[key] = response + # Set the reference count to the number of waiters. self._refs_by_context[response['context']] += count except mitogen.core.CallError: e = sys.exc_info()[1]