mirror of https://github.com/celery/kombu.git
Tests passing
This commit is contained in:
parent
aa0ac8a8e5
commit
e4104694f2
|
@ -234,15 +234,11 @@ class ConsumerSet(messaging.Consumer):
|
||||||
return self.purge()
|
return self.purge()
|
||||||
|
|
||||||
def add_consumer_from_dict(self, queue, **options):
|
def add_consumer_from_dict(self, queue, **options):
|
||||||
queue = entry_to_queue(queue, **options)(self.channel)
|
return self.add_queue(entry_to_queue(queue, **options))
|
||||||
if self.auto_declare:
|
|
||||||
queue.declare()
|
|
||||||
self.queues.append(queue)
|
|
||||||
return queue
|
|
||||||
|
|
||||||
def add_consumer(self, consumer):
|
def add_consumer(self, consumer):
|
||||||
for queue in consumer.queues:
|
for queue in consumer.queues:
|
||||||
self.queues.append(queue(self.channel))
|
self.add_queue(queue)
|
||||||
|
|
||||||
def revive(self, channel):
|
def revive(self, channel):
|
||||||
self.backend = channel
|
self.backend = channel
|
||||||
|
|
|
@ -138,6 +138,9 @@ class Producer(object):
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
self.exchange.revive(channel)
|
self.exchange.revive(channel)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
def _prepare(self, body, serializer=None,
|
def _prepare(self, body, serializer=None,
|
||||||
content_type=None, content_encoding=None, compression=None,
|
content_type=None, content_encoding=None, compression=None,
|
||||||
headers=None):
|
headers=None):
|
||||||
|
@ -261,6 +264,13 @@ class Consumer(object):
|
||||||
def __exit__(self, *exc_info):
|
def __exit__(self, *exc_info):
|
||||||
self.cancel()
|
self.cancel()
|
||||||
|
|
||||||
|
def add_queue(self, queue):
|
||||||
|
queue = queue(self.channel)
|
||||||
|
if self.auto_declare:
|
||||||
|
queue.declare()
|
||||||
|
self.queues.append(queue)
|
||||||
|
return queue
|
||||||
|
|
||||||
def consume(self, no_ack=None):
|
def consume(self, no_ack=None):
|
||||||
"""Register consumer on server.
|
"""Register consumer on server.
|
||||||
|
|
||||||
|
|
|
@ -229,7 +229,7 @@ class AbstractChannel(object):
|
||||||
"""Return the number of messages in `queue` as an :class:`int`."""
|
"""Return the number of messages in `queue` as an :class:`int`."""
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def _delete(self, queue):
|
def _delete(self, queue, *args, **kwargs):
|
||||||
"""Delete `queue`.
|
"""Delete `queue`.
|
||||||
|
|
||||||
This just purges the queue, if you need to do more you can
|
This just purges the queue, if you need to do more you can
|
||||||
|
|
Loading…
Reference in New Issue