mirror of https://github.com/celery/kombu.git
Remove incomplete connection pool implementation that was accidentally committed
This commit is contained in:
parent
d5fe88f007
commit
f9ea964f2e
|
@ -1,10 +1,7 @@
|
|||
import socket
|
||||
import threading
|
||||
|
||||
from collections import deque
|
||||
from copy import copy
|
||||
from itertools import count
|
||||
from time import time
|
||||
|
||||
from kombu import exceptions
|
||||
from kombu.transport import get_transport_cls
|
||||
|
@ -69,8 +66,7 @@ class BrokerConnection(object):
|
|||
|
||||
def __init__(self, hostname="localhost", userid="guest",
|
||||
password="guest", virtual_host="/", port=None, insist=False,
|
||||
ssl=False, transport=None, connect_timeout=5, pool=None,
|
||||
backend_cls=None):
|
||||
ssl=False, transport=None, connect_timeout=5, backend_cls=None):
|
||||
self.hostname = hostname
|
||||
self.userid = userid
|
||||
self.password = password
|
||||
|
@ -81,7 +77,6 @@ class BrokerConnection(object):
|
|||
self.ssl = ssl
|
||||
# backend_cls argument will be removed shortly.
|
||||
self.transport_cls = transport or backend_cls
|
||||
self.pool = pool
|
||||
|
||||
def connect(self):
|
||||
"""Establish connection to server immediately."""
|
||||
|
@ -197,27 +192,6 @@ class BrokerConnection(object):
|
|||
_insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
|
||||
return _insured
|
||||
|
||||
def acquire(self):
|
||||
"""Acquire connection.
|
||||
|
||||
Only here for API compatibility with :class:`BrokerConnectionPool`.
|
||||
|
||||
"""
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
"""Close the connection, or if the connection is managed by a pool
|
||||
the connection will be released to the pool so it can be reused.
|
||||
|
||||
**NOTE:** You must never perform operations on a connection
|
||||
that has been released.
|
||||
|
||||
"""
|
||||
if self.pool:
|
||||
self.pool.release(self)
|
||||
else:
|
||||
self.close()
|
||||
|
||||
def create_transport(self):
|
||||
return self.get_transport_cls()(client=self)
|
||||
create_backend = create_transport # FIXME
|
||||
|
@ -236,16 +210,17 @@ class BrokerConnection(object):
|
|||
|
||||
def info(self):
|
||||
"""Get connection info."""
|
||||
transport_cls = self.transport_cls or "amqplib"
|
||||
port = self.port or self.transport.default_port
|
||||
return OrderedDict((("hostname", self.hostname),
|
||||
("userid", self.userid),
|
||||
("password", self.password),
|
||||
("virtual_host", self.virtual_host),
|
||||
("port", self.port),
|
||||
("port", port),
|
||||
("insist", self.insist),
|
||||
("ssl", self.ssl),
|
||||
("transport_cls", self.transport_cls),
|
||||
("connect_timeout", self.connect_timeout),
|
||||
("pool", self.pool)))
|
||||
("transport_cls", transport_cls),
|
||||
("connect_timeout", self.connect_timeout)))
|
||||
|
||||
def SimpleQueue(self, name, no_ack=False, queue_opts=None,
|
||||
exchange_opts=None, channel=None):
|
||||
|
@ -346,168 +321,3 @@ class BrokerConnection(object):
|
|||
def channel_errors(self):
|
||||
"""List of exceptions that may be raised by the channel."""
|
||||
return self.transport.channel_errors
|
||||
|
||||
|
||||
class BrokerConnectionPool(object):
|
||||
"""Pool of connections.
|
||||
|
||||
:param initial: Initial :class:`BrokerConnection` to take connection
|
||||
parameters from.
|
||||
|
||||
:keyword max: Maximum number of connections in the pool.
|
||||
Default is 10.
|
||||
|
||||
:keyword ensure: When ``preconnect`` on, ensure we're able to establish
|
||||
a connection. Default is ``False``.
|
||||
|
||||
:keyword preconnect: Number of connections at
|
||||
instantiation. Default is to only establish connections when needed.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, initial, max=10, ensure=False, preconnect=0):
|
||||
self.initial = initial
|
||||
self.max = max
|
||||
self.preconnect = preconnect
|
||||
self._connections = deque()
|
||||
self._dirty = set()
|
||||
self.mutex = threading.Lock()
|
||||
self.not_empty = threading.Condition(self.mutex)
|
||||
|
||||
self.grow(self.preconnect, connect=True)
|
||||
self.grow(self.max - self.preconnect)
|
||||
|
||||
def acquire(self, block=True, timeout=None):
|
||||
"""Acquire connection.
|
||||
|
||||
:raises kombu.exceptions.PoolExhausted: If there are no
|
||||
available connections to be acquired.
|
||||
|
||||
"""
|
||||
self.not_empty.acquire()
|
||||
time_start = time()
|
||||
try:
|
||||
while 1:
|
||||
try:
|
||||
connection = self._connections.popleft()
|
||||
self._dirty.add(connection)
|
||||
return connection
|
||||
except IndexError:
|
||||
if not block:
|
||||
raise exceptions.PoolExhausted(
|
||||
"All connections acquired")
|
||||
if timeout:
|
||||
elapsed = time() - time_start
|
||||
remaining = timeout - elapsed
|
||||
if elapsed > timeout:
|
||||
raise exceptions.TimeoutError(
|
||||
"Timed out while acquiring connection.")
|
||||
self.not_empty.wait(remaining)
|
||||
finally:
|
||||
self.not_empty.release()
|
||||
|
||||
def release(self, connection):
|
||||
"""Release connection so it can be used by others.
|
||||
|
||||
**NOTE:** You must never perform operations on a connection
|
||||
that has been released.
|
||||
|
||||
"""
|
||||
self.mutex.acquire()
|
||||
try:
|
||||
try:
|
||||
self._dirty.remove(connection)
|
||||
except KeyError:
|
||||
pass
|
||||
self._connections.append(connection)
|
||||
self.not_empty.notify()
|
||||
finally:
|
||||
self.mutex.release()
|
||||
|
||||
def replace(self, connection):
|
||||
"""Clone and replace connection with a new one.
|
||||
|
||||
This is useful if the connection is broken.
|
||||
|
||||
"""
|
||||
connection.close()
|
||||
self.mutex.acquire()
|
||||
try:
|
||||
try:
|
||||
self._dirty.remove(connection)
|
||||
self._connections.remove(connection)
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
finally:
|
||||
self.mutex.release()
|
||||
self.grow(1)
|
||||
|
||||
def ensure(self, fun, errback=None, max_retries=None,
|
||||
interval_start=2, interval_step=2, interval_max=30):
|
||||
"""See :meth:`BrokerConnection.ensure`."""
|
||||
|
||||
@wraps(fun)
|
||||
def _insured(*args, **kwargs):
|
||||
conn = self.acquire()
|
||||
try:
|
||||
return conn.ensure(fun, errback, max_retries,
|
||||
interval_start,
|
||||
interval_step,
|
||||
interval_max)(*args, **kwargs)
|
||||
finally:
|
||||
conn.release()
|
||||
|
||||
return insured
|
||||
|
||||
def grow(self, n, connect=False):
|
||||
"""Add ``n`` more connections to the pool.
|
||||
|
||||
:keyword connect: Establish connections imemediately.
|
||||
By default connections are only established when needed.
|
||||
|
||||
:raises kombu.exceptions.PoolLimitExceeded: If there are already
|
||||
more than :attr:`max` number of connections in the pool.
|
||||
|
||||
"""
|
||||
self.mutex.acquire()
|
||||
try:
|
||||
for _ in xrange(n):
|
||||
if self.total >= self.max:
|
||||
raise exceptions.PoolLimitExceeded(
|
||||
"Can't add more connections to the pool.")
|
||||
connection = self.initial.clone(pool=self)
|
||||
connect and self._establish_connection(connection)
|
||||
self._connections.append(connection)
|
||||
self.not_empty.notify()
|
||||
finally:
|
||||
self.mutex.release()
|
||||
|
||||
def close(self):
|
||||
"""Close all connections."""
|
||||
while self._connections:
|
||||
self._connections.popleft().close()
|
||||
while self._dirty:
|
||||
self._dirty.pop().close()
|
||||
|
||||
def _establish_connection(self, connection):
|
||||
if self.ensure:
|
||||
return connection.ensure_connection()
|
||||
return connection.connect()
|
||||
|
||||
def __repr__(self):
|
||||
"""``x.__repr__() <==> repr(x)``"""
|
||||
info = self.initial.info()
|
||||
return "<BrokerConnectionPool(%s): %s>" % (
|
||||
self.max,
|
||||
", ".join("%s=%r" % (item, info[item])
|
||||
for item in info.keys()[:8]))
|
||||
|
||||
@property
|
||||
def active(self):
|
||||
"""Number of acquired connections."""
|
||||
return len(self._dirty)
|
||||
|
||||
@property
|
||||
def total(self):
|
||||
"""Current total number of connections"""
|
||||
return self.active + len(self._connections)
|
||||
|
|
Loading…
Reference in New Issue