diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 1a09d1f4..b31fce9b 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -15,6 +15,7 @@ from Queue import Empty import pymongo from pymongo import errors +from pymongo.uri_parser import parse_uri from anyjson import loads, dumps from pymongo.connection import Connection @@ -101,57 +102,42 @@ class Channel(virtual.Channel): See mongodb uri documentation: http://www.mongodb.org/display/DOCS/Connections """ - conninfo = self.connection.client + client = self.connection.client + hostname = client.hostname or DEFAULT_HOST + authdb = dbname = client.virtual_host - dbname = None - hostname = None - - if not conninfo.hostname: - conninfo.hostname = DEFAULT_HOST - - for part in conninfo.hostname.split('/'): - if not hostname: - hostname = 'mongodb://' + part - continue - - dbname = part - if '?' in part: - # In case someone is passing options - # to the mongodb connection. Right now - # it is not permitted by kombu - dbname, options = part.split('?') - hostname += '/?' + options - - hostname = "%s/%s" % ( - hostname, dbname in [None, "/"] and "admin" or dbname, - ) - if not dbname or dbname == "/": + if dbname in ["/", None]: dbname = "kombu_default" + authdb = "admin" + if not client.userid: + hostname = hostname.replace('/' + client.virtual_host, '/') + else: + hostname = hostname.replace('/' + client.virtual_host, + '/' + authdb) + + mongo_uri = 'mongodb://' + hostname # At this point we expect the hostname to be something like # (considering replica set form too): # # mongodb://[username:password@]host1[:port1][,host2[:port2], # ...[,hostN[:portN]]][/[?options]] - mongoconn = Connection(host=hostname, ssl=conninfo.ssl) + mongoconn = Connection(host=mongo_uri, ssl=client.ssl) + database = getattr(mongoconn, dbname) + version = mongoconn.server_info()['version'] if tuple(map(int, version.split('.')[:2])) < (1, 3): raise NotImplementedError( 'Kombu requires MongoDB version 1.3+, but connected to %s' % ( version, )) - database = getattr(mongoconn, dbname) - - # This is done by the connection uri - # if conninfo.userid: - # database.authenticate(conninfo.userid, conninfo.password) self.db = database col = database.messages col.ensure_index([('queue', 1), ('_id', 1)], background=True) if 'messages.broadcast' not in database.collection_names(): - capsize = conninfo.transport_options.get( - 'capped_queue_size') or 100000 + capsize = (client.transport_options.get('capped_queue_size') + or 100000) database.create_collection('messages.broadcast', size=capsize, capped=True)