core: Generalize/duplicate the call/send_await code using Receiver.

This commit is contained in:
David Wilson 2017-09-21 13:41:12 +05:30
parent 76d35df889
commit 7a60b20dc6
2 changed files with 21 additions and 63 deletions

View File

@ -235,10 +235,11 @@ class Sender(object):
class Receiver(object): class Receiver(object):
def __init__(self, router, handle=None): def __init__(self, router, handle=None, persist=True, respondent=None):
self.router = router self.router = router
self.handle = handle # Avoid __repr__ crash in add_handler() self.handle = handle # Avoid __repr__ crash in add_handler()
self.handle = router.add_handler(self._on_receive, handle) self.handle = router.add_handler(self._on_receive, handle,
persist, respondent)
self._queue = Queue.Queue() self._queue = Queue.Queue()
def __repr__(self): def __repr__(self):
@ -266,7 +267,7 @@ class Receiver(object):
continue continue
if msg is None: if msg is None:
return raise TimeoutError('deadline exceeded.')
IOLOG.debug('%r.on_receive() got %r', self, msg) IOLOG.debug('%r.on_receive() got %r', self, msg)
if msg == _DEAD: if msg == _DEAD:
@ -282,6 +283,9 @@ class Receiver(object):
return msg, data return msg, data
def get_data(self, timeout=None):
return self.get(timeout)[1]
def __iter__(self): def __iter__(self):
"""Yield objects from this channel until it is closed.""" """Yield objects from this channel until it is closed."""
while True: while True:
@ -393,7 +397,7 @@ class Importer(object):
self._cache[fullname] = ret = ( self._cache[fullname] = ret = (
self._context.send_await( self._context.send_await(
Message(data=fullname, handle=GET_MODULE) Message(data=fullname, handle=GET_MODULE)
).unpickle() )
) )
if ret is None: if ret is None:
@ -687,29 +691,23 @@ class Context(object):
msg.src_id = mitogen.context_id msg.src_id = mitogen.context_id
self.router.route(msg) self.router.route(msg)
def send_await(self, msg, deadline=None): def send_async(self, msg, persist=False):
"""Send `msg` and wait for a response with an optional timeout."""
if self.router.broker._thread == threading.currentThread(): # TODO if self.router.broker._thread == threading.currentThread(): # TODO
raise SystemError('Cannot making blocking call on broker thread') raise SystemError('Cannot making blocking call on broker thread')
queue = Queue.Queue() receiver = Receiver(self.router, persist=persist, respondent=self)
msg.reply_to = self.router.add_handler(queue.put, msg.reply_to = receiver.handle
persist=False,
respondent=self)
LOG.debug('%r.send_await(%r)', self, msg)
LOG.debug('%r.send_async(%r)', self, msg)
self.send(msg) self.send(msg)
try: return receiver
msg = queue.get(True, deadline)
except Queue.Empty:
# self.broker.defer(self.stream.on_disconnect, self.broker)
raise TimeoutError('deadline exceeded.')
if msg == _DEAD: def send_await(self, msg, deadline=None):
raise StreamError('lost connection during call.') """Send `msg` and wait for a response with an optional timeout."""
receiver = self.send_async(msg)
IOLOG.debug('%r._send_await() -> %r', self, msg) response = receiver.get_data(deadline)
return msg IOLOG.debug('%r._send_await() -> %r', self, response)
return response
def __repr__(self): def __repr__(self):
return 'Context(%s, %r)' % (self.context_id, self.name) return 'Context(%s, %r)' % (self.context_id, self.name)

View File

@ -649,16 +649,6 @@ class Context(mitogen.core.Context):
""" """
mitogen.core.fire(self, 'disconnect') mitogen.core.fire(self, 'disconnect')
def _discard_result(self, msg):
data = msg.unpickle()
if isinstance(data, Exception):
try:
raise data
except Exception:
LOG.exception('_discard_result')
else:
LOG.debug('_discard_result: %r', data)
def call_async(self, fn, *args, **kwargs): def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(%r, *%r, **%r)', LOG.debug('%r.call_async(%r, *%r, **%r)',
self, fn, args, kwargs) self, fn, args, kwargs)
@ -669,46 +659,16 @@ class Context(mitogen.core.Context):
else: else:
klass = None klass = None
self.send( return self.send_async(
mitogen.core.Message.pickled( mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs), (fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION, handle=mitogen.core.CALL_FUNCTION,
reply_to=self.router.add_handler(self._discard_result),
) )
) )
def call_with_deadline(self, deadline, fn, *args, **kwargs):
"""Invoke `fn([context,] *args, **kwargs)` in the external context.
If `deadline` is not ``None``, expire the call after `deadline`
seconds. If `deadline` is ``None``, the invocation may block
indefinitely."""
LOG.debug('%r.call_with_deadline(%r, %r, *%r, **%r)',
self, deadline, fn, args, kwargs)
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
else:
klass = None
response = self.send_await(
mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION
),
deadline
)
decoded = response.unpickle()
if isinstance(decoded, mitogen.core.CallError):
raise decoded
return decoded
def call(self, fn, *args, **kwargs): def call(self, fn, *args, **kwargs):
"""Invoke `fn(*args, **kwargs)` in the external context.""" """Invoke `fn(*args, **kwargs)` in the external context."""
return self.call_with_deadline(None, fn, *args, **kwargs) return self.call_async(fn, *args, **kwargs).get_data()
def _local_method(): def _local_method():