From 26a357744347782c0803323344b33052baaaf160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20H=E1=BB=93ng=20Qu=C3=A2n?= Date: Wed, 20 Nov 2024 15:34:27 +0700 Subject: [PATCH] Replace Black with Ruff as formatter tool (#2152) --- .github/workflows/lint.yml | 6 +- Makefile | 1 - README.md | 2 +- pyproject.toml | 13 ++-- rq/__init__.py | 16 ++-- rq/cli/cli.py | 32 ++++---- rq/cli/helpers.py | 10 +-- rq/group.py | 2 +- rq/intermediate_queue.py | 2 - rq/job.py | 26 +++---- rq/local.py | 11 +-- rq/logutils.py | 40 +++++----- rq/maintenance.py | 2 +- rq/queue.py | 82 ++++++++++---------- rq/registry.py | 8 +- rq/results.py | 2 +- rq/timeouts.py | 6 +- rq/utils.py | 8 +- rq/worker.py | 42 +++++------ rq/worker_pool.py | 12 +-- tests/__init__.py | 4 +- tests/config_files/dummy.py | 2 +- tests/config_files/dummy_override.py | 2 +- tests/fixtures.py | 8 +- tests/test_callbacks.py | 38 +++++----- tests/test_cli.py | 36 ++++----- tests/test_commands.py | 4 +- tests/test_connection.py | 3 +- tests/test_dependencies.py | 8 +- tests/test_group.py | 20 ++--- tests/test_intermediate_queue.py | 1 - tests/test_job.py | 108 +++++++++++++-------------- tests/test_queue.py | 6 +- tests/test_registry.py | 2 +- tests/test_scheduler.py | 4 +- tests/test_timeouts.py | 2 +- tests/test_utils.py | 18 ++--- tests/test_worker.py | 41 +++++----- tox.ini | 2 - 39 files changed, 305 insertions(+), 327 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c992d3ac..0a86c151 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,11 +25,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install black ruff - - - name: Lint with black - run: | - black --check --skip-string-normalization --line-length 120 rq tests + pip install ruff - name: Lint with ruff run: | diff --git a/Makefile b/Makefile index e6a43525..19c841ad 100644 --- a/Makefile +++ b/Makefile @@ -21,5 +21,4 @@ force_release: clean twine upload dist/* lint: - @ black --check --skip-string-normalization --line-length 120 rq tests @ ruff check --show-source rq tests diff --git a/README.md b/README.md index 940f757f..ae4242e9 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ RQ requires Redis >= 3.0.0. [![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22) [![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq) [![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq) -[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) +[![Code style: Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) Full documentation can be found [here][d]. diff --git a/pyproject.toml b/pyproject.toml index ed376e96..d5283e3d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,7 +77,6 @@ include = [ [tool.hatch.envs.test] dependencies = [ - "black", "coverage", "mypy", "packaging", @@ -93,11 +92,6 @@ dependencies = [ cov = "pytest --cov=rq --cov-config=.coveragerc --cov-report=xml {args:tests}" typing = "mypy --enable-incomplete-feature=Unpack rq" -[tool.black] -line-length = 120 -target-version = ["py38"] -skip-string-normalization = true - [tool.mypy] allow_redefinition = true pretty = true @@ -120,13 +114,16 @@ lint.select = [ "I", # import sorting "W", # pycodestyle warnings ] -line-length = 120 # To match black. +line-length = 120 target-version = "py38" [tool.ruff.lint.isort] known-first-party = ["rq"] section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"] +[tool.ruff.format] +quote-style = "single" + [tool.pyright] reportOptionalMemberAccess = false -reportOptionalOperand = false \ No newline at end of file +reportOptionalOperand = false diff --git a/rq/__init__.py b/rq/__init__.py index 81095c7f..f0e46df1 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -5,14 +5,14 @@ from .version import VERSION from .worker import SimpleWorker, Worker __all__ = [ - "Callback", - "Retry", - "cancel_job", - "get_current_job", - "requeue_job", - "Queue", - "SimpleWorker", - "Worker", + 'Callback', + 'Retry', + 'cancel_job', + 'get_current_job', + 'requeue_job', + 'Queue', + 'SimpleWorker', + 'Worker', ] __version__ = VERSION diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 0d53ccbc..7e7a1fe9 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -122,7 +122,7 @@ def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): @main.command() -@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)') +@click.option('--interval', '-i', type=float, help="Updates stats every N seconds (default: don't poll)") @click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') @click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') @click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') @@ -164,7 +164,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @main.command() @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') -@click.option('--logging_level', type=str, default="INFO", help='Set logging level') +@click.option('--logging_level', type=str, default='INFO', help='Set logging level') @click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs') @click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs') @click.option('--name', '-n', help='Specify a different name') @@ -187,7 +187,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') -@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') +@click.option('--disable-default-exception-handler', '-d', is_flag=True, help="Disable RQ's default exception handler") @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') @click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @@ -233,19 +233,19 @@ def worker( logging.config.dictConfig(dict_config) if pid: - with open(os.path.expanduser(pid), "w") as fp: + with open(os.path.expanduser(pid), 'w') as fp: fp.write(str(os.getpid())) worker_name = cli_config.worker_class.__qualname__ - if worker_name in ["RoundRobinWorker", "RandomWorker"]: - strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin" - msg = f"WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead." + if worker_name in ['RoundRobinWorker', 'RandomWorker']: + strategy_alternative = 'random' if worker_name == 'RandomWorker' else 'round_robin' + msg = f'WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead.' warnings.warn(msg, DeprecationWarning) click.secho(msg, fg='yellow') - if dequeue_strategy not in ("default", "random", "round_robin"): + if dequeue_strategy not in ('default', 'random', 'round_robin'): click.secho( - "ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red' + 'ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.', err=True, fg='red' ) sys.exit(1) @@ -309,19 +309,17 @@ def suspend(cli_config, duration, **options): """Suspends all workers, to resume run `rq resume`""" if duration is not None and duration < 1: - click.echo("Duration must be an integer greater than 1") + click.echo('Duration must be an integer greater than 1') sys.exit(1) connection_suspend(cli_config.connection, duration) if duration: msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will - automatically resume""".format( - duration - ) + automatically resume""".format(duration) click.echo(msg) else: - click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") + click.echo('Suspending workers. No new jobs will be started. But current jobs will be completed') @main.command() @@ -329,7 +327,7 @@ def suspend(cli_config, duration, **options): def resume(cli_config, **options): """Resumes processing of queues, that were suspended with `rq suspend`""" connection_resume(cli_config.connection) - click.echo("Resuming workers.") + click.echo('Resuming workers.') @main.command() @@ -427,12 +425,12 @@ def enqueue( queue.schedule_job(job, schedule) if not quiet: - click.echo('Enqueued %s with job-id \'%s\'.' % (blue(function_string), job.id)) + click.echo("Enqueued %s with job-id '%s'." % (blue(function_string), job.id)) @main.command() @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') -@click.option('--logging-level', '-l', type=str, default="INFO", help='Set logging level') +@click.option('--logging-level', '-l', type=str, default='INFO', help='Set logging level') @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs') diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index db924520..c4a24d81 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -196,13 +196,13 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class, connection: R queue_dict[queue] = worker_class.all(queue=queue, connection=connection) if queue_dict: - max_length = max(len(q.name) for q, in queue_dict.keys()) + max_length = max(len(q.name) for (q,) in queue_dict.keys()) else: max_length = 0 for queue in queue_dict: if queue_dict[queue]: - queues_str = ", ".join( + queues_str = ', '.join( sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue])) ) else: @@ -238,7 +238,7 @@ def refresh(interval, func, *args): def setup_loghandlers_from_args(verbose, quiet, date_format, log_format): if verbose and quiet: - raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.") + raise RuntimeError('Flags --verbose and --quiet are mutually exclusive.') if verbose: level = 'DEBUG' @@ -312,7 +312,7 @@ def parse_function_args(arguments): keyword, value = parse_function_arg(argument, len(args) + 1) if keyword is not None: if keyword in kwargs: - raise click.BadParameter('You can\'t specify multiple values for the same keyword.') + raise click.BadParameter("You can't specify multiple values for the same keyword.") kwargs[keyword] = value else: args.append(value) @@ -322,7 +322,7 @@ def parse_function_args(arguments): def parse_schedule(schedule_in, schedule_at): if schedule_in is not None: if schedule_at is not None: - raise click.BadArgumentUsage('You can\'t specify both --schedule-in and --schedule-at') + raise click.BadArgumentUsage("You can't specify both --schedule-in and --schedule-at") return datetime.now(timezone.utc) + timedelta(seconds=parse_timeout(schedule_in)) elif schedule_at is not None: return datetime.strptime(schedule_at, '%Y-%m-%dT%H:%M:%S') diff --git a/rq/group.py b/rq/group.py index 15e90256..10405288 100644 --- a/rq/group.py +++ b/rq/group.py @@ -23,7 +23,7 @@ class Group: self.key = '{0}{1}'.format(self.REDIS_GROUP_NAME_PREFIX, self.name) def __repr__(self): - return "Group(id={})".format(self.name) + return 'Group(id={})'.format(self.name) def _add_jobs(self, jobs: List[Job], pipeline: Pipeline): """Add jobs to the group""" diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py index 5bdb057d..c92984c5 100644 --- a/rq/intermediate_queue.py +++ b/rq/intermediate_queue.py @@ -11,7 +11,6 @@ if TYPE_CHECKING: class IntermediateQueue(object): - def __init__(self, queue_key: str, connection: Redis): self.queue_key = queue_key self.key = self.get_intermediate_queue_key(queue_key) @@ -103,7 +102,6 @@ class IntermediateQueue(object): job = queue.fetch_job(job_id) if job_id not in queue.started_job_registry: - if not job: # If the job doesn't exist in the queue, we can safely remove it from the intermediate queue. self.remove(job_id) diff --git a/rq/job.py b/rq/job.py index 7a964aef..fd8d3026 100644 --- a/rq/job.py +++ b/rq/job.py @@ -45,7 +45,7 @@ from .utils import ( utcformat, ) -logger = logging.getLogger("rq.job") +logger = logging.getLogger('rq.job') class JobStatus(str, Enum): @@ -86,9 +86,9 @@ class Dependency: """ dependent_jobs = ensure_list(jobs) if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job): - raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") + raise ValueError('jobs: must contain objects of type Job and/or strings representing Job ids') elif len(dependent_jobs) < 1: - raise ValueError("jobs: cannot be empty.") + raise ValueError('jobs: cannot be empty.') self.dependencies = dependent_jobs self.allow_failure = allow_failure @@ -126,9 +126,9 @@ def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['J job (Optional[Job]): The current Job running """ if connection: - warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning) + warnings.warn('connection argument for get_current_job is deprecated.', DeprecationWarning) if job_class: - warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning) + warnings.warn('job_class argument for get_current_job is deprecated.', DeprecationWarning) return _job_stack.top @@ -397,7 +397,7 @@ class Job: if refresh: status = self.connection.hget(self.key, 'status') if not status: - raise InvalidJobOperation(f"Failed to retrieve status for job: {self.id}") + raise InvalidJobOperation(f'Failed to retrieve status for job: {self.id}') self._status = JobStatus(as_text(status)) return self._status @@ -730,7 +730,7 @@ class Job: if not isinstance(value, str): raise TypeError('id must be a string, not {0}'.format(type(value))) - if ":" in value: + if ':' in value: raise ValueError('id must not contain ":"') self._id = value @@ -822,7 +822,7 @@ class Job: """ Get the latest result and returns `exc_info` only if the latest result is a failure. """ - warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning) + warnings.warn('job.exc_info is deprecated, use job.latest_result() instead.', DeprecationWarning) from .results import Result @@ -886,7 +886,7 @@ class Job: seconds by default). """ - warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning) + warnings.warn('job.result is deprecated, use job.return_value instead.', DeprecationWarning) from .results import Result @@ -1064,7 +1064,7 @@ class Job: try: obj['result'] = self.serializer.dumps(self._result) except: # noqa - obj['result'] = "Unserializable return value" + obj['result'] = 'Unserializable return value' if self._exc_info is not None and include_result: obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8')) if self.timeout is not None: @@ -1091,10 +1091,10 @@ class Job: if self.allow_dependency_failures is not None: # convert boolean to integer to avoid redis.exception.DataError - obj["allow_dependency_failures"] = int(self.allow_dependency_failures) + obj['allow_dependency_failures'] = int(self.allow_dependency_failures) if self.enqueue_at_front is not None: - obj["enqueue_at_front"] = int(self.enqueue_at_front) + obj['enqueue_at_front'] = int(self.enqueue_at_front) return obj @@ -1158,7 +1158,7 @@ class Job: InvalidJobOperation: If the job has already been cancelled. """ if self.is_canceled: - raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) + raise InvalidJobOperation('Cannot cancel already canceled job: {}'.format(self.get_id())) from .queue import Queue from .registry import CanceledJobRegistry diff --git a/rq/local.py b/rq/local.py index ea2ca1be..5757212f 100644 --- a/rq/local.py +++ b/rq/local.py @@ -1,13 +1,14 @@ # ruff: noqa: E731 """ - werkzeug.local - ~~~~~~~~~~~~~~ +werkzeug.local +~~~~~~~~~~~~~~ - This module implements context-local objects. +This module implements context-local objects. - :copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details. - :license: BSD, see LICENSE for more details. +:copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details. +:license: BSD, see LICENSE for more details. """ + # Since each thread has its own greenlet we can just use those as identifiers # for the context. If greenlets are not available we fall back to the # current thread ident. diff --git a/rq/logutils.py b/rq/logutils.py index 9a1c6c5f..95bffde4 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -7,36 +7,36 @@ from rq.defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT class _Colorizer: def __init__(self): - esc = "\x1b[" + esc = '\x1b[' self.codes = {} - self.codes[""] = "" - self.codes["reset"] = esc + "39;49;00m" + self.codes[''] = '' + self.codes['reset'] = esc + '39;49;00m' - self.codes["bold"] = esc + "01m" - self.codes["faint"] = esc + "02m" - self.codes["standout"] = esc + "03m" - self.codes["underline"] = esc + "04m" - self.codes["blink"] = esc + "05m" - self.codes["overline"] = esc + "06m" + self.codes['bold'] = esc + '01m' + self.codes['faint'] = esc + '02m' + self.codes['standout'] = esc + '03m' + self.codes['underline'] = esc + '04m' + self.codes['blink'] = esc + '05m' + self.codes['overline'] = esc + '06m' - dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"] - light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] + dark_colors = ['black', 'darkred', 'darkgreen', 'brown', 'darkblue', 'purple', 'teal', 'lightgray'] + light_colors = ['darkgray', 'red', 'green', 'yellow', 'blue', 'fuchsia', 'turquoise', 'white'] x = 30 for dark, light in zip(dark_colors, light_colors): - self.codes[dark] = esc + "%im" % x - self.codes[light] = esc + "%i;01m" % x + self.codes[dark] = esc + '%im' % x + self.codes[light] = esc + '%i;01m' % x x += 1 del dark, light, x - self.codes["darkteal"] = self.codes["turquoise"] - self.codes["darkyellow"] = self.codes["brown"] - self.codes["fuscia"] = self.codes["fuchsia"] - self.codes["white"] = self.codes["bold"] + self.codes['darkteal'] = self.codes['turquoise'] + self.codes['darkyellow'] = self.codes['brown'] + self.codes['fuscia'] = self.codes['fuchsia'] + self.codes['white'] = self.codes['bold'] - if hasattr(sys.stdout, "isatty"): + if hasattr(sys.stdout, 'isatty'): self.notty = not sys.stdout.isatty() else: self.notty = True @@ -45,7 +45,7 @@ class _Colorizer: if self.notty: return text else: - return self.codes[color_key] + text + self.codes["reset"] + return self.codes[color_key] + text + self.codes['reset'] colorizer = _Colorizer() @@ -98,7 +98,7 @@ class ColorizingStreamHandler(logging.StreamHandler): if self.is_tty: # Don't colorize any traceback parts = message.split('\n', 1) - parts[0] = " ".join([parts[0].split(" ", 1)[0], parts[0].split(" ", 1)[1]]) + parts[0] = ' '.join([parts[0].split(' ', 1)[0], parts[0].split(' ', 1)[1]]) message = '\n'.join(parts) diff --git a/rq/maintenance.py b/rq/maintenance.py index a77aac3a..3cd453d3 100644 --- a/rq/maintenance.py +++ b/rq/maintenance.py @@ -19,7 +19,7 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None: We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry. """ warnings.warn( - "clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.", + 'clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.', DeprecationWarning, ) intermediate_queue = IntermediateQueue(queue.key, connection=queue.connection) diff --git a/rq/queue.py b/rq/queue.py index 50febe2c..b17d98d1 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -28,29 +28,29 @@ from .serializers import resolve_serializer from .types import FunctionReferenceType, JobDependencyType from .utils import as_text, backend_class, compact, get_version, import_attribute, now, parse_timeout -logger = logging.getLogger("rq.queue") +logger = logging.getLogger('rq.queue') class EnqueueData( namedtuple( 'EnqueueData', [ - "func", - "args", - "kwargs", - "timeout", - "result_ttl", - "ttl", - "failure_ttl", - "description", - "depends_on", - "job_id", - "at_front", - "meta", - "retry", - "on_success", - "on_failure", - "on_stopped", + 'func', + 'args', + 'kwargs', + 'timeout', + 'result_ttl', + 'ttl', + 'failure_ttl', + 'description', + 'depends_on', + 'job_id', + 'at_front', + 'meta', + 'retry', + 'on_success', + 'on_failure', + 'on_stopped', ], ) ): @@ -278,11 +278,7 @@ class Queue: count = count + 1 end return count - """.format( - self.job_class.redis_job_namespace_prefix - ).encode( - "utf-8" - ) + """.format(self.job_class.redis_job_namespace_prefix).encode('utf-8') script = self.connection.register_script(script) return script(keys=[self.key]) @@ -809,23 +805,23 @@ class Queue: def get_job_kwargs(job_data, initial_status): return { - "func": job_data.func, - "args": job_data.args, - "kwargs": job_data.kwargs, - "result_ttl": job_data.result_ttl, - "ttl": job_data.ttl, - "failure_ttl": job_data.failure_ttl, - "description": job_data.description, - "depends_on": job_data.depends_on, - "job_id": job_data.job_id, - "meta": job_data.meta, - "status": initial_status, - "timeout": job_data.timeout, - "retry": job_data.retry, - "on_success": job_data.on_success, - "on_failure": job_data.on_failure, - "on_stopped": job_data.on_stopped, - "group_id": group_id, + 'func': job_data.func, + 'args': job_data.args, + 'kwargs': job_data.kwargs, + 'result_ttl': job_data.result_ttl, + 'ttl': job_data.ttl, + 'failure_ttl': job_data.failure_ttl, + 'description': job_data.description, + 'depends_on': job_data.depends_on, + 'job_id': job_data.job_id, + 'meta': job_data.meta, + 'status': initial_status, + 'timeout': job_data.timeout, + 'retry': job_data.retry, + 'on_success': job_data.on_success, + 'on_failure': job_data.on_failure, + 'on_stopped': job_data.on_stopped, + 'group_id': group_id, } # Enqueue jobs without dependencies @@ -1298,11 +1294,11 @@ class Queue: if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') colored_queues = ', '.join(map(str, [green(str(queue)) for queue in queue_keys])) - logger.debug(f"Starting BLPOP operation for queues {colored_queues} with timeout of {timeout}") + logger.debug(f'Starting BLPOP operation for queues {colored_queues} with timeout of {timeout}') assert connection result = connection.blpop(queue_keys, timeout) if result is None: - logger.debug(f"BLPOP timeout, no jobs found on queues {colored_queues}") + logger.debug(f'BLPOP timeout, no jobs found on queues {colored_queues}') raise DequeueTimeout(timeout, queue_keys) queue_key, job_id = result return queue_key, job_id @@ -1324,10 +1320,10 @@ class Queue: if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') colored_queue = green(queue_key) - logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}") + logger.debug(f'Starting BLMOVE operation for {colored_queue} with timeout of {timeout}') result: Optional[Any] = connection.blmove(queue_key, intermediate_queue.key, timeout) if result is None: - logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}") + logger.debug(f'BLMOVE timeout, no jobs found on {colored_queue}') raise DequeueTimeout(timeout, queue_key) return queue_key, result else: # non-blocking variant diff --git a/rq/registry.py b/rq/registry.py index 921868b6..620850b7 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from rq.executions import Execution -logger = logging.getLogger("rq.registry") +logger = logging.getLogger('rq.registry') class BaseRegistry: @@ -288,13 +288,13 @@ class StartedJobRegistry(BaseRegistry): job.retry(queue, pipeline) else: - exc_string = f"due to {AbandonedJobError.__name__}" + exc_string = f'due to {AbandonedJobError.__name__}' logger.warning( f'{self.__class__.__name__} cleanup: Moving job {job.id} to {FailedJobRegistry.__name__} ' f'({exc_string})' ) job.set_status(JobStatus.FAILED) - job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}" + job._exc_info = f'Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}' job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl) @@ -434,7 +434,7 @@ class DeferredJobRegistry(BaseRegistry): continue job.set_status(JobStatus.FAILED, pipeline=pipeline) - exc_info = "Expired in DeferredJobRegistry, moved to FailedJobRegistry at %s" % datetime.now() + exc_info = 'Expired in DeferredJobRegistry, moved to FailedJobRegistry at %s' % datetime.now() failed_job_registry.add(job, job.failure_ttl, exc_info, pipeline, True) pipeline.zremrangebyscore(self.key, 0, score) diff --git a/rq/results.py b/rq/results.py index a5923880..d719a8bd 100644 --- a/rq/results.py +++ b/rq/results.py @@ -152,7 +152,7 @@ class Result: # Unlike blpop, xread timeout is in miliseconds. "0-0" is the special value for the # first item in the stream, like '-' for xrevrange. timeout_ms = timeout * 1000 - response = job.connection.xread({cls.get_key(job.id): "0-0"}, block=timeout_ms) + response = job.connection.xread({cls.get_key(job.id): '0-0'}, block=timeout_ms) if not response: return None response = response[0] # Querying single stream only. diff --git a/rq/timeouts.py b/rq/timeouts.py index 12eef56d..7831736b 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -86,7 +86,7 @@ class TimerDeathPenalty(BaseDeathPenalty): # Monkey-patch exception with the message ahead of time # since PyThreadState_SetAsyncExc can only take a class def init_with_message(self, *args, **kwargs): # noqa - super(exception, self).__init__("Task exceeded maximum timeout value ({0} seconds)".format(timeout)) + super(exception, self).__init__('Task exceeded maximum timeout value ({0} seconds)'.format(timeout)) self._exception.__init__ = init_with_message @@ -103,10 +103,10 @@ class TimerDeathPenalty(BaseDeathPenalty): ctypes.c_long(self._target_thread_id), ctypes.py_object(self._exception) ) if ret == 0: - raise ValueError("Invalid thread ID {}".format(self._target_thread_id)) + raise ValueError('Invalid thread ID {}'.format(self._target_thread_id)) elif ret > 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._target_thread_id), 0) - raise SystemError("PyThreadState_SetAsyncExc failed") + raise SystemError('PyThreadState_SetAsyncExc failed') def setup_death_penalty(self): """Starts the timer.""" diff --git a/rq/utils.py b/rq/utils.py index a1d382cf..5e7b9f06 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -286,19 +286,19 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]: """ try: # Getting the connection info for each job tanks performance, we can cache it on the connection object - if not getattr(connection, "__rq_redis_server_version", None): + if not getattr(connection, '__rq_redis_server_version', None): # Cast the version string to a tuple of integers. Some Redis implementations may return a float. - version_str = str(connection.info("server")["redis_version"]) + version_str = str(connection.info('server')['redis_version']) version_parts = [int(i) for i in version_str.split('.')[:3]] # Ensure the version tuple has exactly three elements while len(version_parts) < 3: version_parts.append(0) setattr( connection, - "__rq_redis_server_version", + '__rq_redis_server_version', tuple(version_parts), ) - return getattr(connection, "__rq_redis_server_version") + return getattr(connection, '__rq_redis_server_version') except ResponseError: # fakeredis doesn't implement Redis' INFO command return (5, 0, 9) diff --git a/rq/worker.py b/rq/worker.py index 7835a266..6b760268 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -76,7 +76,7 @@ except ImportError: pass -logger = logging.getLogger("rq.worker") +logger = logging.getLogger('rq.worker') class StopRequested(Exception): @@ -102,9 +102,9 @@ def signal_name(signum): class DequeueStrategy(str, Enum): - DEFAULT = "default" - ROUND_ROBIN = "round_robin" - RANDOM = "random" + DEFAULT = 'default' + ROUND_ROBIN = 'round_robin' + RANDOM = 'random' class WorkerStatus(str, Enum): @@ -151,13 +151,12 @@ class BaseWorker: serializer=None, work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None, ): # noqa - self.default_result_ttl = default_result_ttl if worker_ttl: self.worker_ttl = worker_ttl elif default_worker_ttl: - warnings.warn("default_worker_ttl is deprecated, use worker_ttl.", DeprecationWarning, stacklevel=2) + warnings.warn('default_worker_ttl is deprecated, use worker_ttl.', DeprecationWarning, stacklevel=2) self.worker_ttl = default_worker_ttl else: self.worker_ttl = DEFAULT_WORKER_TTL @@ -441,9 +440,9 @@ class BaseWorker: Args: connection (Optional[Redis]): The Redis Connection. """ - current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") + current_socket_timeout = connection.connection_pool.connection_kwargs.get('socket_timeout') if current_socket_timeout is None or current_socket_timeout < self.connection_timeout: - timeout_config = {"socket_timeout": self.connection_timeout} + timeout_config = {'socket_timeout': self.connection_timeout} connection.connection_pool.connection_kwargs.update(timeout_config) return connection @@ -559,7 +558,7 @@ class BaseWorker: def work( self, burst: bool = False, - logging_level: str = "INFO", + logging_level: str = 'INFO', date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, max_jobs: Optional[int] = None, @@ -674,9 +673,9 @@ class BaseWorker: if self._dequeue_strategy is None: self._dequeue_strategy = DequeueStrategy.DEFAULT - if self._dequeue_strategy not in ("default", "random", "round_robin"): + if self._dequeue_strategy not in ('default', 'random', 'round_robin'): raise ValueError( - f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`." + f'Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`.' ) if self._dequeue_strategy == DequeueStrategy.DEFAULT: return @@ -810,7 +809,7 @@ class BaseWorker: def _set_state(self, state): """Raise a DeprecationWarning if ``worker.state = X`` is used""" - warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning) + warnings.warn('worker.state is deprecated, use worker.set_state() instead.', DeprecationWarning) self.set_state(state) def get_state(self) -> str: @@ -818,7 +817,7 @@ class BaseWorker: def _get_state(self): """Raise a DeprecationWarning if ``worker.state == X`` is used""" - warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning) + warnings.warn('worker.state is deprecated, use worker.get_state() instead.', DeprecationWarning) return self.get_state() state = property(_get_state, _set_state) @@ -826,7 +825,7 @@ class BaseWorker: def _start_scheduler( self, burst: bool = False, - logging_level: str = "INFO", + logging_level: str = 'INFO', date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, ): @@ -909,7 +908,7 @@ class BaseWorker: def bootstrap( self, - logging_level: str = "INFO", + logging_level: str = 'INFO', date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, ): @@ -993,7 +992,7 @@ class BaseWorker: self.clean_registries() Group.clean_registries(connection=self.connection) - def _pubsub_exception_handler(self, exc: Exception, pubsub: "PubSub", pubsub_thread: "PubSubWorkerThread") -> None: + def _pubsub_exception_handler(self, exc: Exception, pubsub: 'PubSub', pubsub_thread: 'PubSubWorkerThread') -> None: """ This exception handler allows the pubsub_thread to continue & retry to connect after a connection problem the same way the main worker loop @@ -1003,13 +1002,13 @@ class BaseWorker: """ if isinstance(exc, (redis.exceptions.ConnectionError)): self.log.error( - "Could not connect to Redis instance: %s Retrying in %d seconds...", + 'Could not connect to Redis instance: %s Retrying in %d seconds...', exc, 2, ) time.sleep(2.0) else: - self.log.warning("Pubsub thread exitin on %s" % exc) + self.log.warning('Pubsub thread exitin on %s' % exc) raise def handle_payload(self, message): @@ -1241,7 +1240,6 @@ class BaseWorker: class Worker(BaseWorker): - @property def is_horse(self): """Returns whether or not this is the worker or the work horse.""" @@ -1395,8 +1393,8 @@ class Worker(BaseWorker): job.ended_at = now() # Unhandled failure: move the job to the failed queue - signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else '' - exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; " + signal_msg = f' (signal {os.WTERMSIG(ret_val)})' if ret_val and os.WIFSIGNALED(ret_val) else '' + exc_string = f'Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; ' self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string) self.handle_work_horse_killed(job, retpid, ret_val, rusage) @@ -1532,7 +1530,7 @@ class Worker(BaseWorker): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - self.log.debug('Saving job %s\'s successful execution result', job.id) + self.log.debug("Saving job %s's successful execution result", job.id) job._handle_success(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) diff --git a/rq/worker_pool.py b/rq/worker_pool.py index 900b7b9b..36616a2d 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -149,7 +149,7 @@ class WorkerPool: name: str, burst: bool, _sleep: float = 0, - logging_level: str = "INFO", + logging_level: str = 'INFO', ) -> Process: """Returns the worker process""" return Process( @@ -171,7 +171,7 @@ class WorkerPool: count: Optional[int] = None, burst: bool = True, _sleep: float = 0, - logging_level: str = "INFO", + logging_level: str = 'INFO', ): """ Starts a worker and adds the data to worker_datas. @@ -184,7 +184,7 @@ class WorkerPool: self.worker_dict[name] = worker_data self.log.debug('Spawned worker: %s with PID %d', name, process.pid) - def start_workers(self, burst: bool = True, _sleep: float = 0, logging_level: str = "INFO"): + def start_workers(self, burst: bool = True, _sleep: float = 0, logging_level: str = 'INFO'): """ Run the workers * sleep: waits for X seconds before creating worker, only for testing purposes @@ -214,7 +214,7 @@ class WorkerPool: for worker_data in worker_datas: self.stop_worker(worker_data) - def start(self, burst: bool = False, logging_level: str = "INFO"): + def start(self, burst: bool = False, logging_level: str = 'INFO'): self._burst = burst respawn = not burst # Don't respawn workers if burst mode is on setup_loghandlers(logging_level, DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, name=__name__) @@ -250,7 +250,7 @@ def run_worker( serializer: Type['Serializer'] = DefaultSerializer, job_class: Type[Job] = Job, burst: bool = True, - logging_level: str = "INFO", + logging_level: str = 'INFO', _sleep: int = 0, ): connection = connection_class( @@ -258,6 +258,6 @@ def run_worker( ) queues = [Queue(name, connection=connection) for name in queue_names] worker = worker_class(queues, name=worker_name, connection=connection, serializer=serializer, job_class=job_class) - worker.log.info("Starting worker started with PID %s", os.getpid()) + worker.log.info('Starting worker started with PID %s', os.getpid()) time.sleep(_sleep) worker.work(burst=burst, with_scheduler=True, logging_level=logging_level) diff --git a/tests/__init__.py b/tests/__init__.py index 80963d8e..d01ea38c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -25,12 +25,12 @@ def find_empty_redis_database(ssl=False): def slow(f): f = pytest.mark.slow(f) - return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), "Slow tests disabled")(f) + return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), 'Slow tests disabled')(f) def ssl_test(f): f = pytest.mark.ssl_test(f) - return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), "SSL tests disabled")(f) + return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), 'SSL tests disabled')(f) class TestCase(unittest.TestCase): diff --git a/tests/config_files/dummy.py b/tests/config_files/dummy.py index 14042500..cfe00ef5 100644 --- a/tests/config_files/dummy.py +++ b/tests/config_files/dummy.py @@ -1 +1 @@ -REDIS_HOST = "testhost.example.com" +REDIS_HOST = 'testhost.example.com' diff --git a/tests/config_files/dummy_override.py b/tests/config_files/dummy_override.py index 2b87a0cf..347c5052 100644 --- a/tests/config_files/dummy_override.py +++ b/tests/config_files/dummy_override.py @@ -1,4 +1,4 @@ -REDIS_HOST = "testhost.example.com" +REDIS_HOST = 'testhost.example.com' REDIS_PORT = 6378 REDIS_DB = 2 REDIS_PASSWORD = '123' diff --git a/tests/fixtures.py b/tests/fixtures.py index c3d07584..413b4dc5 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -146,18 +146,18 @@ class Number: class CallableObject: def __call__(self): - return u"I'm callable" + return "I'm callable" class UnicodeStringObject: def __repr__(self): - return u'é' + return 'é' class ClassWithAStaticMethod: @staticmethod def static_method(): - return u"I'm a static method" + return "I'm a static method" def black_hole(job, *exc_info): @@ -298,7 +298,7 @@ def save_exception(job, connection, type, value, traceback): connection.set('failure_callback:%s' % job.id, str(value), ex=60) -def save_result_if_not_stopped(job, connection, result=""): +def save_result_if_not_stopped(job, connection, result=''): connection.set('stopped_callback:%s' % job.id, result, ex=60) diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 8d081bac..2633a92f 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -36,12 +36,12 @@ class QueueCallbackTestCase(RQTestCase): self.assertEqual(job.success_callback, print) # test string callbacks - job = queue.enqueue(say_hello, on_success=Callback("print")) + job = queue.enqueue(say_hello, on_success=Callback('print')) job = Job.fetch(id=job.id, connection=self.connection) self.assertEqual(job.success_callback, print) - job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=Callback("print")) + job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=Callback('print')) job = Job.fetch(id=job.id, connection=self.connection) self.assertEqual(job.success_callback, print) @@ -65,12 +65,12 @@ class QueueCallbackTestCase(RQTestCase): self.assertEqual(job.failure_callback, print) # test string callbacks - job = queue.enqueue(say_hello, on_failure=Callback("print")) + job = queue.enqueue(say_hello, on_failure=Callback('print')) job = Job.fetch(id=job.id, connection=self.connection) self.assertEqual(job.failure_callback, print) - job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=Callback("print")) + job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=Callback('print')) job = Job.fetch(id=job.id, connection=self.connection) self.assertEqual(job.failure_callback, print) @@ -94,12 +94,12 @@ class QueueCallbackTestCase(RQTestCase): self.assertEqual(job.stopped_callback, print) # test string callbacks - job = queue.enqueue(long_process, on_stopped=Callback("print")) + job = queue.enqueue(long_process, on_stopped=Callback('print')) job = Job.fetch(id=job.id, connection=self.connection) self.assertEqual(job.stopped_callback, print) - job = queue.enqueue_in(timedelta(seconds=10), long_process, on_stopped=Callback("print")) + job = queue.enqueue_in(timedelta(seconds=10), long_process, on_stopped=Callback('print')) job = Job.fetch(id=job.id, connection=self.connection) self.assertEqual(job.stopped_callback, print) @@ -131,11 +131,11 @@ class SyncJobCallback(RQTestCase): self.assertFalse(self.connection.exists('success_callback:%s' % job.id)) # test string callbacks - job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result")) + job = queue.enqueue(say_hello, on_success=Callback('tests.fixtures.save_result')) self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(self.connection.get('success_callback:%s' % job.id).decode(), job.result) - job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result')) self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.connection.exists('success_callback:%s' % job.id)) @@ -152,11 +152,11 @@ class SyncJobCallback(RQTestCase): self.assertFalse(self.connection.exists('failure_callback:%s' % job.id)) # test string callbacks - job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception")) + job = queue.enqueue(div_by_zero, on_failure=Callback('tests.fixtures.save_exception')) self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertIn('div_by_zero', self.connection.get('failure_callback:%s' % job.id).decode()) - job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result')) self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.connection.exists('failure_callback:%s' % job.id)) @@ -173,7 +173,7 @@ class SyncJobCallback(RQTestCase): self.assertTrue(self.connection.exists('stopped_callback:%s' % job.id)) # test string callbacks - job = queue.enqueue(long_process, on_stopped=Callback("tests.fixtures.save_result_if_not_stopped")) + job = queue.enqueue(long_process, on_stopped=Callback('tests.fixtures.save_result_if_not_stopped')) job.execute_stopped_callback( worker.death_penalty_class ) # Calling execute_stopped_callback directly for coverage @@ -198,12 +198,12 @@ class WorkerCallbackTestCase(RQTestCase): self.assertFalse(self.connection.exists('success_callback:%s' % job.id)) # test string callbacks - job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result")) + job = queue.enqueue(say_hello, on_success=Callback('tests.fixtures.save_result')) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(self.connection.get('success_callback:%s' % job.id).decode(), job.return_value()) - job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result')) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.connection.exists('success_callback:%s' % job.id)) @@ -219,7 +219,7 @@ class WorkerCallbackTestCase(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) # test string callbacks - job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.erroneous_callback")) + job = queue.enqueue(say_hello, on_success=Callback('tests.fixtures.erroneous_callback')) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) @@ -242,14 +242,14 @@ class WorkerCallbackTestCase(RQTestCase): self.assertFalse(self.connection.exists('failure_callback:%s' % job.id)) # test string callbacks - job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception")) + job = queue.enqueue(div_by_zero, on_failure=Callback('tests.fixtures.save_exception')) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) job.refresh() print(job.exc_info) self.assertIn('div_by_zero', self.connection.get('failure_callback:%s' % job.id).decode()) - job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result')) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.connection.exists('failure_callback:%s' % job.id)) @@ -278,7 +278,7 @@ class JobCallbackTestCase(RQTestCase): self.assertEqual(job.success_callback, print) # test string callbacks - job = Job.create(say_hello, on_success=Callback("print"), connection=self.connection) + job = Job.create(say_hello, on_success=Callback('print'), connection=self.connection) self.assertIsNotNone(job._success_callback_name) self.assertEqual(job.success_callback, print) job.save() @@ -306,7 +306,7 @@ class JobCallbackTestCase(RQTestCase): self.assertEqual(job.failure_callback, print) # test string callbacks - job = Job.create(say_hello, on_failure=Callback("print"), connection=self.connection) + job = Job.create(say_hello, on_failure=Callback('print'), connection=self.connection) self.assertIsNotNone(job._failure_callback_name) self.assertEqual(job.failure_callback, print) job.save() @@ -334,7 +334,7 @@ class JobCallbackTestCase(RQTestCase): self.assertEqual(job.stopped_callback, print) # test string callbacks - job = Job.create(say_hello, on_stopped=Callback("print"), connection=self.connection) + job = Job.create(say_hello, on_stopped=Callback('print'), connection=self.connection) self.assertIsNotNone(job._stopped_callback_name) self.assertEqual(job.stopped_callback, print) job.save() diff --git a/tests/test_cli.py b/tests/test_cli.py index fbdb4c46..c48e824b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -32,10 +32,10 @@ class CLITestCase(RQTestCase): if result.exit_code == 0: return True else: - print("Non normal execution") - print("Exit Code: {}".format(result.exit_code)) - print("Output: {}".format(result.output)) - print("Exception: {}".format(result.exception)) + print('Non normal execution') + print('Exit Code: {}'.format(result.exit_code)) + print('Output: {}'.format(result.output)) + print('Exception: {}'.format(result.exception)) self.assertEqual(result.exit_code, 0) @@ -48,10 +48,10 @@ class TestRQCli(CLITestCase): if result.exit_code == 0: return True else: - print("Non normal execution") - print("Exit Code: {}".format(result.exit_code)) - print("Output: {}".format(result.output)) - print("Exception: {}".format(result.exception)) + print('Non normal execution') + print('Exit Code: {}'.format(result.exit_code)) + print('Output: {}'.format(result.output)) + print('Exception: {}'.format(result.exception)) self.assertEqual(result.exit_code, 0) """Test rq_cli script""" @@ -108,7 +108,7 @@ class TestRQCli(CLITestCase): self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], '123') def test_config_env_vars(self): - os.environ['REDIS_HOST'] = "testhost.example.com" + os.environ['REDIS_HOST'] = 'testhost.example.com' cli_config = CliConfig() @@ -414,7 +414,7 @@ class TestRQCli(CLITestCase): result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0]) self.assertEqual(result.exit_code, 1) - self.assertIn("Duration must be an integer greater than 1", result.output) + self.assertIn('Duration must be an integer greater than 1', result.output) def test_serializer(self): """rq worker -u --serializer """ @@ -434,8 +434,8 @@ class TestRQCli(CLITestCase): result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello']) self.assert_normal_execution(result) - prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' - suffix = '\'.\n' + prefix = "Enqueued tests.fixtures.say_hello() with job-id '" + suffix = "'.\n" self.assertTrue(result.output.startswith(prefix)) self.assertTrue(result.output.endswith(suffix)) @@ -460,8 +460,8 @@ class TestRQCli(CLITestCase): ) self.assert_normal_execution(result) - prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' - suffix = '\'.\n' + prefix = "Enqueued tests.fixtures.say_hello() with job-id '" + suffix = "'.\n" self.assertTrue(result.output.startswith(prefix)) self.assertTrue(result.output.endswith(suffix)) @@ -508,7 +508,7 @@ class TestRQCli(CLITestCase): args, kwargs = Job(job_id, connection=self.connection).result - self.assertEqual(args, ('hello', [1, {'key': 'value'}], {"test": True}, (1, 2))) + self.assertEqual(args, ('hello', [1, {'key': 'value'}], {'test': True}, (1, 2))) self.assertEqual(kwargs, {'json': [3.0, True], 'nojson': 'abc', 'file': '{"test": true}\n'}) def test_cli_enqueue_schedule_in(self): @@ -653,7 +653,7 @@ class TestRQCli(CLITestCase): result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'key=value', 'key=value']) self.assertNotEqual(result.exit_code, 0) - self.assertIn('You can\'t specify multiple values for the same keyword.', result.output) + self.assertIn("You can't specify multiple values for the same keyword.", result.output) result = runner.invoke( main, @@ -669,7 +669,7 @@ class TestRQCli(CLITestCase): ], ) self.assertNotEqual(result.exit_code, 0) - self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output) + self.assertIn("You can't specify both --schedule-in and --schedule-at", result.output) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '@not_existing_file']) self.assertNotEqual(result.exit_code, 0) @@ -763,7 +763,7 @@ class TestRQCli(CLITestCase): ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) - self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}})) + self.assertEqual((job.args, job.kwargs), ([], {'key': {'foo': True}})) id = str(uuid4()) result = runner.invoke( diff --git a/tests/test_commands.py b/tests/test_commands.py index b99f3aeb..32cd0ece 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -45,7 +45,7 @@ class TestCommands(RQTestCase): assert worker.pubsub_thread.is_alive() for client in connection.client_list(): - connection.client_kill(client["addr"]) + connection.client_kill(client['addr']) time.sleep(0.0) # Allow other threads to run assert worker.pubsub_thread.is_alive() @@ -55,7 +55,7 @@ class TestCommands(RQTestCase): connection = self.connection worker = Worker('foo', connection=connection) - with mock.patch("redis.client.PubSub.get_message", new_callable=raise_exc_mock): + with mock.patch('redis.client.PubSub.get_message', new_callable=raise_exc_mock): worker.subscribe() worker.pubsub_thread.join() diff --git a/tests/test_connection.py b/tests/test_connection.py index d59be881..bd35a3b7 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -5,7 +5,6 @@ from tests import RQTestCase class TestConnectionInheritance(RQTestCase): - def test_parse_connection(self): """Test parsing the connection""" conn_class, pool_class, pool_kwargs = parse_connection(Redis(ssl=True)) @@ -17,4 +16,4 @@ class TestConnectionInheritance(RQTestCase): conn_class, pool_class, pool_kwargs = parse_connection(Redis(connection_pool=pool)) self.assertEqual(conn_class, Redis) self.assertEqual(pool_class, UnixDomainSocketConnection) - self.assertEqual(pool_kwargs, {"path": path}) + self.assertEqual(pool_kwargs, {'path': path}) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index aabf227f..aa97f3fb 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -104,7 +104,7 @@ class TestDependencies(RQTestCase): q.enqueue(say_hello, job_id='fake_job_id_2', depends_on=Dependency(jobs=[parent_job], enqueue_at_front=True)) w.work(burst=True, max_jobs=1) - self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) + self.assertEqual(q.job_ids, ['fake_job_id_2', 'fake_job_id_1']) def test_multiple_jobs_with_dependencies(self): """Enqueue dependent jobs only when appropriate""" @@ -205,9 +205,9 @@ class TestDependencies(RQTestCase): def test_dependencies_are_met_at_execution_time(self): queue = Queue(connection=self.connection) queue.empty() - queue.enqueue(say_hello, job_id="A") - queue.enqueue(say_hello, job_id="B") - job_c = queue.enqueue(check_dependencies_are_met, job_id="C", depends_on=["A", "B"]) + queue.enqueue(say_hello, job_id='A') + queue.enqueue(say_hello, job_id='B') + job_c = queue.enqueue(check_dependencies_are_met, job_id='C', depends_on=['A', 'B']) job_c.dependencies_are_met() w = Worker([queue], connection=self.connection) diff --git a/tests/test_group.py b/tests/test_group.py index 6383498c..521add5a 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -24,8 +24,8 @@ class TestGroup(RQTestCase): q.empty() def test_group_repr(self): - group = Group.create(name="foo", connection=self.connection) - assert group.__repr__() == "Group(id=foo)" + group = Group.create(name='foo', connection=self.connection) + assert group.__repr__() == 'Group(id=foo)' def test_group_jobs(self): q = Queue(connection=self.connection) @@ -84,7 +84,7 @@ class TestGroup(RQTestCase): q = Queue(connection=self.connection) group = Group.create(connection=self.connection) group.enqueue_many(q, [self.job_1_data]) - redis_groups = {as_text(group) for group in self.connection.smembers("rq:groups")} + redis_groups = {as_text(group) for group in self.connection.smembers('rq:groups')} assert group.name in redis_groups q.empty() @@ -113,7 +113,7 @@ class TestGroup(RQTestCase): w.work(burst=True, max_jobs=1) sleep(2) w.run_maintenance_tasks() - redis_groups = {as_text(group) for group in self.connection.smembers("rq:groups")} + redis_groups = {as_text(group) for group in self.connection.smembers('rq:groups')} assert group.name not in redis_groups @pytest.mark.slow @@ -130,18 +130,18 @@ class TestGroup(RQTestCase): q.empty() def test_get_group_key(self): - group = Group(name="foo", connection=self.connection) - self.assertEqual(Group.get_key(group.name), "rq:group:foo") + group = Group(name='foo', connection=self.connection) + self.assertEqual(Group.get_key(group.name), 'rq:group:foo') def test_all_returns_all_groups(self): q = Queue(connection=self.connection) - group1 = Group.create(name="group1", connection=self.connection) - Group.create(name="group2", connection=self.connection) + group1 = Group.create(name='group1', connection=self.connection) + Group.create(name='group2', connection=self.connection) group1.enqueue_many(q, [self.job_1_data, self.job_2_data]) all_groups = Group.all(self.connection) assert len(all_groups) == 1 - assert "group1" in [group.name for group in all_groups] - assert "group2" not in [group.name for group in all_groups] + assert 'group1' in [group.name for group in all_groups] + assert 'group2' not in [group.name for group in all_groups] def test_all_deletes_missing_groups(self): q = Queue(connection=self.connection) diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index c9347495..1c8eb6b7 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -14,7 +14,6 @@ from tests.fixtures import say_hello @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') class TestIntermediateQueue(RQTestCase): - def test_set_first_seen(self): """Ensure that the first_seen attribute is set correctly.""" queue = Queue('foo', connection=self.connection) diff --git a/tests/test_job.py b/tests/test_job.py index 9f93cfbf..b92a1f06 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -32,8 +32,8 @@ class TestJob(RQTestCase): """Unicode in job description [issue405]""" job = Job.create( 'myfunc', - args=[12, "☃"], - kwargs=dict(snowman="☃", null=None), + args=[12, '☃'], + kwargs=dict(snowman='☃', null=None), connection=self.connection, ) self.assertEqual( @@ -49,7 +49,7 @@ class TestJob(RQTestCase): # Jobs have a random UUID and a creation date self.assertIsNotNone(job.id) self.assertIsNotNone(job.created_at) - self.assertEqual(str(job), "" % job.id) + self.assertEqual(str(job), '' % job.id) # ...and nothing else self.assertEqual(job.origin, '') @@ -70,8 +70,8 @@ class TestJob(RQTestCase): def test_create_param_errors(self): """Creation of jobs may result in errors""" - self.assertRaises(TypeError, Job.create, fixtures.say_hello, args="string") - self.assertRaises(TypeError, Job.create, fixtures.say_hello, kwargs="string") + self.assertRaises(TypeError, Job.create, fixtures.say_hello, args='string') + self.assertRaises(TypeError, Job.create, fixtures.say_hello, kwargs='string') self.assertRaises(TypeError, Job.create, func=42) def test_create_typical_job(self): @@ -507,8 +507,8 @@ class TestJob(RQTestCase): def test_multiple_dependencies_are_accepted_and_persisted(self): """Ensure job._dependency_ids accepts different input formats, and is set and restored properly""" - job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id="A", connection=self.connection) - job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id="B", connection=self.connection) + job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id='A', connection=self.connection) + job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id='B', connection=self.connection) # No dependencies job = Job.create(func=fixtures.say_hello, connection=self.connection) @@ -518,22 +518,22 @@ class TestJob(RQTestCase): # Various ways of specifying dependencies cases = [ - ["A", ["A"]], - [job_A, ["A"]], - [["A", "B"], ["A", "B"]], - [[job_A, job_B], ["A", "B"]], - [["A", job_B], ["A", "B"]], - [("A", "B"), ["A", "B"]], - [(job_A, job_B), ["A", "B"]], - [(job_A, "B"), ["A", "B"]], - [Dependency("A"), ["A"]], - [Dependency(job_A), ["A"]], - [Dependency(["A", "B"]), ["A", "B"]], - [Dependency([job_A, job_B]), ["A", "B"]], - [Dependency(["A", job_B]), ["A", "B"]], - [Dependency(("A", "B")), ["A", "B"]], - [Dependency((job_A, job_B)), ["A", "B"]], - [Dependency((job_A, "B")), ["A", "B"]], + ['A', ['A']], + [job_A, ['A']], + [['A', 'B'], ['A', 'B']], + [[job_A, job_B], ['A', 'B']], + [['A', job_B], ['A', 'B']], + [('A', 'B'), ['A', 'B']], + [(job_A, job_B), ['A', 'B']], + [(job_A, 'B'), ['A', 'B']], + [Dependency('A'), ['A']], + [Dependency(job_A), ['A']], + [Dependency(['A', 'B']), ['A', 'B']], + [Dependency([job_A, job_B]), ['A', 'B']], + [Dependency(['A', job_B]), ['A', 'B']], + [Dependency(('A', 'B')), ['A', 'B']], + [Dependency((job_A, job_B)), ['A', 'B']], + [Dependency((job_A, 'B')), ['A', 'B']], ] for given, expected in cases: job = Job.create(func=fixtures.say_hello, depends_on=given, connection=self.connection) @@ -546,10 +546,10 @@ class TestJob(RQTestCase): job = Job.create(func=fixtures.say_hello, connection=self.connection) job.save() with self.connection.pipeline() as pipeline: - job.prepare_for_execution("worker_name", pipeline) + job.prepare_for_execution('worker_name', pipeline) pipeline.execute() job.refresh() - self.assertEqual(job.worker_name, "worker_name") + self.assertEqual(job.worker_name, 'worker_name') self.assertEqual(job.get_status(), JobStatus.STARTED) self.assertIsNotNone(job.last_heartbeat) self.assertIsNotNone(job.started_at) @@ -894,8 +894,8 @@ class TestJob(RQTestCase): def test_create_job_with_id(self): """test creating jobs with a custom ID""" queue = Queue(connection=self.connection) - job = queue.enqueue(fixtures.say_hello, job_id="1234") - self.assertEqual(job.id, "1234") + job = queue.enqueue(fixtures.say_hello, job_id='1234') + self.assertEqual(job.id, '1234') job.perform() self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234) @@ -905,17 +905,17 @@ class TestJob(RQTestCase): queue = Queue(connection=self.connection) with self.assertRaises(ValueError): - queue.enqueue(fixtures.say_hello, job_id="1234:4321") + queue.enqueue(fixtures.say_hello, job_id='1234:4321') def test_create_job_with_async(self): """test creating jobs with async function""" queue = Queue(connection=self.connection) - async_job = queue.enqueue(fixtures.say_hello_async, job_id="async_job") - sync_job = queue.enqueue(fixtures.say_hello, job_id="sync_job") + async_job = queue.enqueue(fixtures.say_hello_async, job_id='async_job') + sync_job = queue.enqueue(fixtures.say_hello, job_id='sync_job') - self.assertEqual(async_job.id, "async_job") - self.assertEqual(sync_job.id, "sync_job") + self.assertEqual(async_job.id, 'async_job') + self.assertEqual(sync_job.id, 'sync_job') async_task_result = async_job.perform() sync_task_result = sync_job.perform() @@ -941,14 +941,14 @@ class TestJob(RQTestCase): def test_create_job_with_ttl_should_have_ttl_after_enqueued(self): """test creating jobs with ttl and checks if get_jobs returns it properly [issue502]""" queue = Queue(connection=self.connection) - queue.enqueue(fixtures.say_hello, job_id="1234", ttl=10) + queue.enqueue(fixtures.say_hello, job_id='1234', ttl=10) job = queue.get_jobs()[0] self.assertEqual(job.ttl, 10) def test_create_job_with_ttl_should_expire(self): """test if a job created with ttl expires [issue502]""" queue = Queue(connection=self.connection) - queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1) + queue.enqueue(fixtures.say_hello, job_id='1234', ttl=1) time.sleep(1.1) self.assertEqual(0, len(queue.get_jobs())) @@ -1102,7 +1102,7 @@ class TestJob(RQTestCase): def test_dependencies_key_should_have_prefixed_job_id(self): job_id = 'random' job = Job(id=job_id, connection=self.connection) - expected_key = Job.redis_job_namespace_prefix + ":" + job_id + ':dependencies' + expected_key = Job.redis_job_namespace_prefix + ':' + job_id + ':dependencies' assert job.dependencies_key == expected_key @@ -1247,68 +1247,68 @@ class TestJob(RQTestCase): connection_kwargs = self.connection.connection_pool.connection_kwargs # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued. # Worker 1 will be busy with the slow job, so worker 2 will complete both fast jobs. - job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", connection_kwargs, True, 0.5], job_id='slow_job') - job_A = queue.enqueue(fixtures.rpush, args=[key, "A", connection_kwargs, True]) - job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True]) + job_slow = queue.enqueue(fixtures.rpush, args=[key, 'slow', connection_kwargs, True, 0.5], job_id='slow_job') + job_A = queue.enqueue(fixtures.rpush, args=[key, 'A', connection_kwargs, True]) + job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True]) fixtures.burst_two_workers(queue, connection=self.connection) time.sleep(0.75) jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 2)] self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B])) - self.assertEqual(jobs_completed, ["A:w2", "B:w2", "slow:w1"]) + self.assertEqual(jobs_completed, ['A:w2', 'B:w2', 'slow:w1']) self.connection.delete(key) # When job "A" depends on the slow job, then job "B" finishes before "A". # There is no clear requirement on which worker should take job "A", so we stay silent on that. - job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", connection_kwargs, True, 0.5], job_id='slow_job') - job_A = queue.enqueue(fixtures.rpush, args=[key, "A", connection_kwargs, False], depends_on='slow_job') - job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True]) + job_slow = queue.enqueue(fixtures.rpush, args=[key, 'slow', connection_kwargs, True, 0.5], job_id='slow_job') + job_A = queue.enqueue(fixtures.rpush, args=[key, 'A', connection_kwargs, False], depends_on='slow_job') + job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True]) fixtures.burst_two_workers(queue, connection=self.connection) time.sleep(0.75) jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 2)] self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B])) - self.assertEqual(jobs_completed, ["B:w2", "slow:w1", "A"]) + self.assertEqual(jobs_completed, ['B:w2', 'slow:w1', 'A']) def test_execution_order_with_dual_dependency(self): queue = Queue(connection=self.connection) key = 'test_job:job_order' connection_kwargs = self.connection.connection_pool.connection_kwargs # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued. - job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", connection_kwargs, True, 0.5], job_id='slow_1') - job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", connection_kwargs, True, 0.75], job_id='slow_2') - job_A = queue.enqueue(fixtures.rpush, args=[key, "A", connection_kwargs, True]) - job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True]) + job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, 'slow_1', connection_kwargs, True, 0.5], job_id='slow_1') + job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, 'slow_2', connection_kwargs, True, 0.75], job_id='slow_2') + job_A = queue.enqueue(fixtures.rpush, args=[key, 'A', connection_kwargs, True]) + job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True]) fixtures.burst_two_workers(queue, connection=self.connection) time.sleep(1) jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 3)] self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) - self.assertEqual(jobs_completed, ["slow_1:w1", "A:w1", "B:w1", "slow_2:w2"]) + self.assertEqual(jobs_completed, ['slow_1:w1', 'A:w1', 'B:w1', 'slow_2:w2']) self.connection.delete(key) # This time job "A" depends on two slow jobs, while job "B" depends only on the faster of # the two. Job "B" should be completed before job "A". # There is no clear requirement on which worker should take job "A", so we stay silent on that. - job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", connection_kwargs, True, 0.5], job_id='slow_1') - job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", connection_kwargs, True, 0.75], job_id='slow_2') + job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, 'slow_1', connection_kwargs, True, 0.5], job_id='slow_1') + job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, 'slow_2', connection_kwargs, True, 0.75], job_id='slow_2') job_A = queue.enqueue( - fixtures.rpush, args=[key, "A", connection_kwargs, False], depends_on=['slow_1', 'slow_2'] + fixtures.rpush, args=[key, 'A', connection_kwargs, False], depends_on=['slow_1', 'slow_2'] ) - job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True], depends_on=['slow_1']) + job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True], depends_on=['slow_1']) fixtures.burst_two_workers(queue, connection=self.connection) time.sleep(1) jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 3)] self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) - self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"]) + self.assertEqual(jobs_completed, ['slow_1:w1', 'B:w1', 'slow_2:w2', 'A']) @unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') def test_blocking_result_fetch(self): # Ensure blocking waits for the time to run the job, but not right up until the timeout. job_sleep_seconds = 2 block_seconds = 5 - queue_name = "test_blocking_queue" + queue_name = 'test_blocking_queue' q = Queue(queue_name, connection=self.connection) job = q.enqueue(fixtures.long_running_job, job_sleep_seconds) started_at = time.time() diff --git a/tests/test_queue.py b/tests/test_queue.py index a667a838..490e1e62 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -143,7 +143,7 @@ class TestQueue(RQTestCase): self.assertEqual(0, q.get_job_position(job.id)) self.assertEqual(1, q.get_job_position(job2.id)) self.assertEqual(2, q.get_job_position(job3)) - self.assertEqual(None, q.get_job_position("no_real_job")) + self.assertEqual(None, q.get_job_position('no_real_job')) def test_remove(self): """Ensure queue.remove properly removes Job from queue.""" @@ -516,8 +516,8 @@ class TestQueue(RQTestCase): def test_enqueue_dependents_on_multiple_queues(self): """Enqueueing dependent jobs on multiple queues pushes jobs in the queues and removes them from DeferredJobRegistry for each different queue.""" - q_1 = Queue("queue_1", connection=self.connection) - q_2 = Queue("queue_2", connection=self.connection) + q_1 = Queue('queue_1', connection=self.connection) + q_2 = Queue('queue_2', connection=self.connection) parent_job = Job.create(func=say_hello, connection=self.connection) parent_job.save() job_1 = q_1.enqueue(say_hello, depends_on=parent_job) diff --git a/tests/test_registry.py b/tests/test_registry.py index 7a2d1b07..36a681dd 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -433,7 +433,7 @@ class TestDeferredRegistry(RQTestCase): self.registry.add(job) score = self.connection.zscore(key, job.id) - self.assertEqual(score, float("inf")) + self.assertEqual(score, float('inf')) timestamp = current_timestamp() ttl = 5 diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index a3fea52f..f1673d26 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -484,7 +484,7 @@ class TestQueue(RQTestCase): connection_pool=redis.ConnectionPool( connection_class=CustomRedisConnection, db=4, - custom_arg="foo", + custom_arg='foo', ) ) @@ -494,7 +494,7 @@ class TestQueue(RQTestCase): scheduler_connection = scheduler.connection.connection_pool.get_connection('info') self.assertEqual(scheduler_connection.__class__, CustomRedisConnection) - self.assertEqual(scheduler_connection.get_custom_arg(), "foo") + self.assertEqual(scheduler_connection.get_custom_arg(), 'foo') def test_no_custom_connection_pool(self): """Connection pool customizing must not interfere if we're using a standard diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 5c9c7228..bd96b33a 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -41,7 +41,7 @@ class TestTimeouts(RQTestCase): w.work(burst=True) self.assertIn(job, failed_job_registry) job.refresh() - self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info) + self.assertIn('rq.timeouts.JobTimeoutException', job.exc_info) # Test negative timeout doesn't raise JobTimeoutException, # which implies an unintended immediate timeout. diff --git a/tests/test_utils.py b/tests/test_utils.py index 8b9778a9..5fcae687 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -167,23 +167,23 @@ class TestUtils(RQTestCase): def test_truncate_long_string(self): """Ensure truncate_long_string works properly""" - assert truncate_long_string("12", max_length=3) == "12" - assert truncate_long_string("123", max_length=3) == "123" - assert truncate_long_string("1234", max_length=3) == "123..." - assert truncate_long_string("12345", max_length=3) == "123..." + assert truncate_long_string('12', max_length=3) == '12' + assert truncate_long_string('123', max_length=3) == '123' + assert truncate_long_string('1234', max_length=3) == '123...' + assert truncate_long_string('12345', max_length=3) == '123...' - s = "long string but no max_length provided so no truncating should occur" * 10 + s = 'long string but no max_length provided so no truncating should occur' * 10 assert truncate_long_string(s) == s def test_get_call_string(self): """Ensure a case, when func_name, args and kwargs are not None, works properly""" - cs = get_call_string("f", ('some', 'args', 42), {"key1": "value1", "key2": True}) + cs = get_call_string('f', ('some', 'args', 42), {'key1': 'value1', 'key2': True}) assert cs == "f('some', 'args', 42, key1='value1', key2=True)" def test_get_call_string_with_max_length(self): """Ensure get_call_string works properly when max_length is provided""" - func_name = "f" + func_name = 'f' args = (1234, 12345, 123456) - kwargs = {"len4": 1234, "len5": 12345, "len6": 123456} + kwargs = {'len4': 1234, 'len5': 12345, 'len6': 123456} cs = get_call_string(func_name, args, kwargs, max_length=5) - assert cs == "f(1234, 12345, 12345..., len4=1234, len5=12345, len6=12345...)" + assert cs == 'f(1234, 12345, 12345..., len4=1234, len5=12345, len6=12345...)' diff --git a/tests/test_worker.py b/tests/test_worker.py index 32067416..aac5d6d3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -281,7 +281,7 @@ class TestWorker(RQTestCase): w.perform_job(job, queue) # An exception should be logged here at ERROR level - self.assertIn("Traceback", mock_logger_error.call_args[0][3]) + self.assertIn('Traceback', mock_logger_error.call_args[0][3]) def test_heartbeat(self): """Heartbeat saves last_heartbeat""" @@ -642,10 +642,10 @@ class TestWorker(RQTestCase): """Cancel job and verify that when the parent job is finished, the dependent job is not started.""" - q = Queue("low", connection=self.connection) - parent_job = q.enqueue(long_running_job, 5, job_id="parent_job") - job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1") - job2 = q.enqueue(say_hello, depends_on=job, job_id="job2") + q = Queue('low', connection=self.connection) + parent_job = q.enqueue(long_running_job, 5, job_id='parent_job') + job = q.enqueue(say_hello, depends_on=parent_job, job_id='job1') + job2 = q.enqueue(say_hello, depends_on=job, job_id='job2') job.cancel() w = Worker([q]) @@ -663,11 +663,11 @@ class TestWorker(RQTestCase): def test_cancel_job_enqueue_dependent(self): """Cancel a job in a chain and enqueue the dependent jobs.""" - q = Queue("low", connection=self.connection) - parent_job = q.enqueue(long_running_job, 5, job_id="parent_job") - job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1") - job2 = q.enqueue(say_hello, depends_on=job, job_id="job2") - job3 = q.enqueue(say_hello, depends_on=job2, job_id="job3") + q = Queue('low', connection=self.connection) + parent_job = q.enqueue(long_running_job, 5, job_id='parent_job') + job = q.enqueue(say_hello, depends_on=parent_job, job_id='job1') + job2 = q.enqueue(say_hello, depends_on=job, job_id='job2') + job3 = q.enqueue(say_hello, depends_on=job2, job_id='job3') job.cancel(enqueue_dependents=True) @@ -1019,9 +1019,9 @@ class TestWorker(RQTestCase): def test_worker_hash_(self): """Workers are hashed by their .name attribute""" q = Queue('foo', connection=self.connection) - w1 = Worker([q], name="worker1") - w2 = Worker([q], name="worker2") - w3 = Worker([q], name="worker1") + w1 = Worker([q], name='worker1') + w2 = Worker([q], name='worker2') + w3 = Worker([q], name='worker1') worker_set = set([w1, w2, w3]) self.assertEqual(len(worker_set), 2) @@ -1200,7 +1200,7 @@ class TestWorker(RQTestCase): w = Worker([q], connection=self.connection) q.enqueue(say_hello, args=('Frank',), result_ttl=10) w.dequeue_job_and_maintain_ttl(10) - self.assertIn("Frank", mock_logger_info.call_args[0][2]) + self.assertIn('Frank', mock_logger_info.call_args[0][2]) @mock.patch('rq.worker.logger.info') def test_log_job_description_false(self, mock_logger_info): @@ -1209,14 +1209,14 @@ class TestWorker(RQTestCase): w = Worker([q], log_job_description=False, connection=self.connection) q.enqueue(say_hello, args=('Frank',), result_ttl=10) w.dequeue_job_and_maintain_ttl(10) - self.assertNotIn("Frank", mock_logger_info.call_args[0][2]) + self.assertNotIn('Frank', mock_logger_info.call_args[0][2]) def test_worker_configures_socket_timeout(self): """Ensures that the worker correctly updates Redis client connection to have a socket_timeout""" q = Queue(connection=self.connection) _ = Worker([q], connection=self.connection) connection_kwargs = q.connection.connection_pool.connection_kwargs - self.assertEqual(connection_kwargs["socket_timeout"], 415) + self.assertEqual(connection_kwargs['socket_timeout'], 415) def test_worker_version(self): q = Queue(connection=self.connection) @@ -1255,7 +1255,7 @@ class TestWorker(RQTestCase): qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) w = Worker(qs, connection=self.connection) - w.work(burst=True, dequeue_strategy="random") + w.work(burst=True, dequeue_strategy='random') start_times = [] for i in range(5): @@ -1299,7 +1299,7 @@ class TestWorker(RQTestCase): qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) w = Worker(qs) - w.work(burst=True, dequeue_strategy="round_robin") + w.work(burst=True, dequeue_strategy='round_robin') start_times = [] for i in range(5): @@ -1598,14 +1598,13 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): class TestExceptionHandlerMessageEncoding(RQTestCase): - def test_handle_exception_handles_non_ascii_in_exception_message(self): """worker.handle_exception doesn't crash on non-ascii in exception message.""" - worker = Worker("foo", connection=self.connection) + worker = Worker('foo', connection=self.connection) worker._exc_handlers = [] # Mimic how exception info is actually passed forwards try: - raise Exception(u"💪") + raise Exception('💪') except Exception: exc_info = sys.exc_info() worker.handle_exception(Mock(), *exc_info) diff --git a/tox.ini b/tox.ini index 66fbbabe..602f329e 100644 --- a/tox.ini +++ b/tox.ini @@ -14,10 +14,8 @@ passenv= ; [testenv:lint] ; basepython = python3.10 ; deps = -; black ; ruff ; commands = -; black --check rq tests ; ruff check rq tests [testenv:py37]