From 1ade2c8d2bf677f08fede3ec19ea08e3e165032c Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 18 Jan 2011 11:09:58 +0100 Subject: [PATCH] Cleanup --- docs/reference/kombu.exceptions.rst | 1 - kombu/exceptions.py | 3 + kombu/transport/pyredis.py | 105 ++++++++++++++------------ kombu/transport/virtual/__init__.py | 24 +++++- kombu/transport/virtual/scheduling.py | 3 + kombu/utils/__init__.py | 68 +++++++++++++++++ 6 files changed, 150 insertions(+), 54 deletions(-) diff --git a/docs/reference/kombu.exceptions.rst b/docs/reference/kombu.exceptions.rst index d0b4bd50..5a11bf92 100644 --- a/docs/reference/kombu.exceptions.rst +++ b/docs/reference/kombu.exceptions.rst @@ -7,7 +7,6 @@ .. autoexception:: NotBoundError .. autoexception:: MessageStateError - .. autoexception:: EnsureExhausted .. autoexception:: TimeoutError .. autoexception:: LimitExceeded .. autoexception:: ConnectionLimitExceeded diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 37dadc62..a6615748 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -38,3 +38,6 @@ class ConnectionLimitExceeded(LimitExceeded): class ChannelLimitExceeded(LimitExceeded): """Maximum number of simultaenous channels exceeded.""" pass + +class StdChannelError(Exception): + pass diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index df8bafbc..0a74c296 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -8,8 +8,6 @@ Redis transport. :license: BSD, see LICENSE for more details. """ -import socket - from itertools import imap from Queue import Empty @@ -17,6 +15,7 @@ from anyjson import serialize, deserialize from kombu.transport import virtual from kombu.utils import eventio +from kombu.utils import cached_property DEFAULT_PORT = 6379 DEFAULT_DB = 0 @@ -32,8 +31,10 @@ class MultiChannelPoller(object): self._poller = eventio.poll() def add(self, channel): - if channel not in self._channels: - self._channels.add(channel) + self._channels.add(channel) + + def discard(self, channel): + self._channels.discard(channel) def _register(self, channel, client, type): if (channel, client, type) in self._chan_to_sock: @@ -95,31 +96,16 @@ class Channel(virtual.Channel): _in_listen = False _fanout_queues = {} - def __repr__(self): - return "" % (id(self), - self.active_queues, - self.active_fanout_queues) - def __init__(self, *args, **kwargs): super_ = super(Channel, self) super_.__init__(*args, **kwargs) self.Client = self._get_client() - self.connection.cycle.add(self) + self.ResponseError = self._get_response_error() self.active_fanout_queues = set() self._fanout_to_queue = {} - self.ResponseError = self._get_response_error() - - self.handlers = {"BRPOP": self._brpop_read, - "LISTEN": self._receive} - - def _get_client(self): - from redis import Redis - return Redis - - def _get_response_error(self): - from redis import exceptions - return exceptions.ResponseError + self.handlers = {"BRPOP": self._brpop_read, "LISTEN": self._receive} + self.connection.cycle.add(self) # add to channel poller. def basic_consume(self, queue, *args, **kwargs): if queue in self._fanout_queues: @@ -199,6 +185,13 @@ class Channel(virtual.Channel): pass def _get(self, queue): + """basic.get + + .. note:: + + Implies ``no_ack=True`` + + """ item = self.client.rpop(queue) if item: return deserialize(item) @@ -207,46 +200,50 @@ class Channel(virtual.Channel): def _size(self, queue): return self.client.llen(queue) - def _get_many(self, queues, timeout=None): - dest__item = self.client.brpop(queues, timeout=timeout) - if dest__item: - dest, item = dest__item - return deserialize(item), dest - raise Empty() - def _put(self, queue, message, **kwargs): + """Publish message.""" self.client.lpush(queue, serialize(message)) def _put_fanout(self, exchange, message, **kwargs): + """Publish fanout message.""" self.client.publish(exchange, serialize(message)) def _queue_bind(self, exchange, routing_key, pattern, queue): if self.typeof(exchange).type == "fanout": + # Mark exchange as fanout. self._fanout_queues[queue] = exchange self.client.sadd(self.keyprefix_queue % (exchange, ), self.sep.join([routing_key or "", pattern or "", queue or ""])) - def get_table(self, exchange): - members = self.client.smembers(self.keyprefix_queue % (exchange, )) - return [tuple(val.split(self.sep)) for val in members] + def _has_queue(self, queue, **kwargs): + return self.client.exists(queue) + def get_table(self, exchange): + return [tuple(val.split(self.sep)) + for val in self.client.smembers( + self.keyprefix_queue % exchange)] def _purge(self, queue): - size = self.client.llen(queue) - self.client.delete(queue) + size, _ = self.client.pipeline().llen(queue) \ + .delete(queue).execute() return size def close(self): - if self._client is not None: + # remove from channel poller. + self.connection.cycle.discard(self) + + # Close connections + for attr in "client", "subclient": try: - self._client.connection.disconnect() + delattr(self, attr) except (AttributeError, self.ResponseError): pass + super(Channel, self).close() - def _open(self): + def _create_client(self): conninfo = self.connection.client database = conninfo.virtual_host if not isinstance(database, int): @@ -265,17 +262,29 @@ class Channel(virtual.Channel): db=database, password=conninfo.password) - @property - def client(self): - if self._client is None: - self._client = self._open() - return self._client + def _get_client(self): + from redis import Redis + return Redis - @property + def _get_response_error(self): + from redis import exceptions + return exceptions.ResponseError + + @cached_property + def client(self): + return self._create_client() + + @client.deleter + def client(self, client): + client.disconnect() + + @cached_property def subclient(self): - if self._subclient is None: - self._subclient = self._open() - return self._subclient + return self._create_client() + + @subclient.deleter + def subclient(self, client): + client.disconnect() @property def active_queues(self): @@ -295,10 +304,6 @@ class Transport(virtual.Transport): self.connection_errors, self.channel_errors = self._get_errors() self.cycle = self.default_cycle - def close_connection(self, connection): - self.cycle.close() - super(Transport, self).close_connection(connection) - def _get_errors(self): from redis import exceptions return ((exceptions.ConnectionError, diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 1db50954..0f5d356a 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -16,6 +16,7 @@ from itertools import count from time import sleep, time from Queue import Empty +from kombu.exceptions import StdChannelError from kombu.transport import base from kombu.utils import emergency_dump_state, say from kombu.utils.compat import OrderedDict @@ -207,6 +208,15 @@ class AbstractChannel(object): """ pass + def _has_queue(self, queue, **kwargs): + """Verify that queue exists. + + Should return :const:`True` if the queue exists or :const:`False` + otherwise. + + """ + return True + def _poll(self, cycle, timeout=None): """Poll a list of queues for available messages.""" return cycle.get() @@ -275,9 +285,14 @@ class Channel(AbstractChannel): self.queue_delete(queue, if_unused=True, if_empty=True) self.state.exchanges.pop(exchange, None) - def queue_declare(self, queue, **kwargs): + def queue_declare(self, queue, passive=False, **kwargs): """Declare queue.""" - self._new_queue(queue, **kwargs) + if passive and not self._has_queue(queue, **kwargs): + raise StdChannelError("404", + u"NOT_FOUND - no queue %r in vhost %r" % ( + queue, self.connection.client.virtual_host or '/'), + (50, 10), "Channel.queue_declare") + self._new_queue(queue, **kwargs) return queue, self._size(queue), 0 def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs): @@ -448,7 +463,9 @@ class Channel(AbstractChannel): self.basic_cancel(consumer) self.qos.restore_unacked_once() self.connection.close_channel(self) - self._cycle = None + if self._cycle is not None: + self._cycle.close() + self._cycle = None def _reset_cycle(self): self._cycle = FairCycle(self._get, self._active_queues, Empty) @@ -531,6 +548,7 @@ class Transport(base.Transport): return self # for drain events def close_connection(self, connection): + self.cycle.close() while self.channels: try: channel = self.channels.pop() diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py index ce4da3c8..2a780a0d 100644 --- a/kombu/transport/virtual/scheduling.py +++ b/kombu/transport/virtual/scheduling.py @@ -42,5 +42,8 @@ class FairCycle(object): if tried >= len(self.resources) - 1: raise + def close(self): + pass + def __repr__(self): return "" % (self.resources, ) diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index b268d763..7e831134 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -172,3 +172,71 @@ def rpartition(S, sep): return S.rpartition(sep) else: # Python <= 2.4: return _compat_rpartition(S, sep) + + +class cached_property(object): + """Property descriptor that caches the return value + of the get function. + + *Examples* + + .. code-block:: python + + @cached_property + def connection(self): + return Connection() + + @connection.setter # Prepares stored value + def connection(self, value): + if value is None: + raise TypeError("Connection must be a connection") + return value + + @connection.deleter + def connection(self, value): + # Additional action to do at del(self.attr) + if value is not None: + print("Connection %r deleted" % (value, )) + + """ + + def __init__(self, fget=None, fset=None, fdel=None, doc=None): + self.__get = fget + self.__set = fset + self.__del = fdel + self.__doc__ = doc or fget.__doc__ + self.__name__ = fget.__name__ + self.__module__ = fget.__module__ + + def __get__(self, obj, type=None): + if obj is None: + return self + try: + return obj.__dict__[self.__name__] + except KeyError: + value = obj.__dict__[self.__name__] = self.__get(obj) + return value + + def __set__(self, obj, value): + if obj is None: + return self + if self.__set is not None: + value = self.__set(obj, value) + obj.__dict__[self.__name__] = value + + def __delete__(self, obj): + if obj is None: + return self + try: + value = obj.__dict__.pop(self.__name__) + except KeyError: + pass + else: + if self.__del is not None: + self.__del(obj, value) + + def setter(self, fset): + return self.__class__(self.__get, fset, self.__del) + + def deleter(self, fdel): + return self.__class__(self.__get, self.__set, fdel)