diff --git a/docs/howitworks.rst b/docs/howitworks.rst index ac496a67..7d8d109b 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -859,6 +859,8 @@ threads, with a minimum of 4 required in any configuration. Latch Internals ~~~~~~~~~~~~~~~ +.. currentmodule:: mitogen.core + Attributes: * `lock` – :py:class:`threading.Lock`. @@ -868,15 +870,15 @@ Attributes: * `waking` – integer number of `sleeping` threads in the process of waking up. * `closed` – boolean defaulting to :py:data:`False`. Every time `lock` is acquired, `closed` must be tested, and if it is :py:data:`True`, - :py:class:`mitogen.core.LatchError` must be thrown. + :py:class:`LatchError` must be thrown. Latch.put() ~~~~~~~~~~~ -:py:meth:`mitogen.core.Latch.put` operates by: +:py:meth:`Latch.put` operates by: -1. Acquiring `lock` +1. Acquiring `lock`. 2. Appending the item on to `queue`. 3. If `waking` is less than the length of `sleeping`, write a byte to the socket at `sleeping[waking]` and increment `waking`. @@ -886,14 +888,13 @@ to when its socket was placed on `sleeping`. Latch.close() -~~~~~~~~~~~ +~~~~~~~~~~~~~ -:py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to -:py:data:`True`, then writes a byte to every `sleeping[waking]` socket, while -incrementing `waking`, until no more unwoken sockets exist. Per above, on -waking from sleep, after removing itself from `sleeping`, each sleeping thread -tests if `closed` is :py:data:`True`, and if so throws -:py:class:`mitogen.core.LatchError`. +:py:meth:`Latch.close` acquires `lock`, sets `closed` to :py:data:`True`, then +writes a byte to every `sleeping[waking]` socket, while incrementing `waking`, +until no more unwoken sockets exist. Per above, on waking from sleep, after +removing itself from `sleeping`, each sleeping thread tests if `closed` is +:py:data:`True`, and if so throws :py:class:`LatchError`. It is necessary to ensure at most one byte is delivered on each socket, even if the latch is being torn down, as the sockets outlive the scope of a single @@ -904,42 +905,53 @@ unexpected wakeups if future latches sleep on the same thread. Latch.get() ~~~~~~~~~~~ -:py:meth:`mitogen.core.Latch.get` is far more fiddly, as there are a variety of -outcomes to handle. Queue ordering is strictly first-in first-out, and the -first thread to attempt to retrieve an item always receives the first available -item. +:py:meth:`Latch.get` is far more intricate, as there are many outcomes to +handle. Queue ordering is strictly first-in first-out, and threads always +receive items in the order they are requested, as they become available. **1. Non-empty, No Waiters, No sleep** On entry `lock` is taken, and if `queue` is non-empty, and `sleeping` is empty, it is safe to return `queue`'s first item without blocking. -**2. Non-empty, Waiters Present, Sleep** - In this case `sleeping` is non-empty, and it is not safe to pop the item - even though we are holding `lock`, as it would bump the calling thread to - the front of the line, starving any sleeping thread of their item, since a - race exists between a thread waking from :py:func:`select.select` and - re-acquiring `lock`. +**2. Non-empty, Waiters Present, Queue > Waiters, No sleep** + When `sleeping` is non-empty but there are more items than sleeping + threads, it is safe to pop `queue[len(sleeping)]` without blocking. + +**3. Non-empty, Waiters Present, Queue <= Waiters** + In this case `sleeping` is non-empty and there are no surplus items. It is + not safe to pop any item even though we are holding `lock`, as it would + starve waking threads of their position in favour of the calling thread, + since scheduling uncertainty exists between a thread waking from + :py:func:`select.select` and re-acquiring `lock`. This avoids the need for a retry loop for waking threads, and a thread being continually re-woken to discover `queue` drained by a thread that never slept. -**3. Sleep** - Since `queue` was empty, or `sleeping` was non-empty, the thread adds its - socket to `sleeping` before releasing `lock`, and sleeping in - :py:func:`select.select` waiting for a write from - :py:meth:`mitogen.core.Latch.put`. +**4. Sleep** + Since no surplus items existed, the thread adds its socket to `sleeping` + before releasing `lock`, and sleeping in :py:func:`select.select` waiting + for timeout, or a write from :py:meth:`Latch.put` or + :py:meth:`Latch.close`. -**4. Wake, Non-empty** - On wake it re-acquires `lock`, removes itself from `sleeping`, throws - :py:class:`mitogen.core.TimeoutError` if no byte was written, decrements - `waking`, then pops and returns the first item in `queue` that is - guaranteed to exist. +**5. Wake, Non-empty** + On wake `lock` is re-acquired, the socket is removed from `sleeping` after + noting its index, and :py:class:`TimeoutError` is thrown if `waking` + indicates :py:meth:`Latch.put()` nor :py:meth:`Latch.close` have yet to + send a wake byte to that index. The byte is then read off, + :py:class:`LatchError` is thrown if `closed` is :py:data:`True`, otherwise + the queue item corresponding to the thread's index is popped and returned. - It is paramount that in every case, if :py:func:`select.select` indicates a - byte was written to the socket, that the byte is read away. The socket is - reused by subsequent latches sleeping on the same thread, and unexpected - wakeups are triggered if extraneous data remains buffered on the socket. + It is paramount that in every case, if a byte was written to the socket, + that the byte is read away. The socket is reused by subsequent latches + sleeping on the same thread, and unexpected wakeups are triggered if + extraneous data remains buffered on the socket. + + It is also necessary to favour the synchronized `waking` variable over the + return value of :py:func:`select.select`, as scheduling uncertainty + introduces a race between the select timing out, and :py:meth:`Latch.put()` + or :py:meth:`Latch.close` writing a wake byte before :py:meth:`Latch.get` + has re-acquired `lock`. .. rubric:: Footnotes diff --git a/mitogen/core.py b/mitogen/core.py index fbe0d6eb..39ef818a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -941,9 +941,10 @@ class Latch(object): try: if self.closed: raise LatchError() - if self._queue and not self._sleeping: - _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) - return self._queue.pop(0) + i = len(self._sleeping) + if len(self._queue) > i: + _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[i]) + return self._queue.pop(i) if not block: raise TimeoutError() self._tls_init() @@ -952,25 +953,21 @@ class Latch(object): self._lock.release() _vv and IOLOG.debug('%r.get() -> sleeping', self) - rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout) - assert len(rfds) or timeout is not None + restart(select.select, [_tls.rsock], [], [], timeout) self._lock.acquire() try: - self._sleeping.remove(_tls.wsock) - if not rfds: + i = self._sleeping.index(_tls.wsock) + del self._sleeping[i] + self._waking -= 1 + if i > self._waking: raise TimeoutError() if _tls.rsock.recv(2) != '\x7f': raise LatchError('internal error: received >1 wakeups') - self._waking -= 1 if self.closed: raise LatchError() - try: - _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) - return self._queue.pop(0) - except IndexError: - IOLOG.exception('%r.get() INDEX ERROR', self) - raise + _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i]) + return self._queue.pop(i) finally: self._lock.release()