diff --git a/README.rst b/README.rst index 6cef2c9f..3e6766d1 100644 --- a/README.rst +++ b/README.rst @@ -36,6 +36,10 @@ Proposed API:: image_binding = Binding("image", exchange=media_exchange, key="image") consumer = Consumer(channel, [video_binding, image_binding]) + consumer.consume() + + while True: + connection.drain_events() diff --git a/kombu/messaging.py b/kombu/messaging.py index 2cc3c129..b3af942d 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -75,6 +75,7 @@ class Consumer(object): callbacks = None on_decode_error = None _next_tag = count(1).next # global + _consuming = False def __init__(self, channel, bindings, no_ack=None, auto_declare=None, callbacks=None): @@ -96,7 +97,6 @@ class Consumer(object): if self.auto_declare: self.declare() - self.consume() def __enter__(self): return self @@ -109,16 +109,18 @@ class Consumer(object): binding.declare() def consume(self): - H, T = self.bindings[:-1], self.bindings[-1] - for binding in H: - binding.consume(self._add_tag(binding), - self._receive_callback, - self.no_ack, - nowait=True) - T.consume(self._add_tag(T), - self._receive_callback, - self.no_ack, - nowait=False) + if not self._consuming: + H, T = self.bindings[:-1], self.bindings[-1] + for binding in H: + binding.consume(self._add_tag(binding), + self._receive_callback, + self.no_ack, + nowait=True) + T.consume(self._add_tag(T), + self._receive_callback, + self.no_ack, + nowait=False) + self._consuming = False def _add_tag(self, binding): tag = self._active_tags[binding] = str(self._next_tag()) @@ -150,7 +152,8 @@ class Consumer(object): def cancel(self): for binding, tag in self._active_tags.items(): binding.cancel(tag) - self._active_tags = {} + self._active_tags.clear() + self._consuming = False def flow(self, active): self.channel.flow(active)