mirror of https://github.com/celery/kombu.git
Implement connection revive() for kombu.compat classes
This commit is contained in:
parent
dabfcbadf8
commit
387e979840
|
@ -100,6 +100,10 @@ class Publisher(messaging.Producer):
|
||||||
def send(self, *args, **kwargs):
|
def send(self, *args, **kwargs):
|
||||||
return self.publish(*args, **kwargs)
|
return self.publish(*args, **kwargs)
|
||||||
|
|
||||||
|
def revive(self, channel):
|
||||||
|
self.backend = channel
|
||||||
|
super(Publisher, self).revive(channel)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.backend.close()
|
self.backend.close()
|
||||||
self._closed = True
|
self._closed = True
|
||||||
|
@ -153,6 +157,10 @@ class Consumer(messaging.Consumer):
|
||||||
auto_delete=self.auto_delete)
|
auto_delete=self.auto_delete)
|
||||||
super(Consumer, self).__init__(self.backend, queue, **kwargs)
|
super(Consumer, self).__init__(self.backend, queue, **kwargs)
|
||||||
|
|
||||||
|
def revive(self, channel):
|
||||||
|
self.backend = channel
|
||||||
|
super(Consumer, self).revive(channel)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.cancel()
|
self.cancel()
|
||||||
self.backend.close()
|
self.backend.close()
|
||||||
|
@ -233,6 +241,10 @@ class ConsumerSet(messaging.Consumer):
|
||||||
for queue in consumer.queues:
|
for queue in consumer.queues:
|
||||||
self.queues.append(queue(self.channel))
|
self.queues.append(queue(self.channel))
|
||||||
|
|
||||||
|
def revive(self, channel):
|
||||||
|
self.backend = channel
|
||||||
|
super(ConsumerSet, self).revive(channel)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.cancel()
|
self.cancel()
|
||||||
self.channel.close()
|
self.channel.close()
|
||||||
|
|
Loading…
Reference in New Issue