diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index f9a1a3df..dde44c89 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -146,6 +146,25 @@ class ContextService(mitogen.service.Service): self._lru_by_via = {} #: :func:`key_from_dict` result by Context. self._key_by_context = {} + #: Mapping of Context -> parent Context + self._via_by_context = {} + + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'context': mitogen.core.Context + }) + def reset(self, context): + """ + Return a reference, forcing close and discard of the underlying + connection. Used for 'meta: reset_connection' or when some other error + is detected. + """ + LOG.debug('%r.reset(%r)', self, context) + self._lock.acquire() + try: + self._shutdown_unlocked(context) + finally: + self._lock.release() @mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.arg_spec({ @@ -189,6 +208,19 @@ class ContextService(mitogen.service.Service): self._lock.release() return count + def _forget_context_unlocked(self, context): + key = self._key_by_context.get(context) + if key is None: + LOG.debug('%r: attempt to forget unknown %r', self, context) + return + + self._response_by_key.pop(key, None) + self._latches_by_key.pop(key, None) + self._key_by_context.pop(context, None) + self._refs_by_context.pop(context, None) + self._via_by_context.pop(context, None) + self._lru_by_via.pop(context, None) + def _shutdown_unlocked(self, context, lru=None, new_context=None): """ Arrange for `context` to be shut down, and optionally add `new_context` @@ -196,15 +228,15 @@ class ContextService(mitogen.service.Service): """ LOG.info('%r._shutdown_unlocked(): shutting down %r', self, context) context.shutdown() - - key = self._key_by_context[context] - del self._response_by_key[key] - del self._refs_by_context[context] - del self._key_by_context[context] - if lru and context in lru: - lru.remove(context) - if new_context: - lru.append(new_context) + via = self._via_by_context.get(context) + if via: + lru = self._lru_by_via.get(via) + if lru: + if context in lru: + lru.remove(context) + if new_context: + lru.append(new_context) + self._forget_context_unlocked(context) def _update_lru_unlocked(self, new_context, spec, via): """ @@ -225,6 +257,7 @@ class ContextService(mitogen.service.Service): 'but they are all marked as in-use.', via) return + self._via_by_context[new_context] = via self._shutdown_unlocked(context, lru=lru, new_context=new_context) def _update_lru(self, new_context, spec, via): @@ -243,7 +276,6 @@ class ContextService(mitogen.service.Service): try: for context in list(self._key_by_context): self._shutdown_unlocked(context) - self._lru_by_via = {} finally: self._lock.release() @@ -259,15 +291,11 @@ class ContextService(mitogen.service.Service): self._lock.acquire() try: routes = self.router.route_monitor.get_routes(stream) - for context, key in list(self._key_by_context.items()): + for context in list(self._key_by_context): if context.context_id in routes: LOG.info('Dropping %r due to disconnect of %r', context, stream) - self._response_by_key.pop(key, None) - self._latches_by_key.pop(key, None) - self._refs_by_context.pop(context, None) - self._lru_by_via.pop(context, None) - self._refs_by_context.pop(context, None) + self._forget_context_unlocked(context) finally: self._lock.release()