mirror of https://github.com/celery/kombu.git
connection.ensure now works
This commit is contained in:
parent
4b59b55909
commit
a59cee238c
|
@ -43,6 +43,16 @@ class MaybeChannelBound(Object):
|
||||||
self._is_bound = True
|
self._is_bound = True
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def revive(self, channel):
|
||||||
|
"""Revive channel afer connection re-established.
|
||||||
|
|
||||||
|
Used by :meth:`~kombu.connection.BrokerConnection.ensure`.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if self.is_bound:
|
||||||
|
self._channel = channel
|
||||||
|
self.when_bound()
|
||||||
|
|
||||||
def when_bound(self):
|
def when_bound(self):
|
||||||
"""Callback called when the class is bound."""
|
"""Callback called when the class is bound."""
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -133,7 +133,6 @@ class BrokerConnection(object):
|
||||||
each retry.
|
each retry.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
print("CONNECTION ERRORS: %r" % (self.connection_errors, ))
|
|
||||||
retry_over_time(self.connect, self.connection_errors, (), {},
|
retry_over_time(self.connect, self.connection_errors, (), {},
|
||||||
errback, max_retries,
|
errback, max_retries,
|
||||||
interval_start, interval_step, interval_max)
|
interval_start, interval_step, interval_max)
|
||||||
|
@ -198,7 +197,7 @@ class BrokerConnection(object):
|
||||||
interval_start,
|
interval_start,
|
||||||
interval_step,
|
interval_step,
|
||||||
interval_max)
|
interval_max)
|
||||||
obj.channel = self.channel()
|
obj.revive(self.channel())
|
||||||
got_connection += 1
|
got_connection += 1
|
||||||
|
|
||||||
_insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
|
_insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
|
||||||
|
|
|
@ -11,7 +11,7 @@ class Producer(object):
|
||||||
"""Message Producer.
|
"""Message Producer.
|
||||||
|
|
||||||
:param channel: Connection channel.
|
:param channel: Connection channel.
|
||||||
:keyword exchange: Exchange to publish to.
|
:keyword exchange: Default exchange.
|
||||||
:keyword routing_key: Default routing key.
|
:keyword routing_key: Default routing key.
|
||||||
:keyword serializer: Default serializer. Default is ``"json"``.
|
:keyword serializer: Default serializer. Default is ``"json"``.
|
||||||
:keyword compression: Default compression method. Default is no
|
:keyword compression: Default compression method. Default is no
|
||||||
|
@ -19,9 +19,9 @@ class Producer(object):
|
||||||
:keyword auto_declare: Automatically declare the exchange
|
:keyword auto_declare: Automatically declare the exchange
|
||||||
at instantiation. Default is ``True``.
|
at instantiation. Default is ``True``.
|
||||||
:keyword on_return: Callback to call for undeliverable messages,
|
:keyword on_return: Callback to call for undeliverable messages,
|
||||||
when ``mandatory`` or ``imediate`` is used. This callback
|
when the ``mandatory`` or ``imediate`` arguments to
|
||||||
needs the following signature:
|
:meth:`publish` is used. This callback needs the following
|
||||||
``(exception, exchange, routing_key, message)``.
|
signature: ``(exception, exchange, routing_key, message)``.
|
||||||
|
|
||||||
.. attribute:: channel
|
.. attribute:: channel
|
||||||
|
|
||||||
|
@ -71,6 +71,10 @@ class Producer(object):
|
||||||
if self.on_return:
|
if self.on_return:
|
||||||
self.channel.events["basic_return"].append(self.on_return)
|
self.channel.events["basic_return"].append(self.on_return)
|
||||||
|
|
||||||
|
def revive(self, channel):
|
||||||
|
self.channel = channel
|
||||||
|
self.exchange.revive(channel)
|
||||||
|
|
||||||
def declare(self):
|
def declare(self):
|
||||||
"""Declare the exchange.
|
"""Declare the exchange.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue