mirror of https://github.com/celery/kombu.git
ConnectionPool no longer forces connect at acquire, and ensure now supports an on_revive callback
This commit is contained in:
parent
781a407ca2
commit
d895696563
|
@ -185,7 +185,7 @@ class BrokerConnection(object):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def ensure(self, obj, fun, errback=None, max_retries=None,
|
def ensure(self, obj, fun, errback=None, max_retries=None,
|
||||||
interval_start=1, interval_step=1, interval_max=1):
|
interval_start=1, interval_step=1, interval_max=1, on_revive=None):
|
||||||
"""Ensure operation completes, regardless of any channel/connection
|
"""Ensure operation completes, regardless of any channel/connection
|
||||||
errors occuring.
|
errors occuring.
|
||||||
|
|
||||||
|
@ -244,13 +244,16 @@ class BrokerConnection(object):
|
||||||
interval_start,
|
interval_start,
|
||||||
interval_step,
|
interval_step,
|
||||||
interval_max)
|
interval_max)
|
||||||
obj.revive(self.channel())
|
new_channel = self.channel()
|
||||||
|
obj.revive(new_channel)
|
||||||
|
if on_revive:
|
||||||
|
on_revive(new_channel)
|
||||||
got_connection += 1
|
got_connection += 1
|
||||||
|
|
||||||
_insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
|
_insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
|
||||||
return _insured
|
return _insured
|
||||||
|
|
||||||
def autoretry(self, fun, channel, on_revive=None, **ensure_options):
|
def autoretry(self, fun, channel, **ensure_options):
|
||||||
"""Decorator for functions supporting a ``channel`` keyword argument.
|
"""Decorator for functions supporting a ``channel`` keyword argument.
|
||||||
|
|
||||||
The resulting callable will retry calling the function if
|
The resulting callable will retry calling the function if
|
||||||
|
@ -268,6 +271,7 @@ class BrokerConnection(object):
|
||||||
channel.close()
|
channel.close()
|
||||||
"""
|
"""
|
||||||
channels = [channel]
|
channels = [channel]
|
||||||
|
create_channel = self.channel
|
||||||
|
|
||||||
class Revival(object):
|
class Revival(object):
|
||||||
__name__ = fun.__name__
|
__name__ = fun.__name__
|
||||||
|
@ -276,10 +280,10 @@ class BrokerConnection(object):
|
||||||
|
|
||||||
def revive(self, channel):
|
def revive(self, channel):
|
||||||
channels[0] = channel
|
channels[0] = channel
|
||||||
if on_revive:
|
|
||||||
on_revive(channel)
|
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
def __call__(self, *args, **kwargs):
|
||||||
|
if channels[0] is None:
|
||||||
|
self.revive(create_channel())
|
||||||
kwargs["channel"] = channels[0]
|
kwargs["channel"] = channels[0]
|
||||||
return fun(*args, **kwargs), channels[0]
|
return fun(*args, **kwargs), channels[0]
|
||||||
|
|
||||||
|
@ -652,7 +656,6 @@ class ConnectionPool(Resource):
|
||||||
|
|
||||||
def prepare(self, resource):
|
def prepare(self, resource):
|
||||||
resource._debug("acquired")
|
resource._debug("acquired")
|
||||||
resource.connect()
|
|
||||||
return resource
|
return resource
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue