diff --git a/rq/queue.py b/rq/queue.py index bde1ceb6..3c483e8f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -3,11 +3,11 @@ import sys import traceback import uuid import warnings - from collections import namedtuple from datetime import datetime, timezone, timedelta from functools import total_ordering from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Type, Union + from redis import WatchError if TYPE_CHECKING: @@ -24,7 +24,6 @@ from .types import FunctionReferenceType, JobDependencyType from .serializers import resolve_serializer from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow, compact - green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') @@ -69,7 +68,7 @@ class Queue: @classmethod def all( - cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None + cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None ) -> List['Queue']: """Returns an iterable of all Queues. @@ -94,11 +93,11 @@ class Queue: @classmethod def from_queue_key( - cls, - queue_key: str, - connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, - serializer: Any = None, + cls, + queue_key: str, + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their @@ -119,18 +118,18 @@ class Queue: prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) - name = queue_key[len(prefix) :] + name = queue_key[len(prefix):] return cls(name, connection=connection, job_class=job_class, serializer=serializer) def __init__( - self, - name: str = 'default', - default_timeout: Optional[int] = None, - connection: Optional['Redis'] = None, - is_async: bool = True, - job_class: Union[str, Type['Job'], None] = None, - serializer: Any = None, - **kwargs, + self, + name: str = 'default', + default_timeout: Optional[int] = None, + connection: Optional['Redis'] = None, + is_async: bool = True, + job_class: Union[str, Type['Job'], None] = None, + serializer: Any = None, + **kwargs, ): """Initializes a Queue object. @@ -196,6 +195,12 @@ class Queue: """Redis key used to indicate this queue has been cleaned.""" return 'rq:clean_registries:%s' % self.name + @property + def scheduler_pid(self) -> int: + from rq.scheduler import RQScheduler + pid = self.connection.get(RQScheduler.get_locking_key(self.name)) + return int(pid.decode()) if pid is not None else None + def acquire_cleaning_lock(self) -> bool: """Returns a boolean indicating whether a lock to clean this queue is acquired. A lock expires in 899 seconds (15 minutes - 1 second) @@ -453,23 +458,23 @@ class Queue: self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") def create_job( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - meta: Optional[Dict] = None, - status: JobStatus = JobStatus.QUEUED, - retry: Optional['Retry'] = None, - *, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + meta: Optional[Dict] = None, + status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, + *, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> Job: """Creates a job based on parameters given @@ -595,23 +600,23 @@ class Queue: return job def enqueue_call( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, - pipeline: Optional['Pipeline'] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, + pipeline: Optional['Pipeline'] = None, ) -> Job: """Creates a job to represent the delayed function call and enqueues it. @@ -667,20 +672,20 @@ class Queue: @staticmethod def prepare_data( - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -771,7 +776,7 @@ class Queue: Job: _description_ """ job.perform() - result_ttl = job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL) + result_ttl = job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL) with self.connection.pipeline() as pipeline: job._handle_success(result_ttl=result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) @@ -1043,7 +1048,7 @@ class Queue: return job def enqueue_dependents( - self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None + self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None ): """Enqueues all jobs in the given job's dependents set and clears it. @@ -1081,7 +1086,7 @@ class Queue: dependent_job_ids, connection=self.connection, serializer=self.serializer ) if dependent_job - and dependent_job.dependencies_are_met( + and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, exclude_job_id=exclude_job_id, @@ -1181,12 +1186,12 @@ class Queue: @classmethod def dequeue_any( - cls, - queues: List['Queue'], - timeout: int, - connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, - serializer: Any = None, + cls, + queues: List['Queue'], + timeout: int, + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. diff --git a/rq/scheduler.py b/rq/scheduler.py index da59b0d5..6dfd3a41 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -35,14 +35,14 @@ class RQScheduler: Status = SchedulerStatus def __init__( - self, - queues, - connection, - interval=1, - logging_level=logging.INFO, - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, - serializer=None, + self, + queues, + connection, + interval=1, + logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + serializer=None, ): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index c6683e4c..a907ff57 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,9 +1,10 @@ import os from datetime import datetime, timedelta, timezone from multiprocessing import Process - from unittest import mock + from rq import Queue +from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from rq.exceptions import NoSuchJobError from rq.job import Job, Retry from rq.registry import FinishedJobRegistry, ScheduledJobRegistry @@ -11,10 +12,7 @@ from rq.scheduler import RQScheduler from rq.serializers import JSONSerializer from rq.utils import current_timestamp from rq.worker import Worker -from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL - from tests import RQTestCase, find_empty_redis_database, ssl_test - from .fixtures import kill_worker, say_hello @@ -140,7 +138,7 @@ class TestScheduler(RQTestCase): # scheduler.should_reacquire_locks always returns False if # scheduler.acquired_locks and scheduler._queue_names are the same self.assertFalse(scheduler.should_reacquire_locks) - scheduler.lock_acquisition_time = datetime.now() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL+6) + scheduler.lock_acquisition_time = datetime.now() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 6) self.assertFalse(scheduler.should_reacquire_locks) scheduler._queue_names = set(['default', 'foo']) @@ -196,6 +194,12 @@ class TestScheduler(RQTestCase): self.assertEqual(mocked.call_count, 1) self.assertEqual(stopped_process.is_alive.call_count, 1) + def test_queue_scheduler_pid(self): + queue = Queue(connection=self.testconn) + scheduler = RQScheduler([queue, ], connection=self.testconn) + scheduler.acquire_locks() + assert queue.scheduler_pid == os.getpid() + def test_heartbeat(self): """Test that heartbeat updates locking keys TTL""" name_1 = 'lock-test-1'