diff --git a/rq/worker.py b/rq/worker.py index c9fe4b4a..c2fab62d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -371,8 +371,7 @@ class Worker(object): signal.signal(signal.SIGINT, self.request_force_stop) signal.signal(signal.SIGTERM, self.request_force_stop) - msg = 'Warm shut down requested' - self.log.warning(msg) + self.handle_warm_shutdown_request() # If shutdown is requested in the middle of a job, wait until # finish before shutting down and save the request in redis @@ -384,6 +383,9 @@ class Worker(object): else: raise StopRequested() + def handle_warm_shutdown_request(self): + self.log.warning('Warm shut down requested') + def check_for_suspension(self, burst): """Check to see if workers have been suspended by `rq suspend`""" @@ -727,11 +729,10 @@ class HerokuWorker(Worker): Modified version of rq worker which: * stops work horses getting killed with SIGTERM * sends SIGRTMIN to work horses on SIGTERM to the main process so they can crash as they wish - Note: coverage doesn't work inside the forked thread so code expected to be processed there has pragma: no cover """ - imminent_shutdown_delay = 8 - frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_lasti', 'f_lineno', 'f_locals', - 'f_restricted', 'f_trace'] + imminent_shutdown_delay = 6 + frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', + 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace'] def main_work_horse(self, job, queue): """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" @@ -747,36 +748,13 @@ class HerokuWorker(Worker): success = self.perform_job(job, queue) os._exit(int(not success)) - def request_stop(self, signum, frame): - """Stops the current worker loop but waits for child processes to - end gracefully (warm shutdown). - """ - self.log.debug('Got signal {0}'.format(signal_name(signum))) - - signal.signal(signal.SIGINT, self.request_force_stop) - signal.signal(signal.SIGTERM, self.request_force_stop) - - # start altered { + def handle_warm_shutdown_request(self): + """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') - try: - os.kill(self.horse_pid, signal.SIGRTMIN) - except OSError as e: - if e.errno != errno.ESRCH: - self.log.debug('Horse already down') - raise + os.kill(self.horse_pid, signal.SIGRTMIN) else: self.log.warning('Warm shut down requested, no horse found') - # } end altered - - # If shutdown is requested in the middle of a job, wait until - # finish before shutting down - if self.get_state() == 'busy': - self._stop_requested = True - self.log.debug('Stopping after current horse is finished. ' - 'Press Ctrl+C again for a cold shutdown.') - else: - raise StopRequested() def handle_shutdown_imminent(self, signum, frame): if self.imminent_shutdown_delay == 0: diff --git a/tests/fixtures.py b/tests/fixtures.py index 2d15d881..6b05ba1c 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -9,9 +9,10 @@ from __future__ import (absolute_import, division, print_function, import os import time -from rq import Connection, get_current_job, get_current_connection +from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job from rq.compat import PY2 +from rq.worker import HerokuWorker def say_pid(): @@ -102,3 +103,24 @@ def black_hole(job, *exc_info): def long_running_job(timeout=10): time.sleep(timeout) return 'Done sleeping...' + + +def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay): + """ + Run a simplified heroku worker where perform_job job just creates two files 2 seconds apart + :param sandbox: directory to create files in + :param _imminent_shutdown_delay: delay to use for TestHerokuWorker + :return: + """ + class TestHerokuWorker(HerokuWorker): + imminent_shutdown_delay = _imminent_shutdown_delay + + def perform_job(self, job, queue): + create_file(os.path.join(sandbox, 'started')) + # have to loop here rather than one sleep to avoid holding the GIL and preventing signals being recieved + for i in range(20): + time.sleep(0.1) + create_file(os.path.join(sandbox, 'finished')) + + w = TestHerokuWorker(Queue('dummy')) + w.main_work_horse(None, None) diff --git a/tests/test_worker.py b/tests/test_worker.py index c63f97b6..3b4b53e5 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +import shutil from datetime import timedelta from time import sleep import signal @@ -13,7 +14,7 @@ import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, - access_self) + run_dummy_heroku_worker, access_self) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -23,6 +24,7 @@ from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend from rq.utils import utcnow +from rq.worker import HerokuWorker class CustomJob(Job): @@ -473,6 +475,7 @@ class TestWorker(RQTestCase): assert q.count == 0 self.assertEqual(os.path.exists(SENTINEL_FILE), True) + @slow def test_suspend_with_duration(self): q = Queue() for _ in range(5): @@ -575,7 +578,7 @@ def kill_worker(pid, double_kill): os.kill(pid, signal.SIGTERM) -class TestWorkerShutdown(RQTestCase): +class TimeoutTestCase: def setUp(self): # we want tests to fail if signal are ignored and the work remain # running, so set a signal to kill them after X seconds @@ -588,6 +591,8 @@ class TestWorkerShutdown(RQTestCase): "test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout ) + +class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): @slow def test_idle_worker_warm_shutdown(self): """worker with no ongoing job receiving single SIGTERM signal and shutting down""" @@ -616,12 +621,12 @@ class TestWorkerShutdown(RQTestCase): w.work() p.join(2) + self.assertFalse(p.is_alive()) self.assertTrue(w._stop_requested) self.assertTrue(os.path.exists(sentinel_file)) - shutdown_requested_date = w.shutdown_requested_date - self.assertIsNotNone(shutdown_requested_date) - self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + self.assertIsNotNone(w.shutdown_requested_date) + self.assertEqual(type(w.shutdown_requested_date).__name__, 'datetime') @slow def test_working_worker_cold_shutdown(self): @@ -675,3 +680,79 @@ class TestWorkerSubprocess(RQTestCase): subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) assert get_failed_queue().count == 0 assert q.count == 0 + + +class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): + def setUp(self): + super(HerokuWorkerShutdownTestCase, self).setUp() + self.sandbox = '/tmp/rq_shutdown/' + os.makedirs(self.sandbox) + + def tearDown(self): + shutil.rmtree(self.sandbox, ignore_errors=True) + + @slow + def test_idle_worker_shutdown(self): + """worker with no ongoing job receiving single SIGTERM signal and shutting down""" + w = HerokuWorker('foo') + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), False)) + p.start() + + w.work() + + p.join(1) + self.assertFalse(w._stop_requested) + + @slow + def test_immediate_shutdown(self): + """Heroku work horse shutdown with immediate (0 second) kill""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 0)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + + p.join(2) + self.assertEqual(p.exitcode, 1) + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + @slow + def test_1_sec_shutdown(self): + """Heroku work horse shutdown with 1 second kill""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 1)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + time.sleep(0.1) + self.assertEqual(p.exitcode, None) + p.join(2) + self.assertEqual(p.exitcode, 1) + + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + def test_handle_shutdown_request(self): + """Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request""" + w = HerokuWorker('foo') + + path = os.path.join(self.sandbox, 'shouldnt_exist') + p = Process(target=create_file_after_timeout, args=(path, 2)) + p.start() + self.assertEqual(p.exitcode, None) + + w._horse_pid = p.pid + w.handle_warm_shutdown_request() + p.join(2) + self.assertEqual(p.exitcode, -34) + self.assertFalse(os.path.exists(path)) + + def test_handle_shutdown_request_no_horse(self): + """Mutate HerokuWorker so _horse_pid refers to non existent process and test handle_warm_shutdown_request""" + w = HerokuWorker('foo') + + w._horse_pid = 19999 + with self.assertRaises(OSError): + w.handle_warm_shutdown_request()