From b2f13f1fa4e9579662882eb37255d87a3921bec2 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 21 Sep 2017 16:46:05 +0530 Subject: [PATCH] master: propagate routes for IDs allocated via ALLOCATE_ID needed for inter-child rsync. --- mitogen/master.py | 52 +++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/mitogen/master.py b/mitogen/master.py index a02caece..f80047df 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -739,7 +739,10 @@ class IdAllocator(object): def on_allocate_id(self, msg): id_ = self.allocate() - LOG.debug('%r: allocating ID %d to context %r', self, id_, msg.src_id) + requestee = self.router.context_by_id(msg.src_id) + allocated = self.router.context_by_id(id_, msg.src_id) + + LOG.debug('%r: allocating %r to %r', self, allocated, requestee) self.router.route( mitogen.core.Message.pickled( id_, @@ -748,6 +751,10 @@ class IdAllocator(object): ) ) + LOG.debug('%r: publishing route to %r via %r', self, + allocated, requestee) + self.router.propagate_route(allocated, requestee) + class ChildIdAllocator(object): def __init__(self, router): @@ -787,8 +794,14 @@ class Router(mitogen.core.Router): def allocate_id(self): return self.id_allocator.allocate() - def context_by_id(self, context_id): - return self._context_by_id.get(context_id) + def context_by_id(self, context_id, via_id=None): + context = self._context_by_id.get(context_id) + if context is None: + context = Context(self, context_id) + if via_id is not None: + context.via = self.context_by_id(via_id) + self._context_by_id[context_id] = context + return context def local(self, **kwargs): return self.connect('local', **kwargs) @@ -819,6 +832,22 @@ class Router(mitogen.core.Router): context_id = self.allocate_id() return self._connect(context_id, klass, name=name, **kwargs) + def propagate_route(self, target, via): + self.add_route(target.context_id, via.context_id) + child = via + parent = via.via + + while parent is not None: + LOG.debug('Adding route to %r for %r via %r', parent, target, child) + parent.send( + mitogen.core.Message( + data='%s\x00%s' % (target.context_id, child.context_id), + handle=mitogen.core.ADD_ROUTE, + ) + ) + child = parent + parent = parent.via + def proxy_connect(self, via_context, method_name, name=None, **kwargs): context_id = self.allocate_id() # Must be added prior to _proxy_connect() to avoid a race. @@ -826,24 +855,13 @@ class Router(mitogen.core.Router): name = via_context.call(_proxy_connect, name, context_id, method_name, kwargs ) + # name = '%s.%s' % (via_context.name, name) context = Context(self, context_id, name=name) context.via = via_context - - child = via_context - parent = via_context.via - while parent is not None: - LOG.debug('Adding route to %r for %r via %r', parent, context, child) - parent.send( - mitogen.core.Message( - data='%s\x00%s' % (context_id, child.context_id), - handle=mitogen.core.ADD_ROUTE, - ) - ) - child = parent - parent = parent.via - self._context_by_id[context.context_id] = context + + self.propagate_route(context, via_context) return context