From d8956965635dc47cc032073f8a3814b7f2719a5e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 19 Jul 2011 16:33:47 +0100 Subject: [PATCH] ConnectionPool no longer forces connect at acquire, and ensure now supports an on_revive callback --- kombu/connection.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index e1b6b2ec..659705c7 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -185,7 +185,7 @@ class BrokerConnection(object): return self def ensure(self, obj, fun, errback=None, max_retries=None, - interval_start=1, interval_step=1, interval_max=1): + interval_start=1, interval_step=1, interval_max=1, on_revive=None): """Ensure operation completes, regardless of any channel/connection errors occuring. @@ -244,13 +244,16 @@ class BrokerConnection(object): interval_start, interval_step, interval_max) - obj.revive(self.channel()) + new_channel = self.channel() + obj.revive(new_channel) + if on_revive: + on_revive(new_channel) got_connection += 1 _insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__ return _insured - def autoretry(self, fun, channel, on_revive=None, **ensure_options): + def autoretry(self, fun, channel, **ensure_options): """Decorator for functions supporting a ``channel`` keyword argument. The resulting callable will retry calling the function if @@ -268,6 +271,7 @@ class BrokerConnection(object): channel.close() """ channels = [channel] + create_channel = self.channel class Revival(object): __name__ = fun.__name__ @@ -276,10 +280,10 @@ class BrokerConnection(object): def revive(self, channel): channels[0] = channel - if on_revive: - on_revive(channel) def __call__(self, *args, **kwargs): + if channels[0] is None: + self.revive(create_channel()) kwargs["channel"] = channels[0] return fun(*args, **kwargs), channels[0] @@ -652,7 +656,6 @@ class ConnectionPool(Resource): def prepare(self, resource): resource._debug("acquired") - resource.connect() return resource