Adds kombu.pools.[connections|producers]

This commit is contained in:
Ask Solem 2011-09-05 12:45:32 +01:00
parent d570d6a5e2
commit 1c5f0ecf9f
3 changed files with 103 additions and 1 deletions

View File

@ -38,10 +38,44 @@
So the leading slash in the path component is **always required**. So the leading slash in the path component is **always required**.
* Now comes with default global connection and producer pools.
The acquire a connection using the connection parameters
from a :class:`BrokerConnection`::
>>> from kombu import BrokerConnection, connections
>>> connection = BrokerConnection("amqp://guest:guest@localhost//")
>>> with connections[connection].acquire(block=True):
... # do something with connection
To acquire a producer using the connection parameters
from a :class:`BrokerConnection`::
>>> from kombu import BrokerConnection, producers
>>> connection = BrokerConnection("amqp://guest:guest@localhost//")
>>> with producers[connection].acquire(block=True):
... producer.publish({"hello": "world"}, exchange="hello")
Acquiring a producer will in turn also acquire a connection
from the associated pool in ``connections``, so you the number
of producers is bound the same limit as number of connections.
The default limit of 100 connections per connection instance
can be changed by doing::
>>> from kombu import pools
>>> pools.set_limit(10)
The pool can also be forcefully closed by doing::
>>> from kombu import pools
>>> pool.reset()
* SQS Transport: Persistence using SimpleDB is now disabled by default, * SQS Transport: Persistence using SimpleDB is now disabled by default,
after reports of unstable SimpleDB connections leading to errors. after reports of unstable SimpleDB connections leading to errors.
* :class:`Prodcuer` can now be used as a context manager. * :class:`Producer` can now be used as a context manager.
* ``Producer.__exit__`` now properly calls ``release`` instead of close. * ``Producer.__exit__`` now properly calls ``release`` instead of close.

View File

@ -17,6 +17,7 @@ if not os.environ.get("KOMBU_NO_EVAL", False):
"kombu.connection": ["BrokerConnection", "Connection"], "kombu.connection": ["BrokerConnection", "Connection"],
"kombu.entity": ["Exchange", "Queue"], "kombu.entity": ["Exchange", "Queue"],
"kombu.messaging": ["Consumer", "Producer"], "kombu.messaging": ["Consumer", "Producer"],
"kombu.pools": ["connections", "producers"],
} }
object_origins = {} object_origins = {}

View File

@ -1,6 +1,11 @@
from kombu.connection import Resource from kombu.connection import Resource
from kombu.messaging import Producer from kombu.messaging import Producer
from itertools import chain
__all__ = ["ProducerPool", "connections", "producers", "set_limit", "reset"]
_limit = [200]
class ProducerPool(Resource): class ProducerPool(Resource):
Producer = Producer Producer = Producer
@ -39,3 +44,65 @@ class ProducerPool(Resource):
resource.connection.release() resource.connection.release()
resource.connection = None resource.connection = None
super(ProducerPool, self).release(resource) super(ProducerPool, self).release(resource)
class HashingDict(dict):
def __getitem__(self, key):
h = hash(key)
if h not in self:
return self.__missing__(key)
return dict.__getitem__(self, h)
def __setitem__(self, key, value):
return dict.__setitem__(self, hash(key), value)
def __delitem__(self, key):
return dict.__delitem__(self, hash(key))
class _Connections(HashingDict):
def __missing__(self, connection):
k = self[connection] = connection.Pool(limit=_limit[0])
return k
connections = _Connections()
class _Producers(HashingDict):
def __missing__(self, conn):
k = self[conn] = ProducerPool(connections[conn], limit=_limit[0])
return k
producers = _Producers()
def _all_pools():
return chain(connections.itervalues() if connections else iter([]),
producers.itervalues() if producers else iter([]))
def set_limit(limit):
_limit[0] = limit
for pool in _all_pools():
pool.limit = limit
return limit
def reset():
global connections
global producers
for pool in _all_pools():
try:
pool.force_close_all()
except Exception:
pass
connections = _Connections()
producers._Producers()
try:
from multiprocessing.util import register_after_fork
register_after_fork(connections, reset)
except ImportError:
pass