diff --git a/kombu/connection.py b/kombu/connection.py index 34fe9928..c99f5c52 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -1,7 +1,7 @@ import socket -from copy import copy from itertools import count +from Queue import Empty, Full, Queue as _Queue from kombu import exceptions from kombu.transport import get_transport_cls @@ -222,6 +222,33 @@ class BrokerConnection(object): ("transport_cls", transport_cls), ("connect_timeout", self.connect_timeout))) + def ChannelPool(self, limit=None, preload=None): + """Pool of channels. + + See :class:`ChannelPool`. + + :keyword limit: Maximum number of active channels. + Default is no limit. + :keyword preload: Number of channels to preload + when the pool is created. Default is 0. + + *Example usage*:: + + >>> pool = connection.ChannelPool(2) + >>> c1 = pool.acquire() + >>> c2 = pool.acquire() + >>> c3 = pool.acquire() + Traceback (most recent call last): + File "", line 1, in + File "kombu/connection.py", line 354, in acquire + raise ChannelLimitExceeded(self.limit) + kombu.connection.ChannelLimitExceeded: 4 + >>> c1.release() + >>> c3 = pool.acquire() + + """ + return ChannelPool(self, limit, preload) + def SimpleQueue(self, name, no_ack=False, queue_opts=None, exchange_opts=None, channel=None): """Create new :class:`~kombu.simple.SimpleQueue`, using a channel @@ -321,3 +348,71 @@ class BrokerConnection(object): def channel_errors(self): """List of exceptions that may be raised by the channel.""" return self.transport.channel_errors + + + +class ChannelLimitExceeded(Exception): + pass + + +class ChannelPool(object): + + def __init__(self, connection, limit=None, preload=None): + self.connection = connection + self.limit = limit + self.preload = preload or 0 + + self._channels = _Queue() + self._dirty = set() + + channel = self.connection.channel + for i in xrange(limit): + self._channels.put_nowait(i < preload and channel() or channel) + + def acquire(self, block=False, timeout=None): + """Acquire inactive channel. + + :keyword block: If the channel limit is exceeded, + block until there is an available channel. + :keyword timeout: Timeout to wait for an available + channel if ``block`` is true. + + :raises ChannelLimitExceeded: if block is false + and there are no available channels. + + """ + while True: + try: + channel = self._channels.get(block=block, timeout=timeout) + except Empty: + if self.limit and len(self._dirty) >= self.limit: + raise ChannelLimitExceeded(self.limit) + # No more channels, put one on the queue and + # try get again, this way the first in line + # will get the channel. + self._channels.put_nowait(self.connection.channel) + else: + return self._prepare(channel) + + def release(self, channel): + """Release channel so it can be used by another thread. + + The caller is responsible for discarding the object, + and to never use the channel again. A new channel must + be acquired if so needed. + + """ + self._dirty.discard(channel) + self._channels.put_nowait(channel) + + def _prepare(self, channel): + if callable(channel): + channel = channel() + + @wraps(self.release) + def _release(): + self.release(channel) + channel.release = _release + + self._dirty.add(channel) + return channel