Moves celery.worker.hub here as kombu.async

This is not really done yet, but it will have to live here eventually
as the async transports will want to register with a eventloop.
This commit is contained in:
Ask Solem 2013-09-19 21:27:27 +01:00
parent 0581241eba
commit 036b0dc504
5 changed files with 334 additions and 3 deletions

7
kombu/async/__init__.py Normal file
View File

@ -0,0 +1,7 @@
from __future__ import absolute_import
from .hub import Hub, get_event_loop, set_event_loop
from kombu.utils.eventio import READ, WRITE, ERR
__all__ = ['READ', 'WRITE', 'ERR', 'Hub', 'get_event_loop', 'set_event_loop']

195
kombu/async/hub.py Normal file
View File

@ -0,0 +1,195 @@
from __future__ import absolute_import
from kombu.five import items, range
from kombu.log import get_logger
from kombu.utils import cached_property, fileno
from kombu.utils.eventio import READ, WRITE, ERR, poll
from kombu.utils.functional import maybe_list
__all__ = ['Hub', 'get_event_loop', 'set_event_loop']
logger = get_logger(__name__)
_current_loop = None
def get_event_loop():
return _current_loop
def set_event_loop(loop):
global _current_loop
_current_loop = loop
return loop
def repr_flag(flag):
return '{0}{1}{2}'.format('R' if flag & READ else '',
'W' if flag & WRITE else '',
'!' if flag & ERR else '')
def _rcb(obj):
if obj is None:
return '<missing>'
if isinstance(obj, str):
return obj
return obj.__name__
class Hub(object):
"""Event loop object.
:keyword timer: Specify timer object.
"""
#: Flag set if reading from an fd will not block.
READ = READ
#: Flag set if writing to an fd will not block.
WRITE = WRITE
#: Flag set on error, and the fd should be read from asap.
ERR = ERR
#: List of callbacks to be called when the loop is initialized,
#: applied with the hub instance as sole argument.
on_init = None
#: List of callbacks to be called when the loop is exiting,
#: applied with the hub instance as sole argument.
on_close = None
#: List of callbacks to be called when a task is received.
#: Takes no arguments.
on_task = None
def __init__(self, timer=None):
self.timer = timer
self.readers = {}
self.writers = {}
self.on_init = []
self.on_close = []
self.on_task = []
# The eventloop (in celery.worker.loops)
# will merge fds in this set and then instead of calling
# the callback for each ready fd it will call the
# :attr:`consolidate_callback` with the list of ready_fds
# as an argument. This API is internal and is only
# used by the multiprocessing pool to find inqueues
# that are ready to write.
self.consolidate = set()
self.consolidate_callback = None
def start(self):
self.poller = poll()
def stop(self):
self.poller.close()
def init(self):
for callback in self.on_init:
callback(self)
def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
propagate=()):
timer = self.timer
delay = None
if timer and timer._queue:
for i in range(max_timers):
delay, entry = next(self.scheduler)
if entry is None:
break
try:
entry()
except propagate:
raise
except Exception as exc:
logger.error('Error in timer: %r', exc, exc_info=1)
return min(max(delay or 0, min_delay), max_delay)
def add(self, fds, callback, flags, consolidate=False):
for fd in maybe_list(fds, None):
try:
self._add(fd, callback, flags, consolidate)
except ValueError:
self._discard(fd)
def remove(self, fd):
fd = fileno(fd)
self._unregister(fd)
self._discard(fd)
def add_reader(self, fds, callback):
return self.add(fds, callback, READ | ERR)
def add_writer(self, fds, callback):
return self.add(fds, callback, WRITE)
def update_readers(self, readers):
[self.add_reader(*x) for x in items(readers)]
def update_writers(self, writers):
[self.add_writer(*x) for x in items(writers)]
def _unregister(self, fd):
try:
self.poller.unregister(fd)
except (KeyError, OSError):
pass
def close(self, *args):
[self._unregister(fd) for fd in self.readers]
self.readers.clear()
[self._unregister(fd) for fd in self.writers]
self.writers.clear()
for callback in self.on_close:
callback(self)
def _add(self, fd, cb, flags, consolidate=False):
self.poller.register(fd, flags)
(self.readers if flags & READ else self.writers)[fileno(fd)] = cb
if consolidate:
self.consolidate.add(fd)
def _discard(self, fd):
fd = fileno(fd)
self.readers.pop(fd, None)
self.writers.pop(fd, None)
self.consolidate.discard(fd)
def repr_active(self):
return ', '.join(self._repr_readers() + self._repr_writers())
def repr_events(self, events):
return ', '.join(
'{0}->{1}'.format(
_rcb(self._callback_for(fd, fl, '{0!r}(GONE)'.format(fd))),
repr_flag(fl),
)
for fd, fl in events
)
def _repr_readers(self):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
for fd, cb in items(self.readers)]
def _repr_writers(self):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
for fd, cb in items(self.writers)]
def _callback_for(self, fd, flag, *default):
try:
if flag & READ:
return self.readers[fileno(fd)]
if flag & WRITE:
return self.writers[fileno(fd)]
except KeyError:
if default:
return default[0]
raise
@cached_property
def scheduler(self):
return iter(self.timer)

