diff --git a/rq/defaults.py b/rq/defaults.py index 2e99a289..bb7ec791 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -10,4 +10,5 @@ DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' DEFAULT_SCHEDULER_FALLBACK_PERIOD = 120 DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' +DEFAULT_MAINTENANCE_TASK_INTERVAL = 10 * 60 CALLBACK_TIMEOUT = 60 diff --git a/rq/scheduler.py b/rq/scheduler.py index 4305cdea..de8f26e8 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -118,7 +118,7 @@ class RQScheduler: # If auto_start is requested and scheduler is not started, # run self.start() if self._acquired_locks and auto_start: - if not self._process: + if not self._process or not self._process.is_alive(): self.start() return successful_locks diff --git a/rq/worker.py b/rq/worker.py index 278b3eec..6d013712 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -32,7 +32,7 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection -from .defaults import (CALLBACK_TIMEOUT, DEFAULT_RESULT_TTL, +from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException @@ -546,7 +546,7 @@ class Worker: """ # No need to try to start scheduler on first run if self.last_cleaned_at: - if self.scheduler and not self.scheduler._process: + if self.scheduler and (not self.scheduler._process or not self.scheduler._process.is_alive()): self.scheduler.acquire_locks(auto_start=True) self.clean_registries() @@ -1240,7 +1240,7 @@ class Worker: """Maintenance tasks should run on first startup or every 10 minutes.""" if self.last_cleaned_at is None: return True - if (utcnow() - self.last_cleaned_at) > timedelta(minutes=10): + if (utcnow() - self.last_cleaned_at) > timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL): return True return False diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 76a26855..c6683e4c 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -11,6 +11,7 @@ from rq.scheduler import RQScheduler from rq.serializers import JSONSerializer from rq.utils import current_timestamp from rq.worker import Worker +from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from tests import RQTestCase, find_empty_redis_database, ssl_test @@ -139,7 +140,7 @@ class TestScheduler(RQTestCase): # scheduler.should_reacquire_locks always returns False if # scheduler.acquired_locks and scheduler._queue_names are the same self.assertFalse(scheduler.should_reacquire_locks) - scheduler.lock_acquisition_time = datetime.now() - timedelta(minutes=16) + scheduler.lock_acquisition_time = datetime.now() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL+6) self.assertFalse(scheduler.should_reacquire_locks) scheduler._queue_names = set(['default', 'foo']) @@ -176,11 +177,24 @@ class TestScheduler(RQTestCase): self.assertEqual(mocked.call_count, 1) # If process has started, scheduler.start() won't be called + running_process = mock.MagicMock() + running_process.is_alive.return_value = True scheduler = RQScheduler(['auto-start2'], self.testconn) - scheduler._process = 1 + scheduler._process = running_process with mock.patch.object(scheduler, 'start') as mocked: scheduler.acquire_locks(auto_start=True) self.assertEqual(mocked.call_count, 0) + self.assertEqual(running_process.is_alive.call_count, 1) + + # If the process has stopped for some reason, the scheduler should restart + scheduler = RQScheduler(['auto-start3'], self.testconn) + stopped_process = mock.MagicMock() + stopped_process.is_alive.return_value = False + scheduler._process = stopped_process + with mock.patch.object(scheduler, 'start') as mocked: + scheduler.acquire_locks(auto_start=True) + self.assertEqual(mocked.call_count, 1) + self.assertEqual(stopped_process.is_alive.call_count, 1) def test_heartbeat(self): """Test that heartbeat updates locking keys TTL""" @@ -272,15 +286,32 @@ class TestWorker(RQTestCase): worker.run_maintenance_tasks() self.assertEqual(mocked.call_count, 0) + # if scheduler object exists and it's a first start, acquire locks should not run worker.last_cleaned_at = None worker.scheduler = RQScheduler([queue], connection=self.testconn) worker.run_maintenance_tasks() self.assertEqual(mocked.call_count, 0) + # the scheduler exists and it's NOT a first start, since the process doesn't exists, + # should call acquire_locks to start the process worker.last_cleaned_at = datetime.now() worker.run_maintenance_tasks() self.assertEqual(mocked.call_count, 1) + # the scheduler exists, the process exists, but the process is not alive + running_process = mock.MagicMock() + running_process.is_alive.return_value = False + worker.scheduler._process = running_process + worker.run_maintenance_tasks() + self.assertEqual(mocked.call_count, 2) + self.assertEqual(running_process.is_alive.call_count, 1) + + # the scheduler exists, the process exits, and it is alive. acquire_locks shouldn't run + running_process.is_alive.return_value = True + worker.run_maintenance_tasks() + self.assertEqual(mocked.call_count, 2) + self.assertEqual(running_process.is_alive.call_count, 2) + def test_work(self): queue = Queue(connection=self.testconn) worker = Worker(queues=[queue], connection=self.testconn)