Message ACKing now works, and drain_events should work properly now

This commit is contained in:
Brian Bouterse 2014-02-13 12:56:25 -05:00
parent d700e464de
commit 3e54fc47df
2 changed files with 74 additions and 114 deletions

View File

@ -17,7 +17,7 @@ Ask Solem <ask@celeryproject.org>
Basil Mironenko <bmironenko@ddn.com>
Bobby Beever <bobby.beever@yahoo.com>
Brian Bernstein
Brian Bouterse <bbouters@redhat.com>
Brian Bouterse <bmbouter@redhat.com>
C Anthony Risinger <anthony+corvisa.com@xtfx.me>
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>

View File

@ -15,11 +15,13 @@ import uuid
import base64
import threading
import Queue
from time import clock
from itertools import count
from multiprocessing.util import Finalize
from kombu.transport import virtual
from kombu.five import Empty
from kombu.utils import kwdict
from kombu.utils.compat import OrderedDict
from kombu.utils.encoding import str_to_bytes, bytes_to_str
@ -81,29 +83,13 @@ class QoS(object):
#: :class:`~collections.OrderedDict` of active messages.
#: *NOTE*: Can only be modified by the consuming thread.
_delivered = None
#: acks can be done by other threads than the consuming thread.
#: Instead of a mutex, which doesn't perform well here, we mark
#: the delivery tags as dirty, so subsequent calls to append() can remove
#: them.
_dirty = None
#: If disabled, unacked messages won't be restored at shutdown.
restore_at_shutdown = True
_not_yet_acked = None
def __init__(self, channel, prefetch_count=0):
self.channel = channel
self.prefetch_count = prefetch_count or 0
self._delivered = OrderedDict()
self._delivered.restored = False
self._dirty = set()
self._quick_ack = self._dirty.add
self._quick_append = self._delivered.__setitem__
self._on_collect = Finalize(
self, self.restore_unacked_once, exitpriority=1,
)
self._not_yet_acked = OrderedDict()
self._qpid_session = self.channel.connection.fd_shim._qpid_session
def can_consume(self):
"""Return true if the channel can be consumed from.
@ -113,7 +99,8 @@ class QoS(object):
"""
pcount = self.prefetch_count
return not pcount or len(self._delivered) - len(self._dirty) < pcount
return True
return not pcount or len(self._not_yet_acked) - len(self._dirty) < pcount
def can_consume_max_estimate(self):
"""Returns the maximum number of messages allowed to be returned.
@ -129,7 +116,7 @@ class QoS(object):
pcount = self.prefetch_count
count = None
if pcount:
count = pcount - (len(self._delivered) - len(self._dirty))
count = pcount - (len(self._not_yet_acked) - len(self._dirty))
if count < 1:
return 1
@ -138,92 +125,19 @@ class QoS(object):
def append(self, message, delivery_tag):
"""Append message to transactional state."""
if self._dirty:
self._flush()
self._quick_append(delivery_tag, message)
self._not_yet_acked[delivery_tag] = message
def get(self, delivery_tag):
return self._delivered[delivery_tag]
def _flush(self):
"""Flush dirty (acked/rejected) tags from."""
dirty = self._dirty
delivered = self._delivered
while 1:
try:
dirty_tag = dirty.pop()
except KeyError:
break
delivered.pop(dirty_tag, None)
return self._not_yet_acked[delivery_tag]
def ack(self, delivery_tag):
"""Acknowledge message and remove from transactional state."""
self._quick_ack(delivery_tag)
message = self._not_yet_acked[delivery_tag]
self._qpid_session.acknowledge(message=message)
def reject(self, delivery_tag, requeue=False):
"""Remove from transactional state and requeue message."""
if requeue:
self.channel._restore_at_beginning(self._delivered[delivery_tag])
self._quick_ack(delivery_tag)
def restore_unacked(self):
"""Restore all unacknowledged messages."""
self._flush()
delivered = self._delivered
errors = []
restore = self.channel._restore
pop_message = delivered.popitem
while delivered:
try:
_, message = pop_message()
except KeyError: # pragma: no cover
break
try:
restore(message)
except BaseException as exc:
errors.append((exc, message))
delivered.clear()
return errors
def restore_unacked_once(self):
"""Restores all unacknowledged messages at shutdown/gc collect.
Will only be done once for each instance.
"""
self._on_collect.cancel()
self._flush()
state = self._delivered
if not self.restore_at_shutdown or not self.channel.do_restore:
return
if getattr(state, 'restored', None):
assert not state
return
try:
if state:
say('Restoring {0!r} unacknowledged message(s).',
len(self._delivered))
unrestored = self.restore_unacked()
if unrestored:
errors, messages = list(zip(*unrestored))
say('UNABLE TO RESTORE {0} MESSAGES: {1}',
len(errors), errors)
emergency_dump_state(messages)
finally:
state.restored = True
def restore_visible(self, *args, **kwargs):
"""Restore any pending unackwnowledged messages for visibility_timeout
style implementations.
Optional: Currently only used by the Redis transport.
"""
pass
self._not_yet_acked.pop(delivery_tag)
class Message(base.Message):
@ -286,6 +200,8 @@ class Channel(base.StdChannel):
self._qpid_session = qpid_publish_connection.session()
self._broker = BrokerAgent(qpid_qmf_connection)
self._qos = None
self._consumers = set()
self.closed = False
def _get(self, queue):
raise NotImplementedError('_get Not Implemented')
@ -365,6 +281,10 @@ class Channel(base.StdChannel):
def basic_get(self, queue, *args, **kwargs):
raise NotImplementedError('basic_get Not Implemented')
def basic_ack(self, delivery_tag):
"""Acknowledge message."""
self.qos.ack(delivery_tag)
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
self._tag_to_queue[consumer_tag] = queue
@ -372,14 +292,30 @@ class Channel(base.StdChannel):
raw_message = qpid_message.content
message = self.Message(self, raw_message)
if not no_ack:
self.qos.append(message, message.delivery_tag)
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self.connection.fd_shim.signaling_queue.put(['sub', queue])
def basic_cancel(self, consumer_tag):
raise NotImplementedError('basic_cancel Not Implemented')
"""Cancel consumer by consumer tag."""
if consumer_tag in self._consumers:
self._consumers.remove(consumer_tag)
queue = self._tag_to_queue.pop(consumer_tag, None)
self.connection._callbacks.pop(queue, None)
def close(self):
"""Close channel, cancel all consumers, and requeue unacked
messages."""
if not self.closed:
self.closed = True
for consumer in list(self._consumers):
self.basic_cancel(consumer)
if self.connection is not None:
self.connection.close_channel(self)
@property
def qos(self):
@ -512,8 +448,7 @@ class FDShim(object):
while True:
message = self.recv()
self.queue_from_fdshim.put(message)
#message_contents = message.content['body']
os.write(self._w, 'readyy')
os.write(self._w, 'ready')
class Transport(base.Transport):
@ -538,14 +473,13 @@ class Transport(base.Transport):
self._avail_channels = []
self._callbacks = {}
self.fd_shim = None
self.queue_from_fdshim = None
def register_with_event_loop(self, connection, loop):
self.queue_from_fdshim = Queue.Queue()
self.fd_shim = FDShim(connection, self.queue_from_fdshim)
self.fd_shim = FDShim(self, self.queue_from_fdshim)
fdshim_thread = threading.Thread(target=self.fd_shim.listen)
fdshim_thread.daemon = True
fdshim_thread.start()
def register_with_event_loop(self, connection, loop):
loop.add_reader(self.fd_shim.r, self.on_readable, connection, loop)
def establish_connection(self):
@ -555,6 +489,30 @@ class Transport(base.Transport):
self._avail_channels.append(self.create_channel(self))
return self # for drain events
def close_connection(self, connection):
for l in self._avail_channels, self.channels:
while l:
try:
channel = l.pop()
except (IndexError, KeyError): # pragma: no cover
pass
else:
channel.close()
def drain_events(self, connection, timeout=0, **kwargs):
start_time = clock()
elapsed_time = 0
while elapsed_time < timeout:
try:
message = self.queue_from_fdshim.get(block=True, timeout=timeout)
except Queue.Empty:
pass
else:
queue = message.subject
self._callbacks[queue](message)
elapsed_time = clock() - start_time
raise Empty()
def create_channel(self, connection):
try:
return self._avail_channels.pop()
@ -563,11 +521,13 @@ class Transport(base.Transport):
self.channels.append(channel)
return channel
def on_readable(self, connection, loop):
def close_channel(self, channel):
try:
message = self.queue_from_fdshim.get(False)
except Queue.Empty:
self.channels.remove(channel)
except ValueError:
pass
else:
queue = message.subject
self._callbacks[queue](message)
finally:
channel.connection = None
def on_readable(self, connection, loop):
self.drain_events(connection)