issue #156: waking thread result dictionary with an integer.

This commit is contained in:
David Wilson 2018-03-20 12:55:55 +05:45
parent 001e0163fe
commit 07a8994ff5
2 changed files with 39 additions and 33 deletions

View File

@ -862,12 +862,11 @@ Latch Internals
Attributes: Attributes:
* `lock` :py:class:`threading.Lock`. * `lock` :py:class:`threading.Lock`.
* `queue`  enqueued items. * `queue`  items waiting to be dequeued.
* `wake_socks`  the write sides of the socketpairs for each currently * `sleeping`  write sides of the socketpairs for each sleeping thread, and
sleeping thread. While the lock is held, a non-empty `wake_socks` indicates threads in the process of waking from sleep.
not only the presence of sleeping threads, but threads that have recently * `waking` integer number of `sleeping` threads in the process of waking up.
woken but have not yet to retrieved their item from `queue`. * `closed` boolean defaulting to :py:data:`False`. Every time `lock`
* `closed` a simple boolean defaulting to :py:data:`False`. Every time `lock`
is acquired, `closed` must be tested, and if it is :py:data:`True`, is acquired, `closed` must be tested, and if it is :py:data:`True`,
:py:class:`mitogen.core.LatchError` must be thrown. :py:class:`mitogen.core.LatchError` must be thrown.
@ -875,17 +874,23 @@ Attributes:
Latch.put() Latch.put()
~~~~~~~~~~~ ~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.put` operates simply by acquiring `lock`, :py:meth:`mitogen.core.Latch.put` operates by:
appending the item on to `queue`, then if `wake_socks` is non-empty, a byte is
written to the first socket in the list before finally releasing `lock`. 1. Acquiring `lock`
2. Appending the item on to `queue`.
3. If `waking` is less than the length of `sleeping`, write a byte to the
socket at `sleeping[waking]` and increment `waking`.
In this way each thread is woken only once, and receives each element according
to when its socket was placed on `sleeping`.
Latch.close() Latch.close()
~~~~~~~~~~~ ~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to :py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to
:py:data:`True`, then writes a byte to every socket in `wake_socks`. As above, :py:data:`True`, then writes a byte to every socket in `sleeping`. Per above,
on waking from sleep, after removing itself from `wake_socks`, each sleeping on waking from sleep, after removing itself from `sleeping`, each sleeping
thread tests if `closed` is :py:data:`True`, and if so throws thread tests if `closed` is :py:data:`True`, and if so throws
:py:class:`mitogen.core.LatchError`. :py:class:`mitogen.core.LatchError`.
@ -899,30 +904,31 @@ first thread to attempt to retrieve an item always receives the first available
item. item.
**1. Non-empty, No Waiters, No sleep** **1. Non-empty, No Waiters, No sleep**
On entry `lock` is taken, and if `queue` is non-empty, and `wake_socks` is On entry `lock` is taken, and if `queue` is non-empty, and `sleeping` is
empty, it is safe to return `queue`'s first item without blocking. empty, it is safe to return `queue`'s first item without blocking.
**2. Non-empty, Waiters Present, Sleep** **2. Non-empty, Waiters Present, Sleep**
In this case `wake_socks` is non-empty, and it is not safe to pop the item In this case `sleeping` is non-empty, and it is not safe to pop the item
even though we are holding `lock`, as it would bump the calling thread to even though we are holding `lock`, as it would bump the calling thread to
the front of the line, starving any sleeping thread of their item, since a the front of the line, starving any sleeping thread of their item, since a
race exists between a thread waking from :py:func:`select.select` and its race exists between a thread waking from :py:func:`select.select` and
re-acquiring of `lock`. re-acquiring `lock`.
This avoids the need for a retry loop for waking threads, and a sleeping This avoids the need for a retry loop for waking threads, and a thread
thread being continually re-woken only to discover `queue` drained by a being continually re-woken to discover `queue` drained by a thread that
thread that never slept. never slept.
**3. Sleep** **3. Sleep**
Since `queue` was empty, or `wake_socks` was non-empty, the thread adds its Since `queue` was empty, or `sleeping` was non-empty, the thread adds its
socket to `wake_socks` before releasing `lock`, and sleeping in socket to `sleeping` before releasing `lock`, and sleeping in
:py:func:`select.select` waiting for a write from :py:func:`select.select` waiting for a write from
:py:meth:`mitogen.core.Latch.put`. :py:meth:`mitogen.core.Latch.put`.
**4. Wake, Non-empty** **4. Wake, Non-empty**
On wake it re-acquires `lock`, removes itself from `wake_socks`, throws On wake it re-acquires `lock`, removes itself from `sleeping`, decrementing
:py:class:`mitogen.core.TimeoutError` if no byte was written, otherwise `waking`, throws :py:class:`mitogen.core.TimeoutError` if no byte was
pops and returns the first item in `queue` that is guaranteed to exist. written, otherwise pops and returns the first item in `queue` that is
guaranteed to exist.
.. rubric:: Footnotes .. rubric:: Footnotes

View File

@ -907,12 +907,12 @@ def _unpickle_context(router, context_id, name):
class Latch(object): class Latch(object):
closed = False closed = False
_waking = 0
def __init__(self): def __init__(self):
self._lock = threading.Lock() self._lock = threading.Lock()
self._queue = [] self._queue = []
self._sleeping = [] self._sleeping = []
self._pending = {}
def close(self): def close(self):
self._lock.acquire() self._lock.acquire()
@ -940,7 +940,7 @@ class Latch(object):
try: try:
if self.closed: if self.closed:
raise LatchError() raise LatchError()
if self._queue: if self._queue and not self._sleeping:
_vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0])
return self._queue.pop(0) return self._queue.pop(0)
if not block: if not block:
@ -956,6 +956,8 @@ class Latch(object):
self._lock.acquire() self._lock.acquire()
try: try:
self._sleeping.remove(_tls.wsock)
self._waking -= 1
if self.closed: if self.closed:
raise LatchError() raise LatchError()
if not rfds: if not rfds:
@ -963,9 +965,8 @@ class Latch(object):
if _tls.rsock.recv(2) != '\x7f': if _tls.rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups') raise LatchError('internal error: received >1 wakeups')
try: try:
obj = self._pending.pop(_tls.wsock) _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0])
_vv and IOLOG.debug('%r.get() wake -> %r', self, obj) return self._queue.pop(0)
return obj
except IndexError: except IndexError:
IOLOG.exception('%r.get() INDEX ERROR', self) IOLOG.exception('%r.get() INDEX ERROR', self)
raise raise
@ -978,14 +979,13 @@ class Latch(object):
try: try:
if self.closed: if self.closed:
raise LatchError() raise LatchError()
if self._sleeping: self._queue.append(obj)
sock = self._sleeping.pop(0) if self._waking < len(self._sleeping):
self._pending[sock] = obj sock = self._sleeping[self._waking]
self._waking += 1
_vv and IOLOG.debug('%r.put() -> waking wfd=%r', _vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, sock.fileno()) self, sock.fileno())
self._wake(sock) self._wake(sock)
else:
self._queue.append(obj)
finally: finally:
self._lock.release() self._lock.release()