Redis: Improved solution to drain_events (no longer needs threads), and using PUBLISH/SUBSCRIBE for broadcast exchanges!

Using epoll/kpoll/select to poll several Redis connections at once instead
of having one thread for each channel.

These changes drastically improves the responsitivity of both store-and-forward and
broadcast messaging using Redis.  While the previous solution could have a
latency of 1 second per message, this solution can receive broadcast command
and send reply in ~0.006 seconds (~realtime)
This commit is contained in:
Ask Solem 2011-01-17 16:33:58 +01:00
parent 08ee10243b
commit abd25e9dca
2 changed files with 233 additions and 231 deletions

View File

@ -8,255 +8,111 @@ Redis transport.
:license: BSD, see LICENSE for more details.
"""
import select
import socket
from threading import Condition, Event, Lock, Thread
from itertools import imap
from Queue import Empty, Queue as _Queue
from Queue import Empty
from anyjson import serialize, deserialize
from kombu.transport import virtual
from kombu.utils.finalize import Finalize
from kombu.utils import eventio
DEFAULT_PORT = 6379
DEFAULT_DB = 0
POLL_READ = 0x001
POLL_ERR = 0x008 | 0x010 | 0x2000
class _kqueue(object):
def __init__(self):
self._kqueue = select.kqueue()
self._active = {}
def register(self, fd, events):
self._control(fd, events, select.KQ_EV_ADD)
self._active[fd] = events
def unregister(self, fd):
events = self._active.pop(fd)
self._control(fd, events, select.KQ_EV_DELETE)
def _control(self, fd, events, flags):
self._kqueue.control([select.kevent(fd, filter=select.KQ_FILTER_READ,
flags=flags)], 0)
def poll(self, timeout):
kevents = self._kqueue.control(None, 1000, timeout)
events = {}
for kevent in kevents:
fd = kevent.ident
flags = 0
if kevent.filter == select.KQ_FILTER_READ:
events[fd] = events.get(fd, 0) | POLL_READ
if kevent.filter == select.KQ_EV_ERROR:
events[fd] = events.get(fd, 0) | POLL_ERR
return events.items()
class _select(object):
def __init__(self):
self.read_fds = set()
self.error_fds = set()
self.fd_sets = (self.read_fds, self.error_fds)
def register(self, fd, events):
if events & POLL_READ:
self.read_fds.add(fd)
if events & POLL_ERR:
self.error_fds.add(fd)
self.read_fds.add(fd)
def unregister(self, fd):
self.read_fds.discard(fd)
self.error_fds.discard(fd)
def poll(self, timeout):
read, _write, error = select.select(self.read_fds, [],
self.error_fds, timeout)
events = {}
for fd in read:
fd = fd.fileno()
events[fd] = events.get(fd, 0) | POLL_READ
for fd in error:
fd = fd.fileno()
events[fd] = events.get(fd, 0) | POLL_ERR
return events.items()
if hasattr(select, "epoll"):
# Py2.6+ Linux
_poll = select.epoll
elif hasattr(select, "kqueue"):
# Py2.6+ on BSD / Darwin
_poll = _kqueue
else:
_poll = _select
class ChannelPoller(Thread):
# Method used to drain_events
drain_events = None
#: Queue to put inbound message onto.
inbound = None
#: Condition primitive used to notify poll requests.
poll_request = None
#: Event set when the thread should shut down.
shutdown = None
def __init__(self, drain_events):
self.inbound = _Queue()
self.mutex = Lock()
self.poll_request = Condition(self.mutex)
self.shutdown = Event()
self.stopped = Event()
self._on_collect = Finalize(self, self.close)
Thread.__init__(self)
self.setDaemon(False)
self.started = False
def poll(self):
# start thread on demand.
self.ensure_started()
# notify the thread that a poll request has been initiated.
self.poll_request.acquire()
try:
self.poll_request.notify()
finally:
self.poll_request.release()
# the thread should start to poll now, so just wait
# for it to put a message onto the inbound queue.
return self.inbound.get(timeout=0.3)
def _can_start(self):
return not (self.started or
self.isAlive() or
self.shutdown.isSet() or
self.stopped.isSet())
def ensure_started(self):
if self._can_start():
self.started = True
self.start()
def close(self):
self.shutdown.set()
if self.isAlive():
self.join()
def run(self): # pragma: no cover
inbound = self.inbound
shutdown = self.shutdown
drain_events = self.drain_events
poll_request = self.poll_request
while 1:
if shutdown.isSet():
break
try:
item = drain_events(timeout=1)
except Empty:
pass
else:
inbound.put_nowait(item)
if shutdown.isSet():
break
# Wait for next poll request
#
# Timeout needs to be short here, otherwise it will block
# shutdown. This means that polling will continue even when
# there are no actual calls to `poll`. However, this doesn't
# cause any problems for our current use (especially with
# the active QoS manager).
poll_request.acquire()
try:
poll_request.wait(1)
finally:
poll_request.release()
self.stopped.set()
class pollChannelPoller(ChannelPoller):
eventflags = POLL_READ | POLL_ERR
class MultiChannelPoller(object):
eventflags = eventio.POLL_READ | eventio.POLL_ERR
def __init__(self):
self._channels = set()
self._fd_to_chan = {}
self._chan_to_sock = {}
self._poller = _poll()
super(pollChannelPoller, self).__init__(self.drain_events)
self._poller = eventio.poll()
def add(self, channel):
if channel not in self._channels:
self._channels.add(channel)
def _register(self, channel):
if channel in self._chan_to_sock:
self._unregister(channel)
if channel.client.connection._sock is None:
channel.client.connection.connect(channel.client)
sock = channel.client.connection._sock
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None:
client.connection.connect(client)
sock = client.connection._sock
sock.setblocking(0)
self._fd_to_chan[sock.fileno()] = channel
self._chan_to_sock[channel] = sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self._poller.register(sock, self.eventflags)
def _unregister(self, channel):
self._poller.unregister(self._chan_to_sock[channel])
def _unregister(self, channel, client, type):
self._poller.unregister(self._chan_to_sock[(channel, client, type)])
def drain_events(self, timeout=None):
fdmap = self._fd_to_chan
for chan in self._channels:
# connection lost
if chan.client.connection._sock is None:
chan._in_poll = False
self._register(chan)
# start brpop command
if not chan._in_poll:
chan._brpop_start(chan._active_queues)
assert chan.client.connection._sock
def _register_BRPOP(self, channel):
ident = channel, channel.client, "BRPOP"
if channel.client.connection._sock is None or \
ident not in self._chan_to_sock:
channel._in_poll = False
self._register(*ident)
# start BRPOP command
if not channel._in_poll:
channel._brpop_start()
for fileno, event in self._poller.poll(timeout * 1000):
if event & (select.POLLIN | select.POLLPRI):
chan = fdmap[fileno]
return chan._brpop_read()
elif event & (select.POLLHUP | select.POLLERR):
chan = fdmap[fileno]
return chan._brpop_read_error()
def _register_LISTEN(self, channel):
if channel.subclient.connection._sock is None:
channel._in_listen = False
self._register(channel, channel.subclient, "LISTEN")
if not channel._in_listen:
channel._subscribe()
def get(self, timeout=None):
for channel in self._channels:
if channel.active_queues:
self._register_BRPOP(channel)
if channel.active_fanout_queues:
self._register_LISTEN(channel)
events = self._poller.poll(timeout and timeout * 1000 or None)
for fileno, event in events:
if event & eventio.POLL_READ:
chan, type = self._fd_to_chan[fileno]
return chan.handlers[type](), self
elif event & eventio.POLL_HUP:
chan, type = self._fd_to_chan[fileno]
chan._poll_error(type)
break
raise Empty()
class Channel(virtual.Channel):
_client = None
_subclient = None
supports_fanout = True
keyprefix_fanout = "_kombu.fanout.%s"
keyprefix_queue = "_kombu.binding.%s"
sep = '\x06\x16'
_in_poll = False
_poller = pollChannelPoller()
_in_listen = False
_fanout_queues = {}
def __repr__(self):
return "<Channel: %x BRPOP:%r SUB:%r>" % (id(self),
self.active_queues,
self.active_fanout_queues)
def __init__(self, *args, **kwargs):
super_ = super(Channel, self)
super_.__init__(*args, **kwargs)
#self._poller = ChannelPoller(super_.drain_events)
self.Client = self._get_client()
self._poller.add(self)
self.connection.cycle.add(self)
self.active_fanout_queues = set()
self._fanout_to_queue = {}
self.ResponseError = self._get_response_error()
self.handlers = {"BRPOP": self._brpop_read,
"LISTEN": self._receive}
def _get_client(self):
from redis import Redis
return Redis
@ -265,18 +121,59 @@ class Channel(virtual.Channel):
from redis import exceptions
return exceptions.ResponseError
def drain_events(self, timeout=None):
return self._poller.poll()
def basic_consume(self, queue, *args, **kwargs):
if queue in self._fanout_queues:
exchange = self._fanout_queues[queue]
self.active_fanout_queues.add(queue)
self._fanout_to_queue[exchange] = queue
return super(Channel, self).basic_consume(queue, *args, **kwargs)
def _brpop_start(self, keys, timeout=0):
self._in_poll = True
timeout = timeout or 0
if isinstance(keys, basestring):
keys = [keys, timeout]
else:
keys = list(keys) + [timeout]
def basic_cancel(self, consumer_tag):
try:
queue = self._tag_to_queue[consumer_tag]
except KeyError:
return
try:
self.active_fanout_queues.discard(queue)
self._fanout_to_queue.pop(self._fanout_queues[queue])
except KeyError:
pass
return super(Channel, self).basic_cancel(consumer_tag)
def _subscribe(self):
keys = [self._fanout_queues[queue]
for queue in self.active_fanout_queues]
if not keys:
return
c = self.subclient
if c.connection._sock is None:
c.connection.connect(c)
c.connection._sock.setblocking(0)
self.subclient.subscribe(keys)
self._in_listen = True
def _receive(self):
c = self.subclient
response = None
try:
response = c.parse_response("LISTEN")
except self.connection.connection_errors:
self._in_listen = False
if response is not None:
payload = c._handle_message(response)
if payload["type"] == "message":
return (deserialize(payload["data"]),
self._fanout_to_queue[payload["channel"]])
raise Empty()
def _brpop_start(self, timeout=0):
queues = self.active_queues
if not queues:
return
keys = list(queues) + [timeout or 0]
name, cmd = self._encode_command("BRPOP", *keys)
self.client.connection.send(cmd, self)
self._in_poll = True
def _encode_command(self, *args):
encode = self.client.encode
@ -286,24 +183,20 @@ class Channel(virtual.Channel):
def _brpop_read(self, **options):
try:
dest__item = self.client.parse_response("BRPOP", **options)
try:
dest__item = self.client.parse_response("BRPOP", **options)
except self.connection.connection_errors:
raise Empty()
dest, item = dest__item
return deserialize(item), dest
finally:
self._in_poll = False
def _brpop_read_error(self, **options):
self.client.parse_response("BRPOP")
def _queue_bind(self, exchange, routing_key, pattern, queue):
self.client.sadd(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or "",
pattern or "",
queue or ""]))
def get_table(self, exchange):
members = self.client.smembers(self.keyprefix_queue % (exchange, ))
return [tuple(val.split(self.sep)) for val in members]
def _poll_error(self, type, **options):
try:
self.client.parse_response(type)
except self.connection.connection_errors:
pass
def _get(self, queue):
item = self.client.rpop(queue)
@ -324,13 +217,28 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
self.client.lpush(queue, serialize(message))
def _put_fanout(self, exchange, message, **kwargs):
self.client.publish(exchange, serialize(message))
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == "fanout":
self._fanout_queues[queue] = exchange
self.client.sadd(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or "",
pattern or "",
queue or ""]))
def get_table(self, exchange):
members = self.client.smembers(self.keyprefix_queue % (exchange, ))
return [tuple(val.split(self.sep)) for val in members]
def _purge(self, queue):
size = self.client.llen(queue)
self.client.delete(queue)
return size
def close(self):
self._poller.close()
if self._client is not None:
try:
self._client.connection.disconnect()
@ -363,16 +271,33 @@ class Channel(virtual.Channel):
self._client = self._open()
return self._client
@property
def subclient(self):
if self._subclient is None:
self._subclient = self._open()
return self._subclient
@property
def active_queues(self):
return set(queue for queue in self._active_queues
if queue not in self.active_fanout_queues)
class Transport(virtual.Transport):
Channel = Channel
interval = 1
default_port = DEFAULT_PORT
default_cycle = MultiChannelPoller()
def __init__(self, *args, **kwargs):
self.connection_errors, self.channel_errors = self._get_errors()
super(Transport, self).__init__(*args, **kwargs)
self.connection_errors, self.channel_errors = self._get_errors()
self.cycle = self.default_cycle
def close_connection(self, connection):
self.cycle.close()
super(Transport, self).close_connection(connection)
def _get_errors(self):
from redis import exceptions

77
kombu/utils/eventio.py Normal file
View File

@ -0,0 +1,77 @@
import select
import socket
POLL_READ = 0x001
POLL_ERR = 0x008 | 0x010 | 0x2000
class _kqueue(object):
def __init__(self):
self._kqueue = select.kqueue()
self._active = {}
def register(self, fd, events):
self._control(fd, events, select.KQ_EV_ADD)
self._active[fd] = events
def unregister(self, fd):
events = self._active.pop(fd)
try:
self._control(fd, events, select.KQ_EV_DELETE)
except socket.error:
pass
def _control(self, fd, events, flags):
self._kqueue.control([select.kevent(fd, filter=select.KQ_FILTER_READ,
flags=flags)], 0)
def poll(self, timeout):
kevents = self._kqueue.control(None, 1000, timeout / 1000.0)
events = {}
for kevent in kevents:
fd = kevent.ident
flags = 0
if kevent.filter == select.KQ_FILTER_READ:
events[fd] = events.get(fd, 0) | POLL_READ
if kevent.filter == select.KQ_EV_ERROR:
events[fd] = events.get(fd, 0) | POLL_ERR
return events.items()
class _select(object):
def __init__(self):
self._all = self._rfd, self._efd = set(), set()
def register(self, fd, events):
if events & POLL_ERR:
self._efd.add(fd)
self._rfd.add(fd)
elif events & POLL_READ:
self._rfd.add(fd)
def unregister(self, fd):
self._rfd.discard(fd)
self._efd.discard(fd)
def poll(self, timeout):
read, _write, error = select.select(self._rfd, [], self._efd, timeout)
events = {}
for fd in read:
fd = fd.fileno()
events[fd] = events.get(fd, 0) | POLL_READ
for fd in error:
fd = fd.fileno()
events[fd] = events.get(fd, 0) | POLL_ERR
return events.items()
if hasattr(select, "epoll"):
# Py2.6+ Linux
poll = select.epoll
elif hasattr(select, "kqueue"):
# Py2.6+ on BSD / Darwin
poll = select.poll
else:
poll = _select