mirror of https://github.com/celery/kombu.git
Fanout support for MongoDB transport
This commit is contained in:
parent
75d781ef61
commit
9a621e95eb
|
@ -22,24 +22,40 @@ from . import virtual
|
|||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 27017
|
||||
|
||||
__author__ = "Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>"
|
||||
__author__ = "Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>; Scott Lyons <scottalyons@gmail.com>"
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
_client = None
|
||||
supports_fanout=True
|
||||
_fanout_queues = {}
|
||||
|
||||
def __init__(self, *vargs, **kwargs):
|
||||
super_ = super(Channel, self)
|
||||
super_.__init__(*vargs, **kwargs)
|
||||
|
||||
self._queue_cursors = {}
|
||||
|
||||
|
||||
def _new_queue(self, queue, **kwargs):
|
||||
pass
|
||||
|
||||
def _get(self, queue):
|
||||
try:
|
||||
msg = self.client.database.command("findandmodify", "messages",
|
||||
if queue in self._fanout_queues:
|
||||
msg = self._queue_cursors[queue].next()
|
||||
return loads(msg["payload"])
|
||||
else:
|
||||
msg = self.client.database.command("findandmodify", "messages",
|
||||
query={"queue": queue},
|
||||
sort={"_id": pymongo.ASCENDING}, remove=True)
|
||||
except errors.OperationFailure, exc:
|
||||
if "No matching object found" in exc.args[0]:
|
||||
raise Empty()
|
||||
raise
|
||||
except StopIteration:
|
||||
raise Empty()
|
||||
|
||||
# as of mongo 2.0 empty results won't raise an error
|
||||
if msg['value'] is None:
|
||||
raise Empty()
|
||||
|
@ -75,9 +91,54 @@ class Channel(virtual.Channel):
|
|||
database = getattr(mongoconn, dbname)
|
||||
if conninfo.userid:
|
||||
database.authenticate(conninfo.userid, conninfo.password)
|
||||
|
||||
self.db = database
|
||||
col = database.messages
|
||||
col.ensure_index([("queue", 1)])
|
||||
|
||||
if "messages.broadcast" not in database.collection_names():
|
||||
capsize = conninfo.capped_queue_size or 10000000
|
||||
database.create_collection("messages.broadcast", size=capsize, capped=True)
|
||||
|
||||
self.bcast = getattr(database, "messages.broadcast")
|
||||
self.bcast.ensure_index([("queue", 1)])
|
||||
|
||||
self.routing = getattr(database, "messages.routing")
|
||||
self.routing.ensure_index([("queue", 1), ("exchange", 1)])
|
||||
return col
|
||||
|
||||
def get_table(self, exchange):
|
||||
"""Get table of bindings for `exchange`."""
|
||||
brokerRoutes = self.routing.find({"exchange":exchange})
|
||||
|
||||
localRoutes = self.state.exchanges[exchange]["table"]
|
||||
for route in brokerRoutes:
|
||||
localRoutes.append((route["routing_key"], route["pattern"], route["queue"]))
|
||||
return set(localRoutes)
|
||||
|
||||
def _put_fanout(self, exchange, message, **kwargs):
|
||||
"""Deliver fanout message."""
|
||||
self.bcast.insert({"payload": serialize(message), "queue": exchange})
|
||||
|
||||
def _queue_bind(self, exchange, routing_key, pattern, queue):
|
||||
if self.typeof(exchange).type == "fanout":
|
||||
cursor = self.bcast.find(query={"queue":exchange}, sort=[("$natural", 1)], tailable=True)
|
||||
# Fast forward the cursor past old events
|
||||
self._queue_cursors[queue] = cursor.skip(cursor.count())
|
||||
self._fanout_queues[queue] = exchange
|
||||
|
||||
meta = dict(exchange=exchange, queue=queue, routing_key=routing_key, pattern=pattern)
|
||||
self.routing.update(meta, meta, upsert=True)
|
||||
|
||||
|
||||
def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs):
|
||||
self.routing.remove({"queue":queue})
|
||||
super(Channel, self).queue_delete(queue, if_unusued, if_empty, **kwargs)
|
||||
if queue in self._fanout_queues:
|
||||
self._queue_cursors[queue].close()
|
||||
del self._queue_cursors[queue]
|
||||
del self._fanout_queues[queue]
|
||||
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
|
|
Loading…
Reference in New Issue