From 3cd0eb6f1b2b2e7bc919276a1b75a5c4b6082940 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 11 Nov 2011 16:26:59 +0100 Subject: [PATCH 1/2] First attempt to enable Mongo Replicasets --- kombu/connection.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index d02a63c9..4521da05 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -38,19 +38,24 @@ URI_FORMAT = """\ def parse_url(url): + port = path = None auth = userid = password = None scheme = urlparse(url).scheme parts = urlparse(url.replace("%s://" % (scheme, ), "http://")) - netloc = parts.netloc - if '@' in netloc: - auth, _, netloc = partition(parts.netloc, '@') - userid, _, password = partition(auth, ':') - hostname, _, port = partition(netloc, ':') - path = parts.path or "" - if path and path[0] == '/': - path = path[1:] + if scheme != 'mongodb': + netloc = parts.netloc + if '@' in netloc: + auth, _, netloc = partition(parts.netloc, '@') + userid, _, password = partition(auth, ':') + hostname, _, port = partition(netloc, ':') + path = parts.path or "" + if path and path[0] == '/': + path = path[1:] + port = int(port) + else: + hostname = url[len('mongodb://'):] return dict({"hostname": hostname, - "port": port and int(port) or None, + "port": port or None, "userid": userid or None, "password": password or None, "transport": scheme, @@ -386,14 +391,15 @@ class BrokerConnection(object): port = fields["port"] userid = fields["userid"] password = fields["password"] - url = "%s://" % fields["transport"] + transport = fields["transport"] + url = "%s://" % transport if userid: url += userid if include_password and password: url += ':' + password url += '@' url += fields["hostname"] - if port: + if port and transport != "mongodb": url += ':' + str(port) url += '/' + fields["virtual_host"] return url From 4e9a5c4e181123ad28c05a4ea61fdb5385301fe4 Mon Sep 17 00:00:00 2001 From: Ivan Metzlar Date: Fri, 11 Nov 2011 16:44:47 +0100 Subject: [PATCH 2/2] Proper Mongodb Replicaset handling --- kombu/connection.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index 4521da05..7020ceab 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -38,10 +38,15 @@ URI_FORMAT = """\ def parse_url(url): - port = path = None - auth = userid = password = None + port = path = auth = userid = password = None scheme = urlparse(url).scheme parts = urlparse(url.replace("%s://" % (scheme, ), "http://")) + + # The first pymongo.Connection() argument (host) can be + # a mongodb connection URI. If this is the case, don't + # use port but let pymongo get the port(s) from the URI instead. + # This enables the use of replica sets and sharding. + # See pymongo.Connection() for more info. if scheme != 'mongodb': netloc = parts.netloc if '@' in netloc: @@ -53,7 +58,9 @@ def parse_url(url): path = path[1:] port = int(port) else: + # strip the scheme since it is appended automatically hostname = url[len('mongodb://'):] + return dict({"hostname": hostname, "port": port or None, "userid": userid or None, @@ -399,8 +406,13 @@ class BrokerConnection(object): url += ':' + password url += '@' url += fields["hostname"] + + # If the transport equals 'mongodb' the + # hostname contains a full mongodb connection + # URI. Let pymongo retreive the port from there. if port and transport != "mongodb": url += ':' + str(port) + url += '/' + fields["virtual_host"] return url