diff --git a/kombu/connection.py b/kombu/connection.py index 65385706..9d521417 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -1,10 +1,7 @@ import socket -import threading -from collections import deque from copy import copy from itertools import count -from time import time from kombu import exceptions from kombu.transport import get_transport_cls @@ -69,8 +66,7 @@ class BrokerConnection(object): def __init__(self, hostname="localhost", userid="guest", password="guest", virtual_host="/", port=None, insist=False, - ssl=False, transport=None, connect_timeout=5, pool=None, - backend_cls=None): + ssl=False, transport=None, connect_timeout=5, backend_cls=None): self.hostname = hostname self.userid = userid self.password = password @@ -81,7 +77,6 @@ class BrokerConnection(object): self.ssl = ssl # backend_cls argument will be removed shortly. self.transport_cls = transport or backend_cls - self.pool = pool def connect(self): """Establish connection to server immediately.""" @@ -197,27 +192,6 @@ class BrokerConnection(object): _insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__ return _insured - def acquire(self): - """Acquire connection. - - Only here for API compatibility with :class:`BrokerConnectionPool`. - - """ - return self - - def release(self): - """Close the connection, or if the connection is managed by a pool - the connection will be released to the pool so it can be reused. - - **NOTE:** You must never perform operations on a connection - that has been released. - - """ - if self.pool: - self.pool.release(self) - else: - self.close() - def create_transport(self): return self.get_transport_cls()(client=self) create_backend = create_transport # FIXME @@ -236,16 +210,17 @@ class BrokerConnection(object): def info(self): """Get connection info.""" + transport_cls = self.transport_cls or "amqplib" + port = self.port or self.transport.default_port return OrderedDict((("hostname", self.hostname), ("userid", self.userid), ("password", self.password), ("virtual_host", self.virtual_host), - ("port", self.port), + ("port", port), ("insist", self.insist), ("ssl", self.ssl), - ("transport_cls", self.transport_cls), - ("connect_timeout", self.connect_timeout), - ("pool", self.pool))) + ("transport_cls", transport_cls), + ("connect_timeout", self.connect_timeout))) def SimpleQueue(self, name, no_ack=False, queue_opts=None, exchange_opts=None, channel=None): @@ -346,168 +321,3 @@ class BrokerConnection(object): def channel_errors(self): """List of exceptions that may be raised by the channel.""" return self.transport.channel_errors - - -class BrokerConnectionPool(object): - """Pool of connections. - - :param initial: Initial :class:`BrokerConnection` to take connection - parameters from. - - :keyword max: Maximum number of connections in the pool. - Default is 10. - - :keyword ensure: When ``preconnect`` on, ensure we're able to establish - a connection. Default is ``False``. - - :keyword preconnect: Number of connections at - instantiation. Default is to only establish connections when needed. - - """ - - def __init__(self, initial, max=10, ensure=False, preconnect=0): - self.initial = initial - self.max = max - self.preconnect = preconnect - self._connections = deque() - self._dirty = set() - self.mutex = threading.Lock() - self.not_empty = threading.Condition(self.mutex) - - self.grow(self.preconnect, connect=True) - self.grow(self.max - self.preconnect) - - def acquire(self, block=True, timeout=None): - """Acquire connection. - - :raises kombu.exceptions.PoolExhausted: If there are no - available connections to be acquired. - - """ - self.not_empty.acquire() - time_start = time() - try: - while 1: - try: - connection = self._connections.popleft() - self._dirty.add(connection) - return connection - except IndexError: - if not block: - raise exceptions.PoolExhausted( - "All connections acquired") - if timeout: - elapsed = time() - time_start - remaining = timeout - elapsed - if elapsed > timeout: - raise exceptions.TimeoutError( - "Timed out while acquiring connection.") - self.not_empty.wait(remaining) - finally: - self.not_empty.release() - - def release(self, connection): - """Release connection so it can be used by others. - - **NOTE:** You must never perform operations on a connection - that has been released. - - """ - self.mutex.acquire() - try: - try: - self._dirty.remove(connection) - except KeyError: - pass - self._connections.append(connection) - self.not_empty.notify() - finally: - self.mutex.release() - - def replace(self, connection): - """Clone and replace connection with a new one. - - This is useful if the connection is broken. - - """ - connection.close() - self.mutex.acquire() - try: - try: - self._dirty.remove(connection) - self._connections.remove(connection) - except (KeyError, ValueError): - pass - finally: - self.mutex.release() - self.grow(1) - - def ensure(self, fun, errback=None, max_retries=None, - interval_start=2, interval_step=2, interval_max=30): - """See :meth:`BrokerConnection.ensure`.""" - - @wraps(fun) - def _insured(*args, **kwargs): - conn = self.acquire() - try: - return conn.ensure(fun, errback, max_retries, - interval_start, - interval_step, - interval_max)(*args, **kwargs) - finally: - conn.release() - - return insured - - def grow(self, n, connect=False): - """Add ``n`` more connections to the pool. - - :keyword connect: Establish connections imemediately. - By default connections are only established when needed. - - :raises kombu.exceptions.PoolLimitExceeded: If there are already - more than :attr:`max` number of connections in the pool. - - """ - self.mutex.acquire() - try: - for _ in xrange(n): - if self.total >= self.max: - raise exceptions.PoolLimitExceeded( - "Can't add more connections to the pool.") - connection = self.initial.clone(pool=self) - connect and self._establish_connection(connection) - self._connections.append(connection) - self.not_empty.notify() - finally: - self.mutex.release() - - def close(self): - """Close all connections.""" - while self._connections: - self._connections.popleft().close() - while self._dirty: - self._dirty.pop().close() - - def _establish_connection(self, connection): - if self.ensure: - return connection.ensure_connection() - return connection.connect() - - def __repr__(self): - """``x.__repr__() <==> repr(x)``""" - info = self.initial.info() - return "" % ( - self.max, - ", ".join("%s=%r" % (item, info[item]) - for item in info.keys()[:8])) - - @property - def active(self): - """Number of acquired connections.""" - return len(self._dirty) - - @property - def total(self): - """Current total number of connections""" - return self.active + len(self._connections)