diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index a25dff66..df8bafbc 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -8,255 +8,111 @@ Redis transport. :license: BSD, see LICENSE for more details. """ -import select +import socket -from threading import Condition, Event, Lock, Thread from itertools import imap -from Queue import Empty, Queue as _Queue +from Queue import Empty from anyjson import serialize, deserialize from kombu.transport import virtual -from kombu.utils.finalize import Finalize +from kombu.utils import eventio DEFAULT_PORT = 6379 DEFAULT_DB = 0 -POLL_READ = 0x001 -POLL_ERR = 0x008 | 0x010 | 0x2000 - -class _kqueue(object): - - def __init__(self): - self._kqueue = select.kqueue() - self._active = {} - - def register(self, fd, events): - self._control(fd, events, select.KQ_EV_ADD) - self._active[fd] = events - - def unregister(self, fd): - events = self._active.pop(fd) - self._control(fd, events, select.KQ_EV_DELETE) - - def _control(self, fd, events, flags): - self._kqueue.control([select.kevent(fd, filter=select.KQ_FILTER_READ, - flags=flags)], 0) - - def poll(self, timeout): - kevents = self._kqueue.control(None, 1000, timeout) - events = {} - for kevent in kevents: - fd = kevent.ident - flags = 0 - if kevent.filter == select.KQ_FILTER_READ: - events[fd] = events.get(fd, 0) | POLL_READ - if kevent.filter == select.KQ_EV_ERROR: - events[fd] = events.get(fd, 0) | POLL_ERR - return events.items() - - -class _select(object): - - def __init__(self): - self.read_fds = set() - self.error_fds = set() - self.fd_sets = (self.read_fds, self.error_fds) - - def register(self, fd, events): - if events & POLL_READ: - self.read_fds.add(fd) - if events & POLL_ERR: - self.error_fds.add(fd) - self.read_fds.add(fd) - - def unregister(self, fd): - self.read_fds.discard(fd) - self.error_fds.discard(fd) - - def poll(self, timeout): - read, _write, error = select.select(self.read_fds, [], - self.error_fds, timeout) - events = {} - for fd in read: - fd = fd.fileno() - events[fd] = events.get(fd, 0) | POLL_READ - for fd in error: - fd = fd.fileno() - events[fd] = events.get(fd, 0) | POLL_ERR - return events.items() - - -if hasattr(select, "epoll"): - # Py2.6+ Linux - _poll = select.epoll -elif hasattr(select, "kqueue"): - # Py2.6+ on BSD / Darwin - _poll = _kqueue -else: - _poll = _select - - -class ChannelPoller(Thread): - - # Method used to drain_events - drain_events = None - - #: Queue to put inbound message onto. - inbound = None - - #: Condition primitive used to notify poll requests. - poll_request = None - - #: Event set when the thread should shut down. - shutdown = None - - def __init__(self, drain_events): - self.inbound = _Queue() - self.mutex = Lock() - self.poll_request = Condition(self.mutex) - self.shutdown = Event() - self.stopped = Event() - self._on_collect = Finalize(self, self.close) - Thread.__init__(self) - self.setDaemon(False) - self.started = False - - def poll(self): - # start thread on demand. - self.ensure_started() - - # notify the thread that a poll request has been initiated. - self.poll_request.acquire() - try: - self.poll_request.notify() - finally: - self.poll_request.release() - - # the thread should start to poll now, so just wait - # for it to put a message onto the inbound queue. - return self.inbound.get(timeout=0.3) - - def _can_start(self): - return not (self.started or - self.isAlive() or - self.shutdown.isSet() or - self.stopped.isSet()) - - def ensure_started(self): - if self._can_start(): - self.started = True - self.start() - - def close(self): - self.shutdown.set() - if self.isAlive(): - self.join() - - def run(self): # pragma: no cover - inbound = self.inbound - shutdown = self.shutdown - drain_events = self.drain_events - poll_request = self.poll_request - - while 1: - if shutdown.isSet(): - break - - try: - item = drain_events(timeout=1) - except Empty: - pass - else: - inbound.put_nowait(item) - - if shutdown.isSet(): - break - - # Wait for next poll request - # - # Timeout needs to be short here, otherwise it will block - # shutdown. This means that polling will continue even when - # there are no actual calls to `poll`. However, this doesn't - # cause any problems for our current use (especially with - # the active QoS manager). - poll_request.acquire() - try: - poll_request.wait(1) - finally: - poll_request.release() - - self.stopped.set() - - -class pollChannelPoller(ChannelPoller): - eventflags = POLL_READ | POLL_ERR +class MultiChannelPoller(object): + eventflags = eventio.POLL_READ | eventio.POLL_ERR def __init__(self): self._channels = set() self._fd_to_chan = {} self._chan_to_sock = {} - self._poller = _poll() - super(pollChannelPoller, self).__init__(self.drain_events) + self._poller = eventio.poll() def add(self, channel): if channel not in self._channels: self._channels.add(channel) - def _register(self, channel): - if channel in self._chan_to_sock: - self._unregister(channel) - if channel.client.connection._sock is None: - channel.client.connection.connect(channel.client) - sock = channel.client.connection._sock + def _register(self, channel, client, type): + if (channel, client, type) in self._chan_to_sock: + self._unregister(channel, client, type) + if client.connection._sock is None: + client.connection.connect(client) + sock = client.connection._sock sock.setblocking(0) - self._fd_to_chan[sock.fileno()] = channel - self._chan_to_sock[channel] = sock + self._fd_to_chan[sock.fileno()] = (channel, type) + self._chan_to_sock[(channel, client, type)] = sock self._poller.register(sock, self.eventflags) - def _unregister(self, channel): - self._poller.unregister(self._chan_to_sock[channel]) + def _unregister(self, channel, client, type): + self._poller.unregister(self._chan_to_sock[(channel, client, type)]) - def drain_events(self, timeout=None): - fdmap = self._fd_to_chan - for chan in self._channels: - # connection lost - if chan.client.connection._sock is None: - chan._in_poll = False - self._register(chan) - # start brpop command - if not chan._in_poll: - chan._brpop_start(chan._active_queues) - assert chan.client.connection._sock + def _register_BRPOP(self, channel): + ident = channel, channel.client, "BRPOP" + if channel.client.connection._sock is None or \ + ident not in self._chan_to_sock: + channel._in_poll = False + self._register(*ident) + # start BRPOP command + if not channel._in_poll: + channel._brpop_start() - for fileno, event in self._poller.poll(timeout * 1000): - if event & (select.POLLIN | select.POLLPRI): - chan = fdmap[fileno] - return chan._brpop_read() - elif event & (select.POLLHUP | select.POLLERR): - chan = fdmap[fileno] - return chan._brpop_read_error() + def _register_LISTEN(self, channel): + if channel.subclient.connection._sock is None: + channel._in_listen = False + self._register(channel, channel.subclient, "LISTEN") + if not channel._in_listen: + channel._subscribe() + + def get(self, timeout=None): + for channel in self._channels: + if channel.active_queues: + self._register_BRPOP(channel) + if channel.active_fanout_queues: + self._register_LISTEN(channel) + + events = self._poller.poll(timeout and timeout * 1000 or None) + for fileno, event in events: + if event & eventio.POLL_READ: + chan, type = self._fd_to_chan[fileno] + return chan.handlers[type](), self + elif event & eventio.POLL_HUP: + chan, type = self._fd_to_chan[fileno] + chan._poll_error(type) + break raise Empty() + class Channel(virtual.Channel): _client = None + _subclient = None supports_fanout = True - keyprefix_fanout = "_kombu.fanout.%s" keyprefix_queue = "_kombu.binding.%s" sep = '\x06\x16' _in_poll = False - _poller = pollChannelPoller() + _in_listen = False + _fanout_queues = {} + + def __repr__(self): + return "" % (id(self), + self.active_queues, + self.active_fanout_queues) def __init__(self, *args, **kwargs): super_ = super(Channel, self) super_.__init__(*args, **kwargs) - #self._poller = ChannelPoller(super_.drain_events) self.Client = self._get_client() - self._poller.add(self) + self.connection.cycle.add(self) + self.active_fanout_queues = set() + self._fanout_to_queue = {} self.ResponseError = self._get_response_error() + self.handlers = {"BRPOP": self._brpop_read, + "LISTEN": self._receive} + def _get_client(self): from redis import Redis return Redis @@ -265,18 +121,59 @@ class Channel(virtual.Channel): from redis import exceptions return exceptions.ResponseError - def drain_events(self, timeout=None): - return self._poller.poll() + def basic_consume(self, queue, *args, **kwargs): + if queue in self._fanout_queues: + exchange = self._fanout_queues[queue] + self.active_fanout_queues.add(queue) + self._fanout_to_queue[exchange] = queue + return super(Channel, self).basic_consume(queue, *args, **kwargs) - def _brpop_start(self, keys, timeout=0): - self._in_poll = True - timeout = timeout or 0 - if isinstance(keys, basestring): - keys = [keys, timeout] - else: - keys = list(keys) + [timeout] + def basic_cancel(self, consumer_tag): + try: + queue = self._tag_to_queue[consumer_tag] + except KeyError: + return + try: + self.active_fanout_queues.discard(queue) + self._fanout_to_queue.pop(self._fanout_queues[queue]) + except KeyError: + pass + return super(Channel, self).basic_cancel(consumer_tag) + + def _subscribe(self): + keys = [self._fanout_queues[queue] + for queue in self.active_fanout_queues] + if not keys: + return + c = self.subclient + if c.connection._sock is None: + c.connection.connect(c) + c.connection._sock.setblocking(0) + self.subclient.subscribe(keys) + self._in_listen = True + + def _receive(self): + c = self.subclient + response = None + try: + response = c.parse_response("LISTEN") + except self.connection.connection_errors: + self._in_listen = False + if response is not None: + payload = c._handle_message(response) + if payload["type"] == "message": + return (deserialize(payload["data"]), + self._fanout_to_queue[payload["channel"]]) + raise Empty() + + def _brpop_start(self, timeout=0): + queues = self.active_queues + if not queues: + return + keys = list(queues) + [timeout or 0] name, cmd = self._encode_command("BRPOP", *keys) self.client.connection.send(cmd, self) + self._in_poll = True def _encode_command(self, *args): encode = self.client.encode @@ -286,24 +183,20 @@ class Channel(virtual.Channel): def _brpop_read(self, **options): try: - dest__item = self.client.parse_response("BRPOP", **options) + try: + dest__item = self.client.parse_response("BRPOP", **options) + except self.connection.connection_errors: + raise Empty() dest, item = dest__item return deserialize(item), dest finally: self._in_poll = False - def _brpop_read_error(self, **options): - self.client.parse_response("BRPOP") - - def _queue_bind(self, exchange, routing_key, pattern, queue): - self.client.sadd(self.keyprefix_queue % (exchange, ), - self.sep.join([routing_key or "", - pattern or "", - queue or ""])) - - def get_table(self, exchange): - members = self.client.smembers(self.keyprefix_queue % (exchange, )) - return [tuple(val.split(self.sep)) for val in members] + def _poll_error(self, type, **options): + try: + self.client.parse_response(type) + except self.connection.connection_errors: + pass def _get(self, queue): item = self.client.rpop(queue) @@ -324,13 +217,28 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): self.client.lpush(queue, serialize(message)) + def _put_fanout(self, exchange, message, **kwargs): + self.client.publish(exchange, serialize(message)) + + def _queue_bind(self, exchange, routing_key, pattern, queue): + if self.typeof(exchange).type == "fanout": + self._fanout_queues[queue] = exchange + self.client.sadd(self.keyprefix_queue % (exchange, ), + self.sep.join([routing_key or "", + pattern or "", + queue or ""])) + + def get_table(self, exchange): + members = self.client.smembers(self.keyprefix_queue % (exchange, )) + return [tuple(val.split(self.sep)) for val in members] + + def _purge(self, queue): size = self.client.llen(queue) self.client.delete(queue) return size def close(self): - self._poller.close() if self._client is not None: try: self._client.connection.disconnect() @@ -363,16 +271,33 @@ class Channel(virtual.Channel): self._client = self._open() return self._client + @property + def subclient(self): + if self._subclient is None: + self._subclient = self._open() + return self._subclient + + @property + def active_queues(self): + return set(queue for queue in self._active_queues + if queue not in self.active_fanout_queues) + class Transport(virtual.Transport): Channel = Channel interval = 1 default_port = DEFAULT_PORT + default_cycle = MultiChannelPoller() def __init__(self, *args, **kwargs): - self.connection_errors, self.channel_errors = self._get_errors() super(Transport, self).__init__(*args, **kwargs) + self.connection_errors, self.channel_errors = self._get_errors() + self.cycle = self.default_cycle + + def close_connection(self, connection): + self.cycle.close() + super(Transport, self).close_connection(connection) def _get_errors(self): from redis import exceptions diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py new file mode 100644 index 00000000..984f319e --- /dev/null +++ b/kombu/utils/eventio.py @@ -0,0 +1,77 @@ +import select +import socket + +POLL_READ = 0x001 +POLL_ERR = 0x008 | 0x010 | 0x2000 + + +class _kqueue(object): + + def __init__(self): + self._kqueue = select.kqueue() + self._active = {} + + def register(self, fd, events): + self._control(fd, events, select.KQ_EV_ADD) + self._active[fd] = events + + def unregister(self, fd): + events = self._active.pop(fd) + try: + self._control(fd, events, select.KQ_EV_DELETE) + except socket.error: + pass + + def _control(self, fd, events, flags): + self._kqueue.control([select.kevent(fd, filter=select.KQ_FILTER_READ, + flags=flags)], 0) + + def poll(self, timeout): + kevents = self._kqueue.control(None, 1000, timeout / 1000.0) + events = {} + for kevent in kevents: + fd = kevent.ident + flags = 0 + if kevent.filter == select.KQ_FILTER_READ: + events[fd] = events.get(fd, 0) | POLL_READ + if kevent.filter == select.KQ_EV_ERROR: + events[fd] = events.get(fd, 0) | POLL_ERR + return events.items() + + +class _select(object): + + def __init__(self): + self._all = self._rfd, self._efd = set(), set() + + def register(self, fd, events): + if events & POLL_ERR: + self._efd.add(fd) + self._rfd.add(fd) + elif events & POLL_READ: + self._rfd.add(fd) + + def unregister(self, fd): + self._rfd.discard(fd) + self._efd.discard(fd) + + def poll(self, timeout): + read, _write, error = select.select(self._rfd, [], self._efd, timeout) + events = {} + for fd in read: + fd = fd.fileno() + events[fd] = events.get(fd, 0) | POLL_READ + for fd in error: + fd = fd.fileno() + events[fd] = events.get(fd, 0) | POLL_ERR + return events.items() + + +if hasattr(select, "epoll"): + # Py2.6+ Linux + poll = select.epoll +elif hasattr(select, "kqueue"): + # Py2.6+ on BSD / Darwin + poll = select.poll +else: + poll = _select