ensure fixes + ConnectionPool

This commit is contained in:
Ask Solem 2010-10-22 15:42:04 +02:00
parent 630ba13022
commit ce1d10ee5c
1 changed files with 129 additions and 48 deletions

View File

@ -1,7 +1,8 @@
import socket
from copy import copy
from itertools import count
from Queue import Empty, Full, Queue as _Queue
from Queue import Empty, Queue as _Queue
from kombu import exceptions
from kombu.transport import get_transport_cls
@ -102,7 +103,10 @@ class BrokerConnection(object):
"""Close the connection (if open)."""
try:
if self._connection:
self.transport.close_connection(self._connection)
try:
self.transport.close_connection(self._connection)
except self.transport.connection_errors + (AttributeError, ):
pass
self._connection = None
except socket.error:
pass
@ -129,12 +133,13 @@ class BrokerConnection(object):
each retry.
"""
print("CONNECTION ERRORS: %r" % (self.connection_errors, ))
retry_over_time(self.connect, self.connection_errors, (), {},
errback, max_retries,
interval_start, interval_step, interval_max)
return self
def ensure(self, fun, errback=None, max_retries=None,
def ensure(self, obj, fun, errback=None, max_retries=None,
interval_start=1, interval_step=1, interval_max=1):
"""Ensure operation completes, regardless of any channel/connection
errors occuring.
@ -164,7 +169,7 @@ class BrokerConnection(object):
>>> def errback(exc, interval):
... print("Couldn't publish message: %r. Retry in %ds" % (
... exc, interval))
>>> publish = conn.ensure(producer.publish,
>>> publish = conn.ensure(producer, producer.publish,
... errback=errback, max_retries=3)
>>> publish(message, routing_key)
@ -174,20 +179,27 @@ class BrokerConnection(object):
@wraps(fun)
def _insured(*args, **kwargs):
for ret in count(0):
if max_retries and retries >= max_retries:
raise exceptions.EnsureExhausted()
got_connection = 0
for retries in count(0):
try:
return fun(*args, **kwargs)
except self.connection_errors + self.channel_errors, exc:
if got_connection:
raise
if max_retries and retries > max_retries:
raise exceptions.EnsureExhausted()
errback and errback(exc, 0)
self.connection.connection = None
self._connection = None
self.close()
remaining_retries = max_retries and \
max(max_retries - retries, 1)
self.ensure_connection(errback,
max(max_retries - retries, 1),
remaining_retries,
interval_start,
interval_step,
interval_max)
obj.channel = self.channel()
got_connection += 1
_insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
return _insured
@ -219,9 +231,36 @@ class BrokerConnection(object):
("port", port),
("insist", self.insist),
("ssl", self.ssl),
("transport_cls", transport_cls),
("transport", transport_cls),
("connect_timeout", self.connect_timeout)))
def Pool(self, limit=None, preload=None):
"""Pool of connections.
See :class:`ConnectionPool`.
:keyword limit: Maximum number of active connections.
Default is no limit.
:keyword preload: Number of connections to preload
when the pool is created. Default is 0.
*Example usage*::
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "kombu/connection.py", line 354, in acquire
raise ConnectionLimitExceeded(self.limit)
kombu.exceptions.ConnectionLimitExceeded: 2
>>> c1.release()
>>> c3 = pool.acquire()
"""
return ConnectionPool(self, limit, preload)
def ChannelPool(self, limit=None, preload=None):
"""Pool of channels.
@ -242,7 +281,7 @@ class BrokerConnection(object):
File "<stdin>", line 1, in <module>
File "kombu/connection.py", line 354, in acquire
raise ChannelLimitExceeded(self.limit)
kombu.connection.ChannelLimitExceeded: 4
kombu.connection.ChannelLimitExceeded: 2
>>> c1.release()
>>> c3 = pool.acquire()
@ -350,69 +389,111 @@ class BrokerConnection(object):
return self.transport.channel_errors
class Resource(object):
class ChannelLimitExceeded(Exception):
pass
class ChannelPool(object):
def __init__(self, connection, limit=None, preload=None):
self.connection = connection
def __init__(self, limit=None, preload=None):
self.limit = limit
self.preload = preload or 0
self._channels = _Queue()
self._resource = _Queue()
self._dirty = set()
self.setup()
channel = self.connection.channel
for i in xrange(limit):
self._channels.put_nowait(i < preload and channel() or channel)
def setup(self):
raise NotImplementedError("subclass responsibilty")
def acquire(self, block=False, timeout=None):
"""Acquire inactive channel.
"""Acquire resource.
:keyword block: If the channel limit is exceeded,
block until there is an available channel.
:keyword timeout: Timeout to wait for an available
channel if ``block`` is true.
:keyword block: If the limit is exceeded,
block until there is an available item.
:keyword timeout: Timeout to wait
if ``block`` is true. Default is :const:`None` (forever).
:raises ChannelLimitExceeded: if block is false
and there are no available channels.
:raises LimitExceeded: if block is false
and the limit has been exceeded.
"""
while True:
try:
channel = self._channels.get(block=block, timeout=timeout)
resource = self._resource.get(block=block, timeout=timeout)
except Empty:
if self.limit and len(self._dirty) >= self.limit:
raise ChannelLimitExceeded(self.limit)
# No more channels, put one on the queue and
raise self.LimitExceeded(self.limit)
# All taken, put new on the queue and
# try get again, this way the first in line
# will get the channel.
self._channels.put_nowait(self.connection.channel)
# will get the resource.
self._resource.put_nowait(self.new())
else:
return self._prepare(channel)
resource = self.prepare(resource)
self._dirty.add(resource)
def release(self, channel):
"""Release channel so it can be used by another thread.
@wraps(self.release)
def _release():
self.release(resource)
resource.release = _release
return resource
def prepare(self, resource):
return resource
def release(self, resource):
"""Release resource so it can be used by another thread.
The caller is responsible for discarding the object,
and to never use the channel again. A new channel must
and to never use the resource again. A new resource must
be acquired if so needed.
"""
self._dirty.discard(channel)
self._channels.put_nowait(channel)
self._dirty.discard(resource)
self._resource.put_nowait(resource)
def _prepare(self, channel):
class ConnectionPool(Resource):
LimitExceeded = exceptions.ConnectionLimitExceeded
def __init__(self, connection, limit=None, preload=None):
self.connection = connection
super(ConnectionPool, self).__init__(limit=limit,
preload=preload)
def new(self):
return copy(self.connection)
def setup(self):
for i in xrange(self.limit):
conn = self.new()
if i < self.preload:
conn.connect()
self._resource.put_nowait(conn)
def prepare(self, resource):
if not resource._connection:
resource.connect()
return resource
class ChannelPool(Resource):
LimitExceeded = exceptions.ChannelLimitExceeded
def __init__(self, connection, limit=None, preload=None):
self.connection = connection
super(ChannelPool, self).__init__(limit=limit,
preload=preload)
def new(self):
return self.connection.channel
def setup(self):
channel = self.new()
for i in xrange(self.limit):
self._resource.put_nowait(
i < self.preload and channel() or channel)
def prepare(self, channel):
if callable(channel):
channel = channel()
@wraps(self.release)
def _release():
self.release(channel)
channel.release = _release
self._dirty.add(channel)
return channel