From 0136555208a8b14d516a3e7183dd172dcfb11c71 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 6 Mar 2014 22:46:38 +0400 Subject: [PATCH] Fixed MongoDB broadcast cursor re-initialization. Closes celery/celery#971 --- kombu/transport/mongodb.py | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index bab721f5..e9695e15 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -34,9 +34,9 @@ class BroadcastCursor(object): """Cursor for broadcast queues.""" def __init__(self, cursor): - # Fast forward the cursor past old events - self._cursor = cursor.skip(cursor.count()) - self._offset = cursor.count() + self._cursor = cursor + + self.purge(rewind=False) def get_size(self): return self._cursor.count() - self._offset @@ -44,15 +44,33 @@ class BroadcastCursor(object): def close(self): self._cursor.close() - def purge(self): - self._cursor.rewind() - self._cursor = self._cursor.skip(self._cursor.count()) + def purge(self, rewind=True): + if rewind: + self._cursor.rewind() + + # Fast forward the cursor past old events + self._offset = self._cursor.count() + self._cursor = self._cursor.skip(self._offset) def __iter__(self): return self def next(self): - msg = next(self._cursor) + while True: + try: + msg = next(self._cursor) + except pymongo.errors.OperationFailure, e: + # In some cases tailed cursor can become invalid + # and have to be reinitalized + if 'not valid at server' in e.message: + self.purge() + + continue + + raise + else: + break + self._offset += 1 return msg