mirror of https://github.com/celery/kombu.git
Redis did not respect QoS settings. (ask/celery #355)
This commit is contained in:
parent
2e0169d404
commit
079b4a06ff
|
@ -100,7 +100,8 @@ class MultiChannelPoller(object):
|
||||||
for fileno, event in events:
|
for fileno, event in events:
|
||||||
if event & eventio.POLL_READ:
|
if event & eventio.POLL_READ:
|
||||||
chan, type = self._fd_to_chan[fileno]
|
chan, type = self._fd_to_chan[fileno]
|
||||||
return chan.handlers[type](), self
|
if chan.qos.can_consume():
|
||||||
|
return chan.handlers[type](), self
|
||||||
elif event & eventio.POLL_HUP:
|
elif event & eventio.POLL_HUP:
|
||||||
chan, type = self._fd_to_chan[fileno]
|
chan, type = self._fd_to_chan[fileno]
|
||||||
chan._poll_error(type)
|
chan._poll_error(type)
|
||||||
|
|
|
@ -89,7 +89,7 @@ class QoS(object):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
pcount = self.prefetch_count
|
pcount = self.prefetch_count
|
||||||
return (not pcount or len(self._delivered) - len(self._dirty) < pcount)
|
return not pcount or len(self._delivered) - len(self._dirty) < pcount
|
||||||
|
|
||||||
def append(self, message, delivery_tag):
|
def append(self, message, delivery_tag):
|
||||||
"""Append message to transactional state."""
|
"""Append message to transactional state."""
|
||||||
|
|
Loading…
Reference in New Issue