core: de-munge Message.unpickle() vs. Receiver.get().
This commit is contained in:
parent
c9affbaf50
commit
adc8fe3aed
|
@ -254,17 +254,27 @@ class Message(object):
|
||||||
self.data = cPickle.dumps(CallError(e), protocol=2)
|
self.data = cPickle.dumps(CallError(e), protocol=2)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def unpickle(self):
|
def unpickle(self, throw=True):
|
||||||
"""Deserialize `data` into an object."""
|
"""Deserialize `data` into an object."""
|
||||||
IOLOG.debug('%r.unpickle()', self)
|
IOLOG.debug('%r.unpickle()', self)
|
||||||
fp = cStringIO.StringIO(self.data)
|
fp = cStringIO.StringIO(self.data)
|
||||||
unpickler = cPickle.Unpickler(fp)
|
unpickler = cPickle.Unpickler(fp)
|
||||||
unpickler.find_global = self._find_global
|
unpickler.find_global = self._find_global
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return unpickler.load()
|
# Must occur off the broker thread.
|
||||||
|
obj = unpickler.load()
|
||||||
except (TypeError, ValueError), ex:
|
except (TypeError, ValueError), ex:
|
||||||
raise StreamError('invalid message: %s', ex)
|
raise StreamError('invalid message: %s', ex)
|
||||||
|
|
||||||
|
if throw:
|
||||||
|
if obj == _DEAD:
|
||||||
|
raise ChannelError(ChannelError.remote_msg)
|
||||||
|
if isinstance(obj, CallError):
|
||||||
|
raise obj
|
||||||
|
|
||||||
|
return obj
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return 'Message(%r, %r, %r, %r, %r, %r..%d)' % (
|
return 'Message(%r, %r, %r, %r, %r, %r..%d)' % (
|
||||||
self.dst_id, self.src_id, self.auth_id, self.handle,
|
self.dst_id, self.src_id, self.auth_id, self.handle,
|
||||||
|
@ -358,19 +368,7 @@ class Receiver(object):
|
||||||
|
|
||||||
if msg == _DEAD:
|
if msg == _DEAD:
|
||||||
raise ChannelError(ChannelError.local_msg)
|
raise ChannelError(ChannelError.local_msg)
|
||||||
|
return msg
|
||||||
# Must occur off the broker thread.
|
|
||||||
data = msg.unpickle()
|
|
||||||
if data == _DEAD and self.raise_channelerror:
|
|
||||||
raise ChannelError(ChannelError.remote_msg)
|
|
||||||
|
|
||||||
if isinstance(data, CallError):
|
|
||||||
raise data
|
|
||||||
|
|
||||||
return msg, data
|
|
||||||
|
|
||||||
def get_data(self, timeout=None):
|
|
||||||
return self.get(timeout)[1]
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
while True:
|
while True:
|
||||||
|
@ -1230,7 +1228,8 @@ class ExternalContext(object):
|
||||||
fp.close()
|
fp.close()
|
||||||
|
|
||||||
def _dispatch_calls(self):
|
def _dispatch_calls(self):
|
||||||
for msg, data in self.channel:
|
for msg in self.channel:
|
||||||
|
data = msg.unpickle(throw=False)
|
||||||
LOG.debug('_dispatch_calls(%r)', data)
|
LOG.debug('_dispatch_calls(%r)', data)
|
||||||
if msg.src_id not in mitogen.parent_ids:
|
if msg.src_id not in mitogen.parent_ids:
|
||||||
LOG.warning('CALL_FUNCTION from non-parent %r', msg.src_id)
|
LOG.warning('CALL_FUNCTION from non-parent %r', msg.src_id)
|
||||||
|
|
|
@ -564,17 +564,15 @@ class Context(mitogen.core.Context):
|
||||||
else:
|
else:
|
||||||
klass = None
|
klass = None
|
||||||
|
|
||||||
recv = self.send_async(
|
return self.send_async(
|
||||||
mitogen.core.Message.pickled(
|
mitogen.core.Message.pickled(
|
||||||
(fn.__module__, klass, fn.__name__, args, kwargs),
|
(fn.__module__, klass, fn.__name__, args, kwargs),
|
||||||
handle=mitogen.core.CALL_FUNCTION,
|
handle=mitogen.core.CALL_FUNCTION,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
recv.raise_channelerror = False
|
|
||||||
return recv
|
|
||||||
|
|
||||||
def call(self, fn, *args, **kwargs):
|
def call(self, fn, *args, **kwargs):
|
||||||
return self.call_async(fn, *args, **kwargs).get_data()
|
return self.call_async(fn, *args, **kwargs).get().unpickle()
|
||||||
|
|
||||||
|
|
||||||
class Router(mitogen.parent.Router):
|
class Router(mitogen.parent.Router):
|
||||||
|
|
Loading…
Reference in New Issue