diff --git a/rq/queue.py b/rq/queue.py index 77a6f3e3..43b31ebd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -71,8 +71,11 @@ class Queue: @classmethod def all( - cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, - serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None + cls, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer=None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> List['Queue']: """Returns an iterable of all Queues. @@ -89,8 +92,11 @@ class Queue: def to_queue(queue_key): return cls.from_queue_key( - as_text(queue_key), connection=connection, job_class=job_class, - serializer=serializer, death_penalty_class=death_penalty_class + as_text(queue_key), + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) @@ -99,12 +105,12 @@ class Queue: @classmethod def from_queue_key( - cls, - queue_key: str, - connection: Optional['Redis'] = None, - job_class: Optional[Type['Job']] = None, - serializer: Any = None, - death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, + cls, + queue_key: str, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their @@ -126,20 +132,25 @@ class Queue: prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) - name = queue_key[len(prefix):] - return cls(name, connection=connection, job_class=job_class, serializer=serializer, - death_penalty_class=death_penalty_class) + name = queue_key[len(prefix) :] + return cls( + name, + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, + ) def __init__( - self, - name: str = 'default', - default_timeout: Optional[int] = None, - connection: Optional['Redis'] = None, - is_async: bool = True, - job_class: Union[str, Type['Job'], None] = None, - serializer: Any = None, - death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, - **kwargs, + self, + name: str = 'default', + default_timeout: Optional[int] = None, + connection: Optional['Redis'] = None, + is_async: bool = True, + job_class: Union[str, Type['Job'], None] = None, + serializer: Any = None, + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, + **kwargs, ): """Initializes a Queue object. @@ -207,6 +218,7 @@ class Queue: @property def scheduler_pid(self) -> int: from rq.scheduler import RQScheduler + pid = self.connection.get(RQScheduler.get_locking_key(self.name)) return int(pid.decode()) if pid is not None else None @@ -467,23 +479,23 @@ class Queue: self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result) def create_job( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - meta: Optional[Dict] = None, - status: JobStatus = JobStatus.QUEUED, - retry: Optional['Retry'] = None, - *, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + meta: Optional[Dict] = None, + status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, + *, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> Job: """Creates a job based on parameters given @@ -609,23 +621,23 @@ class Queue: return job def enqueue_call( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, - pipeline: Optional['Pipeline'] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, + pipeline: Optional['Pipeline'] = None, ) -> Job: """Creates a job to represent the delayed function call and enqueues it. @@ -676,20 +688,20 @@ class Queue: @staticmethod def prepare_data( - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -1001,7 +1013,6 @@ class Queue: return self._enqueue_job(job, pipeline=pipeline, at_front=at_front) return job - def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution without checking dependencies. @@ -1071,7 +1082,7 @@ class Queue: return job def enqueue_dependents( - self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None + self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None ): """Enqueues all jobs in the given job's dependents set and clears it. @@ -1108,7 +1119,7 @@ class Queue: dependent_job_ids, connection=self.connection, serializer=self.serializer ) if dependent_job - and dependent_job.dependencies_are_met( + and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, exclude_job_id=exclude_job_id, @@ -1208,13 +1219,13 @@ class Queue: @classmethod def dequeue_any( - cls, - queues: List['Queue'], - timeout: Optional[int], - connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, - serializer: Any = None, - death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, + cls, + queues: List['Queue'], + timeout: Optional[int], + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1248,8 +1259,13 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, - serializer=serializer, death_penalty_class=death_penalty_class) + queue = cls.from_queue_key( + queue_key, + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, + ) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: diff --git a/rq/worker.py b/rq/worker.py index 80c03842..fd9e9124 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,8 +13,7 @@ import warnings from datetime import timedelta from enum import Enum from random import shuffle -from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, - Union) +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union from uuid import uuid4 if TYPE_CHECKING: @@ -57,7 +56,17 @@ from .scheduler import RQScheduler from .serializers import resolve_serializer from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text +from .utils import ( + backend_class, + ensure_list, + get_version, + make_colorizer, + utcformat, + utcnow, + utcparse, + compact, + as_text, +) from .version import VERSION from .serializers import resolve_serializer @@ -249,7 +258,7 @@ class Worker: disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None, - work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None + work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None, ): # noqa self.default_result_ttl = default_result_ttl self.worker_ttl = default_worker_ttl @@ -267,8 +276,13 @@ class Worker: self.serializer = resolve_serializer(serializer) queues = [ - self.queue_class(name=q, connection=connection, job_class=self.job_class, - serializer=self.serializer, death_penalty_class=self.death_penalty_class,) + self.queue_class( + name=q, + connection=connection, + job_class=self.job_class, + serializer=self.serializer, + death_penalty_class=self.death_penalty_class, + ) if isinstance(q, str) else q for q in ensure_list(queues) @@ -706,7 +720,7 @@ class Worker: return if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN: pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] return if self._dequeue_strategy == DequeueStrategy.RANDOM: shuffle(self._ordered_queues) @@ -716,7 +730,7 @@ class Worker: self, logging_level: str = "INFO", date_format: str = DEFAULT_LOGGING_DATE_FORMAT, - log_format: str = DEFAULT_LOGGING_FORMAT + log_format: str = DEFAULT_LOGGING_FORMAT, ): """Bootstraps the worker. Runs the basic tasks that should run when the worker actually starts working. @@ -772,6 +786,12 @@ class Worker: else: self.scheduler.start() + def wait_for_processing_unit(self): + """Wait for a processing unit to be available. + This is used to limit the number of jobs that can be processed at the same time. + """ + return + def work( self, burst: bool = False, @@ -781,7 +801,7 @@ class Worker: max_jobs: Optional[int] = None, max_idle_time: Optional[int] = None, with_scheduler: bool = False, - dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT + dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT, ) -> bool: """Starts the work loop. @@ -814,6 +834,12 @@ class Worker: self._install_signal_handlers() try: while True: + self.wait_for_processing_unit() + + if max_jobs is not None: + if completed_jobs >= max_jobs: + self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs) + break try: self.check_for_suspension(burst) @@ -836,12 +862,7 @@ class Worker: job, queue = result self.execute_job(job, queue) self.heartbeat() - completed_jobs += 1 - if max_jobs is not None: - if completed_jobs >= max_jobs: - self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs) - break except redis.exceptions.TimeoutError: self.log.error('Worker %s: Redis connection timeout, quitting...', self.key) @@ -881,7 +902,9 @@ class Worker: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']: + def dequeue_job_and_maintain_ttl( + self, timeout: Optional[int], max_idle_time: Optional[int] = None + ) -> Tuple['Job', 'Queue']: """Dequeues a job while maintaining the TTL. Returns: @@ -1167,10 +1190,7 @@ class Worker: self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string) self.handle_work_horse_killed(job, retpid, ret_val, rusage) - self.handle_job_failure( - job, queue=queue, - exc_string=exc_string - ) + self.handle_job_failure(job, queue=queue, exc_string=exc_string) def execute_job(self, job: 'Job', queue: 'Queue'): """Spawns a work horse to perform the actual work and passes it a job. @@ -1458,9 +1478,7 @@ class Worker: extra.update({'queue': job.origin, 'job_id': job.id}) # func_name - self.log.error( - '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra - ) + self.log.error('[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1598,7 +1616,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] class RandomWorker(Worker):