diff --git a/ansible_mitogen/strategy/mitogen.py b/ansible_mitogen/strategy/mitogen.py index 7be52f59..8eab8408 100644 --- a/ansible_mitogen/strategy/mitogen.py +++ b/ansible_mitogen/strategy/mitogen.py @@ -52,7 +52,6 @@ class ContextProxyService(mitogen.service.Service): return isinstance(args, dict) def dispatch(self, dct, msg): - print dct.get('via') key = repr(sorted(dct.items())) if key not in self._context_by_id: method = getattr(self.router, dct.pop('method')) diff --git a/mitogen/core.py b/mitogen/core.py index 1ec2bc57..ca796ab2 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -318,28 +318,6 @@ class Sender(object): ) -def _queue_interruptible_get(queue, timeout=None, block=True): - # bool is subclass of int, cannot use isinstance! - assert timeout is None or type(timeout) in (int, long, float) - assert isinstance(block, bool) - - if timeout is not None: - timeout += time.time() - - msg = None - while msg is None and (timeout is None or timeout > time.time()): - try: - msg = queue.get(block, 0.5) - except Queue.Empty: - if not block: - break - - if msg is None: - raise TimeoutError('deadline exceeded.') - - return msg - - class Receiver(object): notify = None raise_channelerror = True @@ -350,6 +328,7 @@ class Receiver(object): self.handle = router.add_handler(self._on_receive, handle, persist, respondent) self._queue = Queue.Queue() + self._latch = Latch() def __repr__(self): return 'Receiver(%r, %r)' % (self.router, self.handle) @@ -358,6 +337,7 @@ class Receiver(object): """Callback from the Stream; appends data to the internal queue.""" IOLOG.debug('%r._on_receive(%r)', self, msg) self._queue.put(msg) + self._latch.wake() if self.notify: self.notify(self) @@ -369,9 +349,9 @@ class Receiver(object): def get(self, timeout=None, block=True): IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) - - msg = _queue_interruptible_get(self._queue, timeout, block=block) - IOLOG.debug('%r.get() got %r', self, msg) + self._latch.wait(timeout=timeout) + msg = self._queue.get() + #IOLOG.debug('%r.get() got %r', self, msg) if msg == _DEAD: raise ChannelError(ChannelError.local_msg) @@ -807,7 +787,44 @@ def _unpickle_context(router, context_id, name): return router.context_class(router, context_id, name) -class Waker(BasicStream): +class Latch(object): + def __init__(self): + rfd, wfd = os.pipe() + set_cloexec(rfd) + set_cloexec(wfd) + self.receive_side = Side(self, rfd) + self.transmit_side = Side(self, wfd) + + def close(self): + self.receive_side.close() + self.transmit_side.close() + + __del__ = close + + def wait(self, timeout=None): + while True: + rfds, _, _ = select.select([self.receive_side], [], [], timeout) + if not rfds: + return False + + try: + self.receive_side.read(1) + except OSError, e: + if e[0] == errno.EWOULDBLOCK: + continue + raise + return False + + def wake(self): + IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) + try: + self.transmit_side.write(' ') + except OSError, e: + if e[0] != errno.EBADF: + raise + + +class Waker(Latch, BasicStream): """ :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when @@ -816,35 +833,26 @@ class Waker(BasicStream): .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html """ def __init__(self, broker): + super(Waker, self).__init__() self._broker = broker - rfd, wfd = os.pipe() - set_cloexec(rfd) - set_cloexec(wfd) - self.receive_side = Side(self, rfd) - self.transmit_side = Side(self, wfd) def __repr__(self): return 'Waker(%r)' % (self._broker,) - def wake(self): - """ - Write a byte to the self-pipe, causing the IO multiplexer to wake up. - Nothing is written if the current thread is the IO multiplexer thread. - """ - IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) - if threading.currentThread() != self._broker._thread: - try: - self.transmit_side.write(' ') - except OSError, e: - if e[0] != errno.EBADF: - raise - def on_receive(self, broker): """ Read a byte from the self-pipe. """ self.receive_side.read(256) + def wake(self): + """ + Write a byte to the self-pipe, causing the IO multiplexer to wake up. + Nothing is written if the current thread is the IO multiplexer thread. + """ + if threading.currentThread() != self._broker._thread: + super(Waker, self).wake() + class IoLogger(BasicStream): """ diff --git a/mitogen/master.py b/mitogen/master.py index 9d1f33b2..7905ed7d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -127,11 +127,13 @@ class Select(object): self._receivers = [] self._oneshot = oneshot self._queue = Queue.Queue() + self._latch = mitogen.core.Latch() for recv in receivers: self.add(recv) def _put(self, value): self._queue.put(value) + self._latch.wake() if self.notify: self.notify(self) @@ -200,7 +202,8 @@ class Select(object): raise SelectError(self.empty_msg) while True: - recv = mitogen.core._queue_interruptible_get(self._queue, timeout) + self._latch.wait() + recv = self._queue.get() try: msg = recv.get(block=False) if self._oneshot: