mirror of https://github.com/celery/kombu.git
Draft ChannelPool implementation
This commit is contained in:
parent
f305b9b8a8
commit
630ba13022
|
@ -1,7 +1,7 @@
|
|||
import socket
|
||||
|
||||
from copy import copy
|
||||
from itertools import count
|
||||
from Queue import Empty, Full, Queue as _Queue
|
||||
|
||||
from kombu import exceptions
|
||||
from kombu.transport import get_transport_cls
|
||||
|
@ -222,6 +222,33 @@ class BrokerConnection(object):
|
|||
("transport_cls", transport_cls),
|
||||
("connect_timeout", self.connect_timeout)))
|
||||
|
||||
def ChannelPool(self, limit=None, preload=None):
|
||||
"""Pool of channels.
|
||||
|
||||
See :class:`ChannelPool`.
|
||||
|
||||
:keyword limit: Maximum number of active channels.
|
||||
Default is no limit.
|
||||
:keyword preload: Number of channels to preload
|
||||
when the pool is created. Default is 0.
|
||||
|
||||
*Example usage*::
|
||||
|
||||
>>> pool = connection.ChannelPool(2)
|
||||
>>> c1 = pool.acquire()
|
||||
>>> c2 = pool.acquire()
|
||||
>>> c3 = pool.acquire()
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
File "kombu/connection.py", line 354, in acquire
|
||||
raise ChannelLimitExceeded(self.limit)
|
||||
kombu.connection.ChannelLimitExceeded: 4
|
||||
>>> c1.release()
|
||||
>>> c3 = pool.acquire()
|
||||
|
||||
"""
|
||||
return ChannelPool(self, limit, preload)
|
||||
|
||||
def SimpleQueue(self, name, no_ack=False, queue_opts=None,
|
||||
exchange_opts=None, channel=None):
|
||||
"""Create new :class:`~kombu.simple.SimpleQueue`, using a channel
|
||||
|
@ -321,3 +348,71 @@ class BrokerConnection(object):
|
|||
def channel_errors(self):
|
||||
"""List of exceptions that may be raised by the channel."""
|
||||
return self.transport.channel_errors
|
||||
|
||||
|
||||
|
||||
class ChannelLimitExceeded(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ChannelPool(object):
|
||||
|
||||
def __init__(self, connection, limit=None, preload=None):
|
||||
self.connection = connection
|
||||
self.limit = limit
|
||||
self.preload = preload or 0
|
||||
|
||||
self._channels = _Queue()
|
||||
self._dirty = set()
|
||||
|
||||
channel = self.connection.channel
|
||||
for i in xrange(limit):
|
||||
self._channels.put_nowait(i < preload and channel() or channel)
|
||||
|
||||
def acquire(self, block=False, timeout=None):
|
||||
"""Acquire inactive channel.
|
||||
|
||||
:keyword block: If the channel limit is exceeded,
|
||||
block until there is an available channel.
|
||||
:keyword timeout: Timeout to wait for an available
|
||||
channel if ``block`` is true.
|
||||
|
||||
:raises ChannelLimitExceeded: if block is false
|
||||
and there are no available channels.
|
||||
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
channel = self._channels.get(block=block, timeout=timeout)
|
||||
except Empty:
|
||||
if self.limit and len(self._dirty) >= self.limit:
|
||||
raise ChannelLimitExceeded(self.limit)
|
||||
# No more channels, put one on the queue and
|
||||
# try get again, this way the first in line
|
||||
# will get the channel.
|
||||
self._channels.put_nowait(self.connection.channel)
|
||||
else:
|
||||
return self._prepare(channel)
|
||||
|
||||
def release(self, channel):
|
||||
"""Release channel so it can be used by another thread.
|
||||
|
||||
The caller is responsible for discarding the object,
|
||||
and to never use the channel again. A new channel must
|
||||
be acquired if so needed.
|
||||
|
||||
"""
|
||||
self._dirty.discard(channel)
|
||||
self._channels.put_nowait(channel)
|
||||
|
||||
def _prepare(self, channel):
|
||||
if callable(channel):
|
||||
channel = channel()
|
||||
|
||||
@wraps(self.release)
|
||||
def _release():
|
||||
self.release(channel)
|
||||
channel.release = _release
|
||||
|
||||
self._dirty.add(channel)
|
||||
return channel
|
||||
|
|
Loading…
Reference in New Issue