diff --git a/kombu/compat.py b/kombu/compat.py index 1ca7bb7e..5dd260d8 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -234,15 +234,11 @@ class ConsumerSet(messaging.Consumer): return self.purge() def add_consumer_from_dict(self, queue, **options): - queue = entry_to_queue(queue, **options)(self.channel) - if self.auto_declare: - queue.declare() - self.queues.append(queue) - return queue + return self.add_queue(entry_to_queue(queue, **options)) def add_consumer(self, consumer): for queue in consumer.queues: - self.queues.append(queue(self.channel)) + self.add_queue(queue) def revive(self, channel): self.backend = channel diff --git a/kombu/messaging.py b/kombu/messaging.py index 2fc034ae..b78a720d 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -138,6 +138,9 @@ class Producer(object): self.channel = channel self.exchange.revive(channel) + def close(self): + pass + def _prepare(self, body, serializer=None, content_type=None, content_encoding=None, compression=None, headers=None): @@ -261,6 +264,13 @@ class Consumer(object): def __exit__(self, *exc_info): self.cancel() + def add_queue(self, queue): + queue = queue(self.channel) + if self.auto_declare: + queue.declare() + self.queues.append(queue) + return queue + def consume(self, no_ack=None): """Register consumer on server. diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index b0b6d1de..c51dfae7 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -229,7 +229,7 @@ class AbstractChannel(object): """Return the number of messages in `queue` as an :class:`int`.""" return 0 - def _delete(self, queue): + def _delete(self, queue, *args, **kwargs): """Delete `queue`. This just purges the queue, if you need to do more you can