diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 3eef6800..d9bccc88 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -856,6 +856,75 @@ means that Mitogen requires twice as many file descriptors as there are user threads, with a minimum of 4 required in any configuration. +Latch Internals +~~~~~~~~~~~~~~~ + +Attributes: + +* `lock` – :py:class:`threading.Lock`. +* `queue` – enqueued items. +* `wake_socks` – the write sides of the socketpairs for each currently + sleeping thread. While the lock is held, a non-empty `wake_socks` indicates + not only the presence of sleeping threads, but threads that have recently + woken but have not yet to retrieved their item from `queue`. +* `closed` – a simple boolean defaulting to :py:data:`False`. Every time `lock` + is acquired, `closed` must be tested, and if it is :py:data:`True`, + :py:class:`mitogen.core.LatchError` must be thrown. + + +Latch.put() +~~~~~~~~~~~ + +:py:meth:`mitogen.core.Latch.put` operates simply by acquiring `lock`, +appending the item on to `queue`, then if `wake_socks` is non-empty, a byte is +written to the first socket in the list before finally releasing `lock`. + + +Latch.close() +~~~~~~~~~~~ + +:py:meth:`mitogen.core.Latch.putclose` acquires `lock`, sets `closed` to +:py:data:`True`, then writes a byte to every socket in `wake_socks`. As above, +on waking from sleep, after removing itself from `wake_socks`, each sleeping +thread tests if `closed` is :py:data:`True`, and if so throws +:py:class:`mitogen.core.LatchError`. + + +Latch.get() +~~~~~~~~~~~ + +:py:meth:`mitogen.core.Latch.get` is far more fiddly, as there are a variety of +outcomes to handle. Queue ordering is strictly first-in first-out, and the +first thread to attempt to retrieve an item always receives the first available +item. + +**1. Non-empty, No Waiters, No sleep** + On entry `lock` is taken, and if `queue` is non-empty, and `wake_socks` is + empty, it is safe to return `queue`'s first item without blocking. + +**2. Non-empty, Waiters Present, Sleep** + In this case `wake_socks` is non-empty, and it is not safe to pop the item + even though we are holding `lock`, as it would bump the calling thread to + the front of the line, starving any sleeping thread of their item, since a + race exists between a thread waking from :py:func:`select.select` and its + re-acquiring of `lock`. + + This avoids the need for a retry loop for waking threads, and a sleeping + thread being continually re-woken only to discover `queue` drained by a + thread that never slept. + +**3. Sleep** + Since `queue` was empty, or `wake_socks` was non-empty, the thread adds its + socket to `wake_socks` before releasing `lock`, and sleeping in + :py:func:`select.select` waiting for a write from + :py:meth:`mitogen.core.Latch.put`. + +**4. Wake, Non-empty** + On wake it re-acquires `lock`, removes itself from `wake_socks`, throws + :py:class:`mitogen.core.TimeoutError` if no byte was written, otherwise + pops and returns the first item in `queue` that is guaranteed to exist. + + .. rubric:: Footnotes .. [#f1] Compression may seem redundant, however it is basically free and reducing IO diff --git a/mitogen/core.py b/mitogen/core.py index 97181e23..edc15f68 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -917,8 +917,8 @@ class Latch(object): self.lock.acquire() try: self.closed = True - while self.wake_socks: - self._wake(self.wake_socks.pop()) + for wsock in self.wake_socks: + self._wake(wsock) finally: self.lock.release() @@ -932,16 +932,14 @@ class Latch(object): set_cloexec(_tls.wsock.fileno()) def get(self, timeout=None, block=True): - _vv and IOLOG.debug( - '%r.get(timeout=%r, block=%r)', - self, timeout, block - ) + _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', + self, timeout, block) self.lock.acquire() try: if self.closed: raise LatchError() - if self.queue: + if self.queue and not self.wake_socks: _vv and IOLOG.debug('%r.get() -> %r', self, self.queue[0]) return self.queue.pop(0) if not block: @@ -957,14 +955,11 @@ class Latch(object): self.lock.acquire() try: + self.wake_socks.remove(_tls.wsock) if self.closed: raise LatchError() - if _tls.wsock in self.wake_socks: - # Nothing woke us, remove stale entry. - self.wake_socks.remove(_tls.wsock) + if not rfds: raise TimeoutError() - - assert _tls.rsock in rfds assert _tls.rsock.recv(1) == '\x7f' try: _vv and IOLOG.debug('%r.get() wake -> %r', self, self.queue[0]) @@ -982,14 +977,12 @@ class Latch(object): if self.closed: raise LatchError() self.queue.append(obj) - woken = len(self.wake_socks) > 0 - if woken: + if self.wake_socks: _vv and IOLOG.debug('%r.put() -> waking wfd=%r', self, self.wake_socks[0].fileno()) - self._wake(self.wake_socks.pop(0)) + self._wake(self.wake_socks[0]) finally: self.lock.release() - _v and LOG.debug('put() done. woken? %s', woken) def _wake(self, sock): try: