diff --git a/kombu/async/__init__.py b/kombu/async/__init__.py new file mode 100644 index 00000000..1524b422 --- /dev/null +++ b/kombu/async/__init__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import + +from .hub import Hub, get_event_loop, set_event_loop + +from kombu.utils.eventio import READ, WRITE, ERR + +__all__ = ['READ', 'WRITE', 'ERR', 'Hub', 'get_event_loop', 'set_event_loop'] diff --git a/kombu/async/hub.py b/kombu/async/hub.py new file mode 100644 index 00000000..56ed7728 --- /dev/null +++ b/kombu/async/hub.py @@ -0,0 +1,195 @@ +from __future__ import absolute_import + +from kombu.five import items, range +from kombu.log import get_logger +from kombu.utils import cached_property, fileno +from kombu.utils.eventio import READ, WRITE, ERR, poll +from kombu.utils.functional import maybe_list + +__all__ = ['Hub', 'get_event_loop', 'set_event_loop'] +logger = get_logger(__name__) + +_current_loop = None + + +def get_event_loop(): + return _current_loop + + +def set_event_loop(loop): + global _current_loop + _current_loop = loop + return loop + + +def repr_flag(flag): + return '{0}{1}{2}'.format('R' if flag & READ else '', + 'W' if flag & WRITE else '', + '!' if flag & ERR else '') + + +def _rcb(obj): + if obj is None: + return '' + if isinstance(obj, str): + return obj + return obj.__name__ + + +class Hub(object): + """Event loop object. + + :keyword timer: Specify timer object. + + """ + #: Flag set if reading from an fd will not block. + READ = READ + + #: Flag set if writing to an fd will not block. + WRITE = WRITE + + #: Flag set on error, and the fd should be read from asap. + ERR = ERR + + #: List of callbacks to be called when the loop is initialized, + #: applied with the hub instance as sole argument. + on_init = None + + #: List of callbacks to be called when the loop is exiting, + #: applied with the hub instance as sole argument. + on_close = None + + #: List of callbacks to be called when a task is received. + #: Takes no arguments. + on_task = None + + def __init__(self, timer=None): + self.timer = timer + + self.readers = {} + self.writers = {} + self.on_init = [] + self.on_close = [] + self.on_task = [] + + # The eventloop (in celery.worker.loops) + # will merge fds in this set and then instead of calling + # the callback for each ready fd it will call the + # :attr:`consolidate_callback` with the list of ready_fds + # as an argument. This API is internal and is only + # used by the multiprocessing pool to find inqueues + # that are ready to write. + self.consolidate = set() + self.consolidate_callback = None + + def start(self): + self.poller = poll() + + def stop(self): + self.poller.close() + + def init(self): + for callback in self.on_init: + callback(self) + + def fire_timers(self, min_delay=1, max_delay=10, max_timers=10, + propagate=()): + timer = self.timer + delay = None + if timer and timer._queue: + for i in range(max_timers): + delay, entry = next(self.scheduler) + if entry is None: + break + try: + entry() + except propagate: + raise + except Exception as exc: + logger.error('Error in timer: %r', exc, exc_info=1) + return min(max(delay or 0, min_delay), max_delay) + + def add(self, fds, callback, flags, consolidate=False): + for fd in maybe_list(fds, None): + try: + self._add(fd, callback, flags, consolidate) + except ValueError: + self._discard(fd) + + def remove(self, fd): + fd = fileno(fd) + self._unregister(fd) + self._discard(fd) + + def add_reader(self, fds, callback): + return self.add(fds, callback, READ | ERR) + + def add_writer(self, fds, callback): + return self.add(fds, callback, WRITE) + + def update_readers(self, readers): + [self.add_reader(*x) for x in items(readers)] + + def update_writers(self, writers): + [self.add_writer(*x) for x in items(writers)] + + def _unregister(self, fd): + try: + self.poller.unregister(fd) + except (KeyError, OSError): + pass + + def close(self, *args): + [self._unregister(fd) for fd in self.readers] + self.readers.clear() + [self._unregister(fd) for fd in self.writers] + self.writers.clear() + for callback in self.on_close: + callback(self) + + def _add(self, fd, cb, flags, consolidate=False): + self.poller.register(fd, flags) + (self.readers if flags & READ else self.writers)[fileno(fd)] = cb + if consolidate: + self.consolidate.add(fd) + + def _discard(self, fd): + fd = fileno(fd) + self.readers.pop(fd, None) + self.writers.pop(fd, None) + self.consolidate.discard(fd) + + def repr_active(self): + return ', '.join(self._repr_readers() + self._repr_writers()) + + def repr_events(self, events): + return ', '.join( + '{0}->{1}'.format( + _rcb(self._callback_for(fd, fl, '{0!r}(GONE)'.format(fd))), + repr_flag(fl), + ) + for fd, fl in events + ) + + def _repr_readers(self): + return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR)) + for fd, cb in items(self.readers)] + + def _repr_writers(self): + return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE)) + for fd, cb in items(self.writers)] + + def _callback_for(self, fd, flag, *default): + try: + if flag & READ: + return self.readers[fileno(fd)] + if flag & WRITE: + return self.writers[fileno(fd)] + except KeyError: + if default: + return default[0] + raise + + @cached_property + def scheduler(self): + return iter(self.timer) diff --git a/kombu/async/semaphore.py b/kombu/async/semaphore.py new file mode 100644 index 00000000..ef65db75 --- /dev/null +++ b/kombu/async/semaphore.py @@ -0,0 +1,95 @@ +from __future__ import absolute_import + +__all__ = ['DummyLock', 'LaxBoundedSemaphore'] + + +class LaxBoundedSemaphore(object): + """Asynchronous Bounded Semaphore. + + Lax means that the value will stay within the specified + range even if released more times than it was acquired. + + Example: + + >>> x = LaxBoundedSemaphore(2) + + >>> def callback(i): + ... say('HELLO {0!r}'.format(i)) + + >>> x.acquire(callback, 1) + HELLO 1 + + >>> x.acquire(callback, 2) + HELLO 2 + + >>> x.acquire(callback, 3) + >>> x._waiters # private, do not access directly + [(callback, 3)] + + >>> x.release() + HELLO 3 + + """ + + def __init__(self, value): + self.initial_value = self.value = value + self._waiting = set() + + def acquire(self, callback, *partial_args): + """Acquire semaphore, applying ``callback`` if + the resource is available. + + :param callback: The callback to apply. + :param \*partial_args: partial arguments to callback. + + """ + if self.value <= 0: + self._waiting.append((callback, partial_args)) + return False + else: + self.value = max(self.value - 1, 0) + callback(*partial_args) + return True + + def release(self): + """Release semaphore. + + If there are any waiters this will apply the first waiter + that is waiting for the resource (FIFO order). + + """ + self.value = min(self.value + 1, self.initial_value) + if self._waiting: + waiter, args = self._waiting.pop() + waiter(*args) + + def grow(self, n=1): + """Change the size of the semaphore to accept more users.""" + self.initial_value += n + self.value += n + [self.release() for _ in range(n)] + + def shrink(self, n=1): + """Change the size of the semaphore to accept less users.""" + self.initial_value = max(self.initial_value - n, 0) + self.value = max(self.value - n, 0) + + def clear(self): + """Reset the sempahore, which also wipes out any waiting callbacks.""" + self._waiting[:] = [] + self.value = self.initial_value + + def __repr__(self): + return '<{0} at {1:#x} value:{2} waiting:{3}>'.format( + self.__class__.__name__, id(self), self.value, len(self.waiting), + ) + + +class DummyLock(object): + """Pretending to be a lock.""" + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + pass diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 7c4acbe7..3a5e35b6 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -16,7 +16,7 @@ from itertools import count, repeat from time import sleep from uuid import UUID, uuid4 as _uuid4, _uuid_generate_random -from kombu.five import items, reraise, string_t +from kombu.five import int_types, items, reraise, string_t from .encoding import default_encode, safe_repr as _safe_repr @@ -25,10 +25,18 @@ try: except: ctypes = None # noqa +try: + from io import UnsupportedOperation + FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation) +except ImportError: # pragma: no cover + # Py2 + FILENO_ERRORS = (AttributeError, ValueError) # noqa + + __all__ = ['EqualityDict', 'say', 'uuid', 'kwdict', 'maybe_list', 'fxrange', 'fxrangemax', 'retry_over_time', 'emergency_dump_state', 'cached_property', - 'reprkwargs', 'reprcall', 'nested'] + 'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno'] def symbol_by_name(name, aliases={}, imp=None, package=None, @@ -406,3 +414,17 @@ def escape_regex(p, white=''): return ''.join(c if c.isalnum() or c in white else ('\\000' if c == '\000' else '\\' + c) for c in p) + + +def fileno(f): + if isinstance(f, int_types): + return f + return f.fileno() + + +def maybe_fileno(f): + """Get object fileno, or :const:`None` if not defined.""" + try: + return fileno(f) + except FILENO_ERRORS: + pass diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index e2ec1afb..972c6998 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -2,7 +2,9 @@ from __future__ import absolute_import import sys -__all__ = ['lazy', 'maybe_evaluate'] +__all__ = ['lazy', 'maybe_evaluate', 'is_list', 'maybe_list'] + +from kombu.five import string_t class lazy(object): @@ -62,6 +64,16 @@ def maybe_evaluate(value): return value +def is_list(l, scalars=(dict, string_t)): + """Returns true if object is list-like, but not a dict or string.""" + return hasattr(l, '__iter__') and not isinstance(l, scalars or ()) + + +def maybe_list(l, scalars=(dict, string_t)): + """Returns list of one element if ``l`` is a scalar.""" + return l if l is None or is_list(l, scalars) else [l] + + # Compat names (before kombu 3.0) promise = lazy maybe_promise = maybe_evaluate