Ruff and typing improvements (#2148)

* Ruff and typing improvements

* Add `kill_horse()` method to BaseWorker

* Add Python 3.13 to test matrix
This commit is contained in:
Selwin Ong 2024-11-16 15:54:32 +07:00 committed by GitHub
parent aa5c58f222
commit a07b23b821
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 58 additions and 51 deletions

View File

@ -35,7 +35,7 @@ jobs:
timeout-minutes: 10 timeout-minutes: 10
strategy: strategy:
matrix: matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
redis-version: [4, 5, 6, 7] redis-version: [4, 5, 6, 7]
redis-py-version: [3.5.0] redis-py-version: [3.5.0]

View File

@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING: if TYPE_CHECKING:
from redis import Redis from redis import Redis
from .worker import Worker from .worker import BaseWorker
from rq.exceptions import InvalidJobOperation from rq.exceptions import InvalidJobOperation
from rq.job import Job from rq.job import Job
@ -82,7 +82,7 @@ def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
send_command(connection, job.worker_name, 'stop-job', job_id=job_id) send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
def handle_command(worker: 'Worker', payload: Dict[Any, Any]): def handle_command(worker: 'BaseWorker', payload: Dict[Any, Any]):
"""Parses payload and routes commands to the worker. """Parses payload and routes commands to the worker.
Args: Args:
@ -97,7 +97,7 @@ def handle_command(worker: 'Worker', payload: Dict[Any, Any]):
handle_kill_worker_command(worker, payload) handle_kill_worker_command(worker, payload)
def handle_shutdown_command(worker: 'Worker'): def handle_shutdown_command(worker: 'BaseWorker'):
"""Perform shutdown command. """Perform shutdown command.
Args: Args:
@ -108,7 +108,7 @@ def handle_shutdown_command(worker: 'Worker'):
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)
def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]): def handle_kill_worker_command(worker: 'BaseWorker', payload: Dict[Any, Any]):
""" """
Stops work horse Stops work horse
@ -125,7 +125,7 @@ def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]):
worker.log.info('Worker is not working, kill horse command ignored') worker.log.info('Worker is not working, kill horse command ignored')
def handle_stop_job_command(worker: 'Worker', payload: Dict[Any, Any]): def handle_stop_job_command(worker: 'BaseWorker', payload: Dict[Any, Any]):
"""Handles stop job command. """Handles stop job command.
Args: Args:

View File

@ -511,6 +511,42 @@ class BaseWorker:
"""Only supported by Redis server >= 5.0 is required.""" """Only supported by Redis server >= 5.0 is required."""
return self.get_redis_server_version() >= (5, 0, 0) return self.get_redis_server_version() >= (5, 0, 0)
def request_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
Args:
signum (Any): Signum
frame (Any): Frame
"""
self.log.debug('Got signal %s', signal_name(signum))
self._shutdown_requested_date = now()
signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)
self.handle_warm_shutdown_request()
self._shutdown()
def _shutdown(self):
"""
If shutdown is requested in the middle of a job, wait until
finish before shutting down and save the request in redis
"""
if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True
self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. Press Ctrl+C again for a cold shutdown.')
if self.scheduler:
self.stop_scheduler()
else:
if self.scheduler:
self.stop_scheduler()
raise StopRequested()
def request_force_stop(self, signum: int, frame: Optional[FrameType]):
raise NotImplementedError()
def _install_signal_handlers(self): def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM gracefully.""" """Installs signal handlers for handling SIGINT and SIGTERM gracefully."""
signal.signal(signal.SIGINT, self.request_stop) signal.signal(signal.SIGINT, self.request_stop)
@ -864,6 +900,13 @@ class BaseWorker:
p.expire(self.key, 60) p.expire(self.key, 60)
p.execute() p.execute()
@property
def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return
0 in the horse part of the fork.
"""
return self._horse_pid
def bootstrap( def bootstrap(
self, self,
logging_level: str = "INFO", logging_level: str = "INFO",
@ -969,6 +1012,12 @@ class BaseWorker:
self.log.warning("Pubsub thread exitin on %s" % exc) self.log.warning("Pubsub thread exitin on %s" % exc)
raise raise
def handle_payload(self, message):
"""Handle external commands"""
self.log.debug('Received message: %s', message)
payload = parse_payload(message)
handle_command(self, payload)
def subscribe(self): def subscribe(self):
"""Subscribe to this worker's channel""" """Subscribe to this worker's channel"""
self.log.info('Subscribing to channel %s', self.pubsub_channel_name) self.log.info('Subscribing to channel %s', self.pubsub_channel_name)
@ -1187,14 +1236,11 @@ class BaseWorker:
"""Pushes an exception handler onto the exc handler stack.""" """Pushes an exception handler onto the exc handler stack."""
self._exc_handlers.append(handler_func) self._exc_handlers.append(handler_func)
def kill_horse(self, sig: signal.Signals = SIGKILL):
raise NotImplementedError()
class Worker(BaseWorker): class Worker(BaseWorker):
@property
def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return
0 in the horse part of the fork.
"""
return self._horse_pid
@property @property
def is_horse(self): def is_horse(self):
@ -1253,39 +1299,6 @@ class Worker(BaseWorker):
self.wait_for_horse() self.wait_for_horse()
raise SystemExit() raise SystemExit()
def request_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
Args:
signum (Any): Signum
frame (Any): Frame
"""
self.log.debug('Got signal %s', signal_name(signum))
self._shutdown_requested_date = now()
signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)
self.handle_warm_shutdown_request()
self._shutdown()
def _shutdown(self):
"""
If shutdown is requested in the middle of a job, wait until
finish before shutting down and save the request in redis
"""
if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True
self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. Press Ctrl+C again for a cold shutdown.')
if self.scheduler:
self.stop_scheduler()
else:
if self.scheduler:
self.stop_scheduler()
raise StopRequested()
def fork_work_horse(self, job: 'Job', queue: 'Queue'): def fork_work_horse(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job.
This is where the `fork()` actually happens. This is where the `fork()` actually happens.
@ -1639,12 +1652,6 @@ class Worker(BaseWorker):
"""The hash does not take the database/connection into account""" """The hash does not take the database/connection into account"""
return hash(self.name) return hash(self.name)
def handle_payload(self, message):
"""Handle external commands"""
self.log.debug('Received message: %s', message)
payload = parse_payload(message)
handle_command(self, payload)
class SimpleWorker(Worker): class SimpleWorker(Worker):
def execute_job(self, job: 'Job', queue: 'Queue'): def execute_job(self, job: 'Job', queue: 'Queue'):