diff --git a/kombu/compat.py b/kombu/compat.py index cb3eef53..ab6398a7 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -36,15 +36,13 @@ class Publisher(messaging.Producer): durable = True auto_delete = False _closed = False - _provided_channel = None + _provided_channel = False def __init__(self, connection, exchange=None, routing_key=None, - exchange_type=None, durable=None, auto_delete=None, channel=None, - **kwargs): + exchange_type=None, durable=None, auto_delete=None, + channel=None, **kwargs): if channel: - self._provided_channel = self.backend = channel - else: - self.backend = connection.channel() + connection, self._provided_channel = channel, True self.exchange = exchange or self.exchange self.exchange_type = exchange_type or self.exchange_type @@ -61,20 +59,14 @@ class Publisher(messaging.Producer): routing_key=self.routing_key, auto_delete=self.auto_delete, durable=self.durable) - - super(Publisher, self).__init__(self.backend, self.exchange, - **kwargs) + super(Publisher, self).__init__(connection, self.exchange, **kwargs) def send(self, *args, **kwargs): return self.publish(*args, **kwargs) - def revive(self, channel): - self.backend = channel - super(Publisher, self).revive(channel) - def close(self): if not self._provided_channel: - self.backend.close() + self.channel.close() self._closed = True def __enter__(self): @@ -83,6 +75,10 @@ class Publisher(messaging.Producer): def __exit__(self, *exc_info): self.close() + @property + def backend(self): + return self.channel + class Consumer(messaging.Consumer): queue = "" diff --git a/kombu/connection.py b/kombu/connection.py index 3a7dba15..88f9d21b 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -385,7 +385,7 @@ class BrokerConnection(object): info[key] = value return info - def __hash__(self): + def __eqhash__(self): return hash("|".join(map(str, self.info().itervalues()))) def as_uri(self, include_password=False): @@ -634,22 +634,22 @@ class Resource(object): if self.limit: while 1: try: - resource = self._resource.get(block=block, timeout=timeout) + R = self._resource.get(block=block, timeout=timeout) except Empty: self._add_when_empty() else: - resource = self.prepare(resource) - self._dirty.add(resource) + R = self.prepare(R) + self._dirty.add(R) break else: - resource = self.prepare(self.new()) + R = self.prepare(self.new()) @wraps(self.release) def _release(): - self.release(resource) - resource.release = _release + self.release(R) + R.release = _release - return resource + return R def prepare(self, resource): return resource @@ -665,9 +665,7 @@ class Resource(object): of defective resources.""" if self.limit: self._dirty.discard(resource) - self.close_resource(resource) - else: - self.close_resource(resource) + self.close_resource(resource) def release(self, resource): """Release resource so it can be used by another thread. @@ -715,18 +713,21 @@ class Resource(object): mutex.release() if os.environ.get("KOMBU_DEBUG_POOL"): - _orig_acquire = acquire _orig_release = release _next_resource_id = 0 def acquire(self, *args, **kwargs): # noqa + import traceback id = self._next_resource_id = self._next_resource_id + 1 print("+%s ACQUIRE %s" % (id, self.__class__.__name__, )) r = self._orig_acquire(*args, **kwargs) r._resource_id = id print("-%s ACQUIRE %s" % (id, self.__class__.__name__, )) + if not hasattr(r, "acquired_by"): + r.acquired_by = [] + r.acquired_by.append(traceback.format_stack()) return r def release(self, resource): # noqa diff --git a/kombu/pools.py b/kombu/pools.py index 6061802b..98dd6cfa 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -16,7 +16,7 @@ from itertools import chain from .connection import Resource from .messaging import Producer -from .utils import HashingDict +from .utils import EqualityDict __all__ = ["ProducerPool", "PoolGroup", "register_group", "connections", "producers", "get_limit", "set_limit", "reset"] @@ -64,7 +64,7 @@ class ProducerPool(Resource): super(ProducerPool, self).release(resource) -class PoolGroup(HashingDict): +class PoolGroup(EqualityDict): def __init__(self, limit=None): self.limit = limit diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 4441b8d5..edf5f4e1 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -39,7 +39,7 @@ TRANSPORT_ALIASES = { "amqp": "kombu.transport.amqplib.Transport", "amqplib": "kombu.transport.amqplib.Transport", "librabbitmq": "kombu.transport.librabbitmq.Transport", - "pika": "kombu.transport.pika.AsyncoreTransport", + "pika": "kombu.transport.pika2.Transport", "syncpika": "kombu.transport.pika.SyncTransport", "memory": "kombu.transport.memory.Transport", "redis": "kombu.transport.redis.Transport", diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 170fdeef..d01e153c 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -23,25 +23,32 @@ try: except: ctypes = None # noqa -__all__ = ["HashingDict", "say", "uuid", "kwdict", "maybe_list", +__all__ = ["EqualityDict", "say", "uuid", "kwdict", "maybe_list", "fxrange", "fxrangemax", "retry_over_time", "emergency_dump_state", "cached_property", "reprkwargs", "reprcall", "nested"] -class HashingDict(dict): +def eqhash(o): + try: + return o.__eqhash__() + except AttributeError: + return hash(o) + + +class EqualityDict(dict): def __getitem__(self, key): - h = hash(key) + h = eqhash(key) if h not in self: return self.__missing__(key) return dict.__getitem__(self, h) def __setitem__(self, key, value): - return dict.__setitem__(self, hash(key), value) + return dict.__setitem__(self, eqhash(key), value) def __delitem__(self, key): - return dict.__delitem__(self, hash(key)) + return dict.__delitem__(self, eqhash(key)) class HashingDict(dict):