mirror of https://github.com/celery/kombu.git
Virtual QoS: Removing tags from _delivered must be done by the appending thread
This commit is contained in:
parent
40cc82871f
commit
835ce39840
|
@ -64,6 +64,9 @@ class QoS(object):
|
||||||
#: :class:`~collections.OrderedDict` of active messages.
|
#: :class:`~collections.OrderedDict` of active messages.
|
||||||
_delivered = None
|
_delivered = None
|
||||||
|
|
||||||
|
#: acked and rejected tags.
|
||||||
|
_dirty = set()
|
||||||
|
|
||||||
def __init__(self, channel, prefetch_count=0):
|
def __init__(self, channel, prefetch_count=0):
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
self.prefetch_count = prefetch_count or 0
|
self.prefetch_count = prefetch_count or 0
|
||||||
|
@ -87,19 +90,30 @@ class QoS(object):
|
||||||
def append(self, message, delivery_tag):
|
def append(self, message, delivery_tag):
|
||||||
"""Append message to transactional state."""
|
"""Append message to transactional state."""
|
||||||
self._delivered[delivery_tag] = message
|
self._delivered[delivery_tag] = message
|
||||||
|
if self._dirty:
|
||||||
|
self._flush()
|
||||||
|
|
||||||
|
def _flush(self):
|
||||||
|
while 1:
|
||||||
|
try:
|
||||||
|
dirty_tag = self._dirty.pop()
|
||||||
|
except KeyError:
|
||||||
|
break
|
||||||
|
self._delivered.pop(dirty_tag, None)
|
||||||
|
|
||||||
def ack(self, delivery_tag):
|
def ack(self, delivery_tag):
|
||||||
"""Acknowledge message and remove from transactional state."""
|
"""Acknowledge message and remove from transactional state."""
|
||||||
self._delivered.pop(delivery_tag, None)
|
self._dirty.add(delivery_tag)
|
||||||
|
|
||||||
def reject(self, delivery_tag, requeue=False):
|
def reject(self, delivery_tag, requeue=False):
|
||||||
"""Remove from transactional state and requeue message."""
|
"""Remove from transactional state and requeue message."""
|
||||||
message = self._delivered.pop(delivery_tag)
|
message = self._dirty.add(delivery_tag)
|
||||||
if requeue:
|
if requeue:
|
||||||
self.channel._restore(message)
|
self.channel._restore(message)
|
||||||
|
|
||||||
def restore_unacked(self):
|
def restore_unacked(self):
|
||||||
"""Restore all unacknowledged messages."""
|
"""Restore all unacknowledged messages."""
|
||||||
|
self._flush()
|
||||||
delivered = self._delivered
|
delivered = self._delivered
|
||||||
errors = []
|
errors = []
|
||||||
|
|
||||||
|
@ -123,6 +137,7 @@ class QoS(object):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self._on_collect.cancel()
|
self._on_collect.cancel()
|
||||||
|
self._flush()
|
||||||
state = self._delivered
|
state = self._delivered
|
||||||
|
|
||||||
if not self.channel.do_restore or getattr(state, "restored"):
|
if not self.channel.do_restore or getattr(state, "restored"):
|
||||||
|
|
Loading…
Reference in New Issue