mirror of https://github.com/celery/kombu.git
Fixes
This commit is contained in:
parent
3edba18123
commit
4b7c002d46
|
@ -32,16 +32,13 @@ class Publisher(messaging.Producer):
|
|||
durable = True
|
||||
auto_delete = False
|
||||
_closed = False
|
||||
_provided_channel = None
|
||||
_provided_channel = False
|
||||
|
||||
def __init__(self, connection, exchange=None, routing_key=None,
|
||||
exchange_type=None, durable=None, auto_delete=None, channel=None,
|
||||
**kwargs):
|
||||
self.connection = connection
|
||||
exchange_type=None, durable=None, auto_delete=None,
|
||||
channel=None, **kwargs):
|
||||
if channel:
|
||||
self._provided_channel = self.backend = channel
|
||||
else:
|
||||
self.backend = connection.channel()
|
||||
connection, self._provided_channel = channel, True
|
||||
|
||||
self.exchange = exchange or self.exchange
|
||||
self.exchange_type = exchange_type or self.exchange_type
|
||||
|
@ -58,20 +55,14 @@ class Publisher(messaging.Producer):
|
|||
routing_key=self.routing_key,
|
||||
auto_delete=self.auto_delete,
|
||||
durable=self.durable)
|
||||
|
||||
super(Publisher, self).__init__(self.backend, self.exchange,
|
||||
**kwargs)
|
||||
super(Publisher, self).__init__(connection, self.exchange, **kwargs)
|
||||
|
||||
def send(self, *args, **kwargs):
|
||||
return self.publish(*args, **kwargs)
|
||||
|
||||
def revive(self, channel):
|
||||
self.backend = channel
|
||||
super(Publisher, self).revive(channel)
|
||||
|
||||
def close(self):
|
||||
if not self._provided_channel:
|
||||
self.backend.close()
|
||||
self.channel.close()
|
||||
self._closed = True
|
||||
|
||||
def __enter__(self):
|
||||
|
@ -80,6 +71,10 @@ class Publisher(messaging.Producer):
|
|||
def __exit__(self, *exc_info):
|
||||
self.close()
|
||||
|
||||
@property
|
||||
def backend(self):
|
||||
return self.channel
|
||||
|
||||
|
||||
class Consumer(messaging.Consumer):
|
||||
queue = ""
|
||||
|
|
|
@ -390,7 +390,7 @@ class BrokerConnection(object):
|
|||
info[key] = value
|
||||
return info
|
||||
|
||||
def __hash__(self):
|
||||
def __eqhash__(self):
|
||||
return hash("|".join(map(str, self.info().itervalues())))
|
||||
|
||||
def as_uri(self, include_password=False):
|
||||
|
@ -644,22 +644,22 @@ class Resource(object):
|
|||
if self.limit:
|
||||
while 1:
|
||||
try:
|
||||
resource = self._resource.get(block=block, timeout=timeout)
|
||||
R = self._resource.get(block=block, timeout=timeout)
|
||||
except Empty:
|
||||
self._add_when_empty()
|
||||
else:
|
||||
resource = self.prepare(resource)
|
||||
self._dirty.add(resource)
|
||||
R = self.prepare(R)
|
||||
self._dirty.add(R)
|
||||
break
|
||||
else:
|
||||
resource = self.prepare(self.new())
|
||||
R = self.prepare(self.new())
|
||||
|
||||
@wraps(self.release)
|
||||
def _release():
|
||||
self.release(resource)
|
||||
resource.release = _release
|
||||
self.release(R)
|
||||
R.release = _release
|
||||
|
||||
return resource
|
||||
return R
|
||||
|
||||
def prepare(self, resource):
|
||||
return resource
|
||||
|
@ -675,9 +675,7 @@ class Resource(object):
|
|||
of defective resources."""
|
||||
if self.limit:
|
||||
self._dirty.discard(resource)
|
||||
self.close_resource(resource)
|
||||
else:
|
||||
self.close_resource(resource)
|
||||
self.close_resource(resource)
|
||||
|
||||
def release(self, resource):
|
||||
"""Release resource so it can be used by another thread.
|
||||
|
@ -725,18 +723,21 @@ class Resource(object):
|
|||
mutex.release()
|
||||
|
||||
if os.environ.get("KOMBU_DEBUG_POOL"):
|
||||
|
||||
_orig_acquire = acquire
|
||||
_orig_release = release
|
||||
|
||||
_next_resource_id = 0
|
||||
|
||||
def acquire(self, *args, **kwargs): # noqa
|
||||
import traceback
|
||||
id = self._next_resource_id = self._next_resource_id + 1
|
||||
print("+%s ACQUIRE %s" % (id, self.__class__.__name__, ))
|
||||
r = self._orig_acquire(*args, **kwargs)
|
||||
r._resource_id = id
|
||||
print("-%s ACQUIRE %s" % (id, self.__class__.__name__, ))
|
||||
if not hasattr(r, "acquired_by"):
|
||||
r.acquired_by = []
|
||||
r.acquired_by.append(traceback.format_stack())
|
||||
return r
|
||||
|
||||
def release(self, resource): # noqa
|
||||
|
|
|
@ -14,7 +14,7 @@ from itertools import chain
|
|||
|
||||
from kombu.connection import Resource
|
||||
from kombu.messaging import Producer
|
||||
from kombu.utils import HashingDict
|
||||
from kombu.utils import EqualityDict
|
||||
|
||||
__all__ = ["ProducerPool", "PoolGroup", "register_group",
|
||||
"connections", "producers", "get_limit", "set_limit", "reset"]
|
||||
|
@ -60,7 +60,7 @@ class ProducerPool(Resource):
|
|||
super(ProducerPool, self).release(resource)
|
||||
|
||||
|
||||
class PoolGroup(HashingDict):
|
||||
class PoolGroup(EqualityDict):
|
||||
|
||||
def __init__(self, limit=None):
|
||||
self.limit = limit
|
||||
|
|
|
@ -21,19 +21,26 @@ except:
|
|||
ctypes = None # noqa
|
||||
|
||||
|
||||
class HashingDict(dict):
|
||||
def eqhash(o):
|
||||
try:
|
||||
return o.__eqhash__()
|
||||
except AttributeError:
|
||||
return hash(o)
|
||||
|
||||
|
||||
class EqualityDict(dict):
|
||||
|
||||
def __getitem__(self, key):
|
||||
h = hash(key)
|
||||
h = eqhash(key)
|
||||
if h not in self:
|
||||
return self.__missing__(key)
|
||||
return dict.__getitem__(self, h)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
return dict.__setitem__(self, hash(key), value)
|
||||
return dict.__setitem__(self, eqhash(key), value)
|
||||
|
||||
def __delitem__(self, key):
|
||||
return dict.__delitem__(self, hash(key))
|
||||
return dict.__delitem__(self, eqhash(key))
|
||||
|
||||
|
||||
def say(m, *s):
|
||||
|
|
Loading…
Reference in New Issue