From 3e54fc47dfe81d5beb346016d63b635420e78ba4 Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Thu, 13 Feb 2014 12:56:25 -0500 Subject: [PATCH] Message ACKing now works, and drain_events should work properly now --- AUTHORS | 2 +- kombu/transport/qpid.py | 186 ++++++++++++++++------------------------ 2 files changed, 74 insertions(+), 114 deletions(-) diff --git a/AUTHORS b/AUTHORS index 95af8369..f7aea2ff 100644 --- a/AUTHORS +++ b/AUTHORS @@ -17,7 +17,7 @@ Ask Solem Basil Mironenko Bobby Beever Brian Bernstein -Brian Bouterse +Brian Bouterse C Anthony Risinger Christophe Chauvet Christopher Grebs diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index fa25a8cd..31e20ec3 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -15,11 +15,13 @@ import uuid import base64 import threading import Queue +from time import clock from itertools import count from multiprocessing.util import Finalize from kombu.transport import virtual +from kombu.five import Empty from kombu.utils import kwdict from kombu.utils.compat import OrderedDict from kombu.utils.encoding import str_to_bytes, bytes_to_str @@ -81,29 +83,13 @@ class QoS(object): #: :class:`~collections.OrderedDict` of active messages. #: *NOTE*: Can only be modified by the consuming thread. - _delivered = None - - #: acks can be done by other threads than the consuming thread. - #: Instead of a mutex, which doesn't perform well here, we mark - #: the delivery tags as dirty, so subsequent calls to append() can remove - #: them. - _dirty = None - - #: If disabled, unacked messages won't be restored at shutdown. - restore_at_shutdown = True + _not_yet_acked = None def __init__(self, channel, prefetch_count=0): self.channel = channel self.prefetch_count = prefetch_count or 0 - - self._delivered = OrderedDict() - self._delivered.restored = False - self._dirty = set() - self._quick_ack = self._dirty.add - self._quick_append = self._delivered.__setitem__ - self._on_collect = Finalize( - self, self.restore_unacked_once, exitpriority=1, - ) + self._not_yet_acked = OrderedDict() + self._qpid_session = self.channel.connection.fd_shim._qpid_session def can_consume(self): """Return true if the channel can be consumed from. @@ -113,7 +99,8 @@ class QoS(object): """ pcount = self.prefetch_count - return not pcount or len(self._delivered) - len(self._dirty) < pcount + return True + return not pcount or len(self._not_yet_acked) - len(self._dirty) < pcount def can_consume_max_estimate(self): """Returns the maximum number of messages allowed to be returned. @@ -129,7 +116,7 @@ class QoS(object): pcount = self.prefetch_count count = None if pcount: - count = pcount - (len(self._delivered) - len(self._dirty)) + count = pcount - (len(self._not_yet_acked) - len(self._dirty)) if count < 1: return 1 @@ -138,92 +125,19 @@ class QoS(object): def append(self, message, delivery_tag): """Append message to transactional state.""" - if self._dirty: - self._flush() - self._quick_append(delivery_tag, message) + self._not_yet_acked[delivery_tag] = message def get(self, delivery_tag): - return self._delivered[delivery_tag] - - def _flush(self): - """Flush dirty (acked/rejected) tags from.""" - dirty = self._dirty - delivered = self._delivered - while 1: - try: - dirty_tag = dirty.pop() - except KeyError: - break - delivered.pop(dirty_tag, None) + return self._not_yet_acked[delivery_tag] def ack(self, delivery_tag): """Acknowledge message and remove from transactional state.""" - self._quick_ack(delivery_tag) + message = self._not_yet_acked[delivery_tag] + self._qpid_session.acknowledge(message=message) def reject(self, delivery_tag, requeue=False): """Remove from transactional state and requeue message.""" - if requeue: - self.channel._restore_at_beginning(self._delivered[delivery_tag]) - self._quick_ack(delivery_tag) - - def restore_unacked(self): - """Restore all unacknowledged messages.""" - self._flush() - delivered = self._delivered - errors = [] - restore = self.channel._restore - pop_message = delivered.popitem - - while delivered: - try: - _, message = pop_message() - except KeyError: # pragma: no cover - break - - try: - restore(message) - except BaseException as exc: - errors.append((exc, message)) - delivered.clear() - return errors - - def restore_unacked_once(self): - """Restores all unacknowledged messages at shutdown/gc collect. - - Will only be done once for each instance. - - """ - self._on_collect.cancel() - self._flush() - state = self._delivered - - if not self.restore_at_shutdown or not self.channel.do_restore: - return - if getattr(state, 'restored', None): - assert not state - return - try: - if state: - say('Restoring {0!r} unacknowledged message(s).', - len(self._delivered)) - unrestored = self.restore_unacked() - - if unrestored: - errors, messages = list(zip(*unrestored)) - say('UNABLE TO RESTORE {0} MESSAGES: {1}', - len(errors), errors) - emergency_dump_state(messages) - finally: - state.restored = True - - def restore_visible(self, *args, **kwargs): - """Restore any pending unackwnowledged messages for visibility_timeout - style implementations. - - Optional: Currently only used by the Redis transport. - - """ - pass + self._not_yet_acked.pop(delivery_tag) class Message(base.Message): @@ -286,6 +200,8 @@ class Channel(base.StdChannel): self._qpid_session = qpid_publish_connection.session() self._broker = BrokerAgent(qpid_qmf_connection) self._qos = None + self._consumers = set() + self.closed = False def _get(self, queue): raise NotImplementedError('_get Not Implemented') @@ -365,6 +281,10 @@ class Channel(base.StdChannel): def basic_get(self, queue, *args, **kwargs): raise NotImplementedError('basic_get Not Implemented') + def basic_ack(self, delivery_tag): + """Acknowledge message.""" + self.qos.ack(delivery_tag) + def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): self._tag_to_queue[consumer_tag] = queue @@ -372,14 +292,30 @@ class Channel(base.StdChannel): raw_message = qpid_message.content message = self.Message(self, raw_message) if not no_ack: - self.qos.append(message, message.delivery_tag) + delivery_tag = message.delivery_tag + self.qos.append(qpid_message, delivery_tag) return callback(message) self.connection._callbacks[queue] = _callback + self._consumers.add(consumer_tag) self.connection.fd_shim.signaling_queue.put(['sub', queue]) def basic_cancel(self, consumer_tag): - raise NotImplementedError('basic_cancel Not Implemented') + """Cancel consumer by consumer tag.""" + if consumer_tag in self._consumers: + self._consumers.remove(consumer_tag) + queue = self._tag_to_queue.pop(consumer_tag, None) + self.connection._callbacks.pop(queue, None) + + def close(self): + """Close channel, cancel all consumers, and requeue unacked + messages.""" + if not self.closed: + self.closed = True + for consumer in list(self._consumers): + self.basic_cancel(consumer) + if self.connection is not None: + self.connection.close_channel(self) @property def qos(self): @@ -512,8 +448,7 @@ class FDShim(object): while True: message = self.recv() self.queue_from_fdshim.put(message) - #message_contents = message.content['body'] - os.write(self._w, 'readyy') + os.write(self._w, 'ready') class Transport(base.Transport): @@ -538,14 +473,13 @@ class Transport(base.Transport): self._avail_channels = [] self._callbacks = {} self.fd_shim = None - self.queue_from_fdshim = None - - def register_with_event_loop(self, connection, loop): self.queue_from_fdshim = Queue.Queue() - self.fd_shim = FDShim(connection, self.queue_from_fdshim) + self.fd_shim = FDShim(self, self.queue_from_fdshim) fdshim_thread = threading.Thread(target=self.fd_shim.listen) fdshim_thread.daemon = True fdshim_thread.start() + + def register_with_event_loop(self, connection, loop): loop.add_reader(self.fd_shim.r, self.on_readable, connection, loop) def establish_connection(self): @@ -555,6 +489,30 @@ class Transport(base.Transport): self._avail_channels.append(self.create_channel(self)) return self # for drain events + def close_connection(self, connection): + for l in self._avail_channels, self.channels: + while l: + try: + channel = l.pop() + except (IndexError, KeyError): # pragma: no cover + pass + else: + channel.close() + + def drain_events(self, connection, timeout=0, **kwargs): + start_time = clock() + elapsed_time = 0 + while elapsed_time < timeout: + try: + message = self.queue_from_fdshim.get(block=True, timeout=timeout) + except Queue.Empty: + pass + else: + queue = message.subject + self._callbacks[queue](message) + elapsed_time = clock() - start_time + raise Empty() + def create_channel(self, connection): try: return self._avail_channels.pop() @@ -563,11 +521,13 @@ class Transport(base.Transport): self.channels.append(channel) return channel - def on_readable(self, connection, loop): + def close_channel(self, channel): try: - message = self.queue_from_fdshim.get(False) - except Queue.Empty: + self.channels.remove(channel) + except ValueError: pass - else: - queue = message.subject - self._callbacks[queue](message) \ No newline at end of file + finally: + channel.connection = None + + def on_readable(self, connection, loop): + self.drain_events(connection) \ No newline at end of file