diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index 2453d60e..9d08af16 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -100,7 +100,8 @@ class MultiChannelPoller(object): for fileno, event in events: if event & eventio.POLL_READ: chan, type = self._fd_to_chan[fileno] - return chan.handlers[type](), self + if chan.qos.can_consume(): + return chan.handlers[type](), self elif event & eventio.POLL_HUP: chan, type = self._fd_to_chan[fileno] chan._poll_error(type) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 37a7dc44..4c59f750 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -89,7 +89,7 @@ class QoS(object): """ pcount = self.prefetch_count - return (not pcount or len(self._delivered) - len(self._dirty) < pcount) + return not pcount or len(self._delivered) - len(self._dirty) < pcount def append(self, message, delivery_tag): """Append message to transactional state."""