issue #369: refactor ContextService to support reset().
This commit is contained in:
parent
519faa3b3b
commit
9b7c958e2e
|
@ -146,6 +146,25 @@ class ContextService(mitogen.service.Service):
|
|||
self._lru_by_via = {}
|
||||
#: :func:`key_from_dict` result by Context.
|
||||
self._key_by_context = {}
|
||||
#: Mapping of Context -> parent Context
|
||||
self._via_by_context = {}
|
||||
|
||||
@mitogen.service.expose(mitogen.service.AllowParents())
|
||||
@mitogen.service.arg_spec({
|
||||
'context': mitogen.core.Context
|
||||
})
|
||||
def reset(self, context):
|
||||
"""
|
||||
Return a reference, forcing close and discard of the underlying
|
||||
connection. Used for 'meta: reset_connection' or when some other error
|
||||
is detected.
|
||||
"""
|
||||
LOG.debug('%r.reset(%r)', self, context)
|
||||
self._lock.acquire()
|
||||
try:
|
||||
self._shutdown_unlocked(context)
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
@mitogen.service.expose(mitogen.service.AllowParents())
|
||||
@mitogen.service.arg_spec({
|
||||
|
@ -189,6 +208,19 @@ class ContextService(mitogen.service.Service):
|
|||
self._lock.release()
|
||||
return count
|
||||
|
||||
def _forget_context_unlocked(self, context):
|
||||
key = self._key_by_context.get(context)
|
||||
if key is None:
|
||||
LOG.debug('%r: attempt to forget unknown %r', self, context)
|
||||
return
|
||||
|
||||
self._response_by_key.pop(key, None)
|
||||
self._latches_by_key.pop(key, None)
|
||||
self._key_by_context.pop(context, None)
|
||||
self._refs_by_context.pop(context, None)
|
||||
self._via_by_context.pop(context, None)
|
||||
self._lru_by_via.pop(context, None)
|
||||
|
||||
def _shutdown_unlocked(self, context, lru=None, new_context=None):
|
||||
"""
|
||||
Arrange for `context` to be shut down, and optionally add `new_context`
|
||||
|
@ -196,15 +228,15 @@ class ContextService(mitogen.service.Service):
|
|||
"""
|
||||
LOG.info('%r._shutdown_unlocked(): shutting down %r', self, context)
|
||||
context.shutdown()
|
||||
|
||||
key = self._key_by_context[context]
|
||||
del self._response_by_key[key]
|
||||
del self._refs_by_context[context]
|
||||
del self._key_by_context[context]
|
||||
if lru and context in lru:
|
||||
lru.remove(context)
|
||||
if new_context:
|
||||
lru.append(new_context)
|
||||
via = self._via_by_context.get(context)
|
||||
if via:
|
||||
lru = self._lru_by_via.get(via)
|
||||
if lru:
|
||||
if context in lru:
|
||||
lru.remove(context)
|
||||
if new_context:
|
||||
lru.append(new_context)
|
||||
self._forget_context_unlocked(context)
|
||||
|
||||
def _update_lru_unlocked(self, new_context, spec, via):
|
||||
"""
|
||||
|
@ -225,6 +257,7 @@ class ContextService(mitogen.service.Service):
|
|||
'but they are all marked as in-use.', via)
|
||||
return
|
||||
|
||||
self._via_by_context[new_context] = via
|
||||
self._shutdown_unlocked(context, lru=lru, new_context=new_context)
|
||||
|
||||
def _update_lru(self, new_context, spec, via):
|
||||
|
@ -243,7 +276,6 @@ class ContextService(mitogen.service.Service):
|
|||
try:
|
||||
for context in list(self._key_by_context):
|
||||
self._shutdown_unlocked(context)
|
||||
self._lru_by_via = {}
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
|
@ -259,15 +291,11 @@ class ContextService(mitogen.service.Service):
|
|||
self._lock.acquire()
|
||||
try:
|
||||
routes = self.router.route_monitor.get_routes(stream)
|
||||
for context, key in list(self._key_by_context.items()):
|
||||
for context in list(self._key_by_context):
|
||||
if context.context_id in routes:
|
||||
LOG.info('Dropping %r due to disconnect of %r',
|
||||
context, stream)
|
||||
self._response_by_key.pop(key, None)
|
||||
self._latches_by_key.pop(key, None)
|
||||
self._refs_by_context.pop(context, None)
|
||||
self._lru_by_via.pop(context, None)
|
||||
self._refs_by_context.pop(context, None)
|
||||
self._forget_context_unlocked(context)
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
|
|
Loading…
Reference in New Issue