diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 8e1ff9c4..d3eff2a0 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -64,6 +64,9 @@ class QoS(object): #: :class:`~collections.OrderedDict` of active messages. _delivered = None + #: acked and rejected tags. + _dirty = set() + def __init__(self, channel, prefetch_count=0): self.channel = channel self.prefetch_count = prefetch_count or 0 @@ -87,19 +90,30 @@ class QoS(object): def append(self, message, delivery_tag): """Append message to transactional state.""" self._delivered[delivery_tag] = message + if self._dirty: + self._flush() + + def _flush(self): + while 1: + try: + dirty_tag = self._dirty.pop() + except KeyError: + break + self._delivered.pop(dirty_tag, None) def ack(self, delivery_tag): """Acknowledge message and remove from transactional state.""" - self._delivered.pop(delivery_tag, None) + self._dirty.add(delivery_tag) def reject(self, delivery_tag, requeue=False): """Remove from transactional state and requeue message.""" - message = self._delivered.pop(delivery_tag) + message = self._dirty.add(delivery_tag) if requeue: self.channel._restore(message) def restore_unacked(self): """Restore all unacknowledged messages.""" + self._flush() delivered = self._delivered errors = [] @@ -123,6 +137,7 @@ class QoS(object): """ self._on_collect.cancel() + self._flush() state = self._delivered if not self.channel.do_restore or getattr(state, "restored"):