issue #186: parent: implement FORWARD_MODULE.

To support detach, we must be able to preload the target with every
module it will need prior to detachment. This implements the
intermediary part of the process (i.e. the Ansible fork parent) --
receiving LOAD_MODULE/FORWARD_MODULE pairs and ensuring they reach the
child.
This commit is contained in:
David Wilson 2018-05-13 16:58:47 +01:00
parent 8a089e975d
commit d1a22cb5d4
3 changed files with 75 additions and 21 deletions

View File

@ -445,6 +445,23 @@ also listen on the following handles:
In this way, the master need never re-send a module it has already sent to
a direct descendant.
.. currentmodule:: mitogen.core
.. data:: FORWARD_MODULE
Receives `(context, fullname)` tuples from its parent and arranges for a
:data:`LOAD_MODULE` to be sent towards `context` for the module `fullname`
and any related modules. The module must already have been delivered to the
current context by its parent in a prior :data:`LOAD_MODULE` message.
If the receiver is the immediate parent of `context`, then only
:data:`LOAD_MODULE` is sent to the child. Otherwise :data:`LOAD_MODULE` is
sent to the next closest parent if the module has not previously been sent
on that stream, followed by a copy of the :data:`FORWARD_MODULE` message.
This message is used to recursively preload indirect children with modules,
ensuring they are cached and deduplicated at each hop in the chain leading
to the target context.
.. currentmodule:: mitogen.core
.. data:: DETACHING

View File

@ -75,7 +75,8 @@ DEL_ROUTE = 104
ALLOCATE_ID = 105
SHUTDOWN = 106
LOAD_MODULE = 107
DETACHING = 108
FORWARD_MODULE = 108
DETACHING = 109
IS_DEAD = 999
try:

View File

@ -1106,6 +1106,12 @@ class ModuleForwarder(object):
self.router = router
self.parent_context = parent_context
self.importer = importer
router.add_handler(
fn=self._on_forward_module,
handle=mitogen.core.FORWARD_MODULE,
persist=True,
policy=mitogen.core.has_parent_authority,
)
router.add_handler(
fn=self._on_get_module,
handle=mitogen.core.GET_MODULE,
@ -1116,34 +1122,64 @@ class ModuleForwarder(object):
def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,)
def _on_forward_module(self, msg):
if msg.is_dead:
return
context_id_s, fullname = msg.data.partition('\x00')
context_id = int(context_id_s)
stream = self.router.stream_by_id(context_id)
if stream.remote_id == mitogen.parent_id:
LOG.error('%r: dropping FORWARD_MODULE(%d, %r): no route to child',
self, context_id, fullname)
return
LOG.debug('%r._on_forward_module() sending %r to %r via %r',
self, fullname, context_id, stream.remote_id)
self._send_module_and_related(stream, fullname)
if stream.remote_id != context_id:
stream._send(
mitogen.core.Message(
dst_id=stream.remote_id,
handle=mitogen.core.FORWARD_MODULE,
data=msg.data,
)
)
def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg)
if msg.is_dead:
return
fullname = msg.data
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
self.importer._request_module(fullname,
lambda: self._on_cache_callback(msg, fullname)
)
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache.get(related)
if not rtup:
LOG.debug('%r._on_get_module(): skipping absent %r',
self, related)
continue
self._send_one_module(msg, rtup)
stream = self.router.stream_by_id(msg.src_id)
self._send_module_and_related(stream, fullname)
self._send_one_module(msg, tup)
def _send_module_and_related(self, stream, fullname):
tup = self.importer._cache[fullname]
for related in tup[4]:
rtup = self.importer._cache.get(related)
if rtup:
self._send_one_module(stream, rtup)
else:
LOG.debug('%r._send_module_and_related(%r): absent: %r',
self, fullname, related)
self._send_one_module(stream, tup)
def _send_one_module(self, stream, tup):
if tup[0] not in stream.sent_modules:
stream.sent_modules.add(tup[0])
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)