diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 5c8bb018..dfe2b2d7 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -445,6 +445,23 @@ also listen on the following handles: In this way, the master need never re-send a module it has already sent to a direct descendant. +.. currentmodule:: mitogen.core +.. data:: FORWARD_MODULE + + Receives `(context, fullname)` tuples from its parent and arranges for a + :data:`LOAD_MODULE` to be sent towards `context` for the module `fullname` + and any related modules. The module must already have been delivered to the + current context by its parent in a prior :data:`LOAD_MODULE` message. + + If the receiver is the immediate parent of `context`, then only + :data:`LOAD_MODULE` is sent to the child. Otherwise :data:`LOAD_MODULE` is + sent to the next closest parent if the module has not previously been sent + on that stream, followed by a copy of the :data:`FORWARD_MODULE` message. + + This message is used to recursively preload indirect children with modules, + ensuring they are cached and deduplicated at each hop in the chain leading + to the target context. + .. currentmodule:: mitogen.core .. data:: DETACHING diff --git a/mitogen/core.py b/mitogen/core.py index 26151cbf..e947ecfe 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,7 +75,8 @@ DEL_ROUTE = 104 ALLOCATE_ID = 105 SHUTDOWN = 106 LOAD_MODULE = 107 -DETACHING = 108 +FORWARD_MODULE = 108 +DETACHING = 109 IS_DEAD = 999 try: diff --git a/mitogen/parent.py b/mitogen/parent.py index 5cdc89c9..e6d76c6b 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1106,6 +1106,12 @@ class ModuleForwarder(object): self.router = router self.parent_context = parent_context self.importer = importer + router.add_handler( + fn=self._on_forward_module, + handle=mitogen.core.FORWARD_MODULE, + persist=True, + policy=mitogen.core.has_parent_authority, + ) router.add_handler( fn=self._on_get_module, handle=mitogen.core.GET_MODULE, @@ -1116,34 +1122,64 @@ class ModuleForwarder(object): def __repr__(self): return 'ModuleForwarder(%r)' % (self.router,) + def _on_forward_module(self, msg): + if msg.is_dead: + return + + context_id_s, fullname = msg.data.partition('\x00') + context_id = int(context_id_s) + stream = self.router.stream_by_id(context_id) + if stream.remote_id == mitogen.parent_id: + LOG.error('%r: dropping FORWARD_MODULE(%d, %r): no route to child', + self, context_id, fullname) + return + + LOG.debug('%r._on_forward_module() sending %r to %r via %r', + self, fullname, context_id, stream.remote_id) + self._send_module_and_related(stream, fullname) + if stream.remote_id != context_id: + stream._send( + mitogen.core.Message( + dst_id=stream.remote_id, + handle=mitogen.core.FORWARD_MODULE, + data=msg.data, + ) + ) + def _on_get_module(self, msg): LOG.debug('%r._on_get_module(%r)', self, msg) if msg.is_dead: return fullname = msg.data - callback = lambda: self._on_cache_callback(msg, fullname) - self.importer._request_module(fullname, callback) - - def _send_one_module(self, msg, tup): - self.router._async_route( - mitogen.core.Message.pickled( - tup, - dst_id=msg.src_id, - handle=mitogen.core.LOAD_MODULE, - ) + self.importer._request_module(fullname, + lambda: self._on_cache_callback(msg, fullname) ) def _on_cache_callback(self, msg, fullname): LOG.debug('%r._on_get_module(): sending %r', self, fullname) - tup = self.importer._cache[fullname] - if tup is not None: - for related in tup[4]: - rtup = self.importer._cache.get(related) - if not rtup: - LOG.debug('%r._on_get_module(): skipping absent %r', - self, related) - continue - self._send_one_module(msg, rtup) + stream = self.router.stream_by_id(msg.src_id) + self._send_module_and_related(stream, fullname) - self._send_one_module(msg, tup) + def _send_module_and_related(self, stream, fullname): + tup = self.importer._cache[fullname] + for related in tup[4]: + rtup = self.importer._cache.get(related) + if rtup: + self._send_one_module(stream, rtup) + else: + LOG.debug('%r._send_module_and_related(%r): absent: %r', + self, fullname, related) + + self._send_one_module(stream, tup) + + def _send_one_module(self, stream, tup): + if tup[0] not in stream.sent_modules: + stream.sent_modules.add(tup[0]) + self.router._async_route( + mitogen.core.Message.pickled( + tup, + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + )