mirror of https://github.com/celery/kombu.git
Fix redis health checks (#1122)
* Fix redis transport health checks functionality * Add tests for accepts_argument util function * Reduce default health check interval to 25s
This commit is contained in:
parent
de8d8cf8c1
commit
ccc9e01f32
|
@ -23,6 +23,7 @@ from kombu.utils.scheduling import cycle_by_name
|
|||
from kombu.utils.url import _parse_url
|
||||
from kombu.utils.uuid import uuid
|
||||
from kombu.utils.compat import _detect_environment
|
||||
from kombu.utils.functional import accepts_argument
|
||||
|
||||
from . import virtual
|
||||
|
||||
|
@ -43,6 +44,8 @@ crit, warn = logger.critical, logger.warn
|
|||
DEFAULT_PORT = 6379
|
||||
DEFAULT_DB = 0
|
||||
|
||||
DEFAULT_HEALTH_CHECK_INTERVAL = 25
|
||||
|
||||
PRIORITY_STEPS = [0, 3, 6, 9]
|
||||
|
||||
error_classes_t = namedtuple('error_classes_t', (
|
||||
|
@ -903,6 +906,14 @@ class Channel(virtual.Channel):
|
|||
'socket_keepalive': self.socket_keepalive,
|
||||
'socket_keepalive_options': self.socket_keepalive_options,
|
||||
}
|
||||
|
||||
conn_class = self.connection_class
|
||||
if (
|
||||
hasattr(conn_class, '__init__') and
|
||||
accepts_argument(conn_class.__init__, 'health_check_interval')
|
||||
):
|
||||
connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL
|
||||
|
||||
if conninfo.ssl:
|
||||
# Connection(ssl={}) must be a dict containing the keys:
|
||||
# 'ssl_cert_reqs', 'ssl_ca_certs', 'ssl_certfile', 'ssl_keyfile'
|
||||
|
@ -1053,7 +1064,10 @@ class Transport(virtual.Transport):
|
|||
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
|
||||
loop.on_tick.add(on_poll_start)
|
||||
loop.call_repeatedly(10, cycle.maybe_restore_messages)
|
||||
loop.call_repeatedly(30, cycle.maybe_check_subclient_health)
|
||||
loop.call_repeatedly(
|
||||
DEFAULT_HEALTH_CHECK_INTERVAL,
|
||||
cycle.maybe_check_subclient_health
|
||||
)
|
||||
|
||||
def on_readable(self, fileno):
|
||||
"""Handle AIO event for one of our file descriptors."""
|
||||
|
|
|
@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals
|
|||
import random
|
||||
import sys
|
||||
import threading
|
||||
import inspect
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
|
@ -18,7 +19,7 @@ from time import sleep, time
|
|||
from vine.utils import wraps
|
||||
|
||||
from kombu.five import (
|
||||
UserDict, items, keys, python_2_unicode_compatible, string_t,
|
||||
UserDict, items, keys, python_2_unicode_compatible, string_t, PY3,
|
||||
)
|
||||
|
||||
from .encoding import safe_repr as _safe_repr
|
||||
|
@ -372,6 +373,19 @@ def reprcall(name, args=(), kwargs=None, sep=', '):
|
|||
)
|
||||
|
||||
|
||||
def accepts_argument(func, argument_name):
|
||||
if PY3:
|
||||
argument_spec = inspect.getfullargspec(func)
|
||||
return (
|
||||
argument_name in argument_spec.args or
|
||||
argument_name in argument_spec.kwonlyargs
|
||||
)
|
||||
|
||||
argument_spec = inspect.getargspec(func)
|
||||
argument_names = argument_spec.args
|
||||
return argument_name in argument_names
|
||||
|
||||
|
||||
# Compat names (before kombu 3.0)
|
||||
promise = lazy
|
||||
maybe_promise = maybe_evaluate
|
||||
|
|
|
@ -738,7 +738,7 @@ class test_Channel:
|
|||
transport.cycle.on_poll_init.assert_called_with(loop.poller)
|
||||
loop.call_repeatedly.assert_has_calls([
|
||||
call(10, transport.cycle.maybe_restore_messages),
|
||||
call(30, transport.cycle.maybe_check_subclient_health),
|
||||
call(25, transport.cycle.maybe_check_subclient_health),
|
||||
])
|
||||
loop.on_tick.add.assert_called()
|
||||
on_poll_start = loop.on_tick.add.call_args[0][0]
|
||||
|
|
|
@ -8,11 +8,14 @@ from itertools import count
|
|||
|
||||
from case import Mock, mock, skip
|
||||
|
||||
from kombu.five import items
|
||||
from kombu.five import (
|
||||
items, PY3,
|
||||
)
|
||||
from kombu.utils import functional as utils
|
||||
from kombu.utils.functional import (
|
||||
ChannelPromise, LRUCache, fxrange, fxrangemax, memoize, lazy,
|
||||
maybe_evaluate, maybe_list, reprcall, reprkwargs, retry_over_time,
|
||||
accepts_argument,
|
||||
)
|
||||
|
||||
|
||||
|
@ -310,3 +313,26 @@ def test_reprkwargs():
|
|||
|
||||
def test_reprcall():
|
||||
assert reprcall('add', (2, 2), {'copy': True})
|
||||
|
||||
|
||||
class test_accepts_arg:
|
||||
def function(self, foo, bar, baz="baz"):
|
||||
pass
|
||||
|
||||
def test_valid_argument(self):
|
||||
assert accepts_argument(self.function, 'self')
|
||||
assert accepts_argument(self.function, 'foo')
|
||||
assert accepts_argument(self.function, 'baz')
|
||||
|
||||
def test_invalid_argument(self):
|
||||
assert not accepts_argument(self.function, 'random_argument')
|
||||
if PY3:
|
||||
assert not accepts_argument(test_accepts_arg, 'foo')
|
||||
|
||||
def test_raise_exception(self):
|
||||
with pytest.raises(Exception):
|
||||
accepts_argument(None, 'foo')
|
||||
|
||||
if not PY3:
|
||||
with pytest.raises(Exception):
|
||||
accepts_argument(test_accepts_arg, 'foo')
|
||||
|
|
Loading…
Reference in New Issue