Add an exception handler for the pubsub thread (#2132)

* Add an exception handler for the pubsub thread

After a connection error to Redis the pubsub thread was stopped and the
worker could not receive commands anymore.

This commit adds an exception handler for the thread that adds a log
message and ignores `redis.exceptions.ConnectionError`. Any other
exception is re-raised.

redis-py internal mechanism allows the pubsub thread to recover its
connection and reinstall the pubsub channel subscription to allow the
worker to receive commands again after connection errors.

It tries to behave the same as the main worker loop retry mechanism but
without the backoff wait factor.

Fixes #1836
Fixes #2070

* Add test for untested line, improve logging and comments

Add *args & **kwargs to tests.fixtures.raise_exc to pass tests with Python 3.7
This commit is contained in:
François Charlier 2024-10-17 06:15:47 +02:00 committed by GitHub
parent f4283afe68
commit 3e2e26e702
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 4 deletions

View File

@ -23,7 +23,7 @@ if TYPE_CHECKING:
except ImportError: except ImportError:
pass pass
from redis import Redis from redis import Redis
from redis.client import Pipeline, PubSub from redis.client import Pipeline, PubSub, PubSubWorkerThread
try: try:
from signal import SIGKILL from signal import SIGKILL
@ -950,12 +950,33 @@ class BaseWorker:
self.clean_registries() self.clean_registries()
Group.clean_registries(connection=self.connection) Group.clean_registries(connection=self.connection)
def _pubsub_exception_handler(self, exc: Exception, pubsub: "PubSub", pubsub_thread: "PubSubWorkerThread") -> None:
"""
This exception handler allows the pubsub_thread to continue & retry to
connect after a connection problem the same way the main worker loop
indefinitely retries.
redis-py internal mechanism will restore the channels subscriptions
once the connection is re-established.
"""
if isinstance(exc, (redis.exceptions.ConnectionError)):
self.log.error(
"Could not connect to Redis instance: %s Retrying in %d seconds...",
exc,
2,
)
time.sleep(2.0)
else:
self.log.warning("Pubsub thread exitin on %s" % exc)
raise
def subscribe(self): def subscribe(self):
"""Subscribe to this worker's channel""" """Subscribe to this worker's channel"""
self.log.info('Subscribing to channel %s', self.pubsub_channel_name) self.log.info('Subscribing to channel %s', self.pubsub_channel_name)
self.pubsub = self.connection.pubsub() self.pubsub = self.connection.pubsub()
self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload}) self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload})
self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=0.2, daemon=True) self.pubsub_thread = self.pubsub.run_in_thread(
sleep_time=0.2, daemon=True, exception_handler=self._pubsub_exception_handler
)
def get_heartbeat_ttl(self, job: 'Job') -> int: def get_heartbeat_ttl(self, job: 'Job') -> int:
"""Get's the TTL for the next heartbeat. """Get's the TTL for the next heartbeat.

View File

@ -47,7 +47,7 @@ def do_nothing():
pass pass
def raise_exc(): def raise_exc(*args, **kwargs):
raise Exception('raise_exc error') raise Exception('raise_exc error')

View File

@ -1,5 +1,6 @@
import time import time
from multiprocessing import Process from multiprocessing import Process
from unittest import mock
from redis import Redis from redis import Redis
@ -9,7 +10,7 @@ from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.serializers import JSONSerializer from rq.serializers import JSONSerializer
from rq.worker import WorkerStatus from rq.worker import WorkerStatus
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job, raise_exc_mock
def start_work(queue_name, worker_name, connection_kwargs): def start_work(queue_name, worker_name, connection_kwargs):
@ -35,6 +36,31 @@ class TestCommands(RQTestCase):
worker.work() worker.work()
p.join(1) p.join(1)
def test_pubsub_thread_survives_connection_error(self):
"""Ensure that the pubsub thread is still alive after its Redis connection is killed"""
connection = self.connection
worker = Worker('foo', connection=connection)
worker.subscribe()
assert worker.pubsub_thread.is_alive()
for client in connection.client_list():
connection.client_kill(client["addr"])
time.sleep(0.0) # Allow other threads to run
assert worker.pubsub_thread.is_alive()
def test_pubsub_thread_exits_other_error(self):
"""Ensure that the pubsub thread exits on other than redis.exceptions.ConnectionError"""
connection = self.connection
worker = Worker('foo', connection=connection)
with mock.patch("redis.client.PubSub.get_message", new_callable=raise_exc_mock):
worker.subscribe()
worker.pubsub_thread.join()
assert not worker.pubsub_thread.is_alive()
def test_kill_horse_command(self): def test_kill_horse_command(self):
"""Ensure that shutdown command works properly.""" """Ensure that shutdown command works properly."""
connection = self.connection connection = self.connection