From a07b23b821bf536eaae54c7f7e7a112aaa709f62 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 16 Nov 2024 15:54:32 +0700 Subject: [PATCH] Ruff and typing improvements (#2148) * Ruff and typing improvements * Add `kill_horse()` method to BaseWorker * Add Python 3.13 to test matrix --- .github/workflows/workflow.yml | 2 +- rq/command.py | 10 ++-- rq/worker.py | 97 ++++++++++++++++++---------------- 3 files changed, 58 insertions(+), 51 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index e1f9b82b..63b38c50 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -35,7 +35,7 @@ jobs: timeout-minutes: 10 strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] redis-version: [4, 5, 6, 7] redis-py-version: [3.5.0] diff --git a/rq/command.py b/rq/command.py index 5d113592..442b0d00 100644 --- a/rq/command.py +++ b/rq/command.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Dict if TYPE_CHECKING: from redis import Redis - from .worker import Worker + from .worker import BaseWorker from rq.exceptions import InvalidJobOperation from rq.job import Job @@ -82,7 +82,7 @@ def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None): send_command(connection, job.worker_name, 'stop-job', job_id=job_id) -def handle_command(worker: 'Worker', payload: Dict[Any, Any]): +def handle_command(worker: 'BaseWorker', payload: Dict[Any, Any]): """Parses payload and routes commands to the worker. Args: @@ -97,7 +97,7 @@ def handle_command(worker: 'Worker', payload: Dict[Any, Any]): handle_kill_worker_command(worker, payload) -def handle_shutdown_command(worker: 'Worker'): +def handle_shutdown_command(worker: 'BaseWorker'): """Perform shutdown command. Args: @@ -108,7 +108,7 @@ def handle_shutdown_command(worker: 'Worker'): os.kill(pid, signal.SIGINT) -def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]): +def handle_kill_worker_command(worker: 'BaseWorker', payload: Dict[Any, Any]): """ Stops work horse @@ -125,7 +125,7 @@ def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]): worker.log.info('Worker is not working, kill horse command ignored') -def handle_stop_job_command(worker: 'Worker', payload: Dict[Any, Any]): +def handle_stop_job_command(worker: 'BaseWorker', payload: Dict[Any, Any]): """Handles stop job command. Args: diff --git a/rq/worker.py b/rq/worker.py index 2122e58c..7835a266 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -511,6 +511,42 @@ class BaseWorker: """Only supported by Redis server >= 5.0 is required.""" return self.get_redis_server_version() >= (5, 0, 0) + def request_stop(self, signum, frame): + """Stops the current worker loop but waits for child processes to + end gracefully (warm shutdown). + + Args: + signum (Any): Signum + frame (Any): Frame + """ + self.log.debug('Got signal %s', signal_name(signum)) + self._shutdown_requested_date = now() + + signal.signal(signal.SIGINT, self.request_force_stop) + signal.signal(signal.SIGTERM, self.request_force_stop) + + self.handle_warm_shutdown_request() + self._shutdown() + + def _shutdown(self): + """ + If shutdown is requested in the middle of a job, wait until + finish before shutting down and save the request in redis + """ + if self.get_state() == WorkerStatus.BUSY: + self._stop_requested = True + self.set_shutdown_requested_date() + self.log.debug('Stopping after current horse is finished. Press Ctrl+C again for a cold shutdown.') + if self.scheduler: + self.stop_scheduler() + else: + if self.scheduler: + self.stop_scheduler() + raise StopRequested() + + def request_force_stop(self, signum: int, frame: Optional[FrameType]): + raise NotImplementedError() + def _install_signal_handlers(self): """Installs signal handlers for handling SIGINT and SIGTERM gracefully.""" signal.signal(signal.SIGINT, self.request_stop) @@ -864,6 +900,13 @@ class BaseWorker: p.expire(self.key, 60) p.execute() + @property + def horse_pid(self): + """The horse's process ID. Only available in the worker. Will return + 0 in the horse part of the fork. + """ + return self._horse_pid + def bootstrap( self, logging_level: str = "INFO", @@ -969,6 +1012,12 @@ class BaseWorker: self.log.warning("Pubsub thread exitin on %s" % exc) raise + def handle_payload(self, message): + """Handle external commands""" + self.log.debug('Received message: %s', message) + payload = parse_payload(message) + handle_command(self, payload) + def subscribe(self): """Subscribe to this worker's channel""" self.log.info('Subscribing to channel %s', self.pubsub_channel_name) @@ -1187,14 +1236,11 @@ class BaseWorker: """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func) + def kill_horse(self, sig: signal.Signals = SIGKILL): + raise NotImplementedError() + class Worker(BaseWorker): - @property - def horse_pid(self): - """The horse's process ID. Only available in the worker. Will return - 0 in the horse part of the fork. - """ - return self._horse_pid @property def is_horse(self): @@ -1253,39 +1299,6 @@ class Worker(BaseWorker): self.wait_for_horse() raise SystemExit() - def request_stop(self, signum, frame): - """Stops the current worker loop but waits for child processes to - end gracefully (warm shutdown). - - Args: - signum (Any): Signum - frame (Any): Frame - """ - self.log.debug('Got signal %s', signal_name(signum)) - self._shutdown_requested_date = now() - - signal.signal(signal.SIGINT, self.request_force_stop) - signal.signal(signal.SIGTERM, self.request_force_stop) - - self.handle_warm_shutdown_request() - self._shutdown() - - def _shutdown(self): - """ - If shutdown is requested in the middle of a job, wait until - finish before shutting down and save the request in redis - """ - if self.get_state() == WorkerStatus.BUSY: - self._stop_requested = True - self.set_shutdown_requested_date() - self.log.debug('Stopping after current horse is finished. Press Ctrl+C again for a cold shutdown.') - if self.scheduler: - self.stop_scheduler() - else: - if self.scheduler: - self.stop_scheduler() - raise StopRequested() - def fork_work_horse(self, job: 'Job', queue: 'Queue'): """Spawns a work horse to perform the actual work and passes it a job. This is where the `fork()` actually happens. @@ -1639,12 +1652,6 @@ class Worker(BaseWorker): """The hash does not take the database/connection into account""" return hash(self.name) - def handle_payload(self, message): - """Handle external commands""" - self.log.debug('Received message: %s', message) - payload = parse_payload(message) - handle_command(self, payload) - class SimpleWorker(Worker): def execute_job(self, job: 'Job', queue: 'Queue'):