diff --git a/kombu/abstract.py b/kombu/abstract.py index ec378a6e..50c70011 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -43,6 +43,16 @@ class MaybeChannelBound(Object): self._is_bound = True return self + def revive(self, channel): + """Revive channel afer connection re-established. + + Used by :meth:`~kombu.connection.BrokerConnection.ensure`. + + """ + if self.is_bound: + self._channel = channel + self.when_bound() + def when_bound(self): """Callback called when the class is bound.""" pass diff --git a/kombu/connection.py b/kombu/connection.py index 663f3220..c30db523 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -133,7 +133,6 @@ class BrokerConnection(object): each retry. """ - print("CONNECTION ERRORS: %r" % (self.connection_errors, )) retry_over_time(self.connect, self.connection_errors, (), {}, errback, max_retries, interval_start, interval_step, interval_max) @@ -198,7 +197,7 @@ class BrokerConnection(object): interval_start, interval_step, interval_max) - obj.channel = self.channel() + obj.revive(self.channel()) got_connection += 1 _insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__ diff --git a/kombu/messaging.py b/kombu/messaging.py index 1ce7e555..b3db5c17 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -11,7 +11,7 @@ class Producer(object): """Message Producer. :param channel: Connection channel. - :keyword exchange: Exchange to publish to. + :keyword exchange: Default exchange. :keyword routing_key: Default routing key. :keyword serializer: Default serializer. Default is ``"json"``. :keyword compression: Default compression method. Default is no @@ -19,9 +19,9 @@ class Producer(object): :keyword auto_declare: Automatically declare the exchange at instantiation. Default is ``True``. :keyword on_return: Callback to call for undeliverable messages, - when ``mandatory`` or ``imediate`` is used. This callback - needs the following signature: - ``(exception, exchange, routing_key, message)``. + when the ``mandatory`` or ``imediate`` arguments to + :meth:`publish` is used. This callback needs the following + signature: ``(exception, exchange, routing_key, message)``. .. attribute:: channel @@ -71,6 +71,10 @@ class Producer(object): if self.on_return: self.channel.events["basic_return"].append(self.on_return) + def revive(self, channel): + self.channel = channel + self.exchange.revive(channel) + def declare(self): """Declare the exchange.