This commit is contained in:
Ask Solem 2011-11-22 16:24:28 +00:00
parent cf4f46f755
commit b7b634d74c
5 changed files with 38 additions and 34 deletions

View File

@ -36,15 +36,13 @@ class Publisher(messaging.Producer):
durable = True
auto_delete = False
_closed = False
_provided_channel = None
_provided_channel = False
def __init__(self, connection, exchange=None, routing_key=None,
exchange_type=None, durable=None, auto_delete=None, channel=None,
**kwargs):
exchange_type=None, durable=None, auto_delete=None,
channel=None, **kwargs):
if channel:
self._provided_channel = self.backend = channel
else:
self.backend = connection.channel()
connection, self._provided_channel = channel, True
self.exchange = exchange or self.exchange
self.exchange_type = exchange_type or self.exchange_type
@ -61,20 +59,14 @@ class Publisher(messaging.Producer):
routing_key=self.routing_key,
auto_delete=self.auto_delete,
durable=self.durable)
super(Publisher, self).__init__(self.backend, self.exchange,
**kwargs)
super(Publisher, self).__init__(connection, self.exchange, **kwargs)
def send(self, *args, **kwargs):
return self.publish(*args, **kwargs)
def revive(self, channel):
self.backend = channel
super(Publisher, self).revive(channel)
def close(self):
if not self._provided_channel:
self.backend.close()
self.channel.close()
self._closed = True
def __enter__(self):
@ -83,6 +75,10 @@ class Publisher(messaging.Producer):
def __exit__(self, *exc_info):
self.close()
@property
def backend(self):
return self.channel
class Consumer(messaging.Consumer):
queue = ""

View File

@ -385,7 +385,7 @@ class BrokerConnection(object):
info[key] = value
return info
def __hash__(self):
def __eqhash__(self):
return hash("|".join(map(str, self.info().itervalues())))
def as_uri(self, include_password=False):
@ -634,22 +634,22 @@ class Resource(object):
if self.limit:
while 1:
try:
resource = self._resource.get(block=block, timeout=timeout)
R = self._resource.get(block=block, timeout=timeout)
except Empty:
self._add_when_empty()
else:
resource = self.prepare(resource)
self._dirty.add(resource)
R = self.prepare(R)
self._dirty.add(R)
break
else:
resource = self.prepare(self.new())
R = self.prepare(self.new())
@wraps(self.release)
def _release():
self.release(resource)
resource.release = _release
self.release(R)
R.release = _release
return resource
return R
def prepare(self, resource):
return resource
@ -665,9 +665,7 @@ class Resource(object):
of defective resources."""
if self.limit:
self._dirty.discard(resource)
self.close_resource(resource)
else:
self.close_resource(resource)
self.close_resource(resource)
def release(self, resource):
"""Release resource so it can be used by another thread.
@ -715,18 +713,21 @@ class Resource(object):
mutex.release()
if os.environ.get("KOMBU_DEBUG_POOL"):
_orig_acquire = acquire
_orig_release = release
_next_resource_id = 0
def acquire(self, *args, **kwargs): # noqa
import traceback
id = self._next_resource_id = self._next_resource_id + 1
print("+%s ACQUIRE %s" % (id, self.__class__.__name__, ))
r = self._orig_acquire(*args, **kwargs)
r._resource_id = id
print("-%s ACQUIRE %s" % (id, self.__class__.__name__, ))
if not hasattr(r, "acquired_by"):
r.acquired_by = []
r.acquired_by.append(traceback.format_stack())
return r
def release(self, resource): # noqa

View File

@ -16,7 +16,7 @@ from itertools import chain
from .connection import Resource
from .messaging import Producer
from .utils import HashingDict
from .utils import EqualityDict
__all__ = ["ProducerPool", "PoolGroup", "register_group",
"connections", "producers", "get_limit", "set_limit", "reset"]
@ -64,7 +64,7 @@ class ProducerPool(Resource):
super(ProducerPool, self).release(resource)
class PoolGroup(HashingDict):
class PoolGroup(EqualityDict):
def __init__(self, limit=None):
self.limit = limit

View File

@ -39,7 +39,7 @@ TRANSPORT_ALIASES = {
"amqp": "kombu.transport.amqplib.Transport",
"amqplib": "kombu.transport.amqplib.Transport",
"librabbitmq": "kombu.transport.librabbitmq.Transport",
"pika": "kombu.transport.pika.AsyncoreTransport",
"pika": "kombu.transport.pika2.Transport",
"syncpika": "kombu.transport.pika.SyncTransport",
"memory": "kombu.transport.memory.Transport",
"redis": "kombu.transport.redis.Transport",

View File

@ -23,25 +23,32 @@ try:
except:
ctypes = None # noqa
__all__ = ["HashingDict", "say", "uuid", "kwdict", "maybe_list",
__all__ = ["EqualityDict", "say", "uuid", "kwdict", "maybe_list",
"fxrange", "fxrangemax", "retry_over_time",
"emergency_dump_state", "cached_property",
"reprkwargs", "reprcall", "nested"]
class HashingDict(dict):
def eqhash(o):
try:
return o.__eqhash__()
except AttributeError:
return hash(o)
class EqualityDict(dict):
def __getitem__(self, key):
h = hash(key)
h = eqhash(key)
if h not in self:
return self.__missing__(key)
return dict.__getitem__(self, h)
def __setitem__(self, key, value):
return dict.__setitem__(self, hash(key), value)
return dict.__setitem__(self, eqhash(key), value)
def __delitem__(self, key):
return dict.__delitem__(self, hash(key))
return dict.__delitem__(self, eqhash(key))
class HashingDict(dict):