mirror of https://github.com/celery/kombu.git
Merge remote-tracking branch 'upstream/master' into brian-qpid-transport
This commit is contained in:
commit
39dfd0a81b
|
@ -34,9 +34,9 @@ class BroadcastCursor(object):
|
||||||
"""Cursor for broadcast queues."""
|
"""Cursor for broadcast queues."""
|
||||||
|
|
||||||
def __init__(self, cursor):
|
def __init__(self, cursor):
|
||||||
# Fast forward the cursor past old events
|
self._cursor = cursor
|
||||||
self._cursor = cursor.skip(cursor.count())
|
|
||||||
self._offset = cursor.count()
|
self.purge(rewind=False)
|
||||||
|
|
||||||
def get_size(self):
|
def get_size(self):
|
||||||
return self._cursor.count() - self._offset
|
return self._cursor.count() - self._offset
|
||||||
|
@ -44,15 +44,33 @@ class BroadcastCursor(object):
|
||||||
def close(self):
|
def close(self):
|
||||||
self._cursor.close()
|
self._cursor.close()
|
||||||
|
|
||||||
def purge(self):
|
def purge(self, rewind=True):
|
||||||
self._cursor.rewind()
|
if rewind:
|
||||||
self._cursor = self._cursor.skip(self._cursor.count())
|
self._cursor.rewind()
|
||||||
|
|
||||||
|
# Fast forward the cursor past old events
|
||||||
|
self._offset = self._cursor.count()
|
||||||
|
self._cursor = self._cursor.skip(self._offset)
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def next(self):
|
def next(self):
|
||||||
msg = next(self._cursor)
|
while True:
|
||||||
|
try:
|
||||||
|
msg = next(self._cursor)
|
||||||
|
except pymongo.errors.OperationFailure, e:
|
||||||
|
# In some cases tailed cursor can become invalid
|
||||||
|
# and have to be reinitalized
|
||||||
|
if 'not valid at server' in e.message:
|
||||||
|
self.purge()
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
self._offset += 1
|
self._offset += 1
|
||||||
|
|
||||||
return msg
|
return msg
|
||||||
|
|
Loading…
Reference in New Issue