95
kombu/async/semaphore.py Normal file
View File

@ -0,0 +1,95 @@
from __future__ import absolute_import
__all__ = ['DummyLock', 'LaxBoundedSemaphore']
class LaxBoundedSemaphore(object):
"""Asynchronous Bounded Semaphore.
Lax means that the value will stay within the specified
range even if released more times than it was acquired.
Example:
>>> x = LaxBoundedSemaphore(2)
>>> def callback(i):
... say('HELLO {0!r}'.format(i))
>>> x.acquire(callback, 1)
HELLO 1
>>> x.acquire(callback, 2)
HELLO 2
>>> x.acquire(callback, 3)
>>> x._waiters # private, do not access directly
[(callback, 3)]
>>> x.release()
HELLO 3
"""
def __init__(self, value):
self.initial_value = self.value = value
self._waiting = set()
def acquire(self, callback, *partial_args):
"""Acquire semaphore, applying ``callback`` if
the resource is available.
:param callback: The callback to apply.
:param \*partial_args: partial arguments to callback.
"""
if self.value <= 0:
self._waiting.append((callback, partial_args))
return False
else:
self.value = max(self.value - 1, 0)
callback(*partial_args)
return True
def release(self):
"""Release semaphore.
If there are any waiters this will apply the first waiter
that is waiting for the resource (FIFO order).
"""
self.value = min(self.value + 1, self.initial_value)
if self._waiting:
waiter, args = self._waiting.pop()
waiter(*args)
def grow(self, n=1):
"""Change the size of the semaphore to accept more users."""
self.initial_value += n
self.value += n
[self.release() for _ in range(n)]
def shrink(self, n=1):
"""Change the size of the semaphore to accept less users."""
self.initial_value = max(self.initial_value - n, 0)
self.value = max(self.value - n, 0)
def clear(self):
"""Reset the sempahore, which also wipes out any waiting callbacks."""
self._waiting[:] = []
self.value = self.initial_value
def __repr__(self):
return '<{0} at {1:#x} value:{2} waiting:{3}>'.format(
self.__class__.__name__, id(self), self.value, len(self.waiting),
)
class DummyLock(object):
"""Pretending to be a lock."""
def __enter__(self):
return self
def __exit__(self, *exc_info):
pass

View File

@ -16,7 +16,7 @@ from itertools import count, repeat
from time import sleep
from uuid import UUID, uuid4 as _uuid4, _uuid_generate_random
from kombu.five import items, reraise, string_t
from kombu.five import int_types, items, reraise, string_t
from .encoding import default_encode, safe_repr as _safe_repr
@ -25,10 +25,18 @@ try:
except:
ctypes = None # noqa
try:
from io import UnsupportedOperation
FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
except ImportError: # pragma: no cover
# Py2
FILENO_ERRORS = (AttributeError, ValueError) # noqa
__all__ = ['EqualityDict', 'say', 'uuid', 'kwdict', 'maybe_list',
'fxrange', 'fxrangemax', 'retry_over_time',
'emergency_dump_state', 'cached_property',
'reprkwargs', 'reprcall', 'nested']
'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno']
def symbol_by_name(name, aliases={}, imp=None, package=None,
@ -406,3 +414,17 @@ def escape_regex(p, white=''):
return ''.join(c if c.isalnum() or c in white
else ('\\000' if c == '\000' else '\\' + c)
for c in p)
def fileno(f):
if isinstance(f, int_types):
return f
return f.fileno()
def maybe_fileno(f):
"""Get object fileno, or :const:`None` if not defined."""
try:
return fileno(f)
except FILENO_ERRORS:
pass

View File

@ -2,7 +2,9 @@ from __future__ import absolute_import
import sys
__all__ = ['lazy', 'maybe_evaluate']
__all__ = ['lazy', 'maybe_evaluate', 'is_list', 'maybe_list']
from kombu.five import string_t
class lazy(object):
@ -62,6 +64,16 @@ def maybe_evaluate(value):
return value
def is_list(l, scalars=(dict, string_t)):
"""Returns true if object is list-like, but not a dict or string."""
return hasattr(l, '__iter__') and not isinstance(l, scalars or ())
def maybe_list(l, scalars=(dict, string_t)):
"""Returns list of one element if ``l`` is a scalar."""
return l if l is None or is_list(l, scalars) else [l]
# Compat names (before kombu 3.0)
promise = lazy
maybe_promise = maybe_evaluate