This commit is contained in:
Ask Solem 2011-01-18 11:09:58 +01:00
parent 2cb7c11a59
commit 1ade2c8d2b
6 changed files with 150 additions and 54 deletions

View File

@ -7,7 +7,6 @@
.. autoexception:: NotBoundError
.. autoexception:: MessageStateError
.. autoexception:: EnsureExhausted
.. autoexception:: TimeoutError
.. autoexception:: LimitExceeded
.. autoexception:: ConnectionLimitExceeded

View File

@ -38,3 +38,6 @@ class ConnectionLimitExceeded(LimitExceeded):
class ChannelLimitExceeded(LimitExceeded):
"""Maximum number of simultaenous channels exceeded."""
pass
class StdChannelError(Exception):
pass

View File

@ -8,8 +8,6 @@ Redis transport.
:license: BSD, see LICENSE for more details.
"""
import socket
from itertools import imap
from Queue import Empty
@ -17,6 +15,7 @@ from anyjson import serialize, deserialize
from kombu.transport import virtual
from kombu.utils import eventio
from kombu.utils import cached_property
DEFAULT_PORT = 6379
DEFAULT_DB = 0
@ -32,8 +31,10 @@ class MultiChannelPoller(object):
self._poller = eventio.poll()
def add(self, channel):
if channel not in self._channels:
self._channels.add(channel)
self._channels.add(channel)
def discard(self, channel):
self._channels.discard(channel)
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
@ -95,31 +96,16 @@ class Channel(virtual.Channel):
_in_listen = False
_fanout_queues = {}
def __repr__(self):
return "<Channel: %x BRPOP:%r SUB:%r>" % (id(self),
self.active_queues,
self.active_fanout_queues)
def __init__(self, *args, **kwargs):
super_ = super(Channel, self)
super_.__init__(*args, **kwargs)
self.Client = self._get_client()
self.connection.cycle.add(self)
self.ResponseError = self._get_response_error()
self.active_fanout_queues = set()
self._fanout_to_queue = {}
self.ResponseError = self._get_response_error()
self.handlers = {"BRPOP": self._brpop_read,
"LISTEN": self._receive}
def _get_client(self):
from redis import Redis
return Redis
def _get_response_error(self):
from redis import exceptions
return exceptions.ResponseError
self.handlers = {"BRPOP": self._brpop_read, "LISTEN": self._receive}
self.connection.cycle.add(self) # add to channel poller.
def basic_consume(self, queue, *args, **kwargs):
if queue in self._fanout_queues:
@ -199,6 +185,13 @@ class Channel(virtual.Channel):
pass
def _get(self, queue):
"""basic.get
.. note::
Implies ``no_ack=True``
"""
item = self.client.rpop(queue)
if item:
return deserialize(item)
@ -207,46 +200,50 @@ class Channel(virtual.Channel):
def _size(self, queue):
return self.client.llen(queue)
def _get_many(self, queues, timeout=None):
dest__item = self.client.brpop(queues, timeout=timeout)
if dest__item:
dest, item = dest__item
return deserialize(item), dest
raise Empty()
def _put(self, queue, message, **kwargs):
"""Publish message."""
self.client.lpush(queue, serialize(message))
def _put_fanout(self, exchange, message, **kwargs):
"""Publish fanout message."""
self.client.publish(exchange, serialize(message))
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == "fanout":
# Mark exchange as fanout.
self._fanout_queues[queue] = exchange
self.client.sadd(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or "",
pattern or "",
queue or ""]))
def get_table(self, exchange):
members = self.client.smembers(self.keyprefix_queue % (exchange, ))
return [tuple(val.split(self.sep)) for val in members]
def _has_queue(self, queue, **kwargs):
return self.client.exists(queue)
def get_table(self, exchange):
return [tuple(val.split(self.sep))
for val in self.client.smembers(
self.keyprefix_queue % exchange)]
def _purge(self, queue):
size = self.client.llen(queue)
self.client.delete(queue)
size, _ = self.client.pipeline().llen(queue) \
.delete(queue).execute()
return size
def close(self):
if self._client is not None:
# remove from channel poller.
self.connection.cycle.discard(self)
# Close connections
for attr in "client", "subclient":
try:
self._client.connection.disconnect()
delattr(self, attr)
except (AttributeError, self.ResponseError):
pass
super(Channel, self).close()
def _open(self):
def _create_client(self):
conninfo = self.connection.client
database = conninfo.virtual_host
if not isinstance(database, int):
@ -265,17 +262,29 @@ class Channel(virtual.Channel):
db=database,
password=conninfo.password)
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
def _get_client(self):
from redis import Redis
return Redis
@property
def _get_response_error(self):
from redis import exceptions
return exceptions.ResponseError
@cached_property
def client(self):
return self._create_client()
@client.deleter
def client(self, client):
client.disconnect()
@cached_property
def subclient(self):
if self._subclient is None:
self._subclient = self._open()
return self._subclient
return self._create_client()
@subclient.deleter
def subclient(self, client):
client.disconnect()
@property
def active_queues(self):
@ -295,10 +304,6 @@ class Transport(virtual.Transport):
self.connection_errors, self.channel_errors = self._get_errors()
self.cycle = self.default_cycle
def close_connection(self, connection):
self.cycle.close()
super(Transport, self).close_connection(connection)
def _get_errors(self):
from redis import exceptions
return ((exceptions.ConnectionError,

View File

@ -16,6 +16,7 @@ from itertools import count
from time import sleep, time
from Queue import Empty
from kombu.exceptions import StdChannelError
from kombu.transport import base
from kombu.utils import emergency_dump_state, say
from kombu.utils.compat import OrderedDict
@ -207,6 +208,15 @@ class AbstractChannel(object):
"""
pass
def _has_queue(self, queue, **kwargs):
"""Verify that queue exists.
Should return :const:`True` if the queue exists or :const:`False`
otherwise.
"""
return True
def _poll(self, cycle, timeout=None):
"""Poll a list of queues for available messages."""
return cycle.get()
@ -275,9 +285,14 @@ class Channel(AbstractChannel):
self.queue_delete(queue, if_unused=True, if_empty=True)
self.state.exchanges.pop(exchange, None)
def queue_declare(self, queue, **kwargs):
def queue_declare(self, queue, passive=False, **kwargs):
"""Declare queue."""
self._new_queue(queue, **kwargs)
if passive and not self._has_queue(queue, **kwargs):
raise StdChannelError("404",
u"NOT_FOUND - no queue %r in vhost %r" % (
queue, self.connection.client.virtual_host or '/'),
(50, 10), "Channel.queue_declare")
self._new_queue(queue, **kwargs)
return queue, self._size(queue), 0
def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs):
@ -448,7 +463,9 @@ class Channel(AbstractChannel):
self.basic_cancel(consumer)
self.qos.restore_unacked_once()
self.connection.close_channel(self)
self._cycle = None
if self._cycle is not None:
self._cycle.close()
self._cycle = None
def _reset_cycle(self):
self._cycle = FairCycle(self._get, self._active_queues, Empty)
@ -531,6 +548,7 @@ class Transport(base.Transport):
return self # for drain events
def close_connection(self, connection):
self.cycle.close()
while self.channels:
try:
channel = self.channels.pop()

