From 4cfe6798cb24c257615ae753728b3179d0c7e059 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 8 Nov 2010 13:35:11 +0100 Subject: [PATCH] virtual: Prefetch count was totally broken, because of a logic error --- kombu/transport/virtual/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 7c7178dc..3b826e57 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -79,7 +79,7 @@ class QoS(object): """ pcount = self.prefetch_count - return (not pcount or len(self._delivered) > pcount) + return (not pcount or len(self._delivered) < pcount) def append(self, message, delivery_tag): """Append message to transactional state.""" @@ -236,6 +236,7 @@ class Channel(AbstractChannel): self._consumers = set() self._tag_to_queue = {} self._qos = None + self.closed = False # instantiate exchange types self.exchange_types = dict((typ, cls(self)) @@ -429,6 +430,7 @@ class Channel(AbstractChannel): def close(self): """Close channel, cancel all consumers, and requeue unacked messages.""" + self.closed = True map(self.basic_cancel, list(self._consumers)) self.qos.restore_unacked_once() self.connection.close_channel(self)