From 148ee045786a0f767b491fe6c24dcab46f714453 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sat, 20 Aug 2011 00:01:43 +0100 Subject: [PATCH] hostname argument to BrokerConnection can now be an URL (transport://userid:password@host:port/vhost. Note that the / at the end must always be included, the vhost '/' becomes '//') --- kombu/connection.py | 120 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 21 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index ee187092..f8f5de01 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -15,10 +15,11 @@ import socket from copy import copy from itertools import count from Queue import Empty +from urlparse import urlparse, parse_qsl from kombu import exceptions from kombu.transport import get_transport_cls -from kombu.utils import retry_over_time +from kombu.utils import kwdict, partition, retry_over_time from kombu.utils.compat import OrderedDict, LifoQueue as _LifoQueue from kombu.utils.functional import wraps @@ -28,9 +29,29 @@ _LOG_CHANNEL = os.environ.get("KOMBU_LOG_CHANNEL", False) #: Connection info -> URI URI_FORMAT = """\ -%(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\ +%(transport)s://%(userid)s@%(hostname)s%(port)s/%(virtual_host)s\ """ +def parse_url(url): + auth = userid = password = None + scheme = urlparse(url).scheme + parts = urlparse(url.replace("%s://" % (scheme, ), "http://")) + netloc = parts.netloc + if '@' in netloc: + auth, _, netloc = partition(parts.netloc, '@') + userid, _, password = partition(auth, ':') + hostname, _, port = partition(netloc, ':') + path = parts.path or "" + if path and path[0] == '/': + path = path[path.index('/') + 1:] + return dict({"hostname": hostname, + "port": port and int(port) or None, + "userid": userid or None, + "password": password or None, + "transport": scheme, + "virtual_host": path or None}, + **kwdict(dict(parse_qsl(parts.query)))) + class BrokerConnection(object): """A connection to the broker. @@ -85,20 +106,21 @@ class BrokerConnection(object): _logger = None def __init__(self, hostname="localhost", userid=None, - password=None, virtual_host="/", port=None, insist=False, + password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, **kwargs): - self.hostname = hostname - self.userid = userid - self.password = password - self.login_method = login_method - self.virtual_host = virtual_host or self.virtual_host - self.port = port or self.port - self.insist = insist - self.connect_timeout = connect_timeout or self.connect_timeout - self.ssl = ssl + # have to spell the args out, just to get nice docstrings :( + params = {"hostname": hostname, "userid": userid, + "password": password, "virtual_host": virtual_host, + "port": port, "insist": insist, "ssl": ssl, + "transport": transport, "connect_timeout": connect_timeout, + "login_method": login_method} + if "://" in hostname: + params.update(parse_url(hostname)) + self._init_params(**params) + # backend_cls argument will be removed shortly. - self.transport_cls = transport or kwargs.get("backend_cls") + self.transport_cls = self.transport_cls or kwargs.get("backend_cls") if transport_options is None: transport_options = {} @@ -108,6 +130,20 @@ class BrokerConnection(object): from kombu.utils.log import get_logger self._logger = get_logger("kombu.connection") + def _init_params(self, hostname, userid, password, virtual_host, port, + insist, ssl, transport, connect_timeout, login_method): + self.hostname = hostname + self.userid = userid + self.password = password + self.login_method = login_method + self.virtual_host = virtual_host or self.virtual_host + self.port = port or self.port + self.insist = insist + self.connect_timeout = connect_timeout + self.ssl = ssl + self.transport_cls = transport + + def _debug(self, msg, ident="[Kombu connection:0x%(id)x] ", **kwargs): if self._logger: self._logger.debug((ident + unicode(msg)) % {"id": id(self)}, @@ -311,7 +347,8 @@ class BrokerConnection(object): def info(self): """Get connection info.""" - transport_cls = self.transport_cls or "amqplib" + transport_cls = self.transport_cls or "amqp" + transport_cls = {"amqplib": "amqp"}.get(transport_cls, transport_cls) defaults = self.transport.default_connection_params info = OrderedDict((("hostname", self.hostname), ("userid", self.userid), @@ -335,12 +372,15 @@ class BrokerConnection(object): def as_uri(self): fields = self.info() port = fields["port"] + userid = fields["userid"] + url = "%s://" % fields["transport"] + if userid: + url += userid + '@' + url += fields["hostname"] if port: - fields["port"] = ":%s" % (port, ) - vhost = fields["virtual_host"] - if not vhost.startswith('/'): - fields["virtual_host"] = '/' + vhost - return self.URI_FORMAT % fields + url += ':' + str(port) + url += '/' + fields["virtual_host"] + return url def Pool(self, limit=None, preload=None): """Pool of connections. @@ -573,6 +613,15 @@ class Resource(object): def release_resource(self, resource): pass + def replace(self, resource): + """Replace resource with a new instance. This can be used in case + of defective resources.""" + if self.limit: + self._dirty.discard(resource) + self.close_resource(resource) + else: + self.close_resource(resource) + def release(self, resource): """Release resource so it can be used by another thread. @@ -604,7 +653,9 @@ class Resource(object): break self.close_resource(dres) - resource.mutex.acquire() + mutex = getattr(resource, "mutex", None) + if mutex: + mutex.acquire() try: while 1: try: @@ -613,7 +664,31 @@ class Resource(object): break self.close_resource(res) finally: - resource.mutex.release() + if mutex: + mutex.release() + + if os.environ.get("KOMBU_DEBUG_POOL"): + + _orig_acquire = acquire + _orig_release = release + + _next_resource_id = 0 + + def acquire(self, *args, **kwargs): + id = self._next_resource_id = self._next_resource_id + 1 + print("+%s ACQUIRE %s" % (id, self.__class__.__name__, )) + r = self._orig_acquire(*args, **kwargs) + r._resource_id = id + print("-%s ACQUIRE %s" % (id, self.__class__.__name__, )) + return r + + def release(self, resource): + id = resource._resource_id + print("+%s RELEASE %s" % (id, self.__class__.__name__, )) + r = self._orig_release(resource) + print("-%s RELEASE %s" % (id, self.__class__.__name__, )) + self._next_resource_id -= 1 + return r class PoolChannelContext(object): @@ -646,6 +721,9 @@ class ConnectionPool(Resource): def release_resource(self, resource): resource._debug("released") + def close_resource(self, resource): + resource._close() + def acquire_channel(self, block=False): return PoolChannelContext(self, block)