View File

@ -42,5 +42,8 @@ class FairCycle(object):
if tried >= len(self.resources) - 1:
raise
def close(self):
pass
def __repr__(self):
return "<FairCycle: %r>" % (self.resources, )

View File

@ -172,3 +172,71 @@ def rpartition(S, sep):
return S.rpartition(sep)
else: # Python <= 2.4:
return _compat_rpartition(S, sep)
class cached_property(object):
"""Property descriptor that caches the return value
of the get function.
*Examples*
.. code-block:: python
@cached_property
def connection(self):
return Connection()
@connection.setter # Prepares stored value
def connection(self, value):
if value is None:
raise TypeError("Connection must be a connection")
return value
@connection.deleter
def connection(self, value):
# Additional action to do at del(self.attr)
if value is not None:
print("Connection %r deleted" % (value, ))
"""
def __init__(self, fget=None, fset=None, fdel=None, doc=None):
self.__get = fget
self.__set = fset
self.__del = fdel
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
def __get__(self, obj, type=None):
if obj is None:
return self
try:
return obj.__dict__[self.__name__]
except KeyError:
value = obj.__dict__[self.__name__] = self.__get(obj)
return value
def __set__(self, obj, value):
if obj is None:
return self
if self.__set is not None:
value = self.__set(obj, value)
obj.__dict__[self.__name__] = value
def __delete__(self, obj):
if obj is None:
return self
try:
value = obj.__dict__.pop(self.__name__)
except KeyError:
pass
else:
if self.__del is not None:
self.__del(obj, value)
def setter(self, fset):
return self.__class__(self.__get, fset, self.__del)
def deleter(self, fdel):
return self.__class__(self.__get, self.__set, fdel)