mirror of https://github.com/celery/kombu.git
Instantiating a Consumer does not declare consumer. Use Consumer.consume()
This commit is contained in:
parent
cc08721c01
commit
d380f7cea8
|
@ -36,6 +36,10 @@ Proposed API::
|
||||||
image_binding = Binding("image", exchange=media_exchange, key="image")
|
image_binding = Binding("image", exchange=media_exchange, key="image")
|
||||||
|
|
||||||
consumer = Consumer(channel, [video_binding, image_binding])
|
consumer = Consumer(channel, [video_binding, image_binding])
|
||||||
|
consumer.consume()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
connection.drain_events()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -75,6 +75,7 @@ class Consumer(object):
|
||||||
callbacks = None
|
callbacks = None
|
||||||
on_decode_error = None
|
on_decode_error = None
|
||||||
_next_tag = count(1).next # global
|
_next_tag = count(1).next # global
|
||||||
|
_consuming = False
|
||||||
|
|
||||||
def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
|
def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
|
||||||
callbacks=None):
|
callbacks=None):
|
||||||
|
@ -96,7 +97,6 @@ class Consumer(object):
|
||||||
|
|
||||||
if self.auto_declare:
|
if self.auto_declare:
|
||||||
self.declare()
|
self.declare()
|
||||||
self.consume()
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
|
@ -109,6 +109,7 @@ class Consumer(object):
|
||||||
binding.declare()
|
binding.declare()
|
||||||
|
|
||||||
def consume(self):
|
def consume(self):
|
||||||
|
if not self._consuming:
|
||||||
H, T = self.bindings[:-1], self.bindings[-1]
|
H, T = self.bindings[:-1], self.bindings[-1]
|
||||||
for binding in H:
|
for binding in H:
|
||||||
binding.consume(self._add_tag(binding),
|
binding.consume(self._add_tag(binding),
|
||||||
|
@ -119,6 +120,7 @@ class Consumer(object):
|
||||||
self._receive_callback,
|
self._receive_callback,
|
||||||
self.no_ack,
|
self.no_ack,
|
||||||
nowait=False)
|
nowait=False)
|
||||||
|
self._consuming = False
|
||||||
|
|
||||||
def _add_tag(self, binding):
|
def _add_tag(self, binding):
|
||||||
tag = self._active_tags[binding] = str(self._next_tag())
|
tag = self._active_tags[binding] = str(self._next_tag())
|
||||||
|
@ -150,7 +152,8 @@ class Consumer(object):
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
for binding, tag in self._active_tags.items():
|
for binding, tag in self._active_tags.items():
|
||||||
binding.cancel(tag)
|
binding.cancel(tag)
|
||||||
self._active_tags = {}
|
self._active_tags.clear()
|
||||||
|
self._consuming = False
|
||||||
|
|
||||||
def flow(self, active):
|
def flow(self, active):
|
||||||
self.channel.flow(active)
|
self.channel.flow(active)
|
||||||
|
|
Loading…
Reference in New Issue