mirror of https://github.com/celery/kombu.git
avoid mongodb warning 'database name in URI is being ignored'
This commit is contained in:
parent
7b968eae83
commit
ff7a40e74d
|
@ -15,6 +15,7 @@ from Queue import Empty
|
|||
import pymongo
|
||||
|
||||
from pymongo import errors
|
||||
from pymongo.uri_parser import parse_uri
|
||||
from anyjson import loads, dumps
|
||||
from pymongo.connection import Connection
|
||||
|
||||
|
@ -101,57 +102,42 @@ class Channel(virtual.Channel):
|
|||
See mongodb uri documentation:
|
||||
http://www.mongodb.org/display/DOCS/Connections
|
||||
"""
|
||||
conninfo = self.connection.client
|
||||
client = self.connection.client
|
||||
hostname = client.hostname or DEFAULT_HOST
|
||||
authdb = dbname = client.virtual_host
|
||||
|
||||
dbname = None
|
||||
hostname = None
|
||||
|
||||
if not conninfo.hostname:
|
||||
conninfo.hostname = DEFAULT_HOST
|
||||
|
||||
for part in conninfo.hostname.split('/'):
|
||||
if not hostname:
|
||||
hostname = 'mongodb://' + part
|
||||
continue
|
||||
|
||||
dbname = part
|
||||
if '?' in part:
|
||||
# In case someone is passing options
|
||||
# to the mongodb connection. Right now
|
||||
# it is not permitted by kombu
|
||||
dbname, options = part.split('?')
|
||||
hostname += '/?' + options
|
||||
|
||||
hostname = "%s/%s" % (
|
||||
hostname, dbname in [None, "/"] and "admin" or dbname,
|
||||
)
|
||||
if not dbname or dbname == "/":
|
||||
if dbname in ["/", None]:
|
||||
dbname = "kombu_default"
|
||||
authdb = "admin"
|
||||
|
||||
if not client.userid:
|
||||
hostname = hostname.replace('/' + client.virtual_host, '/')
|
||||
else:
|
||||
hostname = hostname.replace('/' + client.virtual_host,
|
||||
'/' + authdb)
|
||||
|
||||
mongo_uri = 'mongodb://' + hostname
|
||||
# At this point we expect the hostname to be something like
|
||||
# (considering replica set form too):
|
||||
#
|
||||
# mongodb://[username:password@]host1[:port1][,host2[:port2],
|
||||
# ...[,hostN[:portN]]][/[?options]]
|
||||
mongoconn = Connection(host=hostname, ssl=conninfo.ssl)
|
||||
mongoconn = Connection(host=mongo_uri, ssl=client.ssl)
|
||||
database = getattr(mongoconn, dbname)
|
||||
|
||||
version = mongoconn.server_info()['version']
|
||||
if tuple(map(int, version.split('.')[:2])) < (1, 3):
|
||||
raise NotImplementedError(
|
||||
'Kombu requires MongoDB version 1.3+, but connected to %s' % (
|
||||
version, ))
|
||||
|
||||
database = getattr(mongoconn, dbname)
|
||||
|
||||
# This is done by the connection uri
|
||||
# if conninfo.userid:
|
||||
# database.authenticate(conninfo.userid, conninfo.password)
|
||||
self.db = database
|
||||
col = database.messages
|
||||
col.ensure_index([('queue', 1), ('_id', 1)], background=True)
|
||||
|
||||
if 'messages.broadcast' not in database.collection_names():
|
||||
capsize = conninfo.transport_options.get(
|
||||
'capped_queue_size') or 100000
|
||||
capsize = (client.transport_options.get('capped_queue_size')
|
||||
or 100000)
|
||||
database.create_collection('messages.broadcast',
|
||||
size=capsize, capped=True)
|
||||
|
||||
|
|
Loading…
Reference in New Issue