diff --git a/mitogen/core.py b/mitogen/core.py index 5b1d5298..3d2fc288 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2087,6 +2087,8 @@ class Router(object): self._last_handle = itertools.count(1000) #: handle -> (persistent?, func(msg)) self._handle_map = {} + #: Context -> set { handle, .. } + self._handles_by_respondent = {} self.add_handler(self._on_del_route, DEL_ROUTE) def __repr__(self): @@ -2117,7 +2119,7 @@ class Router(object): def _on_broker_exit(self): while self._handle_map: - _, (_, func, _) = self._handle_map.popitem() + _, (_, func, _, _) = self._handle_map.popitem() func(Message.dead()) def context_by_id(self, context_id, via_id=None, create=True, name=None): @@ -2151,8 +2153,8 @@ class Router(object): `dst_id`. If a specific route for `dst_id` is not known, a reference to the parent context's stream is returned. """ - return self._stream_by_id.get(dst_id, - self._stream_by_id.get(mitogen.parent_id)) + parent = self._stream_by_id.get(mitogen.parent_id) + return self._stream_by_id.get(dst_id, parent) def del_handler(self, handle): """ @@ -2161,7 +2163,9 @@ class Router(object): :raises KeyError: The handle wasn't registered. """ - del self._handle_map[handle] + _, _, _, respondent = self._handle_map.pop(handle) + if respondent: + self._handles_by_respondent[respondent].discard(handle) def add_handler(self, fn, handle=None, persist=True, policy=None, respondent=None): @@ -2216,19 +2220,22 @@ class Router(object): handle = handle or next(self._last_handle) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) + self._handle_map[handle] = persist, fn, policy, respondent if respondent: - assert policy is None - def policy(msg, _stream): - return msg.is_dead or msg.src_id == respondent.context_id - def on_disconnect(): - if handle in self._handle_map: - fn(Message.dead()) - del self._handle_map[handle] - listen(respondent, 'disconnect', on_disconnect) + if respondent not in self._handles_by_respondent: + self._handles_by_respondent[respondent] = set() + listen(respondent, 'disconnect', + lambda: self._on_respondent_disconnect(respondent)) + self._handles_by_respondent[respondent].add(handle) - self._handle_map[handle] = persist, fn, policy return handle + def _on_respondent_disconnect(self, context): + for handle in self._handles_by_respondent.pop(context, ()): + _, fn, _, _ = self._handle_map[handle] + fn(Message.dead()) + del self._handle_map[handle] + def on_shutdown(self, broker): """Called during :meth:`Broker.shutdown`, informs callbacks registered with :meth:`add_handle_cb` the connection is dead.""" @@ -2238,16 +2245,25 @@ class Router(object): _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) fn(Message.dead()) + def _maybe_send_dead(self, msg): + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead(), router=self) + refused_msg = 'Refused by policy.' def _invoke(self, msg, stream): # IOLOG.debug('%r._invoke(%r)', self, msg) try: - persist, fn, policy = self._handle_map[msg.handle] + persist, fn, policy, respondent = self._handle_map[msg.handle] except KeyError: LOG.error('%r: invalid handle: %r', self, msg) - if msg.reply_to and not msg.is_dead: - msg.reply(Message.dead()) + self._maybe_send_dead(msg) + return + + if respondent and not (msg.is_dead or + msg.src_id == respondent.context_id): + LOG.error('%r: reply from unexpected context: %r', self, msg) + self._maybe_send_dead(msg) return if policy and not policy(msg, stream): @@ -2261,17 +2277,13 @@ class Router(object): return if not persist: - del self._handle_map[msg.handle] + self.del_handler(msg.handle) try: fn(msg) except Exception: LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) - def _maybe_send_dead(self, msg): - if msg.reply_to and not msg.is_dead: - msg.reply(Message.dead(), router=self) - def _async_route(self, msg, in_stream=None): """ Arrange for `msg` to be forwarded towards its destination. If its