diff --git a/kombu/connection.py b/kombu/connection.py index c99f5c52..663f3220 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -1,7 +1,8 @@ import socket +from copy import copy from itertools import count -from Queue import Empty, Full, Queue as _Queue +from Queue import Empty, Queue as _Queue from kombu import exceptions from kombu.transport import get_transport_cls @@ -102,7 +103,10 @@ class BrokerConnection(object): """Close the connection (if open).""" try: if self._connection: - self.transport.close_connection(self._connection) + try: + self.transport.close_connection(self._connection) + except self.transport.connection_errors + (AttributeError, ): + pass self._connection = None except socket.error: pass @@ -129,12 +133,13 @@ class BrokerConnection(object): each retry. """ + print("CONNECTION ERRORS: %r" % (self.connection_errors, )) retry_over_time(self.connect, self.connection_errors, (), {}, errback, max_retries, interval_start, interval_step, interval_max) return self - def ensure(self, fun, errback=None, max_retries=None, + def ensure(self, obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1): """Ensure operation completes, regardless of any channel/connection errors occuring. @@ -164,7 +169,7 @@ class BrokerConnection(object): >>> def errback(exc, interval): ... print("Couldn't publish message: %r. Retry in %ds" % ( ... exc, interval)) - >>> publish = conn.ensure(producer.publish, + >>> publish = conn.ensure(producer, producer.publish, ... errback=errback, max_retries=3) >>> publish(message, routing_key) @@ -174,20 +179,27 @@ class BrokerConnection(object): @wraps(fun) def _insured(*args, **kwargs): - for ret in count(0): - if max_retries and retries >= max_retries: - raise exceptions.EnsureExhausted() + got_connection = 0 + for retries in count(0): try: return fun(*args, **kwargs) except self.connection_errors + self.channel_errors, exc: + if got_connection: + raise + if max_retries and retries > max_retries: + raise exceptions.EnsureExhausted() errback and errback(exc, 0) - self.connection.connection = None + self._connection = None self.close() + remaining_retries = max_retries and \ + max(max_retries - retries, 1) self.ensure_connection(errback, - max(max_retries - retries, 1), + remaining_retries, interval_start, interval_step, interval_max) + obj.channel = self.channel() + got_connection += 1 _insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__ return _insured @@ -219,9 +231,36 @@ class BrokerConnection(object): ("port", port), ("insist", self.insist), ("ssl", self.ssl), - ("transport_cls", transport_cls), + ("transport", transport_cls), ("connect_timeout", self.connect_timeout))) + def Pool(self, limit=None, preload=None): + """Pool of connections. + + See :class:`ConnectionPool`. + + :keyword limit: Maximum number of active connections. + Default is no limit. + :keyword preload: Number of connections to preload + when the pool is created. Default is 0. + + *Example usage*:: + + >>> pool = connection.Pool(2) + >>> c1 = pool.acquire() + >>> c2 = pool.acquire() + >>> c3 = pool.acquire() + Traceback (most recent call last): + File "", line 1, in + File "kombu/connection.py", line 354, in acquire + raise ConnectionLimitExceeded(self.limit) + kombu.exceptions.ConnectionLimitExceeded: 2 + >>> c1.release() + >>> c3 = pool.acquire() + + """ + return ConnectionPool(self, limit, preload) + def ChannelPool(self, limit=None, preload=None): """Pool of channels. @@ -242,7 +281,7 @@ class BrokerConnection(object): File "", line 1, in File "kombu/connection.py", line 354, in acquire raise ChannelLimitExceeded(self.limit) - kombu.connection.ChannelLimitExceeded: 4 + kombu.connection.ChannelLimitExceeded: 2 >>> c1.release() >>> c3 = pool.acquire() @@ -350,69 +389,111 @@ class BrokerConnection(object): return self.transport.channel_errors +class Resource(object): -class ChannelLimitExceeded(Exception): - pass - - -class ChannelPool(object): - - def __init__(self, connection, limit=None, preload=None): - self.connection = connection + def __init__(self, limit=None, preload=None): self.limit = limit self.preload = preload or 0 - self._channels = _Queue() + self._resource = _Queue() self._dirty = set() + self.setup() - channel = self.connection.channel - for i in xrange(limit): - self._channels.put_nowait(i < preload and channel() or channel) + def setup(self): + raise NotImplementedError("subclass responsibilty") def acquire(self, block=False, timeout=None): - """Acquire inactive channel. + """Acquire resource. - :keyword block: If the channel limit is exceeded, - block until there is an available channel. - :keyword timeout: Timeout to wait for an available - channel if ``block`` is true. + :keyword block: If the limit is exceeded, + block until there is an available item. + :keyword timeout: Timeout to wait + if ``block`` is true. Default is :const:`None` (forever). - :raises ChannelLimitExceeded: if block is false - and there are no available channels. + :raises LimitExceeded: if block is false + and the limit has been exceeded. """ while True: try: - channel = self._channels.get(block=block, timeout=timeout) + resource = self._resource.get(block=block, timeout=timeout) except Empty: if self.limit and len(self._dirty) >= self.limit: - raise ChannelLimitExceeded(self.limit) - # No more channels, put one on the queue and + raise self.LimitExceeded(self.limit) + # All taken, put new on the queue and # try get again, this way the first in line - # will get the channel. - self._channels.put_nowait(self.connection.channel) + # will get the resource. + self._resource.put_nowait(self.new()) else: - return self._prepare(channel) + resource = self.prepare(resource) + self._dirty.add(resource) - def release(self, channel): - """Release channel so it can be used by another thread. + @wraps(self.release) + def _release(): + self.release(resource) + resource.release = _release + + return resource + + def prepare(self, resource): + return resource + + def release(self, resource): + """Release resource so it can be used by another thread. The caller is responsible for discarding the object, - and to never use the channel again. A new channel must + and to never use the resource again. A new resource must be acquired if so needed. """ - self._dirty.discard(channel) - self._channels.put_nowait(channel) + self._dirty.discard(resource) + self._resource.put_nowait(resource) - def _prepare(self, channel): + + +class ConnectionPool(Resource): + LimitExceeded = exceptions.ConnectionLimitExceeded + + def __init__(self, connection, limit=None, preload=None): + self.connection = connection + super(ConnectionPool, self).__init__(limit=limit, + preload=preload) + + def new(self): + return copy(self.connection) + + def setup(self): + for i in xrange(self.limit): + conn = self.new() + if i < self.preload: + conn.connect() + self._resource.put_nowait(conn) + + def prepare(self, resource): + if not resource._connection: + resource.connect() + return resource + + +class ChannelPool(Resource): + LimitExceeded = exceptions.ChannelLimitExceeded + + def __init__(self, connection, limit=None, preload=None): + self.connection = connection + super(ChannelPool, self).__init__(limit=limit, + preload=preload) + + def new(self): + return self.connection.channel + + def setup(self): + channel = self.new() + for i in xrange(self.limit): + self._resource.put_nowait( + i < self.preload and channel() or channel) + + def prepare(self, channel): if callable(channel): channel = channel() - @wraps(self.release) - def _release(): - self.release(channel) - channel.release = _release - - self._dirty.add(channel) return channel