mirror of https://github.com/celery/kombu.git
Fixed MongoDB broadcast cursor re-initialization. Closes celery/celery#971
This commit is contained in:
parent
874d5e6dea
commit
0136555208
|
@ -34,9 +34,9 @@ class BroadcastCursor(object):
|
|||
"""Cursor for broadcast queues."""
|
||||
|
||||
def __init__(self, cursor):
|
||||
# Fast forward the cursor past old events
|
||||
self._cursor = cursor.skip(cursor.count())
|
||||
self._offset = cursor.count()
|
||||
self._cursor = cursor
|
||||
|
||||
self.purge(rewind=False)
|
||||
|
||||
def get_size(self):
|
||||
return self._cursor.count() - self._offset
|
||||
|
@ -44,15 +44,33 @@ class BroadcastCursor(object):
|
|||
def close(self):
|
||||
self._cursor.close()
|
||||
|
||||
def purge(self):
|
||||
self._cursor.rewind()
|
||||
self._cursor = self._cursor.skip(self._cursor.count())
|
||||
def purge(self, rewind=True):
|
||||
if rewind:
|
||||
self._cursor.rewind()
|
||||
|
||||
# Fast forward the cursor past old events
|
||||
self._offset = self._cursor.count()
|
||||
self._cursor = self._cursor.skip(self._offset)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
msg = next(self._cursor)
|
||||
while True:
|
||||
try:
|
||||
msg = next(self._cursor)
|
||||
except pymongo.errors.OperationFailure, e:
|
||||
# In some cases tailed cursor can become invalid
|
||||
# and have to be reinitalized
|
||||
if 'not valid at server' in e.message:
|
||||
self.purge()
|
||||
|
||||
continue
|
||||
|
||||
raise
|
||||
else:
|
||||
break
|
||||
|
||||
self._offset += 1
|
||||
|
||||
return msg
|
||||
|
|
Loading…
Reference in New Issue