core: fix add_handler(respondent=..) memory leak

Closes #416.
This commit is contained in:
David Wilson 2018-11-04 14:36:44 +00:00
parent 4ac9cdce7c
commit c09780aeb0
1 changed files with 33 additions and 21 deletions

View File

@ -2087,6 +2087,8 @@ class Router(object):
self._last_handle = itertools.count(1000) self._last_handle = itertools.count(1000)
#: handle -> (persistent?, func(msg)) #: handle -> (persistent?, func(msg))
self._handle_map = {} self._handle_map = {}
#: Context -> set { handle, .. }
self._handles_by_respondent = {}
self.add_handler(self._on_del_route, DEL_ROUTE) self.add_handler(self._on_del_route, DEL_ROUTE)
def __repr__(self): def __repr__(self):
@ -2117,7 +2119,7 @@ class Router(object):
def _on_broker_exit(self): def _on_broker_exit(self):
while self._handle_map: while self._handle_map:
_, (_, func, _) = self._handle_map.popitem() _, (_, func, _, _) = self._handle_map.popitem()
func(Message.dead()) func(Message.dead())
def context_by_id(self, context_id, via_id=None, create=True, name=None): def context_by_id(self, context_id, via_id=None, create=True, name=None):
@ -2151,8 +2153,8 @@ class Router(object):
`dst_id`. If a specific route for `dst_id` is not known, a reference to `dst_id`. If a specific route for `dst_id` is not known, a reference to
the parent context's stream is returned. the parent context's stream is returned.
""" """
return self._stream_by_id.get(dst_id, parent = self._stream_by_id.get(mitogen.parent_id)
self._stream_by_id.get(mitogen.parent_id)) return self._stream_by_id.get(dst_id, parent)
def del_handler(self, handle): def del_handler(self, handle):
""" """
@ -2161,7 +2163,9 @@ class Router(object):
:raises KeyError: :raises KeyError:
The handle wasn't registered. The handle wasn't registered.
""" """
del self._handle_map[handle] _, _, _, respondent = self._handle_map.pop(handle)
if respondent:
self._handles_by_respondent[respondent].discard(handle)
def add_handler(self, fn, handle=None, persist=True, def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None): policy=None, respondent=None):
@ -2216,18 +2220,21 @@ class Router(object):
handle = handle or next(self._last_handle) handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
self._handle_map[handle] = persist, fn, policy, respondent
if respondent: if respondent:
assert policy is None if respondent not in self._handles_by_respondent:
def policy(msg, _stream): self._handles_by_respondent[respondent] = set()
return msg.is_dead or msg.src_id == respondent.context_id listen(respondent, 'disconnect',
def on_disconnect(): lambda: self._on_respondent_disconnect(respondent))
if handle in self._handle_map: self._handles_by_respondent[respondent].add(handle)
return handle
def _on_respondent_disconnect(self, context):
for handle in self._handles_by_respondent.pop(context, ()):
_, fn, _, _ = self._handle_map[handle]
fn(Message.dead()) fn(Message.dead())
del self._handle_map[handle] del self._handle_map[handle]
listen(respondent, 'disconnect', on_disconnect)
self._handle_map[handle] = persist, fn, policy
return handle
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Called during :meth:`Broker.shutdown`, informs callbacks registered """Called during :meth:`Broker.shutdown`, informs callbacks registered
@ -2238,16 +2245,25 @@ class Router(object):
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(Message.dead()) fn(Message.dead())
def _maybe_send_dead(self, msg):
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(), router=self)
refused_msg = 'Refused by policy.' refused_msg = 'Refused by policy.'
def _invoke(self, msg, stream): def _invoke(self, msg, stream):
# IOLOG.debug('%r._invoke(%r)', self, msg) # IOLOG.debug('%r._invoke(%r)', self, msg)
try: try:
persist, fn, policy = self._handle_map[msg.handle] persist, fn, policy, respondent = self._handle_map[msg.handle]
except KeyError: except KeyError:
LOG.error('%r: invalid handle: %r', self, msg) LOG.error('%r: invalid handle: %r', self, msg)
if msg.reply_to and not msg.is_dead: self._maybe_send_dead(msg)
msg.reply(Message.dead()) return
if respondent and not (msg.is_dead or
msg.src_id == respondent.context_id):
LOG.error('%r: reply from unexpected context: %r', self, msg)
self._maybe_send_dead(msg)
return return
if policy and not policy(msg, stream): if policy and not policy(msg, stream):
@ -2261,17 +2277,13 @@ class Router(object):
return return
if not persist: if not persist:
del self._handle_map[msg.handle] self.del_handler(msg.handle)
try: try:
fn(msg) fn(msg)
except Exception: except Exception:
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
def _maybe_send_dead(self, msg):
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(), router=self)
def _async_route(self, msg, in_stream=None): def _async_route(self, msg, in_stream=None):
""" """
Arrange for `msg` to be forwarded towards its destination. If its Arrange for `msg` to be forwarded towards its destination. If its