From 375ace1747a25bb26c080c5ad5f51da394bb8059 Mon Sep 17 00:00:00 2001 From: lowercase00 <21188280+lowercase00@users.noreply.github.com> Date: Sat, 1 Oct 2022 06:34:30 -0300 Subject: [PATCH] Typing (#1698) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Gitignore Venv + VScode * Add Typings, Add Test to Makefile * Fix, More typing, Redis Pipeline specific type * More types * Fix 3.7- Typing Compat, Add Tox Envs, Tests Dockerfile * fix listindex error (#1700) * More docstrings * More Types * Fix Typo on Dependency * Last Types Co-authored-by: Burak Yılmaz <46003469+yilmaz-burak@users.noreply.github.com> --- .github/workflows/dependencies.yml | 6 +- .gitignore | 2 + Makefile | 3 + rq/command.py | 92 +++++++++++++++---- rq/connections.py | 45 +++++++--- rq/decorators.py | 27 ++++-- rq/dummy.py | 4 +- rq/job.py | 137 +++++++++++++++-------------- rq/queue.py | 84 +++++++++--------- rq/registry.py | 83 ++++++++++++----- rq/scheduler.py | 4 +- rq/serializers.py | 2 +- rq/suspension.py | 34 +++++-- rq/utils.py | 114 ++++++++++++++++-------- rq/worker.py | 104 ++++++++++++---------- rq/worker_registration.py | 54 +++++++++--- run_tests_in_docker.sh | 3 - tests/Dockerfile | 48 ++++++++-- tests/test_dependencies.py | 2 +- tests/test_worker.py | 18 +--- tox.ini | 31 ++++++- 21 files changed, 593 insertions(+), 304 deletions(-) delete mode 100755 run_tests_in_docker.sh diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index 51180e9e..f478268d 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.5", "3.6", "3.7", "3.8.3", "3.9", "3.10"] + python-version: ["3.6", "3.7", "3.8.3", "3.9", "3.10"] redis-version: [3, 4, 5, 6, 7] redis-py-version: [3.5.0] @@ -49,7 +49,7 @@ jobs: strategy: matrix: - python-version: ["3.5", "3.6", "3.7", "3.8.3", "3.9", "3.10"] + python-version: ["3.6", "3.7", "3.8.3", "3.9", "3.10"] redis-version: [3, 4, 5, 6, 7] steps: @@ -68,7 +68,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install git+https://github.com/andymccurdy/redis-py + pip install git+https://github.com/redis/redis-py pip install git+https://github.com/pallets/click pip install -r dev-requirements.txt pip install -e . diff --git a/.gitignore b/.gitignore index bf5269f5..262d20bc 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,5 @@ Vagrantfile Gemfile Gemfile.lock _site/ +.venv/ +.vscode/ \ No newline at end of file diff --git a/Makefile b/Makefile index d43c5c91..a3ad247c 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,9 @@ all: clean: rm -rf build/ dist/ +test: + docker build -f tests/Dockerfile . -t rqtest && docker run -it --rm rqtest + release: clean # Check if latest tag is the current head we're releasing echo "Latest tag = $$(git tag | sort -nr | head -n1)" diff --git a/rq/command.py b/rq/command.py index e478cbc3..7ade232c 100644 --- a/rq/command.py +++ b/rq/command.py @@ -1,6 +1,11 @@ import json import os import signal +import typing as t + +if t.TYPE_CHECKING: + from redis import Redis + from .worker import Worker from rq.exceptions import InvalidJobOperation from rq.job import Job @@ -9,39 +14,74 @@ from rq.job import Job PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' -def send_command(connection, worker_name, command, **kwargs): - """Use connection' pubsub mechanism to send a command""" +def send_command(connection: 'Redis', worker_name: str, command, **kwargs): + """ + Use connection' pubsub mechanism to send a command + + Args: + connection (Redis): A Redis Connection + worker_name (str): The Job ID + """ payload = {'command': command} if kwargs: payload.update(kwargs) connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload)) -def parse_payload(payload): - """Returns a dict of command data""" +def parse_payload(payload: t.Dict[t.Any, t.Any]) -> t.Dict[t.Any, t.Any]: + """ + Returns a dict of command data + + Args: + payload (dict): Parses the payload dict. + """ return json.loads(payload.get('data').decode()) -def send_shutdown_command(connection, worker_name): - """Send shutdown command""" +def send_shutdown_command(connection: 'Redis', worker_name: str): + """ + Sends a shutdown command to the pubsub topic. + + Args: + connection (Redis): A Redis Connection + worker_name (str): The Job ID + """ send_command(connection, worker_name, 'shutdown') -def send_kill_horse_command(connection, worker_name): - """Tell worker to kill it's horse""" +def send_kill_horse_command(connection: 'Redis', worker_name: str): + """ + Tell worker to kill it's horse + + Args: + connection (Redis): A Redis Connection + worker_name (str): The Job ID + """ send_command(connection, worker_name, 'kill-horse') -def send_stop_job_command(connection, job_id, serializer=None): - """Instruct a worker to stop a job""" +def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None): + """ + Instruct a worker to stop a job + + Args: + connection (Redis): A Redis Connection + job_id (str): The Job ID + serializer (): The serializer + """ job = Job.fetch(job_id, connection=connection, serializer=serializer) if not job.worker_name: raise InvalidJobOperation('Job is not currently executing') send_command(connection, job.worker_name, 'stop-job', job_id=job_id) -def handle_command(worker, payload): - """Parses payload and routes commands""" +def handle_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): + """Parses payload and routes commands + + Args: + worker (Worker): The worker to use + payload (t.Dict[t.Any, t.Any]): The Payload + """ if payload['command'] == 'stop-job': handle_stop_job_command(worker, payload) elif payload['command'] == 'shutdown': @@ -50,15 +90,26 @@ def handle_command(worker, payload): handle_kill_worker_command(worker, payload) -def handle_shutdown_command(worker): - """Perform shutdown command""" +def handle_shutdown_command(worker: 'Worker'): + """Perform shutdown command. + + Args: + worker (Worker): The worker to use. + """ worker.log.info('Received shutdown command, sending SIGINT signal.') pid = os.getpid() os.kill(pid, signal.SIGINT) -def handle_kill_worker_command(worker, payload): - """Stops work horse""" +def handle_kill_worker_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): + """ + Stops work horse + + Args: + worker (Worker): The worker to stop + payload (t.Dict[t.Any, t.Any]): The payload. + """ + worker.log.info('Received kill horse command.') if worker.horse_pid: worker.log.info('Kiling horse...') @@ -67,8 +118,13 @@ def handle_kill_worker_command(worker, payload): worker.log.info('Worker is not working, kill horse command ignored') -def handle_stop_job_command(worker, payload): - """Handles stop job command""" +def handle_stop_job_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]): + """Handles stop job command. + + Args: + worker (Worker): The worker to use + payload (t.Dict[t.Any, t.Any]): The payload. + """ job_id = payload.get('job_id') worker.log.debug('Received command to stop job %s', job_id) if job_id and worker.get_current_job_id() == job_id: diff --git a/rq/connections.py b/rq/connections.py index d6a20367..b37e7460 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,5 +1,5 @@ from contextlib import contextmanager - +import typing as t from redis import Redis from .local import LocalStack, release_local @@ -10,7 +10,7 @@ class NoRedisConnectionException(Exception): @contextmanager -def Connection(connection=None): # noqa +def Connection(connection: t.Optional['Redis'] = None): # noqa if connection is None: connection = Redis() push_connection(connection) @@ -23,19 +23,30 @@ def Connection(connection=None): # noqa 'Check your Redis connection setup.' -def push_connection(redis): - """Pushes the given connection on the stack.""" +def push_connection(redis: 'Redis'): + """ + Pushes the given connection on the stack. + + Args: + redis (Redis): A Redis connection + """ _connection_stack.push(redis) def pop_connection(): - """Pops the topmost connection from the stack.""" + """ + Pops the topmost connection from the stack. + """ return _connection_stack.pop() -def use_connection(redis=None): - """Clears the stack and uses the given connection. Protects against mixed +def use_connection(redis: t.Optional['Redis'] = None): + """ + Clears the stack and uses the given connection. Protects against mixed use of use_connection() and stacked connection contexts. + + Args: + redis (t.Optional[Redis], optional): A Redis Connection. Defaults to None. """ assert len(_connection_stack) <= 1, \ 'You should not mix Connection contexts with use_connection()' @@ -47,16 +58,28 @@ def use_connection(redis=None): def get_current_connection(): - """Returns the current Redis connection (i.e. the topmost on the + """ + Returns the current Redis connection (i.e. the topmost on the connection stack). """ return _connection_stack.top -def resolve_connection(connection=None): - """Convenience function to resolve the given or the current connection. - Raises an exception if it cannot resolve a connection now. +def resolve_connection(connection: t.Optional['Redis'] = None) -> 'Redis': """ + Convenience function to resolve the given or the current connection. + Raises an exception if it cannot resolve a connection now. + + Args: + connection (t.Optional[Redis], optional): A Redis connection. Defaults to None. + + Raises: + NoRedisConnectionException: If connection couldn't be resolved. + + Returns: + Redis: A Redis Connection + """ + if connection is not None: return connection diff --git a/rq/decorators.py b/rq/decorators.py index a38f066f..5398a7c5 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,4 +1,9 @@ from functools import wraps +import typing as t + +if t.TYPE_CHECKING: + from redis import Redis + from .job import Retry from rq.compat import string_types @@ -10,21 +15,25 @@ from .utils import backend_class class job: # noqa queue_class = Queue - def __init__(self, queue, connection=None, timeout=None, + def __init__(self, queue: 'Queue', connection: t.Optional['Redis'] = None, timeout=None, result_ttl=DEFAULT_RESULT_TTL, ttl=None, - queue_class=None, depends_on=None, at_front=None, meta=None, - description=None, failure_ttl=None, retry=None, on_failure=None, + queue_class=None, depends_on: t.Optional[t.List[t.Any]] = None, at_front: t.Optional[bool] = None, + meta=None, description=None, failure_ttl=None, retry: t.Optional['Retry'] = None, on_failure=None, on_success=None): - """A decorator that adds a ``delay`` method to the decorated function, + """ + A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string - denoting the queue name. For example: + denoting the queue name. For example:: - @job(queue='default') - def simple_add(x, y): - return x + y + ..codeblock:python:: - simple_add.delay(1, 2) # Puts simple_add function into queue + >>> @job(queue='default') + >>> def simple_add(x, y): + >>> return x + y + >>> ... + >>> # Puts `simple_add` function into queue + >>> simple_add.delay(1, 2) """ self.queue = queue self.queue_class = backend_class(self, 'queue_class', override=queue_class) diff --git a/rq/dummy.py b/rq/dummy.py index 3c9187da..12f360b5 100644 --- a/rq/dummy.py +++ b/rq/dummy.py @@ -10,7 +10,7 @@ def do_nothing(): pass -def sleep(secs): +def sleep(secs: int): time.sleep(secs) @@ -23,7 +23,7 @@ def div_by_zero(): 1 / 0 -def fib(n): +def fib(n: int): if n <= 1: return 1 else: diff --git a/rq/job.py b/rq/job.py index 0436cf4f..f321dd17 100644 --- a/rq/job.py +++ b/rq/job.py @@ -3,16 +3,21 @@ import json import pickle import warnings import zlib - +import typing as t import asyncio + from collections.abc import Iterable from datetime import datetime, timedelta, timezone from enum import Enum from functools import partial from uuid import uuid4 - from redis import WatchError +if t.TYPE_CHECKING: + from rq.queue import Queue + from redis import Redis + from redis.client import Pipeline + from rq.compat import as_text, decode_redis_hash, string_types from .connections import resolve_connection from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError @@ -39,18 +44,18 @@ class JobStatus(str, Enum): class Dependency: - def __init__(self, jobs, allow_failure: bool = False, enqueue_at_front: bool = False): - jobs = ensure_list(jobs) + def __init__(self, jobs: t.List[t.Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False): + dependent_jobs = ensure_list(jobs) if not all( isinstance(job, Job) or isinstance(job, str) - for job in jobs + for job in dependent_jobs if job ): raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") - elif len(jobs) < 1: + elif len(dependent_jobs) < 1: raise ValueError("jobs: cannot be empty.") - self.dependencies = jobs + self.dependencies = dependent_jobs self.allow_failure = allow_failure self.enqueue_at_front = enqueue_at_front @@ -60,14 +65,14 @@ class Dependency: UNEVALUATED = object() -def cancel_job(job_id, connection=None, serializer=None, enqueue_dependents=False): +def cancel_job(job_id: str, connection: t.Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False): """Cancels the job with the given job ID, preventing execution. Discards any job info (i.e. it can't be requeued later). """ Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents) -def get_current_job(connection=None, job_class=None): +def get_current_job(connection: t.Optional['Redis'] = None, job_class: t.Optional['Job'] = None): """Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. """ @@ -77,7 +82,7 @@ def get_current_job(connection=None, job_class=None): return _job_stack.top -def requeue_job(job_id, connection, serializer=None): +def requeue_job(job_id: str, connection: 'Redis', serializer=None): job = Job.fetch(job_id, connection=connection, serializer=serializer) return job.requeue() @@ -89,10 +94,10 @@ class Job: # Job construction @classmethod - def create(cls, func, args=None, kwargs=None, connection=None, + def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: t.Optional['Redis'] = None, result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None, origin=None, meta=None, - failure_ttl=None, serializer=None, *, on_success=None, on_failure=None): + failure_ttl=None, serializer=None, *, on_success=None, on_failure=None) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -171,18 +176,18 @@ class Job: return q.get_job_position(self._id) return None - def get_status(self, refresh=True): + def get_status(self, refresh: bool = True) -> str: if refresh: self._status = as_text(self.connection.hget(self.key, 'status')) return self._status - def set_status(self, status, pipeline=None): + def set_status(self, status: str, pipeline: t.Optional['Pipeline'] = None): self._status = status - connection = pipeline if pipeline is not None else self.connection + connection: 'Redis' = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) - def get_meta(self, refresh=True): + def get_meta(self, refresh: bool = True): if refresh: meta = self.connection.hget(self.key, 'meta') self.meta = self.serializer.loads(meta) if meta else {} @@ -190,35 +195,35 @@ class Job: return self.meta @property - def is_finished(self): + def is_finished(self) -> bool: return self.get_status() == JobStatus.FINISHED @property - def is_queued(self): + def is_queued(self) -> bool: return self.get_status() == JobStatus.QUEUED @property - def is_failed(self): + def is_failed(self) -> bool: return self.get_status() == JobStatus.FAILED @property - def is_started(self): + def is_started(self) -> bool: return self.get_status() == JobStatus.STARTED @property - def is_deferred(self): + def is_deferred(self) -> bool: return self.get_status() == JobStatus.DEFERRED @property - def is_canceled(self): + def is_canceled(self) -> bool: return self.get_status() == JobStatus.CANCELED @property - def is_scheduled(self): + def is_scheduled(self) -> bool: return self.get_status() == JobStatus.SCHEDULED @property - def is_stopped(self): + def is_stopped(self) -> bool: return self.get_status() == JobStatus.STOPPED @property @@ -230,7 +235,7 @@ class Job: return self._dependency_ids[0] @property - def dependency(self): + def dependency(self) -> t.Optional['Job']: """Returns a job's first dependency. To avoid repeated Redis fetches, we cache job.dependency as job._dependency. """ @@ -243,7 +248,7 @@ class Job: return job @property - def dependent_ids(self): + def dependent_ids(self) -> t.List[str]: """Returns a list of ids of jobs whose execution depends on this job's successful execution.""" return list(map(as_text, self.connection.smembers(self.dependents_key))) @@ -358,13 +363,13 @@ class Job: self._data = UNEVALUATED @classmethod - def exists(cls, job_id, connection=None): + def exists(cls, job_id: str, connection: t.Optional['Redis'] = None) -> int: """Returns whether a job hash exists for the given job ID.""" conn = resolve_connection(connection) return conn.exists(cls.key_for(job_id)) @classmethod - def fetch(cls, id, connection=None, serializer=None): + def fetch(cls, id: str, connection: t.Optional['Redis'] = None, serializer=None) -> 'Job': """Fetches a persisted job from its corresponding Redis key and instantiates it. """ @@ -373,7 +378,7 @@ class Job: return job @classmethod - def fetch_many(cls, job_ids, connection, serializer=None): + def fetch_many(cls, job_ids: t.List[str], connection: 'Redis', serializer=None): """ Bulk version of Job.fetch @@ -385,7 +390,7 @@ class Job: pipeline.hgetall(cls.key_for(job_id)) results = pipeline.execute() - jobs = [] + jobs: t.List[t.Optional['Job']] = [] for i, job_id in enumerate(job_ids): if results[i]: job = cls(job_id, connection=connection, serializer=serializer) @@ -396,7 +401,7 @@ class Job: return jobs - def __init__(self, id=None, connection=None, serializer=None): + def __init__(self, id: str = None, connection: t.Optional['Redis'] = None, serializer=None): self.connection = resolve_connection(connection) self._id = id self.created_at = utcnow() @@ -411,27 +416,26 @@ class Job: self._failure_callback = UNEVALUATED self.description = None self.origin = None - self.enqueued_at = None - self.started_at = None - self.ended_at = None + self.enqueued_at: t.Optional[datetime] = None + self.started_at: t.Optional[datetime] = None + self.ended_at: t.Optional[datetime] = None self._result = None self.exc_info = None self.timeout = None - self.result_ttl = None - self.failure_ttl = None - self.ttl = None - self.worker_name = None + self.result_ttl: t.Optional[int] = None + self.failure_ttl: t.Optional[int] = None + self.ttl: t.Optional[int] = None + self.worker_name: t.Optional[str] = None self._status = None - self._dependency_ids = [] + self._dependency_ids: t.List[str] = [] self.meta = {} self.serializer = resolve_serializer(serializer) self.retries_left = None - # retry_intervals is a list of int e.g [60, 120, 240] - self.retry_intervals = None + self.retry_intervals: t.Optional[t.List[int]] = None self.redis_server_version = None - self.last_heartbeat = None - self.allow_dependency_failures = None - self.enqueue_at_front = None + self.last_heartbeat: t.Optional[datetime] = None + self.allow_dependency_failures: t.Optional[bool] = None + self.enqueue_at_front: t.Optional[bool] = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -443,7 +447,6 @@ class Job: self.id, self.description) - # Job equality def __eq__(self, other): # noqa return isinstance(other, self.__class__) and self.id == other.id @@ -459,13 +462,13 @@ class Job: self._id = str(uuid4()) return self._id - def set_id(self, value): + def set_id(self, value: str): """Sets a job ID for the given job.""" if not isinstance(value, string_types): raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value - def heartbeat(self, timestamp, ttl, pipeline=None, xx=False): + def heartbeat(self, timestamp: datetime, ttl: int, pipeline: t.Optional['Pipeline'] = None, xx: bool = False): self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) @@ -474,12 +477,12 @@ class Job: id = property(get_id, set_id) @classmethod - def key_for(cls, job_id): + def key_for(cls, job_id: str): """The Redis key that is used to store job hash under.""" return (cls.redis_job_namespace_prefix + job_id).encode('utf-8') @classmethod - def dependents_key_for(cls, job_id): + def dependents_key_for(cls, job_id: str): """The Redis key that is used to store job dependents hash under.""" return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id) @@ -497,7 +500,7 @@ class Job: def dependencies_key(self): return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id) - def fetch_dependencies(self, watch=False, pipeline=None): + def fetch_dependencies(self, watch: bool = False, pipeline: t.Optional['Pipeline'] = None): """ Fetch all of a job's dependencies. If a pipeline is supplied, and watch is true, then set WATCH on all the keys of all dependencies. @@ -617,7 +620,7 @@ class Job: raise NoSuchJobError('No such job: {0}'.format(self.key)) self.restore(data) - def to_dict(self, include_meta=True): + def to_dict(self, include_meta: bool = True) -> dict: """ Returns a serialization of the current job instance @@ -678,7 +681,7 @@ class Job: return obj - def save(self, pipeline=None, include_meta=True): + def save(self, pipeline: t.Optional['Pipeline'] = None, include_meta: bool = True): """ Dumps the current job instance to its corresponding Redis key. @@ -710,7 +713,7 @@ class Job: meta = self.serializer.dumps(self.meta) self.connection.hset(self.key, 'meta', meta) - def cancel(self, pipeline=None, enqueue_dependents=False): + def cancel(self, pipeline: t.Optional['Pipeline'] = None, enqueue_dependents: bool = False): """Cancels the given job, which will prevent the job from ever being ran (or inspected). @@ -766,11 +769,11 @@ class Job: # handle it raise - def requeue(self, at_front=False): + def requeue(self, at_front: bool = False): """Requeues job.""" return self.failed_job_registry.requeue(self, at_front=at_front) - def _remove_from_registries(self, pipeline=None, remove_from_queue=True): + def _remove_from_registries(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True): if remove_from_queue: from .queue import Queue q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) @@ -818,7 +821,7 @@ class Job: serializer=self.serializer) registry.remove(self, pipeline=pipeline) - def delete(self, pipeline=None, remove_from_queue=True, + def delete(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents=False): """Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well.""" @@ -832,7 +835,7 @@ class Job: connection.delete(self.key, self.dependents_key, self.dependencies_key) - def delete_dependents(self, pipeline=None): + def delete_dependents(self, pipeline: t.Optional['Pipeline'] = None): """Delete jobs depending on this job.""" connection = pipeline if pipeline is not None else self.connection for dependent_id in self.dependent_ids: @@ -856,7 +859,7 @@ class Job: assert self is _job_stack.pop() return self._result - def prepare_for_execution(self, worker_name, pipeline): + def prepare_for_execution(self, worker_name: str, pipeline: 'Pipeline'): """Set job metadata before execution begins""" self.worker_name = worker_name self.last_heartbeat = utcnow() @@ -881,14 +884,14 @@ class Job: return coro_result return result - def get_ttl(self, default_ttl=None): + def get_ttl(self, default_ttl: t.Optional[int] = None): """Returns ttl for a job that determines how long a job will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.ttl is None else self.ttl - def get_result_ttl(self, default_ttl=None): + def get_result_ttl(self, default_ttl: t.Optional[int] = None): """Returns ttl for a job that determines how long a jobs result will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. @@ -902,7 +905,8 @@ class Job: """ return get_call_string(self.func_name, self.args, self.kwargs, max_length=75) - def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True): + def cleanup(self, ttl: t.Optional[int] = None, pipeline: t.Optional['Pipeline'] = None, + remove_from_queue: bool = True): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its result depends on the value of ttl: @@ -946,7 +950,7 @@ class Job: index = max(number_of_intervals - self.retries_left, 0) return self.retry_intervals[index] - def retry(self, queue, pipeline): + def retry(self, queue: 'Queue', pipeline: 'Pipeline'): """Requeue or schedule this job for execution""" retry_interval = self.get_retry_interval() self.retries_left = self.retries_left - 1 @@ -957,7 +961,7 @@ class Job: else: queue.enqueue_job(self, pipeline=pipeline) - def register_dependency(self, pipeline=None): + def register_dependency(self, pipeline: t.Optional['Pipeline'] = None): """Jobs may have dependencies. Jobs are enqueued only if the jobs they depend on are successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something @@ -989,7 +993,8 @@ class Job: return [Job.key_for(_id.decode()) for _id in dependencies] - def dependencies_are_met(self, parent_job=None, pipeline=None, exclude_job_id=None): + def dependencies_are_met(self, parent_job: t.Optional['Job'] = None, + pipeline: t.Optional['Pipeline'] = None, exclude_job_id: str = None): """Returns a boolean indicating if all of this job's dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. @@ -1009,7 +1014,7 @@ class Job: for _id in connection.smembers(self.dependencies_key)} if exclude_job_id: - dependencies_ids.discard(exclude_job_id) + dependencies_ids.discard(exclude_job_id) if parent_job.id == exclude_job_id: parent_job = None @@ -1050,7 +1055,7 @@ _job_stack = LocalStack() class Retry: - def __init__(self, max, interval=0): + def __init__(self, max, interval: int = 0): """`interval` can be a positive number or a list of ints""" super().__init__() if max < 1: diff --git a/rq/queue.py b/rq/queue.py index 97b74fb9..7eaf84f0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,12 +1,16 @@ import uuid import sys import warnings - +import typing as t from collections import namedtuple from datetime import datetime, timezone from redis import WatchError +if t.TYPE_CHECKING: + from redis import Redis + from redis.client import Pipeline + from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL @@ -33,13 +37,13 @@ class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout" @total_ordering class Queue: - job_class = Job - DEFAULT_TIMEOUT = 180 # Default timeout seconds. - redis_queue_namespace_prefix = 'rq:queue:' - redis_queues_keys = 'rq:queues' + job_class: t.Type['Job'] = Job + DEFAULT_TIMEOUT: int = 180 # Default timeout seconds. + redis_queue_namespace_prefix: str = 'rq:queue:' + redis_queues_keys: str = 'rq:queues' @classmethod - def all(cls, connection=None, job_class=None, serializer=None): + def all(cls, connection: t.Optional['Redis'] = None, job_class: t.Optional[t.Type['Job']] = None, serializer=None): """Returns an iterable of all Queues. """ connection = resolve_connection(connection) @@ -54,7 +58,8 @@ class Queue: if rq_key] @classmethod - def from_queue_key(cls, queue_key, connection=None, job_class=None, serializer=None): + def from_queue_key(cls, queue_key, connection: t.Optional['Redis'] = None, + job_class: t.Optional[t.Type['Job']] = None, serializer=None): """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their Redis keys. @@ -65,7 +70,7 @@ class Queue: name = queue_key[len(prefix):] return cls(name, connection=connection, job_class=job_class, serializer=serializer) - def __init__(self, name='default', default_timeout=None, connection=None, + def __init__(self, name='default', default_timeout=None, connection: t.Optional['Redis'] = None, is_async=True, job_class=None, serializer=None, **kwargs): self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix @@ -143,7 +148,7 @@ class Queue: script = self.connection.register_script(script) return script(keys=[self.key]) - def delete(self, delete_jobs=True): + def delete(self, delete_jobs: bool = True): """Deletes the queue. If delete_jobs is true it removes all the associated messages on the queue first.""" if delete_jobs: self.empty() @@ -162,7 +167,7 @@ class Queue: """Returns whether the current queue is async.""" return bool(self._is_async) - def fetch_job(self, job_id): + def fetch_job(self, job_id: str): try: job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: @@ -171,7 +176,7 @@ class Queue: if job.origin == self.name: return job - def get_job_position(self, job_or_id): + def get_job_position(self, job_or_id: t.Union[Job, str]): """Returns the position of a job within the queue Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of @@ -192,7 +197,7 @@ class Queue: return self.job_ids.index(job_id) return None - def get_job_ids(self, offset=0, length=-1): + def get_job_ids(self, offset: int = 0, length: int = -1): """Returns a slice of job IDs in the queue.""" start = offset if length >= 0: @@ -202,23 +207,23 @@ class Queue: return [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)] - def get_jobs(self, offset=0, length=-1): + def get_jobs(self, offset: int = 0, length: int = -1): """Returns a slice of jobs in the queue.""" job_ids = self.get_job_ids(offset, length) return compact([self.fetch_job(job_id) for job_id in job_ids]) @property - def job_ids(self): + def job_ids(self) -> t.List[str]: """Returns a list of all job IDS in the queue.""" return self.get_job_ids() @property - def jobs(self): + def jobs(self) -> t.List['Job']: """Returns a list of all (valid) jobs in the queue.""" return self.get_jobs() @property - def count(self): + def count(self) -> int: """Returns a count of all messages in the queue.""" return self.connection.llen(self.key) @@ -259,7 +264,7 @@ class Queue: from rq.registry import CanceledJobRegistry return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) - def remove(self, job_or_id, pipeline=None): + def remove(self, job_or_id: t.Union['Job', str], pipeline: t.Optional['Pipeline'] = None): """Removes Job from queue, accepts either a Job instance or ID.""" job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id @@ -283,7 +288,7 @@ class Queue: if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id, pipeline=None, at_front=False): + def push_job_id(self, job_id: str, pipeline: t.Optional['Pipeline'] = None, at_front=False): """Pushes a job ID on the corresponding Redis queue. 'at_front' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection @@ -292,11 +297,11 @@ class Queue: else: connection.rpush(self.key, job_id) - def create_job(self, func, args=None, kwargs=None, timeout=None, + def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, meta=None, status=JobStatus.QUEUED, retry=None, *, - on_success=None, on_failure=None): + on_success=None, on_failure=None) -> Job: """Creates a job based on parameters given.""" timeout = parse_timeout(timeout) @@ -327,11 +332,7 @@ class Queue: return job - def setup_dependencies( - self, - job, - pipeline=None - ): + def setup_dependencies(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None): # If a _dependent_ job depends on any unfinished job, register all the # _dependent_ job's dependencies instead of enqueueing it. # @@ -383,10 +384,10 @@ class Queue: pipeline.multi() # Ensure pipeline in multi mode before returning to caller return job - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, + def enqueue_call(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, - depends_on=None, job_id=None, at_front=False, meta=None, - retry=None, on_success=None, on_failure=None, pipeline=None): + depends_on=None, job_id: str = None, at_front: bool = False, meta=None, + retry=None, on_success=None, on_failure=None, pipeline=None) -> Job: """Creates a job to represent the delayed function call and enqueues it. It is much like `.enqueue()`, except that it takes the function's args @@ -414,7 +415,7 @@ class Queue: def prepare_data(func, args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, job_id=None, - at_front=False, meta=None, retry=None, on_success=None, on_failure=None): + at_front=False, meta=None, retry=None, on_success=None, on_failure=None) -> EnqueueData: # Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples # And can keep this logic within EnqueueData return EnqueueData( @@ -424,11 +425,7 @@ class Queue: at_front, meta, retry, on_success, on_failure ) - def enqueue_many( - self, - job_datas, - pipeline=None - ): + def enqueue_many(self, job_datas, pipeline: t.Optional['Pipeline'] = None) -> t.List[Job]: """ Creates multiple jobs (created via `Queue.prepare_data` calls) to represent the delayed function calls and enqueues them. @@ -456,7 +453,7 @@ class Queue: pipe.execute() return jobs - def run_job(self, job): + def run_job(self, job: 'Job') -> Job: job.perform() job.set_status(JobStatus.FINISHED) job.save(include_meta=False) @@ -464,7 +461,7 @@ class Queue: return job @classmethod - def parse_args(cls, f, *args, **kwargs): + def parse_args(cls, f: t.Union[t.Callable[..., t.Any], str], *args, **kwargs): """ Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()` @@ -519,7 +516,7 @@ class Queue: pipeline=pipeline ) - def enqueue_at(self, datetime, f, *args, **kwargs): + def enqueue_at(self, datetime: datetime, f, *args, **kwargs): """Schedules a job to be enqueued at specified time""" (f, timeout, description, result_ttl, ttl, failure_ttl, @@ -533,7 +530,7 @@ class Queue: return self.schedule_job(job, datetime, pipeline=pipeline) - def schedule_job(self, job, datetime, pipeline=None): + def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None): """Puts job on ScheduledJobRegistry""" from .registry import ScheduledJobRegistry registry = ScheduledJobRegistry(queue=self) @@ -553,7 +550,7 @@ class Queue: return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) - def enqueue_job(self, job, pipeline=None, at_front=False): + def enqueue_job(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution. If Queue is instantiated with is_async=False, job is executed immediately. @@ -583,7 +580,7 @@ class Queue: return job - def run_sync(self, job): + def run_sync(self, job: 'Job') -> 'Job': with self.connection.pipeline() as pipeline: job.prepare_for_execution('sync', pipeline) @@ -599,7 +596,7 @@ class Queue: return job - def enqueue_dependents(self, job, pipeline=None, exclude_job_id=None): + def enqueue_dependents(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, exclude_job_id=None): """Enqueues all jobs in the given job's dependents set and clears it. When called without a pipeline, this method uses WATCH/MULTI/EXEC. @@ -683,7 +680,7 @@ class Queue: return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys, timeout, connection=None): + def lpop(cls, queue_keys, timeout, connection: t.Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -713,7 +710,8 @@ class Queue: return None @classmethod - def dequeue_any(cls, queues, timeout, connection=None, job_class=None, serializer=None): + def dequeue_any(cls, queues, timeout, connection: t.Optional['Redis'] = None, + job_class: t.Optional[t.Type['Job']] = None, serializer=None): """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. diff --git a/rq/registry.py b/rq/registry.py index cc989c84..4b8089c9 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,8 +1,13 @@ +import typing as t import calendar from rq.serializers import resolve_serializer import time from datetime import datetime, timedelta, timezone +if t.TYPE_CHECKING: + from redis import Redis + from redis.client import Pipeline + from .compat import as_text from .connections import resolve_connection from .defaults import DEFAULT_FAILURE_TTL @@ -21,8 +26,8 @@ class BaseRegistry: job_class = Job key_template = 'rq:registry:{0}' - def __init__(self, name='default', connection=None, job_class=None, - queue=None, serializer=None): + def __init__(self, name='default', connection: t.Optional['Redis'] = None, + job_class: t.Optional[t.Type['Job']] = None, queue=None, serializer=None): if queue: self.name = queue.name self.connection = resolve_connection(queue.connection) @@ -45,10 +50,13 @@ class BaseRegistry: self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs ) - def __contains__(self, item): + def __contains__(self, item: t.Union[str, 'Job']): """ Returns a boolean indicating registry contains the given job instance or job id. + + Args: + item (Union[str, Job]): A Job ID or a Job. """ job_id = item if isinstance(item, self.job_class): @@ -61,8 +69,15 @@ class BaseRegistry: self.cleanup() return self.connection.zcard(self.key) - def add(self, job, ttl=0, pipeline=None, xx=False): - """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf""" + def add(self, job: 'Job', ttl=0, pipeline: t.Optional['Pipeline'] = None, xx: bool = False): + """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf + + Args: + job (Job): The Job to add + ttl (int, optional): The time to live. Defaults to 0. + pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + xx (bool, optional): .... Defaults to False. + """ score = ttl if ttl < 0 else current_timestamp() + ttl if score == -1: score = '+inf' @@ -71,8 +86,14 @@ class BaseRegistry: return self.connection.zadd(self.key, {job.id: score}, xx=xx) - def remove(self, job, pipeline=None, delete_job=False): - """Removes job from registry and deletes it if `delete_job == True`""" + def remove(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, delete_job: bool = False): + """Removes job from registry and deletes it if `delete_job == True` + + Args: + job (Job): The Job to remove from the registry + pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + delete_job (bool, optional): If should delete the job.. Defaults to False. + """ connection = pipeline if pipeline is not None else self.connection job_id = job.id if isinstance(job, self.job_class) else job result = connection.zrem(self.key, job_id) @@ -84,7 +105,7 @@ class BaseRegistry: job_instance.delete() return result - def get_expired_job_ids(self, timestamp=None): + def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None): """Returns job ids whose score are less than current timestamp. Returns ids for jobs with an expiry time earlier than timestamp, @@ -95,7 +116,7 @@ class BaseRegistry: return [as_text(job_id) for job_id in self.connection.zrangebyscore(self.key, 0, score)] - def get_job_ids(self, start=0, end=-1): + def get_job_ids(self, start: int = 0, end: int = -1): """Returns list of all job ids.""" self.cleanup() return [as_text(job_id) for job_id in @@ -105,13 +126,28 @@ class BaseRegistry: """Returns Queue object associated with this registry.""" return Queue(self.name, connection=self.connection, serializer=self.serializer) - def get_expiration_time(self, job): - """Returns job's expiration time.""" + def get_expiration_time(self, job: 'Job') -> datetime: + """Returns job's expiration time. + + Args: + job (Job): The Job to get the expiration + """ score = self.connection.zscore(self.key, job.id) return datetime.utcfromtimestamp(score) - def requeue(self, job_or_id, at_front=False): - """Requeues the job with the given job ID.""" + def requeue(self, job_or_id: t.Union['Job', str], at_front: bool = False) -> 'Job': + """Requeues the job with the given job ID. + + Args: + job_or_id (t.Union['Job', str]): The Job or the Job ID + at_front (bool, optional): If the Job should be put at the front of the queue. Defaults to False. + + Raises: + InvalidJobOperation: If nothing is returned from the `ZREM` operation. + + Returns: + Job: The Requeued Job. + """ if isinstance(job_or_id, self.job_class): job = job_or_id serializer = job.serializer @@ -146,12 +182,15 @@ class StartedJobRegistry(BaseRegistry): """ key_template = 'rq:wip:{0}' - def cleanup(self, timestamp=None): + def cleanup(self, timestamp: t.Optional[datetime] = None): """Remove expired jobs from registry and add them to FailedJobRegistry. Removes jobs with an expiry time earlier than timestamp, specified as seconds since the Unix epoch. timestamp defaults to call time if unspecified. Removed jobs are added to the global failed job queue. + + Args: + timestamp (datetime): The datetime to use as the limit. """ score = timestamp if timestamp is not None else current_timestamp() job_ids = self.get_expired_job_ids(score) @@ -194,7 +233,7 @@ class FinishedJobRegistry(BaseRegistry): """ key_template = 'rq:finished:{0}' - def cleanup(self, timestamp=None): + def cleanup(self, timestamp: t.Optional[datetime] = None): """Remove expired jobs from registry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -211,7 +250,7 @@ class FailedJobRegistry(BaseRegistry): """ key_template = 'rq:failed:{0}' - def cleanup(self, timestamp=None): + def cleanup(self, timestamp: t.Optional[datetime] = None): """Remove expired jobs from registry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -221,7 +260,7 @@ class FailedJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) - def add(self, job, ttl=None, exc_string='', pipeline=None): + def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None): """ Adds a job to a registry with expiry time of now + ttl. `ttl` defaults to DEFAULT_FAILURE_TTL if not specified. @@ -270,7 +309,7 @@ class ScheduledJobRegistry(BaseRegistry): # make sense in this context self.get_jobs_to_enqueue = self.get_expired_job_ids - def schedule(self, job, scheduled_datetime, pipeline=None): + def schedule(self, job: 'Job', scheduled_datetime, pipeline: t.Optional['Pipeline'] = None): """ Adds job to registry, scored by its execution time (in UTC). If datetime has no tzinfo, it will assume localtimezone. @@ -295,19 +334,19 @@ class ScheduledJobRegistry(BaseRegistry): implemented in BaseRegistry.""" pass - def remove_jobs(self, timestamp=None, pipeline=None): + def remove_jobs(self, timestamp: t.Optional[datetime] = None, pipeline: t.Optional['Pipeline'] = None): """Remove jobs whose timestamp is in the past from registry.""" connection = pipeline if pipeline is not None else self.connection score = timestamp if timestamp is not None else current_timestamp() return connection.zremrangebyscore(self.key, 0, score) - def get_jobs_to_schedule(self, timestamp=None, chunk_size=1000): + def get_jobs_to_schedule(self, timestamp: t.Optional[datetime] = None, chunk_size: int = 1000): """Remove jobs whose timestamp is in the past from registry.""" score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)] - def get_scheduled_time(self, job_or_id): + def get_scheduled_time(self, job_or_id: t.Union['Job', str]): """Returns datetime (UTC) at which job is scheduled to be enqueued""" if isinstance(job_or_id, self.job_class): job_id = job_or_id.id @@ -324,7 +363,7 @@ class ScheduledJobRegistry(BaseRegistry): class CanceledJobRegistry(BaseRegistry): key_template = 'rq:canceled:{0}' - def get_expired_job_ids(self, timestamp=None): + def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None): raise NotImplementedError def cleanup(self): diff --git a/rq/scheduler.py b/rq/scheduler.py index 2e470e58..ba818c84 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -122,7 +122,7 @@ class RQScheduler: return successful_locks - def prepare_registries(self, queue_names=None): + def prepare_registries(self, queue_names: str = None): """Prepare scheduled job registries for use""" self._scheduled_job_registries = [] if not queue_names: @@ -133,7 +133,7 @@ class RQScheduler: ) @classmethod - def get_locking_key(cls, name): + def get_locking_key(cls, name: str): """Returns scheduler key for a given queue name""" return SCHEDULER_LOCKING_KEY_TEMPLATE % name diff --git a/rq/serializers.py b/rq/serializers.py index 19cd7968..babab696 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -21,7 +21,7 @@ class JSONSerializer(): return json.loads(s.decode('utf-8'), *args, **kwargs) -def resolve_serializer(serializer): +def resolve_serializer(serializer: str): """This function checks the user defined serializer for ('dumps', 'loads') methods It returns a default pickle serializer if not found else it returns a MySerializer The returned serializer objects implement ('dumps', 'loads') methods diff --git a/rq/suspension.py b/rq/suspension.py index 3e960147..c49dd15d 100644 --- a/rq/suspension.py +++ b/rq/suspension.py @@ -1,7 +1,20 @@ +import typing as t + +if t.TYPE_CHECKING: + from redis import Redis + from rq.worker import Worker + + WORKERS_SUSPENDED = 'rq:suspended' -def is_suspended(connection, worker=None): +def is_suspended(connection: 'Redis', worker: t.Optional['Worker'] = None): + """Checks whether a Worker is suspendeed on a given connection + + Args: + connection (Redis): The Redis Connection + worker (t.Optional[Worker], optional): The Worker. Defaults to None. + """ with connection.pipeline() as pipeline: if worker is not None: worker.heartbeat(pipeline=pipeline) @@ -11,14 +24,25 @@ def is_suspended(connection, worker=None): return pipeline.execute()[-1] -def suspend(connection, ttl=None): - """ttl = time to live in seconds. Default is no expiration - Note: If you pass in 0 it will invalidate right away +def suspend(connection: 'Redis', ttl: int = None): + """ + Suspends. + TTL of 0 will invalidate right away. + + Args: + connection (Redis): The Redis connection to use.. + ttl (int): time to live in seconds. Defaults to `None` """ connection.set(WORKERS_SUSPENDED, 1) if ttl is not None: connection.expire(WORKERS_SUSPENDED, ttl) -def resume(connection): +def resume(connection: 'Redis'): + """ + Resumes. + + Args: + connection (Redis): The Redis connection to use.. + """ return connection.delete(WORKERS_SUSPENDED) diff --git a/rq/utils.py b/rq/utils.py index ee393f24..abfb24bf 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -11,9 +11,13 @@ import importlib import logging import numbers import sys - +import datetime as dt +import typing as t from collections.abc import Iterable +if t.TYPE_CHECKING: + from redis import Redis + from redis.exceptions import ResponseError from .compat import as_text, string_types @@ -73,17 +77,18 @@ class _Colorizer: colorizer = _Colorizer() -def make_colorizer(color): +def make_colorizer(color: str): """Creates a function that colorizes text with the given color. - For example: + For example:: - green = make_colorizer('darkgreen') - red = make_colorizer('red') + ..codeblock::python - Then, you can use: - - print "It's either " + green('OK') + ' or ' + red('Oops') + >>> green = make_colorizer('darkgreen') + >>> red = make_colorizer('red') + >>> + >>> # You can then use: + >>> print("It's either " + green('OK') + ' or ' + red('Oops')) """ def inner(text): return colorizer.colorize(color, text) @@ -121,19 +126,31 @@ class ColorizingStreamHandler(logging.StreamHandler): return message -def import_attribute(name): - """Return an attribute from a dotted path name (e.g. "path.to.func").""" +def import_attribute(name: str): + """Returns an attribute from a dotted path name. Example: `path.to.func`. + + When the attribute we look for is a staticmethod, module name in its + dotted path is not the last-before-end word + + E.g.: package_a.package_b.module_a.ClassA.my_static_method + + Thus we remove the bits from the end of the name until we can import it + Sometimes the failure during importing is due to a genuine coding error in the imported module + In this case, the exception is logged as a warning for ease of debugging. + The above logic will apply anyways regardless of the cause of the import error. + + Args: + name (str): The name (reference) to the path. + + Raises: + ValueError: If no module is found or invalid attribute name. + + Returns: + t.Any: An attribute (normally a Callable) + """ name_bits = name.split('.') module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]] module = None - # When the attribute we look for is a staticmethod, module name in its - # dotted path is not the last-before-end word - # E.g.: package_a.package_b.module_a.ClassA.my_static_method - # Thus we remove the bits from the end of the name until we can import it - # - # Sometimes the failure during importing is due to a genuine coding error in the imported module - # In this case, the exception is logged as a warning for ease of debugging. - # The above logic will apply anyways regardless of the cause of the import error. while len(module_name_bits): try: module_name = '.'.join(module_name_bits) @@ -168,11 +185,11 @@ def utcnow(): _TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' -def utcformat(dt): +def utcformat(dt: dt.datetime): return dt.strftime(as_text(_TIMESTAMP_FORMAT)) -def utcparse(string): +def utcparse(string: str): try: return datetime.datetime.strptime(string, _TIMESTAMP_FORMAT) except ValueError: @@ -180,7 +197,7 @@ def utcparse(string): return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ') -def first(iterable, default=None, key=None): +def first(iterable: t.Iterable, default=None, key=None): """ Return first element of `iterable` that evaluates true, else return None (or an optional default value). @@ -219,12 +236,12 @@ def first(iterable, default=None, key=None): return default -def is_nonstring_iterable(obj): +def is_nonstring_iterable(obj: t.Any) -> bool: """Returns whether the obj is an iterable, but not a string""" return isinstance(obj, Iterable) and not isinstance(obj, string_types) -def ensure_list(obj): +def ensure_list(obj: t.Any) -> t.List: """ When passed an iterable of objects, does nothing, otherwise, it returns a list with just that object in it. @@ -232,7 +249,7 @@ def ensure_list(obj): return obj if is_nonstring_iterable(obj) else [obj] -def current_timestamp(): +def current_timestamp() -> int: """Returns current UTC timestamp""" return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) @@ -247,14 +264,14 @@ def backend_class(holder, default_name, override=None): return override -def str_to_date(date_str): +def str_to_date(date_str: t.Optional[str]) -> t.Union[dt.datetime, t.Any]: if not date_str: return else: return utcparse(date_str.decode()) -def parse_timeout(timeout): +def parse_timeout(timeout: t.Any): """Transfer all kinds of timeout format to an integer representing seconds""" if not isinstance(timeout, numbers.Integral) and timeout is not None: try: @@ -272,10 +289,13 @@ def parse_timeout(timeout): return timeout -def get_version(connection): +def get_version(connection: 'Redis'): """ Returns tuple of Redis server version. This function also correctly handles 4 digit redis server versions. + + Args: + connection (Redis): The Redis connection. """ try: return tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]) @@ -288,33 +308,55 @@ def ceildiv(a, b): return -(-a // b) -def split_list(a_list, segment_size): - """ - Splits a list into multiple smaller lists having size `segment_size` +def split_list(a_list: t.List[t.Any], segment_size: int): + """Splits a list into multiple smaller lists having size `segment_size` + + Args: + a_list (t.List[t.Any]): A list to split + segment_size (int): The segment size to split into + + Yields: + list: The splitted listed """ for i in range(0, len(a_list), segment_size): yield a_list[i:i + segment_size] -def truncate_long_string(data, max_length=None): - """Truncate arguments with representation longer than max_length""" +def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str: + """Truncate arguments with representation longer than max_length + + Args: + data (str): The data to truncate + max_length (t.Optional[int], optional): The max length. Defaults to None. + """ if max_length is None: return data return (data[:max_length] + '...') if len(data) > max_length else data -def get_call_string(func_name, args, kwargs, max_length=None): - """Returns a string representation of the call, formatted as a regular +def get_call_string(func_name: t.Optional[str], args: t.Any, kwargs: t.Dict[t.Any, t.Any], + max_length: t.Optional[int] = None) -> t.Optional[str]: + """ + Returns a string representation of the call, formatted as a regular Python function invocation statement. If max_length is not None, truncate arguments with representation longer than max_length. + + Args: + func_name (str): The funtion name + args (t.Any): The function arguments + kwargs (t.Dict[t.Any, t.Any]): The function kwargs + max_length (int, optional): The max length. Defaults to None. + + Returns: + str: A String representation of the function call. """ if func_name is None: return None arg_list = [as_text(truncate_long_string(repr(arg), max_length)) for arg in args] - kwargs = ['{0}={1}'.format(k, as_text(truncate_long_string(repr(v), max_length))) for k, v in kwargs.items()] - arg_list += sorted(kwargs) + list_kwargs = ['{0}={1}'.format(k, as_text(truncate_long_string(repr(v), max_length))) for k, v in kwargs.items()] + arg_list += sorted(list_kwargs) args = ', '.join(arg_list) return '{0}({1})'.format(func_name, args) diff --git a/rq/worker.py b/rq/worker.py index 122e16e5..8a0c40e3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -8,6 +8,11 @@ import sys import time import traceback import warnings +import typing as t + +if t.TYPE_CHECKING: + from redis import Redis + from redis.client import Pipeline from datetime import timedelta from enum import Enum @@ -60,8 +65,8 @@ class StopRequested(Exception): pass -def compact(l): - return [x for x in l if x is not None] +def compact(a_list): + return [x for x in a_list if x is not None] _signames = dict((getattr(signal, signame), signame) @@ -106,7 +111,14 @@ class Worker: max_connection_wait_time = 60.0 @classmethod - def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None): + def all( + cls, + connection: t.Optional['Redis'] = None, + job_class: t.Type['Job'] = None, + queue_class: t.Optional[t.Type['Queue']] = None, + queue: t.Optional['Queue'] = None, + serializer=None + ) -> t.List['Worker']: """Returns an iterable of all Workers. """ if queue: @@ -123,18 +135,18 @@ class Worker: return compact(workers) @classmethod - def all_keys(cls, connection=None, queue=None): + def all_keys(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None): return [as_text(key) for key in get_keys(queue=queue, connection=connection)] @classmethod - def count(cls, connection=None, queue=None): + def count(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None): """Returns the number of workers by queue or connection""" return len(get_keys(queue=queue, connection=connection)) @classmethod - def find_by_key(cls, worker_key, connection=None, job_class=None, - queue_class=None, serializer=None): + def find_by_key(cls, worker_key: str, connection: t.Optional['Redis'] = None, job_class: t.Type['Job'] = None, + queue_class: t.Type['Queue'] = None, serializer=None): """Returns a Worker instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Workers by their Redis keys. @@ -157,13 +169,13 @@ class Worker: return worker - def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL, - connection=None, exc_handler=None, exception_handlers=None, - default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None, - queue_class=None, log_job_description=True, + def __init__(self, queues, name: t.Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, + connection: t.Optional['Redis'] = None, exc_handler=None, exception_handlers=None, + default_worker_ttl=DEFAULT_WORKER_TTL, job_class: t.Type['Job'] = None, + queue_class=None, log_job_description: bool = True, job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, - disable_default_exception_handler=False, - prepare_for_work=True, serializer=None): # noqa + disable_default_exception_handler: bool = False, + prepare_for_work: bool = True, serializer=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -182,7 +194,7 @@ class Worker: if isinstance(q, string_types) else q for q in ensure_list(queues)] - self.name = name or uuid4().hex + self.name: str = name or uuid4().hex self.queues = queues self.validate_queues() self._ordered_queues = self.queues[:] @@ -192,21 +204,21 @@ class Worker: self.default_worker_ttl = default_worker_ttl self.job_monitoring_interval = job_monitoring_interval - self._state = 'starting' - self._is_horse = False - self._horse_pid = 0 - self._stop_requested = False + self._state: str = 'starting' + self._is_horse: bool = False + self._horse_pid: int = 0 + self._stop_requested: bool = False self._stopped_job_id = None self.log = logger self.log_job_description = log_job_description self.last_cleaned_at = None - self.successful_job_count = 0 - self.failed_job_count = 0 - self.total_working_time = 0 - self.current_job_working_time = 0 + self.successful_job_count: int = 0 + self.failed_job_count: int = 0 + self.total_working_time: int = 0 + self.current_job_working_time: int = 0 self.birth_date = None - self.scheduler = None + self.scheduler: t.Optional[RQScheduler] = None self.pubsub = None self.pubsub_thread = None @@ -368,7 +380,7 @@ class Worker: if death_timestamp is not None: return utcparse(as_text(death_timestamp)) - def set_state(self, state, pipeline=None): + def set_state(self, state, pipeline: t.Optional['Pipeline'] = None): self._state = state connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'state', state) @@ -394,12 +406,12 @@ class Worker: state = property(_get_state, _set_state) - def set_current_job_working_time(self, current_job_working_time, pipeline=None): + def set_current_job_working_time(self, current_job_working_time, pipeline: t.Optional['Pipeline'] = None): self.current_job_working_time = current_job_working_time connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'current_job_working_time', current_job_working_time) - def set_current_job_id(self, job_id, pipeline=None): + def set_current_job_id(self, job_id: t.Optional[str] = None, pipeline: t.Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection if job_id is None: @@ -407,7 +419,7 @@ class Worker: else: connection.hset(self.key, 'current_job', job_id) - def get_current_job_id(self, pipeline=None): + def get_current_job_id(self, pipeline: t.Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection return as_text(connection.hget(self.key, 'current_job')) @@ -553,8 +565,8 @@ class Worker: def reorder_queues(self, reference_queue): pass - def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): + def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -701,7 +713,7 @@ class Worker: self.heartbeat() return result - def heartbeat(self, timeout=None, pipeline=None): + def heartbeat(self, timeout=None, pipeline: t.Optional['Pipeline'] = None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -759,11 +771,11 @@ class Worker: job_class=self.job_class, serializer=self.serializer) for queue in queues.split(',')] - def increment_failed_job_count(self, pipeline=None): + def increment_failed_job_count(self, pipeline: t.Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'failed_job_count', 1) - def increment_successful_job_count(self, pipeline=None): + def increment_successful_job_count(self, pipeline: t.Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'successful_job_count', 1) @@ -771,7 +783,7 @@ class Worker: pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds()) - def fork_work_horse(self, job, queue): + def fork_work_horse(self, job: 'Job', queue: 'Queue'): """Spawns a work horse to perform the actual work and passes it a job. """ child_pid = os.fork() @@ -785,14 +797,14 @@ class Worker: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) - def get_heartbeat_ttl(self, job): + def get_heartbeat_ttl(self, job: 'Job'): if job.timeout and job.timeout > 0: remaining_execution_time = job.timeout - self.current_job_working_time return min(remaining_execution_time, self.job_monitoring_interval) + 60 else: return self.job_monitoring_interval + 60 - def monitor_work_horse(self, job, queue): + def monitor_work_horse(self, job: 'Job', queue: 'Queue'): """The worker will monitor the work horse and make sure that it either executes successfully or the status of the job is set to failed @@ -863,7 +875,7 @@ class Worker: "(waitpid returned %s)" % ret_val ) - def execute_job(self, job, queue): + def execute_job(self, job: 'Job', queue: 'Queue'): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes within the given timeout bounds, or will end the work horse with @@ -874,7 +886,7 @@ class Worker: self.monitor_work_horse(job, queue) self.set_state(WorkerStatus.IDLE) - def maintain_heartbeats(self, job): + def maintain_heartbeats(self, job: 'Job'): """Updates worker and job's last heartbeat field. If job was enqueued with `result_ttl=0`, a race condition could happen where this heartbeat arrives after job has been deleted, leaving a job key that contains only @@ -895,7 +907,7 @@ class Worker: if results[2] == 1: self.connection.delete(job.key) - def main_work_horse(self, job, queue): + def main_work_horse(self, job: 'Job', queue: 'Queue'): """This is the entry point of the newly spawned work horse.""" # After fork()'ing, always assure we are generating random sequences # that are different from the worker. @@ -923,7 +935,7 @@ class Worker: signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) - def prepare_job_execution(self, job): + def prepare_job_execution(self, job: 'Job'): """Performs misc bookkeeping like updating states prior to job execution. """ @@ -942,7 +954,7 @@ class Worker: msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job, queue, started_job_registry=None, + def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): """Handles the failure or an executing job by: 1. Setting the job status to failed @@ -1006,7 +1018,7 @@ class Worker: # even if Redis is down pass - def handle_job_success(self, job, queue, started_job_registry): + def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry): self.log.debug('Handling successful execution of job %s', job.id) with self.connection.pipeline() as pipeline: @@ -1046,7 +1058,7 @@ class Worker: except redis.exceptions.WatchError: continue - def execute_success_callback(self, job, result): + def execute_success_callback(self, job: 'Job', result): """Executes success_callback with timeout""" job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): @@ -1058,7 +1070,7 @@ class Worker: with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.failure_callback(job, self.connection, *sys.exc_info()) - def perform_job(self, job, queue): + def perform_job(self, job: 'Job', queue: 'Queue'): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ @@ -1126,7 +1138,7 @@ class Worker: return True - def handle_exception(self, job, *exc_info): + def handle_exception(self, job: 'Job', *exc_info): """Walks the exception handler stack to delegate exception handling.""" exc_string = ''.join(traceback.format_exception(*exc_info)) @@ -1206,13 +1218,13 @@ class Worker: class SimpleWorker(Worker): - def execute_job(self, job, queue): + def execute_job(self, job: 'Job', queue: 'Queue'): """Execute job in same thread/process, do not fork()""" self.set_state(WorkerStatus.BUSY) self.perform_job(job, queue) self.set_state(WorkerStatus.IDLE) - def get_heartbeat_ttl(self, job): + def get_heartbeat_ttl(self, job: 'Job'): # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. # # We should just stick to DEFAULT_WORKER_TTL. if job.timeout == -1: diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 6f24bccd..787c7a1e 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -1,3 +1,11 @@ +import typing as t + +if t.TYPE_CHECKING: + from redis import Redis + from redis.client import Pipeline + from .worker import Worker + from .queue import Queue + from .compat import as_text from rq.utils import split_list @@ -7,8 +15,14 @@ REDIS_WORKER_KEYS = 'rq:workers' MAX_KEYS = 1000 -def register(worker, pipeline=None): - """Store worker key in Redis so we can easily discover active workers.""" +def register(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None): + """ + Store worker key in Redis so we can easily discover active workers. + + Args: + worker (Worker): The Worker + pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + """ connection = pipeline if pipeline is not None else worker.connection connection.sadd(worker.redis_workers_keys, worker.key) for name in worker.queue_names(): @@ -16,8 +30,13 @@ def register(worker, pipeline=None): connection.sadd(redis_key, worker.key) -def unregister(worker, pipeline=None): - """Remove worker key from Redis.""" +def unregister(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None): + """Remove Worker key from Redis + + Args: + worker (Worker): The Worker + pipeline (t.Optional[Pipeline], optional): Redis Pipeline. Defaults to None. + """ if pipeline is None: connection = worker.connection.pipeline() else: @@ -32,23 +51,38 @@ def unregister(worker, pipeline=None): connection.execute() -def get_keys(queue=None, connection=None): - """Returnes a list of worker keys for a queue""" +def get_keys(queue: t.Optional['Queue'] = None, connection: t.Optional['Redis'] = None) -> t.Set[t.Any]: + """Returns a list of worker keys for a given queue. + + Args: + queue (t.Optional['Queue'], optional): The Queue. Defaults to None. + connection (t.Optional['Redis'], optional): The Redis Connection. Defaults to None. + + Raises: + ValueError: If no Queue or Connection is provided. + + Returns: + set: A Set with keys. + """ if queue is None and connection is None: - raise ValueError('"queue" or "connection" argument is required') + raise ValueError('"Queue" or "connection" argument is required') if queue: redis = queue.connection redis_key = WORKERS_BY_QUEUE_KEY % queue.name else: - redis = connection + redis = connection # type: ignore redis_key = REDIS_WORKER_KEYS return {as_text(key) for key in redis.smembers(redis_key)} -def clean_worker_registry(queue): - """Delete invalid worker keys in registry""" +def clean_worker_registry(queue: 'Queue'): + """Delete invalid worker keys in registry. + + Args: + queue (Queue): The Queue + """ keys = list(get_keys(queue)) with queue.connection.pipeline() as pipeline: diff --git a/run_tests_in_docker.sh b/run_tests_in_docker.sh deleted file mode 100755 index 770957fe..00000000 --- a/run_tests_in_docker.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -docker build -f tests/Dockerfile . -t rqtest && docker run -it --rm rqtest diff --git a/tests/Dockerfile b/tests/Dockerfile index 6cd1f184..d131b7e8 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -1,19 +1,51 @@ -FROM ubuntu:latest +FROM ubuntu:20.04 -RUN apt-get update \ - && apt-get install -y \ +ARG DEBIAN_FRONTEND=noninteractive +ENV LANG C.UTF-8 +ENV LC_ALL C.UTF-8 + +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install -y \ + build-essential \ + zlib1g-dev \ + libncurses5-dev \ + libgdbm-dev \ + libnss3-dev \ + libssl-dev \ + libreadline-dev \ + libffi-dev wget \ + software-properties-common && \ + add-apt-repository ppa:deadsnakes/ppa && \ + apt-get update + +RUN apt-get install -y \ redis-server \ python3-pip \ - stunnel + stunnel \ + python3.6 \ + python3.7 \ + python3.8 \ + python3.9 \ + python3.10 \ + python3.6-distutils \ + python3.7-distutils + +RUN apt-get clean && \ + rm -rf /var/lib/apt/lists/* COPY tests/ssl_config/private.pem tests/ssl_config/stunnel.conf /etc/stunnel/ COPY . /tmp/rq WORKDIR /tmp/rq -RUN pip3 install -r /tmp/rq/requirements.txt -r /tmp/rq/dev-requirements.txt \ - && python3 /tmp/rq/setup.py build \ - && python3 /tmp/rq/setup.py install + +RUN set -e && \ + python3 -m pip install --upgrade pip && \ + python3 -m pip install --no-cache-dir tox && \ + pip3 install -r /tmp/rq/requirements.txt -r /tmp/rq/dev-requirements.txt && \ + python3 /tmp/rq/setup.py build && \ + python3 /tmp/rq/setup.py install CMD stunnel \ & redis-server \ - & RUN_SLOW_TESTS_TOO=1 RUN_SSL_TESTS=1 pytest /tmp/rq/tests/ --durations=5 -v --log-cli-level 10 + & RUN_SLOW_TESTS_TOO=1 RUN_SSL_TESTS=1 tox diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index d379ed9f..26b115d4 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -160,4 +160,4 @@ class TestDependencies(RQTestCase): job_c.dependencies_are_met() w = Worker([queue]) w.work(burst=True) - assert job_c.result \ No newline at end of file + assert job_c.result diff --git a/tests/test_worker.py b/tests/test_worker.py index c3a5e3cf..5aeffd7f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -28,7 +28,7 @@ from tests.fixtures import ( ) from rq import Queue, SimpleWorker, Worker, get_current_connection -from rq.compat import as_text, PY2 +from rq.compat import as_text from rq.job import Job, JobStatus, Retry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry from rq.suspension import resume, suspend @@ -68,22 +68,6 @@ class TestWorker(RQTestCase): self.assertEqual(w.queues[0].name, 'foo') self.assertEqual(w.queues[1].name, 'bar') - # Also accept byte strings in Python 2 - if PY2: - # With single byte string argument - w = Worker(b'foo') - self.assertEqual(w.queues[0].name, 'foo') - - # With list of byte strings - w = Worker([b'foo', b'bar']) - self.assertEqual(w.queues[0].name, 'foo') - self.assertEqual(w.queues[1].name, 'bar') - - # With iterable of byte strings - w = Worker(iter([b'foo', b'bar'])) - self.assertEqual(w.queues[0].name, 'foo') - self.assertEqual(w.queues[1].name, 'bar') - # With single Queue w = Worker(Queue('foo')) self.assertEqual(w.queues[0].name, 'foo') diff --git a/tox.ini b/tox.ini index e6fcbbdc..4552ab24 100644 --- a/tox.ini +++ b/tox.ini @@ -1,11 +1,14 @@ [tox] -envlist=py35,py36,py37,py38,py39,pypy,flake8 +envlist=py36,py37,py38,py39,py310,flake8 [testenv] commands=pytest --cov rq --durations=5 {posargs} deps= pytest pytest-cov + sentry-sdk + codecov + psutil [testenv:flake8] basepython = python3.6 @@ -13,3 +16,29 @@ deps = flake8 commands = flake8 rq tests + + +[testenv:py36] +skipdist = True +basepython = python3.6 +deps = {[testenv]deps} + +[testenv:py37] +skipdist = True +basepython = python3.7 +deps = {[testenv]deps} + +[testenv:py38] +skipdist = True +basepython = python3.8 +deps = {[testenv]deps} + +[testenv:py39] +skipdist = True +basepython = python3.9 +deps = {[testenv]deps} + +[testenv:py310] +skipdist = True +basepython = python3.10 +deps = {[testenv]deps}