core: Message.reply() helper function
This commit is contained in:
parent
6fc8fa5b22
commit
10230f62dd
|
@ -255,6 +255,12 @@ class Message(object):
|
||||||
self.data = cPickle.dumps(CallError(e), protocol=2)
|
self.data = cPickle.dumps(CallError(e), protocol=2)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def reply(self, data, **kwargs):
|
||||||
|
kwargs.setdefault('handle', self.reply_to)
|
||||||
|
self.router.route(
|
||||||
|
self.pickled(data, dst_id=self.src_id, **kwargs)
|
||||||
|
)
|
||||||
|
|
||||||
def unpickle(self, throw=True):
|
def unpickle(self, throw=True):
|
||||||
"""Deserialize `data` into an object."""
|
"""Deserialize `data` into an object."""
|
||||||
IOLOG.debug('%r.unpickle()', self)
|
IOLOG.debug('%r.unpickle()', self)
|
||||||
|
@ -1249,16 +1255,10 @@ class ExternalContext(object):
|
||||||
kwargs.setdefault('econtext', self)
|
kwargs.setdefault('econtext', self)
|
||||||
if getattr(fn, 'mitogen_takes_router', None):
|
if getattr(fn, 'mitogen_takes_router', None):
|
||||||
kwargs.setdefault('router', self.router)
|
kwargs.setdefault('router', self.router)
|
||||||
ret = fn(*args, **kwargs)
|
msg.reply(fn(*args, **kwargs))
|
||||||
self.router.route(
|
|
||||||
Message.pickled(ret, dst_id=msg.src_id, handle=msg.reply_to)
|
|
||||||
)
|
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
LOG.debug('_dispatch_calls: %s', e)
|
LOG.debug('_dispatch_calls: %s', e)
|
||||||
e = CallError(e)
|
msg.reply(CallError(e))
|
||||||
self.router.route(
|
|
||||||
Message.pickled(e, dst_id=msg.src_id, handle=msg.reply_to)
|
|
||||||
)
|
|
||||||
self.dispatch_stopped = True
|
self.dispatch_stopped = True
|
||||||
|
|
||||||
def main(self, parent_ids, context_id, debug, profiling, log_level,
|
def main(self, parent_ids, context_id, debug, profiling, log_level,
|
||||||
|
|
|
@ -489,13 +489,8 @@ class ModuleResponder(object):
|
||||||
|
|
||||||
def _send_load_module(self, stream, msg, fullname):
|
def _send_load_module(self, stream, msg, fullname):
|
||||||
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
|
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
|
||||||
self._router.route(
|
msg.reply(self._build_tuple(fullname),
|
||||||
mitogen.core.Message.pickled(
|
handle=mitogen.core.LOAD_MODULE)
|
||||||
self._build_tuple(fullname),
|
|
||||||
dst_id=msg.src_id,
|
|
||||||
handle=mitogen.core.LOAD_MODULE,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
stream.sent_modules.add(fullname)
|
stream.sent_modules.add(fullname)
|
||||||
|
|
||||||
def _on_get_module(self, msg):
|
def _on_get_module(self, msg):
|
||||||
|
@ -526,13 +521,8 @@ class ModuleResponder(object):
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.debug('While importing %r', fullname, exc_info=True)
|
LOG.debug('While importing %r', fullname, exc_info=True)
|
||||||
self._router.route(
|
msg.reply((fullname, None, None, None, []),
|
||||||
mitogen.core.Message.pickled(
|
handle=mitogen.core.LOAD_MODULE)
|
||||||
(fullname, None, None, None, []),
|
|
||||||
dst_id=msg.src_id,
|
|
||||||
handle=msg.reply_to,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Broker(mitogen.core.Broker):
|
class Broker(mitogen.core.Broker):
|
||||||
|
@ -651,13 +641,7 @@ class IdAllocator(object):
|
||||||
allocated = self.router.context_by_id(id_, msg.src_id)
|
allocated = self.router.context_by_id(id_, msg.src_id)
|
||||||
|
|
||||||
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
|
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
|
||||||
self.router.route(
|
msg.reply(id_)
|
||||||
mitogen.core.Message.pickled(
|
|
||||||
id_,
|
|
||||||
dst_id=msg.src_id,
|
|
||||||
handle=msg.reply_to,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
LOG.debug('%r: publishing route to %r via %r', self,
|
LOG.debug('%r: publishing route to %r via %r', self,
|
||||||
allocated, requestee)
|
allocated, requestee)
|
||||||
|
|
Loading…
Reference in New Issue