This commit is contained in:
Ask Solem 2011-01-20 12:17:24 +01:00
parent ab9f5eec51
commit 67fc107396
3 changed files with 29 additions and 9 deletions

View File

@ -26,8 +26,8 @@ class _poll(eventio._select):
eventio.poll = _poll
from kombu.transport import pyredis # must import after poller patch
from kombu.transport import pyredis
class ResponseError(Exception):
pass
@ -90,7 +90,6 @@ class Client(object):
return item
raise Empty()
def brpop(self, keys, timeout=None):
key = keys[0]
try:
@ -135,7 +134,6 @@ class Client(object):
def setblocking(self, blocking):
self.blocking = blocking
def __init__(self, client):
self.client = client
self._sock = self._socket()

View File

@ -8,6 +8,7 @@ Redis transport.
:license: BSD, see LICENSE for more details.
"""
from itertools import imap
from Queue import Empty
@ -21,13 +22,32 @@ DEFAULT_PORT = 6379
DEFAULT_DB = 0
# This implementation may seem overly complex, but I assure you there is
# a good reason for doing it this way.
#
# Consuming from several connections enables us to emulate channels,
# which means we can have different service guarantees for individual
# channels.
#
# So we need to consume messages from multiple connections simultaneously,
# and using epoll means we don't have to do so using multiple threads.
#
# Also it means we can easily use PUBLISH/SUBSCRIBE to do fanout
# exchanges (broadcast), as an alternative to pushing messages to fanout-bound
# queues manually.
class MultiChannelPoller(object):
eventflags = eventio.POLL_READ | eventio.POLL_ERR
def __init__(self):
# active channels
self._channels = set()
# file descriptor -> channel map.
self._fd_to_chan = {}
# channel -> socket map
self._chan_to_sock = {}
# poll implementation (epoll/kqueue/select)
self._poller = eventio.poll()
def add(self, channel):
@ -39,7 +59,7 @@ class MultiChannelPoller(object):
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None:
if client.connection._sock is None: # not connected yet.
client.connection.connect(client)
sock = client.connection._sock
sock.setblocking(0)
@ -51,6 +71,7 @@ class MultiChannelPoller(object):
self._poller.unregister(self._chan_to_sock[(channel, client, type)])
def _register_BRPOP(self, channel):
"""enable BRPOP mode for channel."""
ident = channel, channel.client, "BRPOP"
if channel.client.connection._sock is None or \
ident not in self._chan_to_sock:
@ -61,6 +82,7 @@ class MultiChannelPoller(object):
channel._brpop_start()
def _register_LISTEN(self, channel):
"""enable LISTEN mode for channel."""
if channel.subclient.connection._sock is None:
channel._in_listen = False
self._register(channel, channel.subclient, "LISTEN")
@ -69,9 +91,9 @@ class MultiChannelPoller(object):
def get(self, timeout=None):
for channel in self._channels:
if channel.active_queues:
if channel.active_queues: # BRPOP mode?
self._register_BRPOP(channel)
if channel.active_fanout_queues:
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
events = self._poller.poll(timeout and timeout * 1000 or None)
@ -201,11 +223,11 @@ class Channel(virtual.Channel):
return self.client.llen(queue)
def _put(self, queue, message, **kwargs):
"""Publish message."""
"""Deliver message."""
self.client.lpush(queue, serialize(message))
def _put_fanout(self, exchange, message, **kwargs):
"""Publish fanout message."""
"""Deliver fanout message."""
self.client.publish(exchange, serialize(message))
def _queue_bind(self, exchange, routing_key, pattern, queue):
@ -297,6 +319,7 @@ class Transport(virtual.Transport):
interval = 1
default_port = DEFAULT_PORT
# poller is global for all Redis BrokerConnection's
default_cycle = MultiChannelPoller()
def __init__(self, *args, **kwargs):

View File

@ -37,7 +37,6 @@ class _kqueue(object):
events = {}
for kevent in kevents:
fd = kevent.ident
flags = 0
if kevent.filter == select.KQ_FILTER_READ:
events[fd] = events.get(fd, 0) | POLL_READ
if kevent.filter == select.KQ_EV_ERROR: