diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 950565d9..3e8ee4da 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -409,3 +409,16 @@ for worker in workers: if worker.state = WorkerStatus.BUSY: send_kill_horse_command(redis, worker.name) ``` + +_New in version 1.7.0._ +* `send_stop_job_command()`: tells worker to stop a job. + +```python +from redis import Redis +from rq.command import send_stop_job_command + +redis = Redis() + +# This will raise an exception if job is invalid or not currently executing +send_stop_job_command(redis, job_id) +``` diff --git a/rq/command.py b/rq/command.py index be8fea81..94eed967 100644 --- a/rq/command.py +++ b/rq/command.py @@ -1,13 +1,20 @@ import json +import os +import signal + +from rq.exceptions import InvalidJobOperation +from rq.job import Job PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' -def send_command(redis, worker_name, command): - """Use Redis' pubsub mechanism to send a command""" +def send_command(connection, worker_name, command, **kwargs): + """Use connection' pubsub mechanism to send a command""" payload = {'command': command} - redis.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload)) + if kwargs: + payload.update(kwargs) + connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload)) def parse_payload(payload): @@ -15,11 +22,56 @@ def parse_payload(payload): return json.loads(payload.get('data').decode()) -def send_shutdown_command(redis, worker_name): +def send_shutdown_command(connection, worker_name): """Send shutdown command""" - send_command(redis, worker_name, 'shutdown') + send_command(connection, worker_name, 'shutdown') -def send_kill_horse_command(redis, worker_name): +def send_kill_horse_command(connection, worker_name): """Tell worker to kill it's horse""" - send_command(redis, worker_name, 'kill-horse') + send_command(connection, worker_name, 'kill-horse') + + +def send_stop_job_command(connection, job_id): + """Instruct a worker to stop a job""" + job = Job.fetch(job_id, connection=connection) + if not job.worker_name: + raise InvalidJobOperation('Job is not currently executing') + send_command(connection, job.worker_name, 'stop-job', job_id=job_id) + + +def handle_command(worker, payload): + """Parses payload and routes commands""" + if payload['command'] == 'stop-job': + handle_stop_job_command(worker, payload) + elif payload['command'] == 'shutdown': + handle_shutdown_command(worker) + elif payload['command'] == 'kill-horse': + handle_kill_worker_command(worker, payload) + + +def handle_shutdown_command(worker): + """Perform shutdown command""" + worker.log.info('Received shutdown command, sending SIGINT signal.') + pid = os.getpid() + os.kill(pid, signal.SIGINT) + + +def handle_kill_worker_command(worker, payload): + """Stops work horse""" + worker.log.info('Received kill horse command.') + if worker.horse_pid: + worker.log.info('Kiling horse...') + worker.kill_horse() + else: + worker.log.info('Worker is not working, kill horse command ignored') + + +def handle_stop_job_command(worker, payload): + """Handles stop job command""" + job_id = payload.get('job_id') + worker.log.debug('Received command to stop job %s', job_id) + if job_id and worker.get_current_job_id() == job_id: + worker.kill_horse() + else: + worker.log.info('Not working on job %s, command ignored.', job_id) diff --git a/rq/job.py b/rq/job.py index 0f16d3f8..eb3ca457 100644 --- a/rq/job.py +++ b/rq/job.py @@ -45,7 +45,8 @@ def truncate_long_string(data, maxlen=75): """ Truncates strings longer than maxlen """ return (data[:maxlen] + '...') if len(data) > maxlen else data - + + def cancel_job(job_id, connection=None): """Cancels the job with the given job ID, preventing execution. Discards any job info (i.e. it can't be requeued later). diff --git a/rq/worker.py b/rq/worker.py index cbe7073b..ad5d0aea 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -25,7 +25,7 @@ except ImportError: from redis import WatchError from . import worker_registration -from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE +from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .compat import as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection @@ -1071,18 +1071,10 @@ class Worker(object): return False def handle_payload(self, message): + """Handle external commands""" + self.log.debug('Received message: %s', message) payload = parse_payload(message) - if payload['command'] == 'shutdown': - self.log.info('Received shutdown command, sending SIGINT signal.') - pid = os.getpid() - os.kill(pid, signal.SIGINT) - elif payload['command'] == 'kill-horse': - self.log.info('Received kill horse command.') - if self.horse_pid: - self.log.info('Kiling horse...') - self.kill_horse() - else: - self.log.info('Worker is not working, ignoring kill horse command') + handle_command(self, payload) class SimpleWorker(Worker): diff --git a/tests/test_commands.py b/tests/test_commands.py index 647a17a0..a7a0dd60 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -6,7 +6,9 @@ from tests import RQTestCase from tests.fixtures import long_running_job from rq import Queue, Worker -from rq.command import send_command, send_kill_horse_command, send_shutdown_command +from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command +from rq.exceptions import InvalidJobOperation, NoSuchJobError +from rq.worker import WorkerStatus class TestCommands(RQTestCase): @@ -42,4 +44,54 @@ class TestCommands(RQTestCase): worker.work(burst=True) p.join(1) job.refresh() - self.assertTrue(job.id in queue.failed_job_registry) \ No newline at end of file + self.assertTrue(job.id in queue.failed_job_registry) + + def start_work(): + worker.work() + + p = Process(target=start_work) + p.start() + p.join(2) + + send_kill_horse_command(connection, worker.name) + worker.refresh() + # Since worker is not busy, command will be ignored + self.assertEqual(worker.get_state(), WorkerStatus.IDLE) + send_shutdown_command(connection, worker.name) + + def test_stop_job_command(self): + """Ensure that stop_job command works properly.""" + + connection = self.testconn + queue = Queue('foo', connection=connection) + job = queue.enqueue(long_running_job, 3) + worker = Worker('foo', connection=connection) + + # If job is not executing, an error is raised + with self.assertRaises(InvalidJobOperation): + send_stop_job_command(connection, job_id=job.id) + + # An exception is raised if job ID is invalid + with self.assertRaises(NoSuchJobError): + send_stop_job_command(connection, job_id='1') + + def start_work(): + worker.work(burst=True) + + p = Process(target=start_work) + p.start() + p.join(1) + + time.sleep(0.1) + + send_command(connection, worker.name, 'stop-job', job_id=1) + time.sleep(0.25) + # Worker still working due to job_id mismatch + worker.refresh() + self.assertEqual(worker.get_state(), WorkerStatus.BUSY) + + send_stop_job_command(connection, job_id=job.id) + time.sleep(0.25) + # Worker has stopped working + worker.refresh() + self.assertEqual(worker.get_state(), WorkerStatus.IDLE)