diff --git a/dev-requirements.txt b/dev-requirements.txt index a002971b..d16aa411 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -3,3 +3,5 @@ psutil pytest pytest-cov sentry-sdk +redis +click diff --git a/rq/command.py b/rq/command.py index 7ade232c..b98082ce 100644 --- a/rq/command.py +++ b/rq/command.py @@ -1,9 +1,10 @@ import json import os import signal -import typing as t -if t.TYPE_CHECKING: +from typing import TYPE_CHECKING, Dict, Any + +if TYPE_CHECKING: from redis import Redis from .worker import Worker @@ -28,7 +29,7 @@ def send_command(connection: 'Redis', worker_name: str, command, **kwargs): connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload)) -def parse_payload(payload: t.Dict[t.Any, t.Any]) -> t.Dict[t.Any, t.Any]: +def parse_payload(payload: Dict[Any, Any]) -> Dict[Any, Any]: """ Returns a dict of command data @@ -75,12 +76,12 @@ def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None): send_command(connection, job.worker_name, 'stop-job', job_id=job_id) -def handle_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): +def handle_command(worker: 'Worker', payload: Dict[Any, Any]): """Parses payload and routes commands Args: worker (Worker): The worker to use - payload (t.Dict[t.Any, t.Any]): The Payload + payload (Dict[Any, Any]): The Payload """ if payload['command'] == 'stop-job': handle_stop_job_command(worker, payload) @@ -101,13 +102,13 @@ def handle_shutdown_command(worker: 'Worker'): os.kill(pid, signal.SIGINT) -def handle_kill_worker_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): +def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]): """ Stops work horse Args: worker (Worker): The worker to stop - payload (t.Dict[t.Any, t.Any]): The payload. + payload (Dict[Any, Any]): The payload. """ worker.log.info('Received kill horse command.') @@ -118,12 +119,12 @@ def handle_kill_worker_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): worker.log.info('Worker is not working, kill horse command ignored') -def handle_stop_job_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): +def handle_stop_job_command(worker: 'Worker', payload: Dict[Any, Any]): """Handles stop job command. Args: worker (Worker): The worker to use - payload (t.Dict[t.Any, t.Any]): The payload. + payload (Dict[Any, Any]): The payload. """ job_id = payload.get('job_id') worker.log.debug('Received command to stop job %s', job_id) diff --git a/rq/connections.py b/rq/connections.py index b37e7460..fd88099a 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,5 +1,6 @@ from contextlib import contextmanager import typing as t +import warnings from redis import Redis from .local import LocalStack, release_local @@ -11,6 +12,21 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection: t.Optional['Redis'] = None): # noqa + """The context manager for handling connections in a clean way. + It will push the connection to the LocalStack, and pop the connection + when leaving the context + Example: + ..codeblock:python:: + + with Connection(): + w = Worker() + w.work() + + Args: + connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None. + """ + warnings.warn("The Conneciton context manager is deprecated. Use the `connection` parameter instead.", + DeprecationWarning) if connection is None: connection = Redis() push_connection(connection) @@ -33,9 +49,12 @@ def push_connection(redis: 'Redis'): _connection_stack.push(redis) -def pop_connection(): +def pop_connection() -> 'Redis': """ Pops the topmost connection from the stack. + + Returns: + redis (Redis): A Redis connection """ return _connection_stack.pop() @@ -57,10 +76,13 @@ def use_connection(redis: t.Optional['Redis'] = None): push_connection(redis) -def get_current_connection(): +def get_current_connection() -> 'Redis': """ Returns the current Redis connection (i.e. the topmost on the connection stack). + + Returns: + Redis: A Redis Connection """ return _connection_stack.top diff --git a/rq/decorators.py b/rq/decorators.py index 3c8dc839..70a61aad 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,7 +1,7 @@ from functools import wraps -import typing as t +from typing import TYPE_CHECKING, Callable, Dict, Optional, List, Any, Union -if t.TYPE_CHECKING: +if TYPE_CHECKING: from redis import Redis from .job import Retry @@ -13,13 +13,13 @@ from .utils import backend_class class job: # noqa queue_class = Queue - def __init__(self, queue: 'Queue', connection: t.Optional['Redis'] = None, timeout=None, - result_ttl=DEFAULT_RESULT_TTL, ttl=None, - queue_class=None, depends_on: t.Optional[t.List[t.Any]] = None, at_front: t.Optional[bool] = None, - meta=None, description=None, failure_ttl=None, retry: t.Optional['Retry'] = None, on_failure=None, - on_success=None): - """ - A decorator that adds a ``delay`` method to the decorated function, + def __init__(self, queue: Union['Queue', str], connection: Optional['Redis'] = None, timeout: Optional[int] = None, + result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None, + queue_class: Optional['Queue'] = None, depends_on: Optional[List[Any]] = None, at_front: Optional[bool] = None, + meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None, + retry: Optional['Retry'] = None, on_failure: Optional[Callable[..., Any]] = None, + on_success: Optional[Callable[..., Any]] = None): + """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string denoting the queue name. For example:: @@ -32,6 +32,22 @@ class job: # noqa >>> ... >>> # Puts `simple_add` function into queue >>> simple_add.delay(1, 2) + + Args: + queue (Union['Queue', str]): The queue to use, can be the Queue class itself, or the queue name (str) + connection (Optional[Redis], optional): Redis Connection. Defaults to None. + timeout (Optional[int], optional): Job timeout. Defaults to None. + result_ttl (int, optional): Result time to live. Defaults to DEFAULT_RESULT_TTL. + ttl (Optional[int], optional): Time to live. Defaults to None. + queue_class (Optional[Queue], optional): A custom class that inherits from `Queue`. Defaults to None. + depends_on (Optional[List[Any]], optional): A list of dependents jobs. Defaults to None. + at_front (Optional[bool], optional): Whether to enqueue the job at front of the queue. Defaults to None. + meta (Optional[Dict[Any, Any]], optional): Arbitraty metadata about the job. Defaults to None. + description (Optional[str], optional): Job description. Defaults to None. + failure_ttl (Optional[int], optional): Failture time to live. Defaults to None. + retry (Optional[Retry], optional): A Retry object. Defaults to None. + on_failure (Optional[Callable[..., Any]], optional): Callable to run on failure. Defaults to None. + on_success (Optional[Callable[..., Any]], optional): Callable to run on success. Defaults to None. """ self.queue = queue self.queue_class = backend_class(self, 'queue_class', override=queue_class) diff --git a/rq/defaults.py b/rq/defaults.py index bb7ec791..ef766787 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -1,14 +1,91 @@ DEFAULT_JOB_CLASS = 'rq.job.Job' +""" The path for the default Job class to use. +Defaults to the main `Job` class within the `rq.job` module +""" + + DEFAULT_QUEUE_CLASS = 'rq.Queue' +""" The path for the default Queue class to use. +Defaults to the main `Queue` class within the `rq.queue` module +""" + + DEFAULT_WORKER_CLASS = 'rq.Worker' +""" The path for the default Worker class to use. +Defaults to the main `Worker` class within the `rq.worker` module +""" + + DEFAULT_SERIALIZER_CLASS = 'rq.serializers.DefaultSerializer' +""" The path for the default Serializer class to use. +Defaults to the main `DefaultSerializer` class within the `rq.serializers` module +""" + + DEFAULT_CONNECTION_CLASS = 'redis.Redis' +""" The path for the default Redis client class to use. +Defaults to the main `Redis` class within the `redis` module +As imported like `from redis import Redis` +""" + + DEFAULT_WORKER_TTL = 420 +""" The default Time To Live (TTL) for the Worker in seconds +Defines the effective timeout period for a worker +""" + + DEFAULT_JOB_MONITORING_INTERVAL = 30 +""" The interval in seconds for Job monitoring +""" + + DEFAULT_RESULT_TTL = 500 -DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds -DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' +""" The Time To Live (TTL) in seconds to keep job results +Means that the results will be expired from Redis +after `DEFAULT_RESULT_TTL` seconds +""" + + +DEFAULT_FAILURE_TTL = 31536000 +""" The Time To Live (TTL) in seconds to keep job failure information +Means that the failure information will be expired from Redis +after `DEFAULT_FAILURE_TTL` seconds. +Defaults to 1 YEAR in seconds +""" + + DEFAULT_SCHEDULER_FALLBACK_PERIOD = 120 -DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' +""" The amount in seconds it will take for a new scheduler +to pickup tasks after a scheduler has died. +This is used as a safety net to avoid race conditions and duplicates +when using multiple schedulers +""" + + DEFAULT_MAINTENANCE_TASK_INTERVAL = 10 * 60 +""" The interval to run maintenance tasks +in seconds. Defaults to 10 minutes. +""" + + CALLBACK_TIMEOUT = 60 +""" The timeout period in seconds for Callback functions +Means that Functions used in `success_callback` and `failure_callback` +will timeout after N seconds +""" + + +DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' +""" The Date Format to use for RQ logging. +Defaults to Hour:Minute:Seconds on 24hour format +eg.: `15:45:23` +""" + + +DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' +""" The default Logging Format to use +Uses Python's default attributes as defined +https://docs.python.org/3/library/logging.html#logrecord-attributes +""" + diff --git a/rq/job.py b/rq/job.py index c41c17d0..cfd74188 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,40 +1,37 @@ import inspect import json -import pickle import warnings import zlib -import typing as t import asyncio from collections.abc import Iterable from datetime import datetime, timedelta, timezone from enum import Enum -from functools import partial from redis import WatchError -from typing import Any, List, Optional +from typing import (TYPE_CHECKING, Any, Callable, Dict, Iterable, List, + Optional, Tuple, Union) from uuid import uuid4 -if t.TYPE_CHECKING: + +if TYPE_CHECKING: from .results import Result from .queue import Queue from redis import Redis from redis.client import Pipeline from .connections import resolve_connection -from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError +from .exceptions import (DeserializationError, InvalidJobOperation, + NoSuchJobError) from .local import LocalStack from .serializers import resolve_serializer -from .utils import (get_version, import_attribute, parse_timeout, str_to_date, - utcformat, utcnow, ensure_list, get_call_string, as_text, - decode_redis_hash) - -# Serialize pickle dumps using the highest pickle protocol (binary, default -# uses ascii) -dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) -loads = pickle.loads +from .types import FunctionReferenceType, JobDependencyType +from .utils import (as_text, decode_redis_hash, ensure_list, get_call_string, + get_version, import_attribute, parse_timeout, str_to_date, + utcformat, utcnow) class JobStatus(str, Enum): + """The Status of Job within its lifecycle at any given time. """ QUEUED = 'queued' FINISHED = 'finished' FAILED = 'failed' @@ -46,7 +43,21 @@ class JobStatus(str, Enum): class Dependency: - def __init__(self, jobs: t.List[t.Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False): + def __init__(self, jobs: List[Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False): + """The definition of a Dependency. + + Args: + jobs (List[Union[Job, str]]): A list of Job instances or Job IDs. + Anything different will raise a ValueError + allow_failure (bool, optional): Whether to allow for failure when running the depency, + meaning, the dependencies should continue running even after one of them failed. + Defaults to False. + enqueue_at_front (bool, optional): Whether this dependecy should be enqueued at the front of the queue. + Defaults to False. + + Raises: + ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty + """ dependent_jobs = ensure_list(jobs) if not all( isinstance(job, Job) or isinstance(job, str) @@ -62,47 +73,119 @@ class Dependency: self.enqueue_at_front = enqueue_at_front -# Sentinel value to mark that some of our lazily evaluated properties have not -# yet been evaluated. UNEVALUATED = object() +"""Sentinel value to mark that some of our lazily evaluated properties have not +yet been evaluated. +""" -def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False): - """Cancels the job with the given job ID, preventing execution. Discards - any job info (i.e. it can't be requeued later). +def cancel_job(job_id: str, connection: Optional['Redis'] = None, + serializer=None, enqueue_dependents: bool = False): + """Cancels the job with the given job ID, preventing execution. + Use with caution. This will discard any job info (i.e. it can't be requeued later). + + Args: + job_id (str): The Job ID + connection (Optional[Redis], optional): The Redis Connection. Defaults to None. + serializer (str, optional): The string of the path to the serializer to use. Defaults to None. + enqueue_dependents (bool, optional): Whether dependents should still be enqueued. Defaults to False. """ Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents) -def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None): - """Returns the Job instance that is currently being executed. If this - function is invoked from outside a job context, None is returned. - """ +def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None) -> Optional['Job']: + """Returns the Job instance that is currently being executed. + If this function is invoked from outside a job context, None is returned. + + Args: + connection (Optional[Redis], optional): The connection to use. Defaults to None. + job_class (Optional[Job], optional): The job class (DEPRECATED). Defaults to None. + + Returns: + job (Optional[Job]): The current Job running + """ + if connection: + warnings.warn("connection argument for get_current_job is deprecated.", + DeprecationWarning) if job_class: warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning) return _job_stack.top -def requeue_job(job_id: str, connection: 'Redis', serializer=None): +def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job': + """Fetches a Job by ID and requeues it using the `requeue()` method. + + Args: + job_id (str): The Job ID that should be requeued. + connection (Redis): The Redis Connection to use + serializer (Optional[str], optional): The serializer. Defaults to None. + + Returns: + Job: The requeued Job object. + """ job = Job.fetch(job_id, connection=connection, serializer=serializer) return job.requeue() class Job: - """A Job is just a convenient datastructure to pass around job (meta) data. - """ + """A Job is just a convenient datastructure to pass around job (meta) data.""" redis_job_namespace_prefix = 'rq:job:' - # Job construction @classmethod - def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: Optional['Redis'] = None, - result_ttl=None, ttl=None, status: JobStatus = None, description=None, - depends_on=None, timeout=None, id=None, origin=None, meta=None, - failure_ttl=None, serializer=None, *, on_success=None, on_failure=None) -> 'Job': + def create(cls, func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None, + kwargs: Optional[Dict[str, Any]] = None, connection: Optional['Redis'] = None, + result_ttl: Optional[int] = None, ttl: Optional[int] = None, + status: Optional[JobStatus] = None, description: Optional[str] =None, + depends_on: Optional[JobDependencyType] = None, + timeout: Optional[int] = None, id: Optional[str] = None, + origin=None, meta: Optional[Dict[str, Any]] = None, + failure_ttl: Optional[int] = None, serializer=None, *, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. - """ + + Args: + func (FunctionReference): The function/method/callable for the Job. This can be + a reference to a concrete callable or a string representing the path of function/method to be + imported. Effectively this is the only required attribute when creating a new Job. + args (Union[List[Any], Optional[Tuple]], optional): A Tuple / List of positional arguments to pass the callable. + Defaults to None, meaning no args being passed. + kwargs (Optional[Dict], optional): A Dictionary of keyword arguments to pass the callable. + Defaults to None, meaning no kwargs being passed. + connection (Optional[Redis], optional): The Redis connection to use. Defaults to None. + This will be "resolved" using the `resolve_connection` function when initialzing the Job Class. + result_ttl (Optional[int], optional): The amount of time in seconds the results should live. + Defaults to None. + ttl (Optional[int], optional): The Time To Live (TTL) for the job itself. Defaults to None. + status (JobStatus, optional): The Job Status. Defaults to None. + description (Optional[str], optional): The Job Description. Defaults to None. + depends_on (Union['Dependency', List[Union['Dependency', 'Job']]], optional): What the jobs depends on. + This accepts a variaty of different arguments including a `Dependency`, a list of `Dependency` or a `Job` + list of `Job`. Defaults to None. + timeout (Optional[int], optional): The amount of time in seconds that should be a hardlimit for a job execution. Defaults to None. + id (Optional[str], optional): An Optional ID (str) for the Job. Defaults to None. + origin (Optional[str], optional): The queue of origin. Defaults to None. + meta (Optional[Dict[str, Any]], optional): Custom metadata about the job, takes a dictioanry. Defaults to None. + failure_ttl (Optional[int], optional): THe time to live in seconds for failed-jobs information. Defaults to None. + serializer (Optional[str], optional): The serializer class path to use. Should be a string with the import + path for the serializer to use. eg. `mymodule.myfile.MySerializer` Defaults to None. + on_success (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run + when/if the Job finishes sucessfully. Defaults to None. + on_failure (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run + when/if the Job fails. Defaults to None. + + Raises: + TypeError: If `args` is not a tuple/list + TypeError: If `kwargs` is not a dict + TypeError: If the `func` is something other than a string or a Callable reference + ValueError: If `on_failure` is not a function + ValueError: If `on_success` is not a function + + Returns: + Job: A job instance. + """ if args is None: args = () if kwargs is None: @@ -171,25 +254,51 @@ class Job: return job - def get_position(self): + def get_position(self) -> Optional[int]: + """Get's the job's position on the queue + + Returns: + position (Optional[int]): The position + """ from .queue import Queue if self.origin: q = Queue(name=self.origin, connection=self.connection) return q.get_job_position(self._id) return None - def get_status(self, refresh: bool = True) -> str: + def get_status(self, refresh: bool = True) -> JobStatus: + """Gets the Job Status + + Args: + refresh (bool, optional): Whether to refresh the Job. Defaults to True. + + Returns: + status (JobStatus): The Job Status + """ if refresh: self._status = as_text(self.connection.hget(self.key, 'status')) - return self._status - def set_status(self, status: str, pipeline: Optional['Pipeline'] = None): + def set_status(self, status: JobStatus, pipeline: Optional['Pipeline'] = None) -> None: + """Set's the Job Status + + Args: + status (JobStatus): The Job Status to be set + pipeline (Optional[Pipeline], optional): Optional Redis Pipeline to use. Defaults to None. + """ self._status = status connection: 'Redis' = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) - def get_meta(self, refresh: bool = True): + def get_meta(self, refresh: bool = True) -> Dict: + """Get's the metadata for a Job, an arbitrary dictionary. + + Args: + refresh (bool, optional): Whether to refresh. Defaults to True. + + Returns: + meta (Dict): The dictionary of metadata + """ if refresh: meta = self.connection.hget(self.key, 'meta') self.meta = self.serializer.loads(meta) if meta else {} @@ -231,7 +340,7 @@ class Job: @property def _dependency_id(self): """Returns the first item in self._dependency_ids. Present to - preserve compatibility with third party packages.. + preserve compatibility with third party packages. """ if self._dependency_ids: return self._dependency_ids[0] @@ -250,7 +359,7 @@ class Job: return job @property - def dependent_ids(self) -> t.List[str]: + def dependent_ids(self) -> List[str]: """Returns a list of ids of jobs whose execution depends on this job's successful execution.""" return list(map(as_text, self.connection.smembers(self.dependents_key))) @@ -287,10 +396,15 @@ class Job: return self._failure_callback def _deserialize_data(self): + """Deserializes the Job `data` into a tuple. + This includes the `_func_name`, `_instance`, `_args` and `_kwargs` + + Raises: + DeserializationError: Cathes any deserialization error (since serializers are generic) + """ try: self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data) except Exception as e: - # catch anything because serializers are generic raise DeserializationError() from e @property @@ -365,45 +479,71 @@ class Job: self._data = UNEVALUATED @classmethod - def exists(cls, job_id: str, connection: Optional['Redis'] = None) -> int: - """Returns whether a job hash exists for the given job ID.""" + def exists(cls, job_id: str, connection: Optional['Redis'] = None) -> bool: + """Checks whether a Job Hash exists for the given Job ID + + Args: + job_id (str): The Job ID + connection (Optional[Redis], optional): Optional connection to use. Defaults to None. + + Returns: + job_exists (bool): Whether the Job exists + """ conn = resolve_connection(connection) - return conn.exists(cls.key_for(job_id)) + job_key = cls.key_for(job_id) + job_exists = conn.exists(job_key) + return bool(job_exists) @classmethod def fetch(cls, id: str, connection: Optional['Redis'] = None, serializer=None) -> 'Job': - """Fetches a persisted job from its corresponding Redis key and - instantiates it. + """Fetches a persisted Job from its corresponding Redis key and instantiates it + + Args: + id (str): The Job to fetch + connection (Optional['Redis'], optional): An optional Redis connection. Defaults to None. + serializer (_type_, optional): The serializer to use. Defaults to None. + + Returns: + Job: The Job instance """ job = cls(id, connection=connection, serializer=serializer) job.refresh() return job @classmethod - def fetch_many(cls, job_ids: t.Iterable[str], connection: 'Redis', serializer=None): + def fetch_many(cls, job_ids: Iterable[str], connection: 'Redis', serializer=None) -> List['Job']: """ Bulk version of Job.fetch For any job_ids which a job does not exist, the corresponding item in the returned list will be None. + + Args: + job_ids (Iterable[str]): A list of job ids. + connection (Redis): Redis connection + serializer (Callable): A serializer + + Returns: + jobs (list[Job]): A list of Jobs instances. """ with connection.pipeline() as pipeline: for job_id in job_ids: pipeline.hgetall(cls.key_for(job_id)) results = pipeline.execute() - jobs: t.List[Optional['Job']] = [] + jobs: List[Optional['Job']] = [] for i, job_id in enumerate(job_ids): - if results[i]: - job = cls(job_id, connection=connection, serializer=serializer) - job.restore(results[i]) - jobs.append(job) - else: + if not results[i]: jobs.append(None) + continue + + job = cls(job_id, connection=connection, serializer=serializer) + job.restore(results[i]) + jobs.append(job) return jobs - def __init__(self, id: str = None, connection: Optional['Redis'] = None, serializer=None): + def __init__(self, id: Optional[str] = None, connection: Optional['Redis'] = None, serializer=None): self.connection = resolve_connection(connection) self._id = id self.created_at = utcnow() @@ -429,12 +569,12 @@ class Job: self.ttl: Optional[int] = None self.worker_name: Optional[str] = None self._status = None - self._dependency_ids: t.List[str] = [] + self._dependency_ids: List[str] = [] self.meta = {} self.serializer = resolve_serializer(serializer) self.retries_left = None - self.retry_intervals: Optional[t.List[int]] = None - self.redis_server_version = None + self.retry_intervals: Optional[List[int]] = None + self.redis_server_version: Optional[Tuple[int, int, int]] = None self.last_heartbeat: Optional[datetime] = None self.allow_dependency_failures: Optional[bool] = None self.enqueue_at_front: Optional[bool] = None @@ -459,21 +599,38 @@ class Job: return hash(self.id) # Data access - def get_id(self): # noqa + def get_id(self) -> str: # noqa """The job ID for this job instance. Generates an ID lazily the first time the ID is requested. + + Returns: + job_id (str): The Job ID """ if self._id is None: self._id = str(uuid4()) return self._id - def set_id(self, value: str): - """Sets a job ID for the given job.""" + def set_id(self, value: str) -> None: + """Sets a job ID for the given job + + Args: + value (str): The value to set as Job ID + """ if not isinstance(value, str): raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value def heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False): + """Sets the heartbeat for a job. + It will set a hash in Redis with the `last_heartbeat` key and datetime value. + If a Redis' pipeline is passed, it will use that, else, it will use the job's own connection. + + Args: + timestamp (datetime): The timestamp to use + ttl (int): The time to live + pipeline (Optional[Pipeline], optional): Can receive a Redis' pipeline to use. Defaults to None. + xx (bool, optional): Only sets the key if already exists. Defaults to False. + """ self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) @@ -482,13 +639,27 @@ class Job: id = property(get_id, set_id) @classmethod - def key_for(cls, job_id: str): - """The Redis key that is used to store job hash under.""" + def key_for(cls, job_id: str) -> bytes: + """The Redis key that is used to store job hash under. + + Args: + job_id (str): The Job ID + + Returns: + redis_job_key (bytes): The Redis fully qualified key for the job + """ return (cls.redis_job_namespace_prefix + job_id).encode('utf-8') @classmethod - def dependents_key_for(cls, job_id: str): - """The Redis key that is used to store job dependents hash under.""" + def dependents_key_for(cls, job_id: str) -> str: + """The Redis key that is used to store job dependents hash under. + + Args: + job_id (str): The "parent" job id + + Returns: + dependents_key (str): The dependents key + """ return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id) @property @@ -505,25 +676,29 @@ class Job: def dependencies_key(self): return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id) - def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None): - """ - Fetch all of a job's dependencies. If a pipeline is supplied, and + def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None) -> List['Job']: + """Fetch all of a job's dependencies. If a pipeline is supplied, and watch is true, then set WATCH on all the keys of all dependencies. Returned jobs will use self's connection, not the pipeline supplied. If a job has been deleted from redis, it is not returned. - """ + + Args: + watch (bool, optional): Wether to WATCH the keys. Defaults to False. + pipeline (Optional[Pipeline]): The Redis' pipeline to use. Defaults to None. + + Returns: + jobs (list[Job]): A list of Jobs + """ connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids]) - jobs = [job - for job in self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer) - if job] - + dependencies_list = self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer) + jobs = [job for job in dependencies_list if job] return jobs @property @@ -545,18 +720,29 @@ class Job: return self._exc_info - def return_value(self, refresh=False) -> Any: - """Returns the return value of the latest execution, if it was successful""" + def return_value(self, refresh: bool = False) -> Optional[Any]: + """Returns the return value of the latest execution, if it was successful + + Args: + refresh (bool, optional): Whether to refresh the current status. Defaults to False. + + Returns: + result (Optional[Any]): The job return value. + """ from .results import Result if refresh: self._cached_result = None - if self.supports_redis_streams: - if not self._cached_result: - self._cached_result = self.latest_result() + if not self.supports_redis_streams: + return None - if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL: - return self._cached_result.return_value + if not self._cached_result: + self._cached_result = self.latest_result() + + if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL: + return self._cached_result.return_value + + return None @property def result(self) -> Any: @@ -597,17 +783,33 @@ class Job: return self._result def results(self) -> List['Result']: - """Returns all Result objects""" + """Returns all Result objects + + Returns: + all_results (List[Result]): A list of 'Result' objects + """ from .results import Result return Result.all(self, serializer=self.serializer) def latest_result(self) -> Optional['Result']: + """Get the latest job result. + + Returns: + result (Result): The Result object + """ """Returns the latest Result object""" from .results import Result return Result.fetch_latest(self, serializer=self.serializer) - def restore(self, raw_data): - """Overwrite properties with the provided values stored in Redis""" + def restore(self, raw_data) -> Any: + """Overwrite properties with the provided values stored in Redis. + + Args: + raw_data (_type_): The raw data to load the job data from + + Raises: + NoSuchJobError: If there way an error getting the job data + """ obj = decode_redis_hash(raw_data) try: raw_data = obj['data'] @@ -680,11 +882,17 @@ class Job: self.restore(data) def to_dict(self, include_meta: bool = True, include_result: bool = True) -> dict: - """ - Returns a serialization of the current job instance + """Returns a serialization of the current job instance You can exclude serializing the `meta` dictionary by setting `include_meta=False`. + + Args: + include_meta (bool, optional): Whether to include the Job's metadata. Defaults to True. + include_result (bool, optional): Whether to include the Job's result. Defaults to True. + + Returns: + dict: The Job serialized as a dictionary """ obj = { 'created_at': utcformat(self.created_at or utcnow()), @@ -741,15 +949,19 @@ class Job: return obj def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, - include_result=True): - """ - Dumps the current job instance to its corresponding Redis key. + include_result: bool = True): + """Dumps the current job instance to its corresponding Redis key. Exclude saving the `meta` dictionary by setting `include_meta=False`. This is useful to prevent clobbering user metadata without an expensive `refresh()` call first. Redis key persistence may be altered by `cleanup()` method. + + Args: + pipeline (Optional[Pipeline], optional): The Redis' pipeline to use. Defaults to None. + include_meta (bool, optional): Whether to include the job's metadata. Defaults to True. + include_result (bool, optional): Whether to include the job's result. Defaults to True. """ key = self.key connection = pipeline if pipeline is not None else self.connection @@ -766,9 +978,13 @@ class Job: """Only supported by Redis server >= 5.0 is required.""" return self.get_redis_server_version() >= (5, 0, 0) - def get_redis_server_version(self): - """Return Redis server version of connection""" - if not self.redis_server_version: + def get_redis_server_version(self) -> Tuple[int, int, int]: + """Return Redis server version of connection + + Returns: + redis_server_version (Tuple[int, int, int]): The Redis version within a Tuple of integers, eg (5, 0, 9) + """ + if self.redis_server_version is None: self.redis_server_version = get_version(self.connection) return self.redis_server_version @@ -788,6 +1004,13 @@ class Job: You can enqueue the jobs dependents optionally, Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in. + + Args: + pipeline (Optional[Pipeline], optional): The Redis' pipeline to use. Defaults to None. + enqueue_dependents (bool, optional): Whether to enqueue dependents jobs. Defaults to False. + + Raises: + InvalidJobOperation: If the job has already been cancelled. """ if self.is_canceled: raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) @@ -834,8 +1057,15 @@ class Job: # handle it raise - def requeue(self, at_front: bool = False): - """Requeues job.""" + def requeue(self, at_front: bool = False) -> 'Job': + """Requeues job + + Args: + at_front (bool, optional): Whether the job should be requeued at the front of the queue. Defaults to False. + + Returns: + job (Job): The requeued Job instance + """ return self.failed_job_registry.requeue(self, at_front=at_front) def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): @@ -888,10 +1118,15 @@ class Job: registry.remove(self, pipeline=pipeline) def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, - delete_dependents=False): + delete_dependents: bool = False): """Cancels the job and deletes the job hash from Redis. Jobs depending - on this job can optionally be deleted as well.""" + on this job can optionally be deleted as well. + Args: + pipeline (Optional[Pipeline], optional): Redis' piepline. Defaults to None. + remove_from_queue (bool, optional): Whether the job should be removed from the queue. Defaults to True. + delete_dependents (bool, optional): Whether job dependents should also be deleted. Defaults to False. + """ connection = pipeline if pipeline is not None else self.connection self._remove_from_registries(pipeline=pipeline, remove_from_queue=remove_from_queue) @@ -902,21 +1137,29 @@ class Job: connection.delete(self.key, self.dependents_key, self.dependencies_key) def delete_dependents(self, pipeline: Optional['Pipeline'] = None): - """Delete jobs depending on this job.""" + """Delete jobs depending on this job. + + Args: + pipeline (Optional[Pipeline], optional): Redis' piepline. Defaults to None. + """ connection = pipeline if pipeline is not None else self.connection for dependent_id in self.dependent_ids: try: job = Job.fetch(dependent_id, connection=self.connection, serializer=self.serializer) - job.delete(pipeline=pipeline, - remove_from_queue=False) + job.delete(pipeline=pipeline, remove_from_queue=False) except NoSuchJobError: # It could be that the dependent job was never saved to redis pass connection.delete(self.dependents_key) # Job execution - def perform(self): # noqa - """Invokes the job function with the job arguments.""" + def perform(self) -> Any: # noqa + """The main execution method. Invokes the job function with the job arguments. + This is the method that actually performs the job - it's what its called by the worker. + + Returns: + result (Any): The job result + """ self.connection.persist(self.key) _job_stack.push(self) try: @@ -926,13 +1169,19 @@ class Job: return self._result def prepare_for_execution(self, worker_name: str, pipeline: 'Pipeline'): - """Set job metadata before execution begins""" + """Prepares the job for execution, setting the worker name, + heartbeat information, status and other metadata before execution begins. + + Args: + worker_name (str): The worker that will perform the job + pipeline (Pipeline): The Redis' piipeline to use + """ self.worker_name = worker_name self.last_heartbeat = utcnow() self.started_at = self.last_heartbeat self._status = JobStatus.STARTED mapping = { - 'last_heartbeat': utcformat(self.last_heartbeat), # type: ignore + 'last_heartbeat': utcformat(self.last_heartbeat), 'status': self._status, 'started_at': utcformat(self.started_at), # type: ignore 'worker_name': worker_name @@ -940,9 +1189,17 @@ class Job: if self.get_redis_server_version() >= (4, 0, 0): pipeline.hset(self.key, mapping=mapping) else: - pipeline.hmset(self.key, mapping) + pipeline.hmset(self.key, mapping=mapping) - def _execute(self): + def _execute(self) -> Any: + """Actually runs the function with it's *args and **kwargs. + It will use the `func` property, which was already resolved and ready to run at this point. + If the function is a coroutine (it's an async function/method), then the `result` + will have to be awaited within an event loop. + + Returns: + result (Any): The function result + """ result = self.func(*self.args, **self.kwargs) if asyncio.iscoroutine(result): loop = asyncio.new_event_loop() @@ -950,36 +1207,56 @@ class Job: return coro_result return result - def get_ttl(self, default_ttl: Optional[int] = None): + def get_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]: """Returns ttl for a job that determines how long a job will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. + + Args: + default_ttl (Optional[int]): The default time to live for the job + + Returns: + ttl (int): The time to live """ return default_ttl if self.ttl is None else self.ttl - def get_result_ttl(self, default_ttl: Optional[int] = None): + def get_result_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]: """Returns ttl for a job that determines how long a jobs result will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. + + Args: + default_ttl (Optional[int]): The default time to live for the job result + + Returns: + ttl (int): The time to live for the result """ return default_ttl if self.result_ttl is None else self.result_ttl # Representation - def get_call_string(self): # noqa + def get_call_string(self) -> Optional[str]: # noqa """Returns a string representation of the call, formatted as a regular Python function invocation statement. + + Returns: + call_repr (str): The string representation """ - return get_call_string(self.func_name, self.args, self.kwargs, max_length=75) + call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75) + return call_repr def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): - """Prepare job for eventual deletion (if needed). This method is usually - called after successful execution. How long we persist the job and its - result depends on the value of ttl: + """Prepare job for eventual deletion (if needed). + This method is usually called after successful execution. + How long we persist the job and its result depends on the value of ttl: - If ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If ttl is negative, don't set an expiry to it (persist - forever) + - If ttl is negative, don't set an expiry to it (persist forever) + + Args: + ttl (Optional[int], optional): Time to live. Defaults to None. + pipeline (Optional[Pipeline], optional): Redis' pipeline. Defaults to None. + remove_from_queue (bool, optional): Whether the job should be removed from the queue. Defaults to True. """ if ttl == 0: self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) @@ -1005,10 +1282,13 @@ class Job: job_class=self.__class__, serializer=self.serializer) - def get_retry_interval(self): + def get_retry_interval(self) -> int: """Returns the desired retry interval. If number of retries is bigger than length of intervals, the first value in the list will be used multiple times. + + Returns: + retry_interval (int): The desired retry interval """ if self.retry_intervals is None: return 0 @@ -1017,7 +1297,15 @@ class Job: return self.retry_intervals[index] def retry(self, queue: 'Queue', pipeline: 'Pipeline'): - """Requeue or schedule this job for execution""" + """Requeue or schedule this job for execution. + If the the `retry_interval` was set on the job itself, + it will calculate a scheduled time for the job to run, and instead + of just regularly `enqueing` the job, it will `schedule` it. + + Args: + queue (Queue): The queue to retry the job on + pipeline (Pipeline): The Redis' pipeline to use + """ retry_interval = self.get_retry_interval() self.retries_left = self.retries_left - 1 if retry_interval: @@ -1032,11 +1320,15 @@ class Job: depend on are successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something like: + ..codeblock:python:: rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} This method adds the job in its dependencies' dependents sets, and adds the job to DeferredJobRegistry. + + Args: + pipeline (Optional[Pipeline]): The Redis' pipeline. Defaults to None """ from .registry import DeferredJobRegistry @@ -1054,14 +1346,14 @@ class Job: connection.sadd(self.dependencies_key, dependency_id) @property - def dependency_ids(self): + def dependency_ids(self) -> List[bytes]: dependencies = self.connection.smembers(self.dependencies_key) return [Job.key_for(_id.decode()) for _id in dependencies] - def dependencies_are_met(self, parent_job: Optional['Job'] = None, - pipeline: Optional['Pipeline'] = None, exclude_job_id: str = None): - """Returns a boolean indicating if all of this job's dependencies are _FINISHED_ + def dependencies_are_met(self, parent_job: Optional['Job'] = None, pipeline: Optional['Pipeline'] = None, + exclude_job_id: Optional[str] = None) -> bool: + """Returns a boolean indicating if all of this job's dependencies are `FINISHED` If a pipeline is passed, all dependencies are WATCHed. @@ -1069,6 +1361,14 @@ class Job: This is useful when enqueueing the dependents of a _successful_ job -- that status of `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this method is _called_ in the _stack_ of its dependents are being enqueued. + + Args: + parent_job (Optional[Job], optional): The parent Job. Defaults to None. + pipeline (Optional[Pipeline], optional): The Redis' pipeline. Defaults to None. + exclude_job_id (Optional[str], optional): Whether to exclude the job id.. Defaults to None. + + Returns: + are_met (bool): Whether the dependencies were met. """ connection = pipeline if pipeline is not None else self.connection @@ -1076,8 +1376,7 @@ class Job: connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids]) - dependencies_ids = {_id.decode() - for _id in connection.smembers(self.dependencies_key)} + dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)} if exclude_job_id: dependencies_ids.discard(exclude_job_id) @@ -1104,10 +1403,9 @@ class Job: dependencies_statuses = pipeline.execute() + allowed_statuses = [JobStatus.FINISHED] if self.allow_dependency_failures: - allowed_statuses = [JobStatus.FINISHED, JobStatus.FAILED] - else: - allowed_statuses = [JobStatus.FINISHED] + allowed_statuses.append(JobStatus.FAILED) return all( status.decode() in allowed_statuses @@ -1121,8 +1419,18 @@ _job_stack = LocalStack() class Retry: - def __init__(self, max, interval: int = 0): - """`interval` can be a positive number or a list of ints""" + def __init__(self, max: int, interval: Union[int, List[int]] = 0): + """The main object to defined Retry logics for jobs. + + Args: + max (int): The max number of times a job should be retried + interval (Union[int, List[int]], optional): The interval between retries. + Can be a positive number (int) or a list of ints. Defaults to 0 (meaning no interval between retries). + + Raises: + ValueError: If the `max` argument is lower than 1 + ValueError: If the interval param is negative or the list contains negative numbers + """ super().__init__() if max < 1: raise ValueError('max: please enter a value greater than 0') diff --git a/rq/serializers.py b/rq/serializers.py index 00fd0a7b..9e63bc74 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -1,6 +1,7 @@ from functools import partial import pickle import json +from typing import Optional, Union from .utils import import_attribute @@ -20,11 +21,17 @@ class JSONSerializer(): return json.loads(s.decode('utf-8'), *args, **kwargs) -def resolve_serializer(serializer: str): +def resolve_serializer(serializer=None): """This function checks the user defined serializer for ('dumps', 'loads') methods It returns a default pickle serializer if not found else it returns a MySerializer The returned serializer objects implement ('dumps', 'loads') methods - Also accepts a string path to serializer that will be loaded as the serializer + Also accepts a string path to serializer that will be loaded as the serializer. + + Args: + serializer (Callable): The serializer to resolve. + + Returns: + serializer (Callable): An object that implements the SerializerProtocol """ if not serializer: return DefaultSerializer diff --git a/rq/types.py b/rq/types.py new file mode 100644 index 00000000..fe8e002d --- /dev/null +++ b/rq/types.py @@ -0,0 +1,17 @@ +from typing import TYPE_CHECKING, Any, Callable, List, TypeVar, Union + +if TYPE_CHECKING: + from .job import Dependency, Job + + +FunctionReferenceType = TypeVar('FunctionReferenceType', str, Callable[..., Any]) +"""Custom type definition for what a `func` is in the context of a job. +A `func` can be a string with the function import path (eg.: `myfile.mymodule.myfunc`) +or a direct callable (function/method). +""" + + +JobDependencyType = TypeVar('JobDependencyType', 'Dependency', 'Job', str, List[Union['Dependency', 'Job']]) +"""Custom type definition for a job dependencies. +A simple helper definition for the `depends_on` parameter when creating a job. +""" diff --git a/rq/utils.py b/rq/utils.py index a9fbb1d7..7f9e90be 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -148,11 +148,20 @@ def as_text(v): raise ValueError('Unknown type %r' % type(v)) -def decode_redis_hash(h): +def decode_redis_hash(h) -> t.Dict[str, t.Any]: + """Decodes the Redis hash, ensuring that keys are strings + Most importantly, decodes bytes strings, ensuring the dict has str keys. + + Args: + h (Dict[Any, Any]): The Redis hash + + Returns: + Dict[str, t.Any]: The decoded Redis data (Dictionary) + """ return dict((as_text(k), h[k]) for k in h) -def import_attribute(name: str): +def import_attribute(name: str) -> t.Callable[..., t.Any]: """Returns an attribute from a dotted path name. Example: `path.to.func`. When the attribute we look for is a staticmethod, module name in its @@ -216,11 +225,11 @@ def now(): _TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' -def utcformat(dt: dt.datetime): +def utcformat(dt: dt.datetime) -> str: return dt.strftime(as_text(_TIMESTAMP_FORMAT)) -def utcparse(string: str): +def utcparse(string: str) -> dt.datetime: try: return datetime.datetime.strptime(string, _TIMESTAMP_FORMAT) except ValueError: @@ -320,13 +329,16 @@ def parse_timeout(timeout: t.Any): return timeout -def get_version(connection: 'Redis'): +def get_version(connection: 'Redis') -> t.Tuple[int, int, int]: """ Returns tuple of Redis server version. This function also correctly handles 4 digit redis server versions. Args: connection (Redis): The Redis connection. + + Returns: + version (Tuple[int, int, int]): A tuple representing the semantic versioning format (eg. (5, 0, 9)) """ try: # Getting the connection info for each job tanks performance, we can cache it on the connection object @@ -366,6 +378,9 @@ def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str: Args: data (str): The data to truncate max_length (t.Optional[int], optional): The max length. Defaults to None. + + Returns: + truncated (str): The truncated string """ if max_length is None: return data