mirror of https://github.com/celery/kombu.git
compat consumer should use connection.default_channel
This commit is contained in:
parent
00398a9d5a
commit
8020481d72
|
@ -92,7 +92,6 @@ class Consumer(messaging.Consumer):
|
|||
def __init__(self, connection, queue=None, exchange=None,
|
||||
routing_key=None, exchange_type=None, durable=None,
|
||||
exclusive=None, auto_delete=None, **kwargs):
|
||||
self.backend = connection.channel()
|
||||
|
||||
if durable is not None:
|
||||
self.durable = durable
|
||||
|
@ -117,15 +116,10 @@ class Consumer(messaging.Consumer):
|
|||
durable=self.durable,
|
||||
exclusive=self.exclusive,
|
||||
auto_delete=self.auto_delete)
|
||||
super(Consumer, self).__init__(self.backend, queue, **kwargs)
|
||||
|
||||
def revive(self, channel):
|
||||
self.backend = channel
|
||||
super(Consumer, self).revive(channel)
|
||||
super(Consumer, self).__init__(connection, queue, **kwargs)
|
||||
|
||||
def close(self):
|
||||
self.cancel()
|
||||
self.backend.close()
|
||||
self._closed = True
|
||||
|
||||
def __enter__(self):
|
||||
|
@ -170,13 +164,14 @@ class Consumer(messaging.Consumer):
|
|||
raise StopIteration
|
||||
yield item
|
||||
|
||||
@property
|
||||
def backend(self):
|
||||
return self.channel
|
||||
|
||||
|
||||
class ConsumerSet(messaging.Consumer):
|
||||
|
||||
def __init__(self, connection, from_dict=None, consumers=None,
|
||||
callbacks=None, **kwargs):
|
||||
self.backend = connection.channel()
|
||||
|
||||
def __init__(self, connection, from_dict=None, consumers=None, **kwargs):
|
||||
queues = []
|
||||
if consumers:
|
||||
for consumer in consumers:
|
||||
|
@ -185,7 +180,7 @@ class ConsumerSet(messaging.Consumer):
|
|||
for queue_name, queue_options in from_dict.items():
|
||||
queues.append(entry_to_queue(queue_name, **queue_options))
|
||||
|
||||
super(ConsumerSet, self).__init__(self.backend, queues, **kwargs)
|
||||
super(ConsumerSet, self).__init__(connection, queues, **kwargs)
|
||||
|
||||
def iterconsume(self, limit=None, no_ack=False):
|
||||
return _iterconsume(self.connection, self, no_ack, limit)
|
||||
|
@ -200,10 +195,10 @@ class ConsumerSet(messaging.Consumer):
|
|||
for queue in consumer.queues:
|
||||
self.add_queue(queue)
|
||||
|
||||
def revive(self, channel):
|
||||
self.backend = channel
|
||||
super(ConsumerSet, self).revive(channel)
|
||||
|
||||
def close(self):
|
||||
self.cancel()
|
||||
self.channel.close()
|
||||
|
||||
@property
|
||||
def backend(self):
|
||||
return self.channel
|
||||
|
|
|
@ -168,7 +168,6 @@ class test_Consumer(TestCase):
|
|||
x = c.__enter__()
|
||||
self.assertIs(x, c)
|
||||
x.__exit__()
|
||||
self.assertIn("close", c.backend)
|
||||
self.assertTrue(c._closed)
|
||||
|
||||
def test_revive(self, n="test_revive"):
|
||||
|
|
Loading…
Reference in New Issue