diff --git a/mitogen/core.py b/mitogen/core.py index cdff71d2..1c1d2e27 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -235,10 +235,11 @@ class Sender(object): class Receiver(object): - def __init__(self, router, handle=None): + def __init__(self, router, handle=None, persist=True, respondent=None): self.router = router self.handle = handle # Avoid __repr__ crash in add_handler() - self.handle = router.add_handler(self._on_receive, handle) + self.handle = router.add_handler(self._on_receive, handle, + persist, respondent) self._queue = Queue.Queue() def __repr__(self): @@ -266,7 +267,7 @@ class Receiver(object): continue if msg is None: - return + raise TimeoutError('deadline exceeded.') IOLOG.debug('%r.on_receive() got %r', self, msg) if msg == _DEAD: @@ -282,6 +283,9 @@ class Receiver(object): return msg, data + def get_data(self, timeout=None): + return self.get(timeout)[1] + def __iter__(self): """Yield objects from this channel until it is closed.""" while True: @@ -393,7 +397,7 @@ class Importer(object): self._cache[fullname] = ret = ( self._context.send_await( Message(data=fullname, handle=GET_MODULE) - ).unpickle() + ) ) if ret is None: @@ -687,29 +691,23 @@ class Context(object): msg.src_id = mitogen.context_id self.router.route(msg) - def send_await(self, msg, deadline=None): - """Send `msg` and wait for a response with an optional timeout.""" + def send_async(self, msg, persist=False): if self.router.broker._thread == threading.currentThread(): # TODO raise SystemError('Cannot making blocking call on broker thread') - queue = Queue.Queue() - msg.reply_to = self.router.add_handler(queue.put, - persist=False, - respondent=self) - LOG.debug('%r.send_await(%r)', self, msg) + receiver = Receiver(self.router, persist=persist, respondent=self) + msg.reply_to = receiver.handle + LOG.debug('%r.send_async(%r)', self, msg) self.send(msg) - try: - msg = queue.get(True, deadline) - except Queue.Empty: - # self.broker.defer(self.stream.on_disconnect, self.broker) - raise TimeoutError('deadline exceeded.') + return receiver - if msg == _DEAD: - raise StreamError('lost connection during call.') - - IOLOG.debug('%r._send_await() -> %r', self, msg) - return msg + def send_await(self, msg, deadline=None): + """Send `msg` and wait for a response with an optional timeout.""" + receiver = self.send_async(msg) + response = receiver.get_data(deadline) + IOLOG.debug('%r._send_await() -> %r', self, response) + return response def __repr__(self): return 'Context(%s, %r)' % (self.context_id, self.name) diff --git a/mitogen/master.py b/mitogen/master.py index 7c664997..d2fe66c8 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -649,16 +649,6 @@ class Context(mitogen.core.Context): """ mitogen.core.fire(self, 'disconnect') - def _discard_result(self, msg): - data = msg.unpickle() - if isinstance(data, Exception): - try: - raise data - except Exception: - LOG.exception('_discard_result') - else: - LOG.debug('_discard_result: %r', data) - def call_async(self, fn, *args, **kwargs): LOG.debug('%r.call_async(%r, *%r, **%r)', self, fn, args, kwargs) @@ -669,46 +659,16 @@ class Context(mitogen.core.Context): else: klass = None - self.send( + return self.send_async( mitogen.core.Message.pickled( (fn.__module__, klass, fn.__name__, args, kwargs), handle=mitogen.core.CALL_FUNCTION, - reply_to=self.router.add_handler(self._discard_result), ) ) - def call_with_deadline(self, deadline, fn, *args, **kwargs): - """Invoke `fn([context,] *args, **kwargs)` in the external context. - - If `deadline` is not ``None``, expire the call after `deadline` - seconds. If `deadline` is ``None``, the invocation may block - indefinitely.""" - LOG.debug('%r.call_with_deadline(%r, %r, *%r, **%r)', - self, deadline, fn, args, kwargs) - - if isinstance(fn, types.MethodType) and \ - isinstance(fn.im_self, (type, types.ClassType)): - klass = fn.im_self.__name__ - else: - klass = None - - response = self.send_await( - mitogen.core.Message.pickled( - (fn.__module__, klass, fn.__name__, args, kwargs), - handle=mitogen.core.CALL_FUNCTION - ), - deadline - ) - - decoded = response.unpickle() - if isinstance(decoded, mitogen.core.CallError): - raise decoded - return decoded - def call(self, fn, *args, **kwargs): """Invoke `fn(*args, **kwargs)` in the external context.""" - return self.call_with_deadline(None, fn, *args, **kwargs) - + return self.call_async(fn, *args, **kwargs).get_data() def _local_method():