From 94c21a849f6044eccb1585623912f72a437259a1 Mon Sep 17 00:00:00 2001 From: Flavio Percoco Premoli Date: Wed, 13 Apr 2011 21:21:50 +0200 Subject: [PATCH 1/2] Added sort by _id in order to support FIFO queues... fixes #34 --- kombu/transport/mongodb.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index e06428de..8c14b5bb 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -11,8 +11,9 @@ MongoDB transport. """ from Queue import Empty -from anyjson import serialize, deserialize +import pymongo from pymongo import errors +from anyjson import serialize, deserialize from pymongo.connection import Connection from kombu.transport import virtual @@ -32,7 +33,7 @@ class Channel(virtual.Channel): def _get(self, queue): try: msg = self.client.database.command("findandmodify", - "messages", query={"queue": queue}, remove=True) + "messages", query={"queue": queue}, sort={'_id' : pymongo.ASCENDING }, remove=True) except errors.OperationFailure, exc: if "No matching object found" in exc.args[0]: raise Empty() @@ -51,10 +52,12 @@ class Channel(virtual.Channel): return size def close(self): - super(Channel, self).close() self.client.database.connection.end_request() - + super(Channel, self).close() + def _open(self): + print self.connection + print self.connection.client conninfo = self.connection.client mongoconn = Connection(host=conninfo.hostname, port=conninfo.port) dbname = conninfo.virtual_host From 90c2934e0abcc336f300cf834e3130ab8cce8280 Mon Sep 17 00:00:00 2001 From: Flavio Percoco Premoli Date: Wed, 13 Apr 2011 21:23:46 +0200 Subject: [PATCH 2/2] Removed prints. closes #34 --- kombu/transport/mongodb.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 8c14b5bb..2c49c802 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -56,8 +56,6 @@ class Channel(virtual.Channel): super(Channel, self).close() def _open(self): - print self.connection - print self.connection.client conninfo = self.connection.client mongoconn = Connection(host=conninfo.hostname, port=conninfo.port) dbname = conninfo.virtual_host