diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 9348a1ba..95578509 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -23,6 +23,7 @@ from kombu.utils.scheduling import cycle_by_name from kombu.utils.url import _parse_url from kombu.utils.uuid import uuid from kombu.utils.compat import _detect_environment +from kombu.utils.functional import accepts_argument from . import virtual @@ -43,6 +44,8 @@ crit, warn = logger.critical, logger.warn DEFAULT_PORT = 6379 DEFAULT_DB = 0 +DEFAULT_HEALTH_CHECK_INTERVAL = 25 + PRIORITY_STEPS = [0, 3, 6, 9] error_classes_t = namedtuple('error_classes_t', ( @@ -903,6 +906,14 @@ class Channel(virtual.Channel): 'socket_keepalive': self.socket_keepalive, 'socket_keepalive_options': self.socket_keepalive_options, } + + conn_class = self.connection_class + if ( + hasattr(conn_class, '__init__') and + accepts_argument(conn_class.__init__, 'health_check_interval') + ): + connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL + if conninfo.ssl: # Connection(ssl={}) must be a dict containing the keys: # 'ssl_cert_reqs', 'ssl_ca_certs', 'ssl_certfile', 'ssl_keyfile' @@ -1053,7 +1064,10 @@ class Transport(virtual.Transport): [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) - loop.call_repeatedly(30, cycle.maybe_check_subclient_health) + loop.call_repeatedly( + DEFAULT_HEALTH_CHECK_INTERVAL, + cycle.maybe_check_subclient_health + ) def on_readable(self, fileno): """Handle AIO event for one of our file descriptors.""" diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 30c3ae8c..a793150e 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals import random import sys import threading +import inspect from collections import OrderedDict @@ -18,7 +19,7 @@ from time import sleep, time from vine.utils import wraps from kombu.five import ( - UserDict, items, keys, python_2_unicode_compatible, string_t, + UserDict, items, keys, python_2_unicode_compatible, string_t, PY3, ) from .encoding import safe_repr as _safe_repr @@ -372,6 +373,19 @@ def reprcall(name, args=(), kwargs=None, sep=', '): ) +def accepts_argument(func, argument_name): + if PY3: + argument_spec = inspect.getfullargspec(func) + return ( + argument_name in argument_spec.args or + argument_name in argument_spec.kwonlyargs + ) + + argument_spec = inspect.getargspec(func) + argument_names = argument_spec.args + return argument_name in argument_names + + # Compat names (before kombu 3.0) promise = lazy maybe_promise = maybe_evaluate diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 58e22413..6c0a5229 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -738,7 +738,7 @@ class test_Channel: transport.cycle.on_poll_init.assert_called_with(loop.poller) loop.call_repeatedly.assert_has_calls([ call(10, transport.cycle.maybe_restore_messages), - call(30, transport.cycle.maybe_check_subclient_health), + call(25, transport.cycle.maybe_check_subclient_health), ]) loop.on_tick.add.assert_called() on_poll_start = loop.on_tick.add.call_args[0][0] diff --git a/t/unit/utils/test_functional.py b/t/unit/utils/test_functional.py index 2ed64761..b5ec03af 100644 --- a/t/unit/utils/test_functional.py +++ b/t/unit/utils/test_functional.py @@ -8,11 +8,14 @@ from itertools import count from case import Mock, mock, skip -from kombu.five import items +from kombu.five import ( + items, PY3, +) from kombu.utils import functional as utils from kombu.utils.functional import ( ChannelPromise, LRUCache, fxrange, fxrangemax, memoize, lazy, maybe_evaluate, maybe_list, reprcall, reprkwargs, retry_over_time, + accepts_argument, ) @@ -310,3 +313,26 @@ def test_reprkwargs(): def test_reprcall(): assert reprcall('add', (2, 2), {'copy': True}) + + +class test_accepts_arg: + def function(self, foo, bar, baz="baz"): + pass + + def test_valid_argument(self): + assert accepts_argument(self.function, 'self') + assert accepts_argument(self.function, 'foo') + assert accepts_argument(self.function, 'baz') + + def test_invalid_argument(self): + assert not accepts_argument(self.function, 'random_argument') + if PY3: + assert not accepts_argument(test_accepts_arg, 'foo') + + def test_raise_exception(self): + with pytest.raises(Exception): + accepts_argument(None, 'foo') + + if not PY3: + with pytest.raises(Exception): + accepts_argument(test_accepts_arg, 'foo')