From 7f84c4dd513a21e4749b174d64c62c54ad3a2aeb Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 14 Jan 2014 14:22:12 +0000 Subject: [PATCH] Redis: Make sure basic_cancel is not called while reading from socket. Closes celery/celery#1773 --- kombu/transport/redis.py | 57 +++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 7a71ca0b..1e7a7f7f 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -217,6 +217,12 @@ class QoS(virtual.QoS): class MultiChannelPoller(object): eventflags = READ | ERR + #: Set by :meth:`get` while reading from the socket. + _in_protected_read = False + + #: Set of one-shot callbacks to call after reading from socket. + after_read = None + def __init__(self): # active channels self._channels = set() @@ -226,6 +232,8 @@ class MultiChannelPoller(object): self._chan_to_sock = {} # poll implementation (epoll/kqueue/select) self.poller = poll() + # one-shot callbacks called after reading from socket. + self.after_read = set() def close(self): for fd in values(self._chan_to_sock): @@ -312,24 +320,35 @@ class MultiChannelPoller(object): chan._poll_error(type) def get(self, timeout=None): - for channel in self._channels: - if channel.active_queues: # BRPOP mode? - if channel.qos.can_consume(): - self._register_BRPOP(channel) - if channel.active_fanout_queues: # LISTEN mode? - self._register_LISTEN(channel) + self._in_protected_read = True + try: + for channel in self._channels: + if channel.active_queues: # BRPOP mode? + if channel.qos.can_consume(): + self._register_BRPOP(channel) + if channel.active_fanout_queues: # LISTEN mode? + self._register_LISTEN(channel) - events = self.poller.poll(timeout) - for fileno, event in events or []: - ret = self.handle_event(fileno, event) - if ret: - return ret + events = self.poller.poll(timeout) + for fileno, event in events or []: + ret = self.handle_event(fileno, event) + if ret: + return ret - # - no new data, so try to restore messages. - # - reset active redis commands. - self.maybe_restore_messages() + # - no new data, so try to restore messages. + # - reset active redis commands. + self.maybe_restore_messages() - raise Empty() + raise Empty() + finally: + self._in_protected_read = False + while self.after_read: + try: + fun = self.after_read.pop() + except KeyError: + break + else: + fun() @property def fds(self): @@ -465,6 +484,14 @@ class Channel(virtual.Channel): return ret def basic_cancel(self, consumer_tag): + # If we are busy reading messages we may experience + # a race condition where a message is consumed after + # cancelling, so we must delay this operation until reading + # is complete (Issue celery/celery#1773). + if self.connection.cycle._in_protected_read: + return self.connection.cycle.after_read.add( + promise(self.basic_cancel, (consumer_tag, )), + ) try: queue = self._tag_to_queue[consumer_tag] except KeyError: