diff --git a/kombu/backends/pypika.py b/kombu/backends/pypika.py index 5422660f..d0cb195e 100644 --- a/kombu/backends/pypika.py +++ b/kombu/backends/pypika.py @@ -1,7 +1,3 @@ -import weakref -import functools -import itertools - from pika import asyncore_adapter from pika import blocking_adapter from pika import channel diff --git a/kombu/compat.py b/kombu/compat.py index 6c76b543..224f2e33 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -4,7 +4,7 @@ from kombu import entity from kombu import messaging -def iterconsume(connection, consumer): +def iterconsume(connection, consumer, no_ack=False, limit=None): consumer.consume(no_ack=no_ack) for iteration in count(0): if limit and iteration >= limit: @@ -161,7 +161,7 @@ class Consumer(messaging.Consumer): return self.purge() def iterconsume(self, limit=None, no_ack=None): - return iterconsume(self.connection, self) + return iterconsume(self.connection, self, no_ack, limit) def wait(self, limit=None): it = self.iterconsume(limit) @@ -175,6 +175,7 @@ class Consumer(messaging.Consumer): raise StopIteration yield item + class _CSet(messaging.Consumer): def __init__(self, connection, *args, **kwargs): @@ -182,8 +183,8 @@ class _CSet(messaging.Consumer): self.backend = connection.channel() super(_CSet, self).__init__(self.backend, *args, **kwargs) - def iterconsume(self): - return iterconsume(self.connection, self) + def iterconsume(self, limit=None, no_ack=False): + return iterconsume(self.connection, self, no_ack, limit) def discard_all(self): return self.purge() @@ -204,7 +205,8 @@ def ConsumerSet(connection, from_dict=None, consumers=None, bindings = [] if consumers: - map(bindings.extend, consumer.bindings) + for consumer in consumers: + map(bindings.extend, consumer.bindings) if from_dict: for queue_name, queue_options in from_dict.items(): bindings.append(entry_to_binding(queue_name, **queue_options)) diff --git a/kombu/messaging.py b/kombu/messaging.py index b3af942d..11c72ca8 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -68,7 +68,6 @@ class Producer(object): immediate) - class Consumer(object): no_ack = False auto_declare = True