diff --git a/kombu/async/semaphore.py b/kombu/async/semaphore.py index ef65db75..26b77077 100644 --- a/kombu/async/semaphore.py +++ b/kombu/async/semaphore.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +from collections import deque + __all__ = ['DummyLock', 'LaxBoundedSemaphore'] @@ -13,18 +15,15 @@ class LaxBoundedSemaphore(object): >>> x = LaxBoundedSemaphore(2) - >>> def callback(i): - ... say('HELLO {0!r}'.format(i)) - - >>> x.acquire(callback, 1) + >>> x.acquire(print, 'HELLO 1') HELLO 1 - >>> x.acquire(callback, 2) + >>> x.acquire(print, 'HELLO 2') HELLO 2 - >>> x.acquire(callback, 3) + >>> x.acquire(print, 'HELLO 3') >>> x._waiters # private, do not access directly - [(callback, 3)] + [print, ('HELLO 3', )] >>> x.release() HELLO 3 @@ -33,7 +32,9 @@ class LaxBoundedSemaphore(object): def __init__(self, value): self.initial_value = self.value = value - self._waiting = set() + self._waiting = deque() + self._add_waiter = self._waiting.append + self._pop_waiter = self._waiting.popleft def acquire(self, callback, *partial_args): """Acquire semaphore, applying ``callback`` if @@ -43,8 +44,8 @@ class LaxBoundedSemaphore(object): :param \*partial_args: partial arguments to callback. """ - if self.value <= 0: - self._waiting.append((callback, partial_args)) + if value <= 0: + self._add_waiter((callback, partial_args)) return False else: self.value = max(self.value - 1, 0) @@ -59,8 +60,11 @@ class LaxBoundedSemaphore(object): """ self.value = min(self.value + 1, self.initial_value) - if self._waiting: - waiter, args = self._waiting.pop() + try: + waiter, args = self._pop_waiter() + except IndexError: + pass + else: waiter(*args) def grow(self, n=1): @@ -76,7 +80,7 @@ class LaxBoundedSemaphore(object): def clear(self): """Reset the sempahore, which also wipes out any waiting callbacks.""" - self._waiting[:] = [] + self._waiting.clear() self.value = self.initial_value def __repr__(self):