From 6f751f47aaaf2a705db155a8d111507e4b388bde Mon Sep 17 00:00:00 2001 From: Yibo Wei Date: Mon, 12 Aug 2024 22:00:20 -0500 Subject: [PATCH] Format with single quotes instead of double quotes --- rq/job.py | 600 +++++++++++++++++++++++++++--------------------------- 1 file changed, 300 insertions(+), 300 deletions(-) diff --git a/rq/job.py b/rq/job.py index f6361482..a98dfc36 100644 --- a/rq/job.py +++ b/rq/job.py @@ -52,32 +52,32 @@ from .utils import ( utcformat, ) -logger = logging.getLogger("rq.job") +logger = logging.getLogger('rq.job') class JobStatus(str, Enum): - """The Status of Job within its lifecycle at any given time.""" + '''The Status of Job within its lifecycle at any given time.''' - QUEUED = "queued" - FINISHED = "finished" - FAILED = "failed" - STARTED = "started" - DEFERRED = "deferred" - SCHEDULED = "scheduled" - STOPPED = "stopped" - CANCELED = "canceled" + QUEUED = 'queued' + FINISHED = 'finished' + FAILED = 'failed' + STARTED = 'started' + DEFERRED = 'deferred' + SCHEDULED = 'scheduled' + STOPPED = 'stopped' + CANCELED = 'canceled' def parse_job_id(job_or_execution_id: str) -> str: - """Parse a string and returns job ID. This function supports both job ID and execution composite key.""" - if ":" in job_or_execution_id: - return job_or_execution_id.split(":")[0] + '''Parse a string and returns job ID. This function supports both job ID and execution composite key.''' + if ':' in job_or_execution_id: + return job_or_execution_id.split(':')[0] return job_or_execution_id class Dependency: - def __init__(self, jobs: List[Union["Job", str]], allow_failure: bool = False, enqueue_at_front: bool = False): - """The definition of a Dependency. + def __init__(self, jobs: List[Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False): + '''The definition of a Dependency. Args: jobs (List[Union[Job, str]]): A list of Job instances or Job IDs. @@ -90,12 +90,12 @@ class Dependency: Raises: ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty - """ + ''' dependent_jobs = ensure_list(jobs) if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job): - raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") + raise ValueError('jobs: must contain objects of type Job and/or strings representing Job ids') elif len(dependent_jobs) < 1: - raise ValueError("jobs: cannot be empty.") + raise ValueError('jobs: cannot be empty.') self.dependencies = dependent_jobs self.allow_failure = allow_failure @@ -103,13 +103,13 @@ class Dependency: UNEVALUATED = object() -"""Sentinel value to mark that some of our lazily evaluated properties have not +'''Sentinel value to mark that some of our lazily evaluated properties have not yet been evaluated. -""" +''' -def cancel_job(job_id: str, connection: "Redis", serializer=None, enqueue_dependents: bool = False): - """Cancels the job with the given job ID, preventing execution. +def cancel_job(job_id: str, connection: 'Redis', serializer=None, enqueue_dependents: bool = False): + '''Cancels the job with the given job ID, preventing execution. Use with caution. This will discard any job info (i.e. it can't be requeued later). Args: @@ -117,12 +117,12 @@ def cancel_job(job_id: str, connection: "Redis", serializer=None, enqueue_depend connection (Optional[Redis], optional): The Redis Connection. Defaults to None. serializer (str, optional): The string of the path to the serializer to use. Defaults to None. enqueue_dependents (bool, optional): Whether dependents should still be enqueued. Defaults to False. - """ + ''' Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents) -def get_current_job(connection: Optional["Redis"] = None, job_class: Optional["Job"] = None) -> Optional["Job"]: - """Returns the Job instance that is currently being executed. +def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None) -> Optional['Job']: + '''Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. Args: @@ -131,16 +131,16 @@ def get_current_job(connection: Optional["Redis"] = None, job_class: Optional["J Returns: job (Optional[Job]): The current Job running - """ + ''' if connection: - warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning) + warnings.warn('connection argument for get_current_job is deprecated.', DeprecationWarning) if job_class: - warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning) + warnings.warn('job_class argument for get_current_job is deprecated.', DeprecationWarning) return _job_stack.top -def requeue_job(job_id: str, connection: "Redis", serializer=None) -> "Job": - """Fetches a Job by ID and requeues it using the `requeue()` method. +def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job': + '''Fetches a Job by ID and requeues it using the `requeue()` method. Args: job_id (str): The Job ID that should be requeued. @@ -149,15 +149,15 @@ def requeue_job(job_id: str, connection: "Redis", serializer=None) -> "Job": Returns: Job: The requeued Job object. - """ + ''' job = Job.fetch(job_id, connection=connection, serializer=serializer) return job.requeue() class Job: - """A Job is just a convenient datastructure to pass around job (meta) data.""" + '''A Job is just a convenient datastructure to pass around job (meta) data.''' - redis_job_namespace_prefix = "rq:job:" + redis_job_namespace_prefix = 'rq:job:' @classmethod def create( @@ -165,7 +165,7 @@ class Job: func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None, kwargs: Optional[Dict[str, Any]] = None, - connection: Optional["Redis"] = None, + connection: Optional['Redis'] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, status: Optional[JobStatus] = None, @@ -173,17 +173,17 @@ class Job: depends_on: Optional[JobDependencyType] = None, timeout: Optional[int] = None, id: Optional[str] = None, - origin: str = "", + origin: str = '', meta: Optional[Dict[str, Any]] = None, failure_ttl: Optional[int] = None, serializer=None, group_id: Optional[str] = None, *, - on_success: Optional[Union["Callback", Callable[..., Any]]] = None, # Callable is deprecated - on_failure: Optional[Union["Callback", Callable[..., Any]]] = None, # Callable is deprecated - on_stopped: Optional[Union["Callback", Callable[..., Any]]] = None, # Callable is deprecated - ) -> "Job": - """Creates a new Job instance for the given function, arguments, and + on_success: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated + on_failure: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated + on_stopped: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated + ) -> 'Job': + '''Creates a new Job instance for the given function, arguments, and keyword arguments. Args: @@ -195,7 +195,7 @@ class Job: kwargs (Optional[Dict], optional): A Dictionary of keyword arguments to pass the callable. Defaults to None, meaning no kwargs being passed. connection (Redis): The Redis connection to use. Defaults to None. - This will be "resolved" using the `resolve_connection` function when initialzing the Job Class. + This will be 'resolved' using the `resolve_connection` function when initialzing the Job Class. result_ttl (Optional[int], optional): The amount of time in seconds the results should live. Defaults to None. ttl (Optional[int], optional): The Time To Live (TTL) for the job itself. Defaults to None. @@ -232,16 +232,16 @@ class Job: Returns: Job: A job instance. - """ + ''' if args is None: args = () if kwargs is None: kwargs = {} if not isinstance(args, (tuple, list)): - raise TypeError("{0!r} is not a valid args list".format(args)) + raise TypeError('{0!r} is not a valid args list'.format(args)) if not isinstance(kwargs, dict): - raise TypeError("{0!r} is not a valid kwargs dict".format(kwargs)) + raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs)) job = cls(connection=connection, serializer=serializer) if id is not None: @@ -256,21 +256,21 @@ class Job: job._instance = func.__self__ job._func_name = func.__name__ elif inspect.isfunction(func) or inspect.isbuiltin(func): - job._func_name = "{0}.{1}".format(func.__module__, func.__qualname__) + job._func_name = '{0}.{1}'.format(func.__module__, func.__qualname__) elif isinstance(func, str): job._func_name = as_text(func) - elif not inspect.isclass(func) and hasattr(func, "__call__"): # a callable class instance + elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance job._instance = func - job._func_name = "__call__" + job._func_name = '__call__' else: - raise TypeError("Expected a callable or a string, but got: {0}".format(func)) + raise TypeError('Expected a callable or a string, but got: {0}'.format(func)) job._args = args job._kwargs = kwargs if on_success: if not isinstance(on_success, Callback): warnings.warn( - "Passing a string or function for `on_success` is deprecated, pass `Callback` instead", + 'Passing a string or function for `on_success` is deprecated, pass `Callback` instead', DeprecationWarning, ) on_success = Callback(on_success) # backward compatibility @@ -281,7 +281,7 @@ class Job: if on_failure: if not isinstance(on_failure, Callback): warnings.warn( - "Passing a string or function for `on_failure` is deprecated, pass `Callback` instead", + 'Passing a string or function for `on_failure` is deprecated, pass `Callback` instead', DeprecationWarning, ) on_failure = Callback(on_failure) # backward compatibility @@ -292,7 +292,7 @@ class Job: if on_stopped: if not isinstance(on_stopped, Callback): warnings.warn( - "Passing a string or function for `on_stopped` is deprecated, pass `Callback` instead", + 'Passing a string or function for `on_stopped` is deprecated, pass `Callback` instead', DeprecationWarning, ) on_stopped = Callback(on_stopped) # backward compatibility @@ -328,11 +328,11 @@ class Job: return job def get_position(self) -> Optional[int]: - """Get's the job's position on the queue + '''Get's the job's position on the queue Returns: position (Optional[int]): The position - """ + ''' from .queue import Queue if self.origin: @@ -341,7 +341,7 @@ class Job: return None def get_status(self, refresh: bool = True) -> JobStatus: - """Gets the Job Status + '''Gets the Job Status Args: refresh (bool, optional): Whether to refresh the Job. Defaults to True. @@ -351,36 +351,36 @@ class Job: Returns: status (JobStatus): The Job Status - """ + ''' if refresh: - status = self.connection.hget(self.key, "status") + status = self.connection.hget(self.key, 'status') if not status: - raise InvalidJobOperation(f"Failed to retrieve status for job: {self.id}") + raise InvalidJobOperation(f'Failed to retrieve status for job: {self.id}') self._status = JobStatus(as_text(status)) return self._status - def set_status(self, status: JobStatus, pipeline: Optional["Pipeline"] = None) -> None: - """Set's the Job Status + def set_status(self, status: JobStatus, pipeline: Optional['Pipeline'] = None) -> None: + '''Set's the Job Status Args: status (JobStatus): The Job Status to be set pipeline (Optional[Pipeline], optional): Optional Redis Pipeline to use. Defaults to None. - """ + ''' self._status = status - connection: "Redis" = pipeline if pipeline is not None else self.connection - connection.hset(self.key, "status", self._status) + connection: 'Redis' = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'status', self._status) def get_meta(self, refresh: bool = True) -> Dict: - """Get's the metadata for a Job, an arbitrary dictionary. + '''Get's the metadata for a Job, an arbitrary dictionary. Args: refresh (bool, optional): Whether to refresh. Defaults to True. Returns: meta (Dict): The dictionary of metadata - """ + ''' if refresh: - meta = self.connection.hget(self.key, "meta") + meta = self.connection.hget(self.key, 'meta') self.meta = self.serializer.loads(meta) if meta else {} return self.meta @@ -419,20 +419,20 @@ class Job: @property def _dependency_id(self): - """Returns the first item in self._dependency_ids. Present to + '''Returns the first item in self._dependency_ids. Present to preserve compatibility with third party packages. - """ + ''' if self._dependency_ids: return self._dependency_ids[0] @property - def dependency(self) -> Optional["Job"]: - """Returns a job's first dependency. To avoid repeated Redis fetches, we cache + def dependency(self) -> Optional['Job']: + '''Returns a job's first dependency. To avoid repeated Redis fetches, we cache job.dependency as job._dependency. - """ + ''' if not self._dependency_ids: return None - if hasattr(self, "_dependency"): + if hasattr(self, '_dependency'): return self._dependency job = self.fetch(self._dependency_ids[0], connection=self.connection, serializer=self.serializer) self._dependency = job @@ -440,8 +440,8 @@ class Job: @property def dependent_ids(self) -> List[str]: - """Returns a list of ids of jobs whose execution depends on this - job's successful execution.""" + '''Returns a list of ids of jobs whose execution depends on this + job's successful execution.''' return list(map(as_text, self.connection.smembers(self.dependents_key))) @property @@ -507,12 +507,12 @@ class Job: return self._stopped_callback_timeout def _deserialize_data(self): - """Deserializes the Job `data` into a tuple. + '''Deserializes the Job `data` into a tuple. This includes the `_func_name`, `_instance`, `_args` and `_kwargs` Raises: DeserializationError: Cathes any deserialization error (since serializers are generic) - """ + ''' try: self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data) except Exception as e: @@ -522,7 +522,7 @@ class Job: def data(self): if self._data is UNEVALUATED: if self._func_name is UNEVALUATED: - raise ValueError("Cannot build the job data") + raise ValueError('Cannot build the job data') if self._instance is UNEVALUATED: self._instance = None @@ -590,8 +590,8 @@ class Job: self._data = UNEVALUATED @classmethod - def exists(cls, job_id: str, connection: "Redis") -> bool: - """Checks whether a Job Hash exists for the given Job ID + def exists(cls, job_id: str, connection: 'Redis') -> bool: + '''Checks whether a Job Hash exists for the given Job ID Args: job_id (str): The Job ID @@ -599,14 +599,14 @@ class Job: Returns: job_exists (bool): Whether the Job exists - """ + ''' job_key = cls.key_for(job_id) job_exists = connection.exists(job_key) return bool(job_exists) @classmethod - def fetch(cls, id: str, connection: Optional["Redis"] = None, serializer=None) -> "Job": - """Fetches a persisted Job from its corresponding Redis key and instantiates it + def fetch(cls, id: str, connection: Optional['Redis'] = None, serializer=None) -> 'Job': + '''Fetches a persisted Job from its corresponding Redis key and instantiates it Args: id (str): The Job to fetch @@ -615,15 +615,15 @@ class Job: Returns: Job: The Job instance - """ + ''' # TODO: this method needs to support fetching jobs based on execution ID job = cls(parse_job_id(id), connection=connection, serializer=serializer) job.refresh() return job @classmethod - def fetch_many(cls, job_ids: Iterable[str], connection: "Redis", serializer=None) -> List[Optional["Job"]]: - """ + def fetch_many(cls, job_ids: Iterable[str], connection: 'Redis', serializer=None) -> List[Optional['Job']]: + ''' Bulk version of Job.fetch For any job_ids which a job does not exist, the corresponding item in @@ -636,14 +636,14 @@ class Job: Returns: jobs (list[Optional[Job]]): A list of Jobs instances, elements are None if a job_id does not exist. - """ + ''' parsed_ids = [parse_job_id(job_id) for job_id in job_ids] with connection.pipeline() as pipeline: for job_id in parsed_ids: pipeline.hgetall(cls.key_for(job_id)) results = pipeline.execute() - jobs: List[Optional["Job"]] = [] + jobs: List[Optional['Job']] = [] for i, job_id in enumerate(parsed_ids): if not results[i]: jobs.append(None) @@ -655,11 +655,11 @@ class Job: return jobs - def __init__(self, id: Optional[str] = None, connection: "Redis" = None, serializer=None): + def __init__(self, id: Optional[str] = None, connection: 'Redis' = None, serializer=None): # Manually check for the presence of the connection argument to preserve # backwards compatibility during the transition to RQ v2.0.0. if not connection: - raise TypeError("Job.__init__() missing 1 required argument: 'connection'") + raise TypeError('Job.__init__() missing 1 required argument: 'connection'') self.connection = connection self._id = id self.created_at = now() @@ -675,7 +675,7 @@ class Job: self._stopped_callback_name = None self._stopped_callback = UNEVALUATED self.description: Optional[str] = None - self.origin: str = "" + self.origin: str = '' self.enqueued_at: Optional[datetime] = None self.started_at: Optional[datetime] = None self.ended_at: Optional[datetime] = None @@ -706,10 +706,10 @@ class Job: self._cached_result: Optional[Result] = None def __repr__(self): # noqa # pragma: no cover - return "{0}({1!r}, enqueued_at={2!r})".format(self.__class__.__name__, self._id, self.enqueued_at) + return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, self._id, self.enqueued_at) def __str__(self): - return "<{0} {1}: {2}>".format(self.__class__.__name__, self.id, self.description) + return '<{0} {1}: {2}>'.format(self.__class__.__name__, self.id, self.description) def __eq__(self, other): # noqa return isinstance(other, self.__class__) and self.id == other.id @@ -719,28 +719,28 @@ class Job: # Data access def get_id(self) -> str: # noqa - """The job ID for this job instance. Generates an ID lazily the + '''The job ID for this job instance. Generates an ID lazily the first time the ID is requested. Returns: job_id (str): The Job ID - """ + ''' if self._id is None: self._id = str(uuid4()) return self._id def set_id(self, value: str) -> None: - """Sets a job ID for the given job + '''Sets a job ID for the given job Args: value (str): The value to set as Job ID - """ + ''' if not isinstance(value, str): - raise TypeError("id must be a string, not {0}".format(type(value))) + raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value - def heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional["Pipeline"] = None, xx: bool = False): - """Sets the heartbeat for a job. + def heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False): + '''Sets the heartbeat for a job. It will set a hash in Redis with the `last_heartbeat` key and datetime value. If a Redis' pipeline is passed, it will use that, else, it will use the job's own connection. @@ -749,54 +749,54 @@ class Job: ttl (int): The time to live pipeline (Optional[Pipeline], optional): Can receive a Redis' pipeline to use. Defaults to None. xx (bool, optional): Only sets the key if already exists. Defaults to False. - """ + ''' self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection - connection.hset(self.key, "last_heartbeat", utcformat(self.last_heartbeat)) + connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) # self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx) id = property(get_id, set_id) @classmethod def key_for(cls, job_id: str) -> bytes: - """The Redis key that is used to store job hash under. + '''The Redis key that is used to store job hash under. Args: job_id (str): The Job ID Returns: redis_job_key (bytes): The Redis fully qualified key for the job - """ - return (cls.redis_job_namespace_prefix + job_id).encode("utf-8") + ''' + return (cls.redis_job_namespace_prefix + job_id).encode('utf-8') @classmethod def dependents_key_for(cls, job_id: str) -> str: - """The Redis key that is used to store job dependents hash under. + '''The Redis key that is used to store job dependents hash under. Args: - job_id (str): The "parent" job id + job_id (str): The 'parent' job id Returns: dependents_key (str): The dependents key - """ - return "{0}{1}:dependents".format(cls.redis_job_namespace_prefix, job_id) + ''' + return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id) @property def key(self): - """The Redis key that is used to store job hash under.""" + '''The Redis key that is used to store job hash under.''' return self.key_for(self.id) @property def dependents_key(self): - """The Redis key that is used to store job dependents hash under.""" + '''The Redis key that is used to store job dependents hash under.''' return self.dependents_key_for(self.id) @property def dependencies_key(self): - return "{0}:{1}:dependencies".format(self.redis_job_namespace_prefix, self.id) + return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id) - def fetch_dependencies(self, watch: bool = False, pipeline: Optional["Pipeline"] = None) -> List["Job"]: - """Fetch all of a job's dependencies. If a pipeline is supplied, and + def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None) -> List['Job']: + '''Fetch all of a job's dependencies. If a pipeline is supplied, and watch is true, then set WATCH on all the keys of all dependencies. Returned jobs will use self's connection, not the pipeline supplied. @@ -809,7 +809,7 @@ class Job: Returns: jobs (list[Job]): A list of Jobs - """ + ''' connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: @@ -823,10 +823,10 @@ class Job: @property def exc_info(self) -> Optional[str]: - """ + ''' Get the latest result and returns `exc_info` only if the latest result is a failure. - """ - warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning) + ''' + warnings.warn('job.exc_info is deprecated, use job.latest_result() instead.', DeprecationWarning) from .results import Result @@ -840,14 +840,14 @@ class Job: return self._exc_info def return_value(self, refresh: bool = False) -> Optional[Any]: - """Returns the return value of the latest execution, if it was successful. + '''Returns the return value of the latest execution, if it was successful. Args: refresh (bool, optional): Whether to refresh the current status. Defaults to False. Returns: result (Optional[Any]): The job return value. - """ + ''' from .results import Result if refresh: @@ -857,7 +857,7 @@ class Job: if self._result is not None: return self._result - rv = self.connection.hget(self.key, "result") + rv = self.connection.hget(self.key, 'result') if rv is not None: # cache the result self._result = self.serializer.loads(rv) @@ -874,7 +874,7 @@ class Job: @property def result(self) -> Any: - """Returns the return value of the job. + '''Returns the return value of the job. Initially, right after enqueueing a job, the return value will be None. But when the job has been executed, and had a return value or @@ -888,9 +888,9 @@ class Job: been executed when its return value is None, since return values written back to Redis will expire after a given amount of time (500 seconds by default). - """ + ''' - warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning) + warnings.warn('job.result is deprecated, use job.return_value instead.', DeprecationWarning) from .results import Result @@ -903,49 +903,49 @@ class Job: # Fallback to old behavior of getting result from job hash if self._result is None: - rv = self.connection.hget(self.key, "result") + rv = self.connection.hget(self.key, 'result') if rv is not None: # cache the result self._result = self.serializer.loads(rv) return self._result - def results(self) -> List["Result"]: - """Returns all Result objects + def results(self) -> List['Result']: + '''Returns all Result objects Returns: all_results (List[Result]): A list of 'Result' objects - """ + ''' from .results import Result return Result.all(self, serializer=self.serializer) - def latest_result(self, timeout: int = 0) -> Optional["Result"]: - """Get the latest job result. + def latest_result(self, timeout: int = 0) -> Optional['Result']: + '''Get the latest job result. Args: timeout (int, optional): Number of seconds to block waiting for a result. Defaults to 0 (no blocking). Returns: result (Result): The Result object - """ + ''' from .results import Result return Result.fetch_latest(self, serializer=self.serializer, timeout=timeout) def restore(self, raw_data) -> Any: - """Overwrite properties with the provided values stored in Redis. + '''Overwrite properties with the provided values stored in Redis. Args: raw_data (_type_): The raw data to load the job data from Raises: NoSuchJobError: If there way an error getting the job data - """ + ''' obj = decode_redis_hash(raw_data) try: - raw_data = obj["data"] + raw_data = obj['data'] except KeyError: - raise NoSuchJobError("Unexpected job format: {0}".format(obj)) + raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) try: self.data = zlib.decompress(raw_data) @@ -953,61 +953,61 @@ class Job: # Fallback to uncompressed string self.data = raw_data - self.created_at = str_to_date(obj.get("created_at")) - self.origin = as_text(obj.get("origin")) if obj.get("origin") else "" - self.worker_name = obj.get("worker_name").decode() if obj.get("worker_name") else None - self.description = as_text(obj.get("description")) if obj.get("description") else None - self.enqueued_at = str_to_date(obj.get("enqueued_at")) - self.started_at = str_to_date(obj.get("started_at")) - self.ended_at = str_to_date(obj.get("ended_at")) - self.last_heartbeat = str_to_date(obj.get("last_heartbeat")) - self.group_id = as_text(obj.get("group_id")) if obj.get("group_id") else None - result = obj.get("result") + self.created_at = str_to_date(obj.get('created_at')) + self.origin = as_text(obj.get('origin')) if obj.get('origin') else '' + self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None + self.description = as_text(obj.get('description')) if obj.get('description') else None + self.enqueued_at = str_to_date(obj.get('enqueued_at')) + self.started_at = str_to_date(obj.get('started_at')) + self.ended_at = str_to_date(obj.get('ended_at')) + self.last_heartbeat = str_to_date(obj.get('last_heartbeat')) + self.group_id = as_text(obj.get('group_id')) if obj.get('group_id') else None + result = obj.get('result') if result: try: self._result = self.serializer.loads(result) except Exception: self._result = UNSERIALIZABLE_RETURN_VALUE_PAYLOAD - self.timeout = parse_timeout(obj.get("timeout")) if obj.get("timeout") else None - self.result_ttl = int(obj.get("result_ttl")) if obj.get("result_ttl") else None - self.failure_ttl = int(obj.get("failure_ttl")) if obj.get("failure_ttl") else None - self._status = JobStatus(as_text(obj.get("status"))) if obj.get("status") else None + self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None + self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None + self._status = JobStatus(as_text(obj.get('status'))) if obj.get('status') else None - if obj.get("success_callback_name"): - self._success_callback_name = obj.get("success_callback_name").decode() + if obj.get('success_callback_name'): + self._success_callback_name = obj.get('success_callback_name').decode() - if "success_callback_timeout" in obj: - self._success_callback_timeout = int(obj.get("success_callback_timeout")) + if 'success_callback_timeout' in obj: + self._success_callback_timeout = int(obj.get('success_callback_timeout')) - if obj.get("failure_callback_name"): - self._failure_callback_name = obj.get("failure_callback_name").decode() + if obj.get('failure_callback_name'): + self._failure_callback_name = obj.get('failure_callback_name').decode() - if "failure_callback_timeout" in obj: - self._failure_callback_timeout = int(obj.get("failure_callback_timeout")) + if 'failure_callback_timeout' in obj: + self._failure_callback_timeout = int(obj.get('failure_callback_timeout')) - if obj.get("stopped_callback_name"): - self._stopped_callback_name = obj.get("stopped_callback_name").decode() + if obj.get('stopped_callback_name'): + self._stopped_callback_name = obj.get('stopped_callback_name').decode() - if "stopped_callback_timeout" in obj: - self._stopped_callback_timeout = int(obj.get("stopped_callback_timeout")) + if 'stopped_callback_timeout' in obj: + self._stopped_callback_timeout = int(obj.get('stopped_callback_timeout')) - dep_ids = obj.get("dependency_ids") - dep_id = obj.get("dependency_id") # for backwards compatibility + dep_ids = obj.get('dependency_ids') + dep_id = obj.get('dependency_id') # for backwards compatibility self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else [] - allow_failures = obj.get("allow_dependency_failures") + allow_failures = obj.get('allow_dependency_failures') self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None - self.enqueue_at_front = bool(int(obj["enqueue_at_front"])) if "enqueue_at_front" in obj else None - self.ttl = int(obj.get("ttl")) if obj.get("ttl") else None + self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None + self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None try: - self.meta = self.serializer.loads(obj.get("meta")) if obj.get("meta") else {} + self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} except Exception: # depends on the serializer - self.meta = {"unserialized": obj.get("meta", {})} + self.meta = {'unserialized': obj.get('meta', {})} - self.retries_left = int(obj.get("retries_left")) if obj.get("retries_left") else None - if obj.get("retry_intervals"): - self.retry_intervals = json.loads(obj.get("retry_intervals").decode()) + self.retries_left = int(obj.get('retries_left')) if obj.get('retries_left') else None + if obj.get('retry_intervals'): + self.retry_intervals = json.loads(obj.get('retry_intervals').decode()) - raw_exc_info = obj.get("exc_info") + raw_exc_info = obj.get('exc_info') if raw_exc_info: try: self._exc_info = as_text(zlib.decompress(raw_exc_info)) @@ -1017,18 +1017,18 @@ class Job: # Persistence def refresh(self): # noqa - """Overwrite the current instance's properties with the values in the + '''Overwrite the current instance's properties with the values in the corresponding Redis key. Will raise a NoSuchJobError if no corresponding Redis key exists. - """ + ''' data = self.connection.hgetall(self.key) if not data: - raise NoSuchJobError("No such job: {0}".format(self.key)) + raise NoSuchJobError('No such job: {0}'.format(self.key)) self.restore(data) def to_dict(self, include_meta: bool = True, include_result: bool = True) -> dict: - """Returns a serialization of the current job instance + '''Returns a serialization of the current job instance You can exclude serializing the `meta` dictionary by setting `include_meta=False`. @@ -1039,71 +1039,71 @@ class Job: Returns: dict: The Job serialized as a dictionary - """ + ''' obj = { - "created_at": utcformat(self.created_at or now()), - "data": zlib.compress(self.data), - "success_callback_name": self._success_callback_name if self._success_callback_name else "", - "failure_callback_name": self._failure_callback_name if self._failure_callback_name else "", - "stopped_callback_name": self._stopped_callback_name if self._stopped_callback_name else "", - "started_at": utcformat(self.started_at) if self.started_at else "", - "ended_at": utcformat(self.ended_at) if self.ended_at else "", - "last_heartbeat": utcformat(self.last_heartbeat) if self.last_heartbeat else "", - "worker_name": self.worker_name or "", - "group_id": self.group_id or "", + 'created_at': utcformat(self.created_at or now()), + 'data': zlib.compress(self.data), + 'success_callback_name': self._success_callback_name if self._success_callback_name else '', + 'failure_callback_name': self._failure_callback_name if self._failure_callback_name else '', + 'stopped_callback_name': self._stopped_callback_name if self._stopped_callback_name else '', + 'started_at': utcformat(self.started_at) if self.started_at else '', + 'ended_at': utcformat(self.ended_at) if self.ended_at else '', + 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', + 'worker_name': self.worker_name or '', + 'group_id': self.group_id or '', } if self.retries_left is not None: - obj["retries_left"] = self.retries_left + obj['retries_left'] = self.retries_left if self.retry_intervals is not None: - obj["retry_intervals"] = json.dumps(self.retry_intervals) + obj['retry_intervals'] = json.dumps(self.retry_intervals) if self.origin: - obj["origin"] = self.origin + obj['origin'] = self.origin if self.description is not None: - obj["description"] = self.description + obj['description'] = self.description if self.enqueued_at is not None: - obj["enqueued_at"] = utcformat(self.enqueued_at) + obj['enqueued_at'] = utcformat(self.enqueued_at) if self._result is not None and include_result: try: - obj["result"] = self.serializer.dumps(self._result) + obj['result'] = self.serializer.dumps(self._result) except: # noqa - obj["result"] = "Unserializable return value" + obj['result'] = 'Unserializable return value' if self._exc_info is not None and include_result: - obj["exc_info"] = zlib.compress(str(self._exc_info).encode("utf-8")) + obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8')) if self.timeout is not None: - obj["timeout"] = self.timeout + obj['timeout'] = self.timeout if self._success_callback_timeout is not None: - obj["success_callback_timeout"] = self._success_callback_timeout + obj['success_callback_timeout'] = self._success_callback_timeout if self._failure_callback_timeout is not None: - obj["failure_callback_timeout"] = self._failure_callback_timeout + obj['failure_callback_timeout'] = self._failure_callback_timeout if self._stopped_callback_timeout is not None: - obj["stopped_callback_timeout"] = self._stopped_callback_timeout + obj['stopped_callback_timeout'] = self._stopped_callback_timeout if self.result_ttl is not None: - obj["result_ttl"] = self.result_ttl + obj['result_ttl'] = self.result_ttl if self.failure_ttl is not None: - obj["failure_ttl"] = self.failure_ttl + obj['failure_ttl'] = self.failure_ttl if self._status is not None: - obj["status"] = self._status + obj['status'] = self._status if self._dependency_ids: - obj["dependency_id"] = self._dependency_ids[0] # for backwards compatibility - obj["dependency_ids"] = json.dumps(self._dependency_ids) + obj['dependency_id'] = self._dependency_ids[0] # for backwards compatibility + obj['dependency_ids'] = json.dumps(self._dependency_ids) if self.meta and include_meta: - obj["meta"] = self.serializer.dumps(self.meta) + obj['meta'] = self.serializer.dumps(self.meta) if self.ttl: - obj["ttl"] = self.ttl + obj['ttl'] = self.ttl if self.allow_dependency_failures is not None: # convert boolean to integer to avoid redis.exception.DataError - obj["allow_dependency_failures"] = int(self.allow_dependency_failures) + obj['allow_dependency_failures'] = int(self.allow_dependency_failures) if self.enqueue_at_front is not None: - obj["enqueue_at_front"] = int(self.enqueue_at_front) + obj['enqueue_at_front'] = int(self.enqueue_at_front) return obj - def save(self, pipeline: Optional["Pipeline"] = None, include_meta: bool = True, include_result: bool = True): - """Dumps the current job instance to its corresponding Redis key. + def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, include_result: bool = True): + '''Dumps the current job instance to its corresponding Redis key. Exclude saving the `meta` dictionary by setting `include_meta=False`. This is useful to prevent clobbering @@ -1115,7 +1115,7 @@ class Job: pipeline (Optional[Pipeline], optional): The Redis' pipeline to use. Defaults to None. include_meta (bool, optional): Whether to include the job's metadata. Defaults to True. include_result (bool, optional): Whether to include the job's result. Defaults to True. - """ + ''' key = self.key connection = pipeline if pipeline is not None else self.connection @@ -1124,27 +1124,27 @@ class Job: @property def supports_redis_streams(self) -> bool: - """Only supported by Redis server >= 5.0 is required.""" + '''Only supported by Redis server >= 5.0 is required.''' return self.get_redis_server_version() >= (5, 0, 0) def get_redis_server_version(self) -> Tuple[int, int, int]: - """Return Redis server version of connection + '''Return Redis server version of connection Returns: redis_server_version (Tuple[int, int, int]): The Redis version within a Tuple of integers, eg (5, 0, 9) - """ + ''' if self.redis_server_version is None: self.redis_server_version = get_version(self.connection) return self.redis_server_version def save_meta(self): - """Stores job meta from the job instance to the corresponding Redis key.""" + '''Stores job meta from the job instance to the corresponding Redis key.''' meta = self.serializer.dumps(self.meta) - self.connection.hset(self.key, "meta", meta) + self.connection.hset(self.key, 'meta', meta) - def cancel(self, pipeline: Optional["Pipeline"] = None, enqueue_dependents: bool = False): - """Cancels the given job, which will prevent the job from ever being + def cancel(self, pipeline: Optional['Pipeline'] = None, enqueue_dependents: bool = False): + '''Cancels the given job, which will prevent the job from ever being ran (or inspected). This method merely exists as a high-level API call to cancel jobs @@ -1160,9 +1160,9 @@ class Job: Raises: InvalidJobOperation: If the job has already been cancelled. - """ + ''' if self.is_canceled: - raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) + raise InvalidJobOperation('Cannot cancel already canceled job: {}'.format(self.get_id())) from .queue import Queue from .registry import CanceledJobRegistry @@ -1198,27 +1198,27 @@ class Job: # handle it raise - def requeue(self, at_front: bool = False) -> "Job": - """Requeues job + def requeue(self, at_front: bool = False) -> 'Job': + '''Requeues job Args: at_front (bool, optional): Whether the job should be requeued at the front of the queue. Defaults to False. Returns: job (Job): The requeued Job instance - """ + ''' return self.failed_job_registry.requeue(self, at_front=at_front) @property - def execution_registry(self) -> "ExecutionRegistry": + def execution_registry(self) -> 'ExecutionRegistry': from .executions import ExecutionRegistry return ExecutionRegistry(self.id, connection=self.connection) - def get_executions(self) -> List["Execution"]: + def get_executions(self) -> List['Execution']: return self.execution_registry.get_executions() - def _remove_from_registries(self, pipeline: Optional["Pipeline"] = None, remove_from_queue: bool = True): + def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): from .registry import BaseRegistry if remove_from_queue: @@ -1275,16 +1275,16 @@ class Job: registry.remove(self, pipeline=pipeline) def delete( - self, pipeline: Optional["Pipeline"] = None, remove_from_queue: bool = True, delete_dependents: bool = False + self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents: bool = False ): - """Cancels the job and deletes the job hash from Redis. Jobs depending + '''Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well. Args: pipeline (Optional[Pipeline], optional): Redis' piepline. Defaults to None. remove_from_queue (bool, optional): Whether the job should be removed from the queue. Defaults to True. delete_dependents (bool, optional): Whether job dependents should also be deleted. Defaults to False. - """ + ''' connection = pipeline if pipeline is not None else self.connection self._remove_from_registries(pipeline=pipeline, remove_from_queue=remove_from_queue) @@ -1300,12 +1300,12 @@ class Job: connection.delete(self.key, self.dependents_key, self.dependencies_key) - def delete_dependents(self, pipeline: Optional["Pipeline"] = None): - """Delete jobs depending on this job. + def delete_dependents(self, pipeline: Optional['Pipeline'] = None): + '''Delete jobs depending on this job. Args: pipeline (Optional[Pipeline], optional): Redis' piepline. Defaults to None. - """ + ''' connection = pipeline if pipeline is not None else self.connection for dependent_id in self.dependent_ids: try: @@ -1318,12 +1318,12 @@ class Job: # Job execution def perform(self) -> Any: # noqa - """The main execution method. Invokes the job function with the job arguments. + '''The main execution method. Invokes the job function with the job arguments. This is the method that actually performs the job - it's what its called by the worker. Returns: result (Any): The job result - """ + ''' self.connection.persist(self.key) _job_stack.push(self) try: @@ -1332,35 +1332,35 @@ class Job: assert self is _job_stack.pop() return self._result - def prepare_for_execution(self, worker_name: str, pipeline: "Pipeline"): - """Prepares the job for execution, setting the worker name, + def prepare_for_execution(self, worker_name: str, pipeline: 'Pipeline'): + '''Prepares the job for execution, setting the worker name, heartbeat information, status and other metadata before execution begins. Args: worker_name (str): The worker that will perform the job pipeline (Pipeline): The Redis' piipeline to use - """ + ''' self.worker_name = worker_name self.last_heartbeat = now() self.started_at = self.last_heartbeat self._status = JobStatus.STARTED mapping = { - "last_heartbeat": utcformat(self.last_heartbeat), - "status": self._status, - "started_at": utcformat(self.started_at), # type: ignore - "worker_name": worker_name, + 'last_heartbeat': utcformat(self.last_heartbeat), + 'status': self._status, + 'started_at': utcformat(self.started_at), # type: ignore + 'worker_name': worker_name, } pipeline.hset(self.key, mapping=mapping) def _execute(self) -> Any: - """Actually runs the function with it's *args and **kwargs. + '''Actually runs the function with it's *args and **kwargs. It will use the `func` property, which was already resolved and ready to run at this point. If the function is a coroutine (it's an async function/method), then the `result` will have to be awaited within an event loop. Returns: result (Any): The function result - """ + ''' result = self.func(*self.args, **self.kwargs) if asyncio.iscoroutine(result): loop = asyncio.new_event_loop() @@ -1369,7 +1369,7 @@ class Job: return result def get_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]: - """Returns ttl for a job that determines how long a job will be + '''Returns ttl for a job that determines how long a job will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. @@ -1378,11 +1378,11 @@ class Job: Returns: ttl (int): The time to live - """ + ''' return default_ttl if self.ttl is None else self.ttl def get_result_ttl(self, default_ttl: int) -> int: - """Returns ttl for a job that determines how long a jobs result will + '''Returns ttl for a job that determines how long a jobs result will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. @@ -1391,22 +1391,22 @@ class Job: Returns: ttl (int): The time to live for the result - """ + ''' return default_ttl if self.result_ttl is None else self.result_ttl # Representation def get_call_string(self) -> Optional[str]: # noqa - """Returns a string representation of the call, formatted as a regular + '''Returns a string representation of the call, formatted as a regular Python function invocation statement. Returns: call_repr (str): The string representation - """ + ''' call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75) return call_repr - def cleanup(self, ttl: Optional[int] = None, pipeline: Optional["Pipeline"] = None, remove_from_queue: bool = True): - """Prepare job for eventual deletion (if needed). + def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): + '''Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its result depends on the value of ttl: - If ttl is 0, cleanup the job immediately. @@ -1417,7 +1417,7 @@ class Job: ttl (Optional[int], optional): Time to live. Defaults to None. pipeline (Optional[Pipeline], optional): Redis' pipeline. Defaults to None. remove_from_queue (bool, optional): Whether the job should be removed from the queue. Defaults to True. - """ + ''' if ttl == 0: self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: @@ -1453,45 +1453,45 @@ class Job: ) def execute_success_callback(self, death_penalty_class: Type[BaseDeathPenalty], result: Any): - """Executes success_callback for a job. + '''Executes success_callback for a job. with timeout . Args: death_penalty_class (Type[BaseDeathPenalty]): The penalty class to use for timeout result (Any): The job's result. - """ + ''' if not self.success_callback: return - logger.debug("Running success callbacks for %s", self.id) + logger.debug('Running success callbacks for %s', self.id) with death_penalty_class(self.success_callback_timeout, JobTimeoutException, job_id=self.id): self.success_callback(self, self.connection, result) def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info): - """Executes failure_callback with possible timeout""" + '''Executes failure_callback with possible timeout''' if not self.failure_callback: return - logger.debug("Running failure callbacks for %s", self.id) + logger.debug('Running failure callbacks for %s', self.id) try: with death_penalty_class(self.failure_callback_timeout, JobTimeoutException, job_id=self.id): self.failure_callback(self, self.connection, *exc_info) except Exception: # noqa - logger.exception("Job %s: error while executing failure callback", self.id) + logger.exception('Job %s: error while executing failure callback', self.id) raise def execute_stopped_callback(self, death_penalty_class: Type[BaseDeathPenalty]): - """Executes stopped_callback with possible timeout""" - logger.debug("Running stopped callbacks for %s", self.id) + '''Executes stopped_callback with possible timeout''' + logger.debug('Running stopped callbacks for %s', self.id) try: with death_penalty_class(self.stopped_callback_timeout, JobTimeoutException, job_id=self.id): self.stopped_callback(self, self.connection) except Exception: # noqa - logger.exception("Job %s: error while executing stopped callback", self.id) + logger.exception('Job %s: error while executing stopped callback', self.id) raise - def _handle_success(self, result_ttl: int, pipeline: "Pipeline"): - """Saves and cleanup job after successful execution""" + def _handle_success(self, result_ttl: int, pipeline: 'Pipeline'): + '''Saves and cleanup job after successful execution''' # self.log.debug('Setting job %s status to finished', job.id) self.set_status(JobStatus.FINISHED, pipeline=pipeline) # Result should be saved in job hash only if server @@ -1511,7 +1511,7 @@ class Job: finished_job_registry = self.finished_job_registry finished_job_registry.add(self, result_ttl, pipeline) - def _handle_failure(self, exc_string: str, pipeline: "Pipeline"): + def _handle_failure(self, exc_string: str, pipeline: 'Pipeline'): failed_job_registry = self.failed_job_registry # Exception should be saved in job hash if server # doesn't support Redis streams @@ -1529,21 +1529,21 @@ class Job: Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline) def get_retry_interval(self) -> int: - """Returns the desired retry interval. + '''Returns the desired retry interval. If number of retries is bigger than length of intervals, the first value in the list will be used multiple times. Returns: retry_interval (int): The desired retry interval - """ + ''' if self.retry_intervals is None: return 0 number_of_intervals = len(self.retry_intervals) index = max(number_of_intervals - self.retries_left, 0) return self.retry_intervals[index] - def retry(self, queue: "Queue", pipeline: "Pipeline"): - """Requeue or schedule this job for execution. + def retry(self, queue: 'Queue', pipeline: 'Pipeline'): + '''Requeue or schedule this job for execution. If the the `retry_interval` was set on the job itself, it will calculate a scheduled time for the job to run, and instead of just regularly `enqueing` the job, it will `schedule` it. @@ -1551,7 +1551,7 @@ class Job: Args: queue (Queue): The queue to retry the job on pipeline (Pipeline): The Redis' pipeline to use - """ + ''' retry_interval = self.get_retry_interval() self.retries_left = self.retries_left - 1 if retry_interval: @@ -1559,14 +1559,14 @@ class Job: self.set_status(JobStatus.SCHEDULED) queue.schedule_job(self, scheduled_datetime, pipeline=pipeline) logger.info( - "Job %s: scheduled for retry at %s, %s remaining", self.id, scheduled_datetime, self.retries_left + 'Job %s: scheduled for retry at %s, %s remaining', self.id, scheduled_datetime, self.retries_left ) else: queue._enqueue_job(self, pipeline=pipeline) - logger.info("Job %s: enqueued for retry, %s remaining") + logger.info('Job %s: enqueued for retry, %s remaining') - def register_dependency(self, pipeline: Optional["Pipeline"] = None): - """Jobs may have dependencies. Jobs are enqueued only if the jobs they + def register_dependency(self, pipeline: Optional['Pipeline'] = None): + '''Jobs may have dependencies. Jobs are enqueued only if the jobs they depend on are successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something like: @@ -1579,7 +1579,7 @@ class Job: Args: pipeline (Optional[Pipeline]): The Redis' pipeline. Defaults to None - """ + ''' from .registry import DeferredJobRegistry registry = DeferredJobRegistry( @@ -1601,11 +1601,11 @@ class Job: def dependencies_are_met( self, - parent_job: Optional["Job"] = None, - pipeline: Optional["Pipeline"] = None, + parent_job: Optional['Job'] = None, + pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None, ) -> bool: - """Returns a boolean indicating if all of this job's dependencies are `FINISHED` + '''Returns a boolean indicating if all of this job's dependencies are `FINISHED` If a pipeline is passed, all dependencies are WATCHed. @@ -1621,7 +1621,7 @@ class Job: Returns: are_met (bool): Whether the dependencies were met. - """ + ''' connection = pipeline if pipeline is not None else self.connection if pipeline is not None: @@ -1650,7 +1650,7 @@ class Job: with connection.pipeline() as pipeline: for key in dependencies_ids: - pipeline.hget(self.key_for(key), "status") + pipeline.hget(self.key_for(key), 'status') dependencies_statuses = pipeline.execute() @@ -1666,7 +1666,7 @@ _job_stack = LocalStack() class Retry: def __init__(self, max: int, interval: Union[int, Iterable[int]] = 0): - """The main object to defined Retry logics for jobs. + '''The main object to defined Retry logics for jobs. Args: max (int): The max number of times a job should be retried @@ -1676,19 +1676,19 @@ class Retry: Raises: ValueError: If the `max` argument is lower than 1 ValueError: If the interval param is negative or the list contains negative numbers - """ + ''' super().__init__() if max < 1: - raise ValueError("max: please enter a value greater than 0") + raise ValueError('max: please enter a value greater than 0') if isinstance(interval, int): if interval < 0: - raise ValueError("interval: negative numbers are not allowed") + raise ValueError('interval: negative numbers are not allowed') intervals = [interval] elif isinstance(interval, Iterable): for i in interval: if i < 0: - raise ValueError("interval: negative numbers are not allowed") + raise ValueError('interval: negative numbers are not allowed') intervals = interval self.max = max @@ -1696,15 +1696,15 @@ class Retry: class CallbackType(str, Enum): - SUCCESS = "success" - FAILURE = "failure" - STOPPED = "stopped" + SUCCESS = 'success' + FAILURE = 'failure' + STOPPED = 'stopped' @runtime_checkable class SuccessCallbackFunc(Protocol): def __call__(self, job: Job, connection: Redis, result: Any, *args: Any, **kwargs: Any) -> Any: - """ + ''' Callback function to be called when a job is successfully executed. Args: @@ -1713,7 +1713,7 @@ class SuccessCallbackFunc(Protocol): result (Any): the result of the job execution *args (Any): additional positional arguments **kwargs (Any): additional keyword arguments - """ + ''' @runtime_checkable @@ -1721,7 +1721,7 @@ class FailureCallbackFunc(Protocol): def __call__( self, job: Job, connection: Redis, type: Type[BaseException], value: BaseException, traceback: Any ) -> Any: - """ + ''' Callback function to be called when a job fails during execution. Args: @@ -1730,19 +1730,19 @@ class FailureCallbackFunc(Protocol): Type (type[BaseException]): the type of the exception value (BaseException): the exception instance traceback (Any): the traceback of the exception - """ + ''' @runtime_checkable class StoppedCallbackFunc(Protocol): def __call__(self, job: Job, connection: Redis) -> Any: - """ + ''' Callback function to be called when a job is stopped before execution. Args: job (Job): the job that was executed connection (Redis): the Redis connection - """ + ''' CALLBACK_PROTOCOLS = { @@ -1752,11 +1752,11 @@ CALLBACK_PROTOCOLS = { } EXPECTED_SIGNATURE = { - CallbackType.SUCCESS: "(job: Job, connection: Redis, result: Any, *args: Any, **kwargs: Any) -> Any", + CallbackType.SUCCESS: '(job: Job, connection: Redis, result: Any, *args: Any, **kwargs: Any) -> Any', CallbackType.FAILURE: ( - "(job: Job, connection: Redis, type: Type[BaseException], " "value: BaseException, traceback: Any) -> Any" + '(job: Job, connection: Redis, type: Type[BaseException], ' 'value: BaseException, traceback: Any) -> Any' ), - CallbackType.STOPPED: "(job: Job, connection: Redis) -> Any", + CallbackType.STOPPED: '(job: Job, connection: Redis) -> Any', } @@ -1767,7 +1767,7 @@ class Callback: timeout: Optional[Any] = None, ): if not isinstance(func, str) and not inspect.isfunction(func) and not inspect.isbuiltin(func): - raise ValueError("Callback `func` must be a string or function") + raise ValueError('Callback `func` must be a string or function') self.func = func self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT @@ -1776,10 +1776,10 @@ class Callback: def name(self) -> str: if isinstance(self.func, str): return self.func - return "{0}.{1}".format(self.func.__module__, self.func.__qualname__) + return '{0}.{1}'.format(self.func.__module__, self.func.__qualname__) def assert_type(self, callback_type: CallbackType) -> None: - """ + ''' Assert that the callback function matches the expected signature for the given callback type. Args: @@ -1787,7 +1787,7 @@ class Callback: Raises: InvalidCallbackException: if the callback function does not match the expected signature - """ + ''' if isinstance(self.func, str): return @@ -1798,6 +1798,6 @@ class Callback: class InvalidCallbackException(Exception): def __init__(self, callback: Callback, callback_type: CallbackType) -> None: super().__init__( - f"Callback {callback.name} does not match the expected signature\n" - f"Expected: def {callback.name}{EXPECTED_SIGNATURE[callback_type]}" + f'Callback {callback.name} does not match the expected signature\n' + f'Expected: def {callback.name}{EXPECTED_SIGNATURE[callback_type]}' )