mirror of https://github.com/celery/kombu.git
hostname argument to BrokerConnection can now be an URL (transport://userid:password@host:port/vhost. Note that the / at the end must always be included, the vhost '/' becomes '//')
This commit is contained in:
parent
feea01bdbf
commit
148ee04578
|
@ -15,10 +15,11 @@ import socket
|
|||
from copy import copy
|
||||
from itertools import count
|
||||
from Queue import Empty
|
||||
from urlparse import urlparse, parse_qsl
|
||||
|
||||
from kombu import exceptions
|
||||
from kombu.transport import get_transport_cls
|
||||
from kombu.utils import retry_over_time
|
||||
from kombu.utils import kwdict, partition, retry_over_time
|
||||
from kombu.utils.compat import OrderedDict, LifoQueue as _LifoQueue
|
||||
from kombu.utils.functional import wraps
|
||||
|
||||
|
@ -28,9 +29,29 @@ _LOG_CHANNEL = os.environ.get("KOMBU_LOG_CHANNEL", False)
|
|||
|
||||
#: Connection info -> URI
|
||||
URI_FORMAT = """\
|
||||
%(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
|
||||
%(transport)s://%(userid)s@%(hostname)s%(port)s/%(virtual_host)s\
|
||||
"""
|
||||
|
||||
def parse_url(url):
|
||||
auth = userid = password = None
|
||||
scheme = urlparse(url).scheme
|
||||
parts = urlparse(url.replace("%s://" % (scheme, ), "http://"))
|
||||
netloc = parts.netloc
|
||||
if '@' in netloc:
|
||||
auth, _, netloc = partition(parts.netloc, '@')
|
||||
userid, _, password = partition(auth, ':')
|
||||
hostname, _, port = partition(netloc, ':')
|
||||
path = parts.path or ""
|
||||
if path and path[0] == '/':
|
||||
path = path[path.index('/') + 1:]
|
||||
return dict({"hostname": hostname,
|
||||
"port": port and int(port) or None,
|
||||
"userid": userid or None,
|
||||
"password": password or None,
|
||||
"transport": scheme,
|
||||
"virtual_host": path or None},
|
||||
**kwdict(dict(parse_qsl(parts.query))))
|
||||
|
||||
|
||||
class BrokerConnection(object):
|
||||
"""A connection to the broker.
|
||||
|
@ -85,20 +106,21 @@ class BrokerConnection(object):
|
|||
_logger = None
|
||||
|
||||
def __init__(self, hostname="localhost", userid=None,
|
||||
password=None, virtual_host="/", port=None, insist=False,
|
||||
password=None, virtual_host=None, port=None, insist=False,
|
||||
ssl=False, transport=None, connect_timeout=5,
|
||||
transport_options=None, login_method=None, **kwargs):
|
||||
self.hostname = hostname
|
||||
self.userid = userid
|
||||
self.password = password
|
||||
self.login_method = login_method
|
||||
self.virtual_host = virtual_host or self.virtual_host
|
||||
self.port = port or self.port
|
||||
self.insist = insist
|
||||
self.connect_timeout = connect_timeout or self.connect_timeout
|
||||
self.ssl = ssl
|
||||
# have to spell the args out, just to get nice docstrings :(
|
||||
params = {"hostname": hostname, "userid": userid,
|
||||
"password": password, "virtual_host": virtual_host,
|
||||
"port": port, "insist": insist, "ssl": ssl,
|
||||
"transport": transport, "connect_timeout": connect_timeout,
|
||||
"login_method": login_method}
|
||||
if "://" in hostname:
|
||||
params.update(parse_url(hostname))
|
||||
self._init_params(**params)
|
||||
|
||||
# backend_cls argument will be removed shortly.
|
||||
self.transport_cls = transport or kwargs.get("backend_cls")
|
||||
self.transport_cls = self.transport_cls or kwargs.get("backend_cls")
|
||||
|
||||
if transport_options is None:
|
||||
transport_options = {}
|
||||
|
@ -108,6 +130,20 @@ class BrokerConnection(object):
|
|||
from kombu.utils.log import get_logger
|
||||
self._logger = get_logger("kombu.connection")
|
||||
|
||||
def _init_params(self, hostname, userid, password, virtual_host, port,
|
||||
insist, ssl, transport, connect_timeout, login_method):
|
||||
self.hostname = hostname
|
||||
self.userid = userid
|
||||
self.password = password
|
||||
self.login_method = login_method
|
||||
self.virtual_host = virtual_host or self.virtual_host
|
||||
self.port = port or self.port
|
||||
self.insist = insist
|
||||
self.connect_timeout = connect_timeout
|
||||
self.ssl = ssl
|
||||
self.transport_cls = transport
|
||||
|
||||
|
||||
def _debug(self, msg, ident="[Kombu connection:0x%(id)x] ", **kwargs):
|
||||
if self._logger:
|
||||
self._logger.debug((ident + unicode(msg)) % {"id": id(self)},
|
||||
|
@ -311,7 +347,8 @@ class BrokerConnection(object):
|
|||
|
||||
def info(self):
|
||||
"""Get connection info."""
|
||||
transport_cls = self.transport_cls or "amqplib"
|
||||
transport_cls = self.transport_cls or "amqp"
|
||||
transport_cls = {"amqplib": "amqp"}.get(transport_cls, transport_cls)
|
||||
defaults = self.transport.default_connection_params
|
||||
info = OrderedDict((("hostname", self.hostname),
|
||||
("userid", self.userid),
|
||||
|
@ -335,12 +372,15 @@ class BrokerConnection(object):
|
|||
def as_uri(self):
|
||||
fields = self.info()
|
||||
port = fields["port"]
|
||||
userid = fields["userid"]
|
||||
url = "%s://" % fields["transport"]
|
||||
if userid:
|
||||
url += userid + '@'
|
||||
url += fields["hostname"]
|
||||
if port:
|
||||
fields["port"] = ":%s" % (port, )
|
||||
vhost = fields["virtual_host"]
|
||||
if not vhost.startswith('/'):
|
||||
fields["virtual_host"] = '/' + vhost
|
||||
return self.URI_FORMAT % fields
|
||||
url += ':' + str(port)
|
||||
url += '/' + fields["virtual_host"]
|
||||
return url
|
||||
|
||||
def Pool(self, limit=None, preload=None):
|
||||
"""Pool of connections.
|
||||
|
@ -573,6 +613,15 @@ class Resource(object):
|
|||
def release_resource(self, resource):
|
||||
pass
|
||||
|
||||
def replace(self, resource):
|
||||
"""Replace resource with a new instance. This can be used in case
|
||||
of defective resources."""
|
||||
if self.limit:
|
||||
self._dirty.discard(resource)
|
||||
self.close_resource(resource)
|
||||
else:
|
||||
self.close_resource(resource)
|
||||
|
||||
def release(self, resource):
|
||||
"""Release resource so it can be used by another thread.
|
||||
|
||||
|
@ -604,7 +653,9 @@ class Resource(object):
|
|||
break
|
||||
self.close_resource(dres)
|
||||
|
||||
resource.mutex.acquire()
|
||||
mutex = getattr(resource, "mutex", None)
|
||||
if mutex:
|
||||
mutex.acquire()
|
||||
try:
|
||||
while 1:
|
||||
try:
|
||||
|
@ -613,7 +664,31 @@ class Resource(object):
|
|||
break
|
||||
self.close_resource(res)
|
||||
finally:
|
||||
resource.mutex.release()
|
||||
if mutex:
|
||||
mutex.release()
|
||||
|
||||
if os.environ.get("KOMBU_DEBUG_POOL"):
|
||||
|
||||
_orig_acquire = acquire
|
||||
_orig_release = release
|
||||
|
||||
_next_resource_id = 0
|
||||
|
||||
def acquire(self, *args, **kwargs):
|
||||
id = self._next_resource_id = self._next_resource_id + 1
|
||||
print("+%s ACQUIRE %s" % (id, self.__class__.__name__, ))
|
||||
r = self._orig_acquire(*args, **kwargs)
|
||||
r._resource_id = id
|
||||
print("-%s ACQUIRE %s" % (id, self.__class__.__name__, ))
|
||||
return r
|
||||
|
||||
def release(self, resource):
|
||||
id = resource._resource_id
|
||||
print("+%s RELEASE %s" % (id, self.__class__.__name__, ))
|
||||
r = self._orig_release(resource)
|
||||
print("-%s RELEASE %s" % (id, self.__class__.__name__, ))
|
||||
self._next_resource_id -= 1
|
||||
return r
|
||||
|
||||
|
||||
class PoolChannelContext(object):
|
||||
|
@ -646,6 +721,9 @@ class ConnectionPool(Resource):
|
|||
def release_resource(self, resource):
|
||||
resource._debug("released")
|
||||
|
||||
def close_resource(self, resource):
|
||||
resource._close()
|
||||
|
||||
def acquire_channel(self, block=False):
|
||||
return PoolChannelContext(self, block)
|
||||
|
||||
|
|
Loading…
Reference in New Issue