From 1c5f0ecf9f79e0c3f8f689057541295d3bf7a336 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 5 Sep 2011 12:45:32 +0100 Subject: [PATCH] Adds kombu.pools.[connections|producers] --- Changelog | 36 ++++++++++++++++++++++++- kombu/__init__.py | 1 + kombu/pools.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) diff --git a/Changelog b/Changelog index 75a85142..d3c60757 100644 --- a/Changelog +++ b/Changelog @@ -38,10 +38,44 @@ So the leading slash in the path component is **always required**. +* Now comes with default global connection and producer pools. + + The acquire a connection using the connection parameters + from a :class:`BrokerConnection`:: + + >>> from kombu import BrokerConnection, connections + >>> connection = BrokerConnection("amqp://guest:guest@localhost//") + >>> with connections[connection].acquire(block=True): + ... # do something with connection + + To acquire a producer using the connection parameters + from a :class:`BrokerConnection`:: + + >>> from kombu import BrokerConnection, producers + >>> connection = BrokerConnection("amqp://guest:guest@localhost//") + >>> with producers[connection].acquire(block=True): + ... producer.publish({"hello": "world"}, exchange="hello") + + Acquiring a producer will in turn also acquire a connection + from the associated pool in ``connections``, so you the number + of producers is bound the same limit as number of connections. + + The default limit of 100 connections per connection instance + can be changed by doing:: + + >>> from kombu import pools + >>> pools.set_limit(10) + + The pool can also be forcefully closed by doing:: + + >>> from kombu import pools + >>> pool.reset() + + * SQS Transport: Persistence using SimpleDB is now disabled by default, after reports of unstable SimpleDB connections leading to errors. -* :class:`Prodcuer` can now be used as a context manager. +* :class:`Producer` can now be used as a context manager. * ``Producer.__exit__`` now properly calls ``release`` instead of close. diff --git a/kombu/__init__.py b/kombu/__init__.py index a0985cc2..5c0ae8f0 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -17,6 +17,7 @@ if not os.environ.get("KOMBU_NO_EVAL", False): "kombu.connection": ["BrokerConnection", "Connection"], "kombu.entity": ["Exchange", "Queue"], "kombu.messaging": ["Consumer", "Producer"], + "kombu.pools": ["connections", "producers"], } object_origins = {} diff --git a/kombu/pools.py b/kombu/pools.py index 2d9db861..2680cb0f 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -1,6 +1,11 @@ from kombu.connection import Resource from kombu.messaging import Producer +from itertools import chain + +__all__ = ["ProducerPool", "connections", "producers", "set_limit", "reset"] +_limit = [200] + class ProducerPool(Resource): Producer = Producer @@ -39,3 +44,65 @@ class ProducerPool(Resource): resource.connection.release() resource.connection = None super(ProducerPool, self).release(resource) + + +class HashingDict(dict): + + def __getitem__(self, key): + h = hash(key) + if h not in self: + return self.__missing__(key) + return dict.__getitem__(self, h) + + def __setitem__(self, key, value): + return dict.__setitem__(self, hash(key), value) + + def __delitem__(self, key): + return dict.__delitem__(self, hash(key)) + + +class _Connections(HashingDict): + + def __missing__(self, connection): + k = self[connection] = connection.Pool(limit=_limit[0]) + return k +connections = _Connections() + + +class _Producers(HashingDict): + + def __missing__(self, conn): + k = self[conn] = ProducerPool(connections[conn], limit=_limit[0]) + return k +producers = _Producers() + + +def _all_pools(): + return chain(connections.itervalues() if connections else iter([]), + producers.itervalues() if producers else iter([])) + + +def set_limit(limit): + _limit[0] = limit + for pool in _all_pools(): + pool.limit = limit + return limit + + +def reset(): + global connections + global producers + for pool in _all_pools(): + try: + pool.force_close_all() + except Exception: + pass + connections = _Connections() + producers._Producers() + + +try: + from multiprocessing.util import register_after_fork + register_after_fork(connections, reset) +except ImportError: + pass