From d1a22cb5d44983787a8a6e884d4577004810a55e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 13 May 2018 16:58:47 +0100 Subject: [PATCH] issue #186: parent: implement FORWARD_MODULE. To support detach, we must be able to preload the target with every module it will need prior to detachment. This implements the intermediary part of the process (i.e. the Ansible fork parent) -- receiving LOAD_MODULE/FORWARD_MODULE pairs and ensuring they reach the child. --- docs/howitworks.rst | 17 ++++++++++ mitogen/core.py | 3 +- mitogen/parent.py | 76 +++++++++++++++++++++++++++++++++------------ 3 files changed, 75 insertions(+), 21 deletions(-) diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 5c8bb018..dfe2b2d7 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -445,6 +445,23 @@ also listen on the following handles: In this way, the master need never re-send a module it has already sent to a direct descendant. +.. currentmodule:: mitogen.core +.. data:: FORWARD_MODULE + + Receives `(context, fullname)` tuples from its parent and arranges for a + :data:`LOAD_MODULE` to be sent towards `context` for the module `fullname` + and any related modules. The module must already have been delivered to the + current context by its parent in a prior :data:`LOAD_MODULE` message. + + If the receiver is the immediate parent of `context`, then only + :data:`LOAD_MODULE` is sent to the child. Otherwise :data:`LOAD_MODULE` is + sent to the next closest parent if the module has not previously been sent + on that stream, followed by a copy of the :data:`FORWARD_MODULE` message. + + This message is used to recursively preload indirect children with modules, + ensuring they are cached and deduplicated at each hop in the chain leading + to the target context. + .. currentmodule:: mitogen.core .. data:: DETACHING diff --git a/mitogen/core.py b/mitogen/core.py index 26151cbf..e947ecfe 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,7 +75,8 @@ DEL_ROUTE = 104 ALLOCATE_ID = 105 SHUTDOWN = 106 LOAD_MODULE = 107 -DETACHING = 108 +FORWARD_MODULE = 108 +DETACHING = 109 IS_DEAD = 999 try: diff --git a/mitogen/parent.py b/mitogen/parent.py index 5cdc89c9..e6d76c6b 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1106,6 +1106,12 @@ class ModuleForwarder(object): self.router = router self.parent_context = parent_context self.importer = importer + router.add_handler( + fn=self._on_forward_module, + handle=mitogen.core.FORWARD_MODULE, + persist=True, + policy=mitogen.core.has_parent_authority, + ) router.add_handler( fn=self._on_get_module, handle=mitogen.core.GET_MODULE, @@ -1116,34 +1122,64 @@ class ModuleForwarder(object): def __repr__(self): return 'ModuleForwarder(%r)' % (self.router,) + def _on_forward_module(self, msg): + if msg.is_dead: + return + + context_id_s, fullname = msg.data.partition('\x00') + context_id = int(context_id_s) + stream = self.router.stream_by_id(context_id) + if stream.remote_id == mitogen.parent_id: + LOG.error('%r: dropping FORWARD_MODULE(%d, %r): no route to child', + self, context_id, fullname) + return + + LOG.debug('%r._on_forward_module() sending %r to %r via %r', + self, fullname, context_id, stream.remote_id) + self._send_module_and_related(stream, fullname) + if stream.remote_id != context_id: + stream._send( + mitogen.core.Message( + dst_id=stream.remote_id, + handle=mitogen.core.FORWARD_MODULE, + data=msg.data, + ) + ) + def _on_get_module(self, msg): LOG.debug('%r._on_get_module(%r)', self, msg) if msg.is_dead: return fullname = msg.data - callback = lambda: self._on_cache_callback(msg, fullname) - self.importer._request_module(fullname, callback) - - def _send_one_module(self, msg, tup): - self.router._async_route( - mitogen.core.Message.pickled( - tup, - dst_id=msg.src_id, - handle=mitogen.core.LOAD_MODULE, - ) + self.importer._request_module(fullname, + lambda: self._on_cache_callback(msg, fullname) ) def _on_cache_callback(self, msg, fullname): LOG.debug('%r._on_get_module(): sending %r', self, fullname) - tup = self.importer._cache[fullname] - if tup is not None: - for related in tup[4]: - rtup = self.importer._cache.get(related) - if not rtup: - LOG.debug('%r._on_get_module(): skipping absent %r', - self, related) - continue - self._send_one_module(msg, rtup) + stream = self.router.stream_by_id(msg.src_id) + self._send_module_and_related(stream, fullname) - self._send_one_module(msg, tup) + def _send_module_and_related(self, stream, fullname): + tup = self.importer._cache[fullname] + for related in tup[4]: + rtup = self.importer._cache.get(related) + if rtup: + self._send_one_module(stream, rtup) + else: + LOG.debug('%r._send_module_and_related(%r): absent: %r', + self, fullname, related) + + self._send_one_module(stream, tup) + + def _send_one_module(self, stream, tup): + if tup[0] not in stream.sent_modules: + stream.sent_modules.add(tup[0]) + self.router._async_route( + mitogen.core.Message.pickled( + tup, + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, + ) + )