master: propagate routes for IDs allocated via ALLOCATE_ID

needed for inter-child rsync.
This commit is contained in:
David Wilson 2017-09-21 16:46:05 +05:30
parent f01e457d70
commit b2f13f1fa4
1 changed files with 35 additions and 17 deletions

View File

@ -739,7 +739,10 @@ class IdAllocator(object):
def on_allocate_id(self, msg):
id_ = self.allocate()
LOG.debug('%r: allocating ID %d to context %r', self, id_, msg.src_id)
requestee = self.router.context_by_id(msg.src_id)
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
self.router.route(
mitogen.core.Message.pickled(
id_,
@ -748,6 +751,10 @@ class IdAllocator(object):
)
)
LOG.debug('%r: publishing route to %r via %r', self,
allocated, requestee)
self.router.propagate_route(allocated, requestee)
class ChildIdAllocator(object):
def __init__(self, router):
@ -787,8 +794,14 @@ class Router(mitogen.core.Router):
def allocate_id(self):
return self.id_allocator.allocate()
def context_by_id(self, context_id):
return self._context_by_id.get(context_id)
def context_by_id(self, context_id, via_id=None):
context = self._context_by_id.get(context_id)
if context is None:
context = Context(self, context_id)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
def local(self, **kwargs):
return self.connect('local', **kwargs)
@ -819,6 +832,22 @@ class Router(mitogen.core.Router):
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
def propagate_route(self, target, via):
self.add_route(target.context_id, via.context_id)
child = via
parent = via.via
while parent is not None:
LOG.debug('Adding route to %r for %r via %r', parent, target, child)
parent.send(
mitogen.core.Message(
data='%s\x00%s' % (target.context_id, child.context_id),
handle=mitogen.core.ADD_ROUTE,
)
)
child = parent
parent = parent.via
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
@ -826,24 +855,13 @@ class Router(mitogen.core.Router):
name = via_context.call(_proxy_connect,
name, context_id, method_name, kwargs
)
# name = '%s.%s' % (via_context.name, name)
context = Context(self, context_id, name=name)
context.via = via_context
child = via_context
parent = via_context.via
while parent is not None:
LOG.debug('Adding route to %r for %r via %r', parent, context, child)
parent.send(
mitogen.core.Message(
data='%s\x00%s' % (context_id, child.context_id),
handle=mitogen.core.ADD_ROUTE,
)
)
child = parent
parent = parent.via
self._context_by_id[context.context_id] = context
self.propagate_route(context, via_context)
return context