mirror of https://github.com/celery/kombu.git
Redis: Make sure basic_cancel is not called while reading from socket. Closes celery/celery#1773
This commit is contained in:
parent
26bf9faccb
commit
7f84c4dd51
|
@ -217,6 +217,12 @@ class QoS(virtual.QoS):
|
|||
class MultiChannelPoller(object):
|
||||
eventflags = READ | ERR
|
||||
|
||||
#: Set by :meth:`get` while reading from the socket.
|
||||
_in_protected_read = False
|
||||
|
||||
#: Set of one-shot callbacks to call after reading from socket.
|
||||
after_read = None
|
||||
|
||||
def __init__(self):
|
||||
# active channels
|
||||
self._channels = set()
|
||||
|
@ -226,6 +232,8 @@ class MultiChannelPoller(object):
|
|||
self._chan_to_sock = {}
|
||||
# poll implementation (epoll/kqueue/select)
|
||||
self.poller = poll()
|
||||
# one-shot callbacks called after reading from socket.
|
||||
self.after_read = set()
|
||||
|
||||
def close(self):
|
||||
for fd in values(self._chan_to_sock):
|
||||
|
@ -312,24 +320,35 @@ class MultiChannelPoller(object):
|
|||
chan._poll_error(type)
|
||||
|
||||
def get(self, timeout=None):
|
||||
for channel in self._channels:
|
||||
if channel.active_queues: # BRPOP mode?
|
||||
if channel.qos.can_consume():
|
||||
self._register_BRPOP(channel)
|
||||
if channel.active_fanout_queues: # LISTEN mode?
|
||||
self._register_LISTEN(channel)
|
||||
self._in_protected_read = True
|
||||
try:
|
||||
for channel in self._channels:
|
||||
if channel.active_queues: # BRPOP mode?
|
||||
if channel.qos.can_consume():
|
||||
self._register_BRPOP(channel)
|
||||
if channel.active_fanout_queues: # LISTEN mode?
|
||||
self._register_LISTEN(channel)
|
||||
|
||||
events = self.poller.poll(timeout)
|
||||
for fileno, event in events or []:
|
||||
ret = self.handle_event(fileno, event)
|
||||
if ret:
|
||||
return ret
|
||||
events = self.poller.poll(timeout)
|
||||
for fileno, event in events or []:
|
||||
ret = self.handle_event(fileno, event)
|
||||
if ret:
|
||||
return ret
|
||||
|
||||
# - no new data, so try to restore messages.
|
||||
# - reset active redis commands.
|
||||
self.maybe_restore_messages()
|
||||
# - no new data, so try to restore messages.
|
||||
# - reset active redis commands.
|
||||
self.maybe_restore_messages()
|
||||
|
||||
raise Empty()
|
||||
raise Empty()
|
||||
finally:
|
||||
self._in_protected_read = False
|
||||
while self.after_read:
|
||||
try:
|
||||
fun = self.after_read.pop()
|
||||
except KeyError:
|
||||
break
|
||||
else:
|
||||
fun()
|
||||
|
||||
@property
|
||||
def fds(self):
|
||||
|
@ -465,6 +484,14 @@ class Channel(virtual.Channel):
|
|||
return ret
|
||||
|
||||
def basic_cancel(self, consumer_tag):
|
||||
# If we are busy reading messages we may experience
|
||||
# a race condition where a message is consumed after
|
||||
# cancelling, so we must delay this operation until reading
|
||||
# is complete (Issue celery/celery#1773).
|
||||
if self.connection.cycle._in_protected_read:
|
||||
return self.connection.cycle.after_read.add(
|
||||
promise(self.basic_cancel, (consumer_tag, )),
|
||||
)
|
||||
try:
|
||||
queue = self._tag_to_queue[consumer_tag]
|
||||
except KeyError:
|
||||
|
|
Loading…
Reference in New Issue