Replace Black with Ruff as formatter tool (#2152)

This commit is contained in:
Nguyễn Hồng Quân 2024-11-20 15:34:27 +07:00 committed by GitHub
parent fb017d2c29
commit 26a3577443
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 305 additions and 327 deletions

View File

@ -25,11 +25,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install black ruff
- name: Lint with black
run: |
black --check --skip-string-normalization --line-length 120 rq tests
pip install ruff
- name: Lint with ruff
run: |

View File

@ -21,5 +21,4 @@ force_release: clean
twine upload dist/*
lint:
@ black --check --skip-string-normalization --line-length 120 rq tests
@ ruff check --show-source rq tests

View File

@ -8,7 +8,7 @@ RQ requires Redis >= 3.0.0.
[![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22)
[![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq)
[![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Code style: Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
Full documentation can be found [here][d].

View File

@ -77,7 +77,6 @@ include = [
[tool.hatch.envs.test]
dependencies = [
"black",
"coverage",
"mypy",
"packaging",
@ -93,11 +92,6 @@ dependencies = [
cov = "pytest --cov=rq --cov-config=.coveragerc --cov-report=xml {args:tests}"
typing = "mypy --enable-incomplete-feature=Unpack rq"
[tool.black]
line-length = 120
target-version = ["py38"]
skip-string-normalization = true
[tool.mypy]
allow_redefinition = true
pretty = true
@ -120,13 +114,16 @@ lint.select = [
"I", # import sorting
"W", # pycodestyle warnings
]
line-length = 120 # To match black.
line-length = 120
target-version = "py38"
[tool.ruff.lint.isort]
known-first-party = ["rq"]
section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"]
[tool.ruff.format]
quote-style = "single"
[tool.pyright]
reportOptionalMemberAccess = false
reportOptionalOperand = false
reportOptionalOperand = false

View File

@ -5,14 +5,14 @@ from .version import VERSION
from .worker import SimpleWorker, Worker
__all__ = [
"Callback",
"Retry",
"cancel_job",
"get_current_job",
"requeue_job",
"Queue",
"SimpleWorker",
"Worker",
'Callback',
'Retry',
'cancel_job',
'get_current_job',
'requeue_job',
'Queue',
'SimpleWorker',
'Worker',
]
__version__ = VERSION

View File

@ -122,7 +122,7 @@ def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
@main.command()
@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
@click.option('--interval', '-i', type=float, help="Updates stats every N seconds (default: don't poll)")
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@ -164,7 +164,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@main.command()
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--logging_level', type=str, default="INFO", help='Set logging level')
@click.option('--logging_level', type=str, default='INFO', help='Set logging level')
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--name', '-n', help='Specify a different name')
@ -187,7 +187,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help="Disable RQ's default exception handler")
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@ -233,19 +233,19 @@ def worker(
logging.config.dictConfig(dict_config)
if pid:
with open(os.path.expanduser(pid), "w") as fp:
with open(os.path.expanduser(pid), 'w') as fp:
fp.write(str(os.getpid()))
worker_name = cli_config.worker_class.__qualname__
if worker_name in ["RoundRobinWorker", "RandomWorker"]:
strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
msg = f"WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead."
if worker_name in ['RoundRobinWorker', 'RandomWorker']:
strategy_alternative = 'random' if worker_name == 'RandomWorker' else 'round_robin'
msg = f'WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead.'
warnings.warn(msg, DeprecationWarning)
click.secho(msg, fg='yellow')
if dequeue_strategy not in ("default", "random", "round_robin"):
if dequeue_strategy not in ('default', 'random', 'round_robin'):
click.secho(
"ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red'
'ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.', err=True, fg='red'
)
sys.exit(1)
@ -309,19 +309,17 @@ def suspend(cli_config, duration, **options):
"""Suspends all workers, to resume run `rq resume`"""
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
click.echo('Duration must be an integer greater than 1')
sys.exit(1)
connection_suspend(cli_config.connection, duration)
if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(
duration
)
automatically resume""".format(duration)
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
click.echo('Suspending workers. No new jobs will be started. But current jobs will be completed')
@main.command()
@ -329,7 +327,7 @@ def suspend(cli_config, duration, **options):
def resume(cli_config, **options):
"""Resumes processing of queues, that were suspended with `rq suspend`"""
connection_resume(cli_config.connection)
click.echo("Resuming workers.")
click.echo('Resuming workers.')
@main.command()
@ -427,12 +425,12 @@ def enqueue(
queue.schedule_job(job, schedule)
if not quiet:
click.echo('Enqueued %s with job-id \'%s\'.' % (blue(function_string), job.id))
click.echo("Enqueued %s with job-id '%s'." % (blue(function_string), job.id))
@main.command()
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--logging-level', '-l', type=str, default="INFO", help='Set logging level')
@click.option('--logging-level', '-l', type=str, default='INFO', help='Set logging level')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')

View File

@ -196,13 +196,13 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class, connection: R
queue_dict[queue] = worker_class.all(queue=queue, connection=connection)
if queue_dict:
max_length = max(len(q.name) for q, in queue_dict.keys())
max_length = max(len(q.name) for (q,) in queue_dict.keys())
else:
max_length = 0
for queue in queue_dict:
if queue_dict[queue]:
queues_str = ", ".join(
queues_str = ', '.join(
sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue]))
)
else:
@ -238,7 +238,7 @@ def refresh(interval, func, *args):
def setup_loghandlers_from_args(verbose, quiet, date_format, log_format):
if verbose and quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")
raise RuntimeError('Flags --verbose and --quiet are mutually exclusive.')
if verbose:
level = 'DEBUG'
@ -312,7 +312,7 @@ def parse_function_args(arguments):
keyword, value = parse_function_arg(argument, len(args) + 1)
if keyword is not None:
if keyword in kwargs:
raise click.BadParameter('You can\'t specify multiple values for the same keyword.')
raise click.BadParameter("You can't specify multiple values for the same keyword.")
kwargs[keyword] = value
else:
args.append(value)
@ -322,7 +322,7 @@ def parse_function_args(arguments):
def parse_schedule(schedule_in, schedule_at):
if schedule_in is not None:
if schedule_at is not None:
raise click.BadArgumentUsage('You can\'t specify both --schedule-in and --schedule-at')
raise click.BadArgumentUsage("You can't specify both --schedule-in and --schedule-at")
return datetime.now(timezone.utc) + timedelta(seconds=parse_timeout(schedule_in))
elif schedule_at is not None:
return datetime.strptime(schedule_at, '%Y-%m-%dT%H:%M:%S')

View File

@ -23,7 +23,7 @@ class Group:
self.key = '{0}{1}'.format(self.REDIS_GROUP_NAME_PREFIX, self.name)
def __repr__(self):
return "Group(id={})".format(self.name)
return 'Group(id={})'.format(self.name)
def _add_jobs(self, jobs: List[Job], pipeline: Pipeline):
"""Add jobs to the group"""

View File

@ -11,7 +11,6 @@ if TYPE_CHECKING:
class IntermediateQueue(object):
def __init__(self, queue_key: str, connection: Redis):
self.queue_key = queue_key
self.key = self.get_intermediate_queue_key(queue_key)
@ -103,7 +102,6 @@ class IntermediateQueue(object):
job = queue.fetch_job(job_id)
if job_id not in queue.started_job_registry:
if not job:
# If the job doesn't exist in the queue, we can safely remove it from the intermediate queue.
self.remove(job_id)

View File

@ -45,7 +45,7 @@ from .utils import (
utcformat,
)
logger = logging.getLogger("rq.job")
logger = logging.getLogger('rq.job')
class JobStatus(str, Enum):
@ -86,9 +86,9 @@ class Dependency:
"""
dependent_jobs = ensure_list(jobs)
if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job):
raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids")
raise ValueError('jobs: must contain objects of type Job and/or strings representing Job ids')
elif len(dependent_jobs) < 1:
raise ValueError("jobs: cannot be empty.")
raise ValueError('jobs: cannot be empty.')
self.dependencies = dependent_jobs
self.allow_failure = allow_failure
@ -126,9 +126,9 @@ def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['J
job (Optional[Job]): The current Job running
"""
if connection:
warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning)
warnings.warn('connection argument for get_current_job is deprecated.', DeprecationWarning)
if job_class:
warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning)
warnings.warn('job_class argument for get_current_job is deprecated.', DeprecationWarning)
return _job_stack.top
@ -397,7 +397,7 @@ class Job:
if refresh:
status = self.connection.hget(self.key, 'status')
if not status:
raise InvalidJobOperation(f"Failed to retrieve status for job: {self.id}")
raise InvalidJobOperation(f'Failed to retrieve status for job: {self.id}')
self._status = JobStatus(as_text(status))
return self._status
@ -730,7 +730,7 @@ class Job:
if not isinstance(value, str):
raise TypeError('id must be a string, not {0}'.format(type(value)))
if ":" in value:
if ':' in value:
raise ValueError('id must not contain ":"')
self._id = value
@ -822,7 +822,7 @@ class Job:
"""
Get the latest result and returns `exc_info` only if the latest result is a failure.
"""
warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning)
warnings.warn('job.exc_info is deprecated, use job.latest_result() instead.', DeprecationWarning)
from .results import Result
@ -886,7 +886,7 @@ class Job:
seconds by default).
"""
warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning)
warnings.warn('job.result is deprecated, use job.return_value instead.', DeprecationWarning)
from .results import Result
@ -1064,7 +1064,7 @@ class Job:
try:
obj['result'] = self.serializer.dumps(self._result)
except: # noqa
obj['result'] = "Unserializable return value"
obj['result'] = 'Unserializable return value'
if self._exc_info is not None and include_result:
obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8'))
if self.timeout is not None:
@ -1091,10 +1091,10 @@ class Job:
if self.allow_dependency_failures is not None:
# convert boolean to integer to avoid redis.exception.DataError
obj["allow_dependency_failures"] = int(self.allow_dependency_failures)
obj['allow_dependency_failures'] = int(self.allow_dependency_failures)
if self.enqueue_at_front is not None:
obj["enqueue_at_front"] = int(self.enqueue_at_front)
obj['enqueue_at_front'] = int(self.enqueue_at_front)
return obj
@ -1158,7 +1158,7 @@ class Job:
InvalidJobOperation: If the job has already been cancelled.
"""
if self.is_canceled:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
raise InvalidJobOperation('Cannot cancel already canceled job: {}'.format(self.get_id()))
from .queue import Queue
from .registry import CanceledJobRegistry

View File

@ -1,13 +1,14 @@
# ruff: noqa: E731
"""
werkzeug.local
~~~~~~~~~~~~~~
werkzeug.local
~~~~~~~~~~~~~~
This module implements context-local objects.
This module implements context-local objects.
:copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
:copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
# Since each thread has its own greenlet we can just use those as identifiers
# for the context. If greenlets are not available we fall back to the
# current thread ident.

View File

@ -7,36 +7,36 @@ from rq.defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT
class _Colorizer:
def __init__(self):
esc = "\x1b["
esc = '\x1b['
self.codes = {}
self.codes[""] = ""
self.codes["reset"] = esc + "39;49;00m"
self.codes[''] = ''
self.codes['reset'] = esc + '39;49;00m'
self.codes["bold"] = esc + "01m"
self.codes["faint"] = esc + "02m"
self.codes["standout"] = esc + "03m"
self.codes["underline"] = esc + "04m"
self.codes["blink"] = esc + "05m"
self.codes["overline"] = esc + "06m"
self.codes['bold'] = esc + '01m'
self.codes['faint'] = esc + '02m'
self.codes['standout'] = esc + '03m'
self.codes['underline'] = esc + '04m'
self.codes['blink'] = esc + '05m'
self.codes['overline'] = esc + '06m'
dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"]
light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"]
dark_colors = ['black', 'darkred', 'darkgreen', 'brown', 'darkblue', 'purple', 'teal', 'lightgray']
light_colors = ['darkgray', 'red', 'green', 'yellow', 'blue', 'fuchsia', 'turquoise', 'white']
x = 30
for dark, light in zip(dark_colors, light_colors):
self.codes[dark] = esc + "%im" % x
self.codes[light] = esc + "%i;01m" % x
self.codes[dark] = esc + '%im' % x
self.codes[light] = esc + '%i;01m' % x
x += 1
del dark, light, x
self.codes["darkteal"] = self.codes["turquoise"]
self.codes["darkyellow"] = self.codes["brown"]
self.codes["fuscia"] = self.codes["fuchsia"]
self.codes["white"] = self.codes["bold"]
self.codes['darkteal'] = self.codes['turquoise']
self.codes['darkyellow'] = self.codes['brown']
self.codes['fuscia'] = self.codes['fuchsia']
self.codes['white'] = self.codes['bold']
if hasattr(sys.stdout, "isatty"):
if hasattr(sys.stdout, 'isatty'):
self.notty = not sys.stdout.isatty()
else:
self.notty = True
@ -45,7 +45,7 @@ class _Colorizer:
if self.notty:
return text
else:
return self.codes[color_key] + text + self.codes["reset"]
return self.codes[color_key] + text + self.codes['reset']
colorizer = _Colorizer()
@ -98,7 +98,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
if self.is_tty:
# Don't colorize any traceback
parts = message.split('\n', 1)
parts[0] = " ".join([parts[0].split(" ", 1)[0], parts[0].split(" ", 1)[1]])
parts[0] = ' '.join([parts[0].split(' ', 1)[0], parts[0].split(' ', 1)[1]])
message = '\n'.join(parts)

View File

@ -19,7 +19,7 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None:
We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry.
"""
warnings.warn(
"clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.",
'clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.',
DeprecationWarning,
)
intermediate_queue = IntermediateQueue(queue.key, connection=queue.connection)

View File

@ -28,29 +28,29 @@ from .serializers import resolve_serializer
from .types import FunctionReferenceType, JobDependencyType
from .utils import as_text, backend_class, compact, get_version, import_attribute, now, parse_timeout
logger = logging.getLogger("rq.queue")
logger = logging.getLogger('rq.queue')
class EnqueueData(
namedtuple(
'EnqueueData',
[
"func",
"args",
"kwargs",
"timeout",
"result_ttl",
"ttl",
"failure_ttl",
"description",
"depends_on",
"job_id",
"at_front",
"meta",
"retry",
"on_success",
"on_failure",
"on_stopped",
'func',
'args',
'kwargs',
'timeout',
'result_ttl',
'ttl',
'failure_ttl',
'description',
'depends_on',
'job_id',
'at_front',
'meta',
'retry',
'on_success',
'on_failure',
'on_stopped',
],
)
):
@ -278,11 +278,7 @@ class Queue:
count = count + 1
end
return count
""".format(
self.job_class.redis_job_namespace_prefix
).encode(
"utf-8"
)
""".format(self.job_class.redis_job_namespace_prefix).encode('utf-8')
script = self.connection.register_script(script)
return script(keys=[self.key])
@ -809,23 +805,23 @@ class Queue:
def get_job_kwargs(job_data, initial_status):
return {
"func": job_data.func,
"args": job_data.args,
"kwargs": job_data.kwargs,
"result_ttl": job_data.result_ttl,
"ttl": job_data.ttl,
"failure_ttl": job_data.failure_ttl,
"description": job_data.description,
"depends_on": job_data.depends_on,
"job_id": job_data.job_id,
"meta": job_data.meta,
"status": initial_status,
"timeout": job_data.timeout,
"retry": job_data.retry,
"on_success": job_data.on_success,
"on_failure": job_data.on_failure,
"on_stopped": job_data.on_stopped,
"group_id": group_id,
'func': job_data.func,
'args': job_data.args,
'kwargs': job_data.kwargs,
'result_ttl': job_data.result_ttl,
'ttl': job_data.ttl,
'failure_ttl': job_data.failure_ttl,
'description': job_data.description,
'depends_on': job_data.depends_on,
'job_id': job_data.job_id,
'meta': job_data.meta,
'status': initial_status,
'timeout': job_data.timeout,
'retry': job_data.retry,
'on_success': job_data.on_success,
'on_failure': job_data.on_failure,
'on_stopped': job_data.on_stopped,
'group_id': group_id,
}
# Enqueue jobs without dependencies
@ -1298,11 +1294,11 @@ class Queue:
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
colored_queues = ', '.join(map(str, [green(str(queue)) for queue in queue_keys]))
logger.debug(f"Starting BLPOP operation for queues {colored_queues} with timeout of {timeout}")
logger.debug(f'Starting BLPOP operation for queues {colored_queues} with timeout of {timeout}')
assert connection
result = connection.blpop(queue_keys, timeout)
if result is None:
logger.debug(f"BLPOP timeout, no jobs found on queues {colored_queues}")
logger.debug(f'BLPOP timeout, no jobs found on queues {colored_queues}')
raise DequeueTimeout(timeout, queue_keys)
queue_key, job_id = result
return queue_key, job_id
@ -1324,10 +1320,10 @@ class Queue:
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
colored_queue = green(queue_key)
logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}")
logger.debug(f'Starting BLMOVE operation for {colored_queue} with timeout of {timeout}')
result: Optional[Any] = connection.blmove(queue_key, intermediate_queue.key, timeout)
if result is None:
logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}")
logger.debug(f'BLMOVE timeout, no jobs found on {colored_queue}')
raise DequeueTimeout(timeout, queue_key)
return queue_key, result
else: # non-blocking variant

View File

@ -21,7 +21,7 @@ if TYPE_CHECKING:
from rq.executions import Execution
logger = logging.getLogger("rq.registry")
logger = logging.getLogger('rq.registry')
class BaseRegistry:
@ -288,13 +288,13 @@ class StartedJobRegistry(BaseRegistry):
job.retry(queue, pipeline)
else:
exc_string = f"due to {AbandonedJobError.__name__}"
exc_string = f'due to {AbandonedJobError.__name__}'
logger.warning(
f'{self.__class__.__name__} cleanup: Moving job {job.id} to {FailedJobRegistry.__name__} '
f'({exc_string})'
)
job.set_status(JobStatus.FAILED)
job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}"
job._exc_info = f'Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}'
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl)
@ -434,7 +434,7 @@ class DeferredJobRegistry(BaseRegistry):
continue
job.set_status(JobStatus.FAILED, pipeline=pipeline)
exc_info = "Expired in DeferredJobRegistry, moved to FailedJobRegistry at %s" % datetime.now()
exc_info = 'Expired in DeferredJobRegistry, moved to FailedJobRegistry at %s' % datetime.now()
failed_job_registry.add(job, job.failure_ttl, exc_info, pipeline, True)
pipeline.zremrangebyscore(self.key, 0, score)

View File

@ -152,7 +152,7 @@ class Result:
# Unlike blpop, xread timeout is in miliseconds. "0-0" is the special value for the
# first item in the stream, like '-' for xrevrange.
timeout_ms = timeout * 1000
response = job.connection.xread({cls.get_key(job.id): "0-0"}, block=timeout_ms)
response = job.connection.xread({cls.get_key(job.id): '0-0'}, block=timeout_ms)
if not response:
return None
response = response[0] # Querying single stream only.

View File

@ -86,7 +86,7 @@ class TimerDeathPenalty(BaseDeathPenalty):
# Monkey-patch exception with the message ahead of time
# since PyThreadState_SetAsyncExc can only take a class
def init_with_message(self, *args, **kwargs): # noqa
super(exception, self).__init__("Task exceeded maximum timeout value ({0} seconds)".format(timeout))
super(exception, self).__init__('Task exceeded maximum timeout value ({0} seconds)'.format(timeout))
self._exception.__init__ = init_with_message
@ -103,10 +103,10 @@ class TimerDeathPenalty(BaseDeathPenalty):
ctypes.c_long(self._target_thread_id), ctypes.py_object(self._exception)
)
if ret == 0:
raise ValueError("Invalid thread ID {}".format(self._target_thread_id))
raise ValueError('Invalid thread ID {}'.format(self._target_thread_id))
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._target_thread_id), 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
raise SystemError('PyThreadState_SetAsyncExc failed')
def setup_death_penalty(self):
"""Starts the timer."""

View File

@ -286,19 +286,19 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]:
"""
try:
# Getting the connection info for each job tanks performance, we can cache it on the connection object
if not getattr(connection, "__rq_redis_server_version", None):
if not getattr(connection, '__rq_redis_server_version', None):
# Cast the version string to a tuple of integers. Some Redis implementations may return a float.
version_str = str(connection.info("server")["redis_version"])
version_str = str(connection.info('server')['redis_version'])
version_parts = [int(i) for i in version_str.split('.')[:3]]
# Ensure the version tuple has exactly three elements
while len(version_parts) < 3:
version_parts.append(0)
setattr(
connection,
"__rq_redis_server_version",
'__rq_redis_server_version',
tuple(version_parts),
)
return getattr(connection, "__rq_redis_server_version")
return getattr(connection, '__rq_redis_server_version')
except ResponseError: # fakeredis doesn't implement Redis' INFO command
return (5, 0, 9)

View File

@ -76,7 +76,7 @@ except ImportError:
pass
logger = logging.getLogger("rq.worker")
logger = logging.getLogger('rq.worker')
class StopRequested(Exception):
@ -102,9 +102,9 @@ def signal_name(signum):
class DequeueStrategy(str, Enum):
DEFAULT = "default"
ROUND_ROBIN = "round_robin"
RANDOM = "random"
DEFAULT = 'default'
ROUND_ROBIN = 'round_robin'
RANDOM = 'random'
class WorkerStatus(str, Enum):
@ -151,13 +151,12 @@ class BaseWorker:
serializer=None,
work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None,
): # noqa
self.default_result_ttl = default_result_ttl
if worker_ttl:
self.worker_ttl = worker_ttl
elif default_worker_ttl:
warnings.warn("default_worker_ttl is deprecated, use worker_ttl.", DeprecationWarning, stacklevel=2)
warnings.warn('default_worker_ttl is deprecated, use worker_ttl.', DeprecationWarning, stacklevel=2)
self.worker_ttl = default_worker_ttl
else:
self.worker_ttl = DEFAULT_WORKER_TTL
@ -441,9 +440,9 @@ class BaseWorker:
Args:
connection (Optional[Redis]): The Redis Connection.
"""
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
current_socket_timeout = connection.connection_pool.connection_kwargs.get('socket_timeout')
if current_socket_timeout is None or current_socket_timeout < self.connection_timeout:
timeout_config = {"socket_timeout": self.connection_timeout}
timeout_config = {'socket_timeout': self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
@ -559,7 +558,7 @@ class BaseWorker:
def work(
self,
burst: bool = False,
logging_level: str = "INFO",
logging_level: str = 'INFO',
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None,
@ -674,9 +673,9 @@ class BaseWorker:
if self._dequeue_strategy is None:
self._dequeue_strategy = DequeueStrategy.DEFAULT
if self._dequeue_strategy not in ("default", "random", "round_robin"):
if self._dequeue_strategy not in ('default', 'random', 'round_robin'):
raise ValueError(
f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`."
f'Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`.'
)
if self._dequeue_strategy == DequeueStrategy.DEFAULT:
return
@ -810,7 +809,7 @@ class BaseWorker:
def _set_state(self, state):
"""Raise a DeprecationWarning if ``worker.state = X`` is used"""
warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning)
warnings.warn('worker.state is deprecated, use worker.set_state() instead.', DeprecationWarning)
self.set_state(state)
def get_state(self) -> str:
@ -818,7 +817,7 @@ class BaseWorker:
def _get_state(self):
"""Raise a DeprecationWarning if ``worker.state == X`` is used"""
warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning)
warnings.warn('worker.state is deprecated, use worker.get_state() instead.', DeprecationWarning)
return self.get_state()
state = property(_get_state, _set_state)
@ -826,7 +825,7 @@ class BaseWorker:
def _start_scheduler(
self,
burst: bool = False,
logging_level: str = "INFO",
logging_level: str = 'INFO',
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
):
@ -909,7 +908,7 @@ class BaseWorker:
def bootstrap(
self,
logging_level: str = "INFO",
logging_level: str = 'INFO',
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
):
@ -993,7 +992,7 @@ class BaseWorker:
self.clean_registries()
Group.clean_registries(connection=self.connection)
def _pubsub_exception_handler(self, exc: Exception, pubsub: "PubSub", pubsub_thread: "PubSubWorkerThread") -> None:
def _pubsub_exception_handler(self, exc: Exception, pubsub: 'PubSub', pubsub_thread: 'PubSubWorkerThread') -> None:
"""
This exception handler allows the pubsub_thread to continue & retry to
connect after a connection problem the same way the main worker loop
@ -1003,13 +1002,13 @@ class BaseWorker:
"""
if isinstance(exc, (redis.exceptions.ConnectionError)):
self.log.error(
"Could not connect to Redis instance: %s Retrying in %d seconds...",
'Could not connect to Redis instance: %s Retrying in %d seconds...',
exc,
2,
)
time.sleep(2.0)
else:
self.log.warning("Pubsub thread exitin on %s" % exc)
self.log.warning('Pubsub thread exitin on %s' % exc)
raise
def handle_payload(self, message):
@ -1241,7 +1240,6 @@ class BaseWorker:
class Worker(BaseWorker):
@property
def is_horse(self):
"""Returns whether or not this is the worker or the work horse."""
@ -1395,8 +1393,8 @@ class Worker(BaseWorker):
job.ended_at = now()
# Unhandled failure: move the job to the failed queue
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ''
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
signal_msg = f' (signal {os.WTERMSIG(ret_val)})' if ret_val and os.WIFSIGNALED(ret_val) else ''
exc_string = f'Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; '
self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string)
self.handle_work_horse_killed(job, retpid, ret_val, rusage)
@ -1532,7 +1530,7 @@ class Worker(BaseWorker):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
self.log.debug('Saving job %s\'s successful execution result', job.id)
self.log.debug("Saving job %s's successful execution result", job.id)
job._handle_success(result_ttl, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)

View File

@ -149,7 +149,7 @@ class WorkerPool:
name: str,
burst: bool,
_sleep: float = 0,
logging_level: str = "INFO",
logging_level: str = 'INFO',
) -> Process:
"""Returns the worker process"""
return Process(
@ -171,7 +171,7 @@ class WorkerPool:
count: Optional[int] = None,
burst: bool = True,
_sleep: float = 0,
logging_level: str = "INFO",
logging_level: str = 'INFO',
):
"""
Starts a worker and adds the data to worker_datas.
@ -184,7 +184,7 @@ class WorkerPool:
self.worker_dict[name] = worker_data
self.log.debug('Spawned worker: %s with PID %d', name, process.pid)
def start_workers(self, burst: bool = True, _sleep: float = 0, logging_level: str = "INFO"):
def start_workers(self, burst: bool = True, _sleep: float = 0, logging_level: str = 'INFO'):
"""
Run the workers
* sleep: waits for X seconds before creating worker, only for testing purposes
@ -214,7 +214,7 @@ class WorkerPool:
for worker_data in worker_datas:
self.stop_worker(worker_data)
def start(self, burst: bool = False, logging_level: str = "INFO"):
def start(self, burst: bool = False, logging_level: str = 'INFO'):
self._burst = burst
respawn = not burst # Don't respawn workers if burst mode is on
setup_loghandlers(logging_level, DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, name=__name__)
@ -250,7 +250,7 @@ def run_worker(
serializer: Type['Serializer'] = DefaultSerializer,
job_class: Type[Job] = Job,
burst: bool = True,
logging_level: str = "INFO",
logging_level: str = 'INFO',
_sleep: int = 0,
):
connection = connection_class(
@ -258,6 +258,6 @@ def run_worker(
)
queues = [Queue(name, connection=connection) for name in queue_names]
worker = worker_class(queues, name=worker_name, connection=connection, serializer=serializer, job_class=job_class)
worker.log.info("Starting worker started with PID %s", os.getpid())
worker.log.info('Starting worker started with PID %s', os.getpid())
time.sleep(_sleep)
worker.work(burst=burst, with_scheduler=True, logging_level=logging_level)

View File

@ -25,12 +25,12 @@ def find_empty_redis_database(ssl=False):
def slow(f):
f = pytest.mark.slow(f)
return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), "Slow tests disabled")(f)
return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), 'Slow tests disabled')(f)
def ssl_test(f):
f = pytest.mark.ssl_test(f)
return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), "SSL tests disabled")(f)
return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), 'SSL tests disabled')(f)
class TestCase(unittest.TestCase):

View File

@ -1 +1 @@
REDIS_HOST = "testhost.example.com"
REDIS_HOST = 'testhost.example.com'

View File

@ -1,4 +1,4 @@
REDIS_HOST = "testhost.example.com"
REDIS_HOST = 'testhost.example.com'
REDIS_PORT = 6378
REDIS_DB = 2
REDIS_PASSWORD = '123'

View File

@ -146,18 +146,18 @@ class Number:
class CallableObject:
def __call__(self):
return u"I'm callable"
return "I'm callable"
class UnicodeStringObject:
def __repr__(self):
return u'é'
return 'é'
class ClassWithAStaticMethod:
@staticmethod
def static_method():
return u"I'm a static method"
return "I'm a static method"
def black_hole(job, *exc_info):
@ -298,7 +298,7 @@ def save_exception(job, connection, type, value, traceback):
connection.set('failure_callback:%s' % job.id, str(value), ex=60)
def save_result_if_not_stopped(job, connection, result=""):
def save_result_if_not_stopped(job, connection, result=''):
connection.set('stopped_callback:%s' % job.id, result, ex=60)

View File

@ -36,12 +36,12 @@ class QueueCallbackTestCase(RQTestCase):
self.assertEqual(job.success_callback, print)
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("print"))
job = queue.enqueue(say_hello, on_success=Callback('print'))
job = Job.fetch(id=job.id, connection=self.connection)
self.assertEqual(job.success_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=Callback("print"))
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=Callback('print'))
job = Job.fetch(id=job.id, connection=self.connection)
self.assertEqual(job.success_callback, print)
@ -65,12 +65,12 @@ class QueueCallbackTestCase(RQTestCase):
self.assertEqual(job.failure_callback, print)
# test string callbacks
job = queue.enqueue(say_hello, on_failure=Callback("print"))
job = queue.enqueue(say_hello, on_failure=Callback('print'))
job = Job.fetch(id=job.id, connection=self.connection)
self.assertEqual(job.failure_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=Callback("print"))
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=Callback('print'))
job = Job.fetch(id=job.id, connection=self.connection)
self.assertEqual(job.failure_callback, print)
@ -94,12 +94,12 @@ class QueueCallbackTestCase(RQTestCase):
self.assertEqual(job.stopped_callback, print)
# test string callbacks
job = queue.enqueue(long_process, on_stopped=Callback("print"))
job = queue.enqueue(long_process, on_stopped=Callback('print'))
job = Job.fetch(id=job.id, connection=self.connection)
self.assertEqual(job.stopped_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), long_process, on_stopped=Callback("print"))
job = queue.enqueue_in(timedelta(seconds=10), long_process, on_stopped=Callback('print'))
job = Job.fetch(id=job.id, connection=self.connection)
self.assertEqual(job.stopped_callback, print)
@ -131,11 +131,11 @@ class SyncJobCallback(RQTestCase):
self.assertFalse(self.connection.exists('success_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result"))
job = queue.enqueue(say_hello, on_success=Callback('tests.fixtures.save_result'))
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(self.connection.get('success_callback:%s' % job.id).decode(), job.result)
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result'))
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.connection.exists('success_callback:%s' % job.id))
@ -152,11 +152,11 @@ class SyncJobCallback(RQTestCase):
self.assertFalse(self.connection.exists('failure_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception"))
job = queue.enqueue(div_by_zero, on_failure=Callback('tests.fixtures.save_exception'))
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIn('div_by_zero', self.connection.get('failure_callback:%s' % job.id).decode())
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result'))
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.connection.exists('failure_callback:%s' % job.id))
@ -173,7 +173,7 @@ class SyncJobCallback(RQTestCase):
self.assertTrue(self.connection.exists('stopped_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(long_process, on_stopped=Callback("tests.fixtures.save_result_if_not_stopped"))
job = queue.enqueue(long_process, on_stopped=Callback('tests.fixtures.save_result_if_not_stopped'))
job.execute_stopped_callback(
worker.death_penalty_class
) # Calling execute_stopped_callback directly for coverage
@ -198,12 +198,12 @@ class WorkerCallbackTestCase(RQTestCase):
self.assertFalse(self.connection.exists('success_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result"))
job = queue.enqueue(say_hello, on_success=Callback('tests.fixtures.save_result'))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(self.connection.get('success_callback:%s' % job.id).decode(), job.return_value())
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result'))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.connection.exists('success_callback:%s' % job.id))
@ -219,7 +219,7 @@ class WorkerCallbackTestCase(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.FAILED)
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.erroneous_callback"))
job = queue.enqueue(say_hello, on_success=Callback('tests.fixtures.erroneous_callback'))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
@ -242,14 +242,14 @@ class WorkerCallbackTestCase(RQTestCase):
self.assertFalse(self.connection.exists('failure_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception"))
job = queue.enqueue(div_by_zero, on_failure=Callback('tests.fixtures.save_exception'))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
job.refresh()
print(job.exc_info)
self.assertIn('div_by_zero', self.connection.get('failure_callback:%s' % job.id).decode())
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
job = queue.enqueue(div_by_zero, on_success=Callback('tests.fixtures.save_result'))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.connection.exists('failure_callback:%s' % job.id))
@ -278,7 +278,7 @@ class JobCallbackTestCase(RQTestCase):
self.assertEqual(job.success_callback, print)
# test string callbacks
job = Job.create(say_hello, on_success=Callback("print"), connection=self.connection)
job = Job.create(say_hello, on_success=Callback('print'), connection=self.connection)
self.assertIsNotNone(job._success_callback_name)
self.assertEqual(job.success_callback, print)
job.save()
@ -306,7 +306,7 @@ class JobCallbackTestCase(RQTestCase):
self.assertEqual(job.failure_callback, print)
# test string callbacks
job = Job.create(say_hello, on_failure=Callback("print"), connection=self.connection)
job = Job.create(say_hello, on_failure=Callback('print'), connection=self.connection)
self.assertIsNotNone(job._failure_callback_name)
self.assertEqual(job.failure_callback, print)
job.save()
@ -334,7 +334,7 @@ class JobCallbackTestCase(RQTestCase):
self.assertEqual(job.stopped_callback, print)
# test string callbacks
job = Job.create(say_hello, on_stopped=Callback("print"), connection=self.connection)
job = Job.create(say_hello, on_stopped=Callback('print'), connection=self.connection)
self.assertIsNotNone(job._stopped_callback_name)
self.assertEqual(job.stopped_callback, print)
job.save()

View File

@ -32,10 +32,10 @@ class CLITestCase(RQTestCase):
if result.exit_code == 0:
return True
else:
print("Non normal execution")
print("Exit Code: {}".format(result.exit_code))
print("Output: {}".format(result.output))
print("Exception: {}".format(result.exception))
print('Non normal execution')
print('Exit Code: {}'.format(result.exit_code))
print('Output: {}'.format(result.output))
print('Exception: {}'.format(result.exception))
self.assertEqual(result.exit_code, 0)
@ -48,10 +48,10 @@ class TestRQCli(CLITestCase):
if result.exit_code == 0:
return True
else:
print("Non normal execution")
print("Exit Code: {}".format(result.exit_code))
print("Output: {}".format(result.output))
print("Exception: {}".format(result.exception))
print('Non normal execution')
print('Exit Code: {}'.format(result.exit_code))
print('Output: {}'.format(result.output))
print('Exception: {}'.format(result.exception))
self.assertEqual(result.exit_code, 0)
"""Test rq_cli script"""
@ -108,7 +108,7 @@ class TestRQCli(CLITestCase):
self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], '123')
def test_config_env_vars(self):
os.environ['REDIS_HOST'] = "testhost.example.com"
os.environ['REDIS_HOST'] = 'testhost.example.com'
cli_config = CliConfig()
@ -414,7 +414,7 @@ class TestRQCli(CLITestCase):
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output)
self.assertIn('Duration must be an integer greater than 1', result.output)
def test_serializer(self):
"""rq worker -u <url> --serializer <serializer>"""
@ -434,8 +434,8 @@ class TestRQCli(CLITestCase):
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello'])
self.assert_normal_execution(result)
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n'
prefix = "Enqueued tests.fixtures.say_hello() with job-id '"
suffix = "'.\n"
self.assertTrue(result.output.startswith(prefix))
self.assertTrue(result.output.endswith(suffix))
@ -460,8 +460,8 @@ class TestRQCli(CLITestCase):
)
self.assert_normal_execution(result)
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n'
prefix = "Enqueued tests.fixtures.say_hello() with job-id '"
suffix = "'.\n"
self.assertTrue(result.output.startswith(prefix))
self.assertTrue(result.output.endswith(suffix))
@ -508,7 +508,7 @@ class TestRQCli(CLITestCase):
args, kwargs = Job(job_id, connection=self.connection).result
self.assertEqual(args, ('hello', [1, {'key': 'value'}], {"test": True}, (1, 2)))
self.assertEqual(args, ('hello', [1, {'key': 'value'}], {'test': True}, (1, 2)))
self.assertEqual(kwargs, {'json': [3.0, True], 'nojson': 'abc', 'file': '{"test": true}\n'})
def test_cli_enqueue_schedule_in(self):
@ -653,7 +653,7 @@ class TestRQCli(CLITestCase):
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'key=value', 'key=value'])
self.assertNotEqual(result.exit_code, 0)
self.assertIn('You can\'t specify multiple values for the same keyword.', result.output)
self.assertIn("You can't specify multiple values for the same keyword.", result.output)
result = runner.invoke(
main,
@ -669,7 +669,7 @@ class TestRQCli(CLITestCase):
],
)
self.assertNotEqual(result.exit_code, 0)
self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output)
self.assertIn("You can't specify both --schedule-in and --schedule-at", result.output)
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '@not_existing_file'])
self.assertNotEqual(result.exit_code, 0)
@ -763,7 +763,7 @@ class TestRQCli(CLITestCase):
)
self.assert_normal_execution(result)
job = Job.fetch(id, connection=self.connection)
self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}}))
self.assertEqual((job.args, job.kwargs), ([], {'key': {'foo': True}}))
id = str(uuid4())
result = runner.invoke(

View File

@ -45,7 +45,7 @@ class TestCommands(RQTestCase):
assert worker.pubsub_thread.is_alive()
for client in connection.client_list():
connection.client_kill(client["addr"])
connection.client_kill(client['addr'])
time.sleep(0.0) # Allow other threads to run
assert worker.pubsub_thread.is_alive()
@ -55,7 +55,7 @@ class TestCommands(RQTestCase):
connection = self.connection
worker = Worker('foo', connection=connection)
with mock.patch("redis.client.PubSub.get_message", new_callable=raise_exc_mock):
with mock.patch('redis.client.PubSub.get_message', new_callable=raise_exc_mock):
worker.subscribe()
worker.pubsub_thread.join()

View File

@ -5,7 +5,6 @@ from tests import RQTestCase
class TestConnectionInheritance(RQTestCase):
def test_parse_connection(self):
"""Test parsing the connection"""
conn_class, pool_class, pool_kwargs = parse_connection(Redis(ssl=True))
@ -17,4 +16,4 @@ class TestConnectionInheritance(RQTestCase):
conn_class, pool_class, pool_kwargs = parse_connection(Redis(connection_pool=pool))
self.assertEqual(conn_class, Redis)
self.assertEqual(pool_class, UnixDomainSocketConnection)
self.assertEqual(pool_kwargs, {"path": path})
self.assertEqual(pool_kwargs, {'path': path})

View File

@ -104,7 +104,7 @@ class TestDependencies(RQTestCase):
q.enqueue(say_hello, job_id='fake_job_id_2', depends_on=Dependency(jobs=[parent_job], enqueue_at_front=True))
w.work(burst=True, max_jobs=1)
self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
self.assertEqual(q.job_ids, ['fake_job_id_2', 'fake_job_id_1'])
def test_multiple_jobs_with_dependencies(self):
"""Enqueue dependent jobs only when appropriate"""
@ -205,9 +205,9 @@ class TestDependencies(RQTestCase):
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.connection)
queue.empty()
queue.enqueue(say_hello, job_id="A")
queue.enqueue(say_hello, job_id="B")
job_c = queue.enqueue(check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
queue.enqueue(say_hello, job_id='A')
queue.enqueue(say_hello, job_id='B')
job_c = queue.enqueue(check_dependencies_are_met, job_id='C', depends_on=['A', 'B'])
job_c.dependencies_are_met()
w = Worker([queue], connection=self.connection)

View File

@ -24,8 +24,8 @@ class TestGroup(RQTestCase):
q.empty()
def test_group_repr(self):
group = Group.create(name="foo", connection=self.connection)
assert group.__repr__() == "Group(id=foo)"
group = Group.create(name='foo', connection=self.connection)
assert group.__repr__() == 'Group(id=foo)'
def test_group_jobs(self):
q = Queue(connection=self.connection)
@ -84,7 +84,7 @@ class TestGroup(RQTestCase):
q = Queue(connection=self.connection)
group = Group.create(connection=self.connection)
group.enqueue_many(q, [self.job_1_data])
redis_groups = {as_text(group) for group in self.connection.smembers("rq:groups")}
redis_groups = {as_text(group) for group in self.connection.smembers('rq:groups')}
assert group.name in redis_groups
q.empty()
@ -113,7 +113,7 @@ class TestGroup(RQTestCase):
w.work(burst=True, max_jobs=1)
sleep(2)
w.run_maintenance_tasks()
redis_groups = {as_text(group) for group in self.connection.smembers("rq:groups")}
redis_groups = {as_text(group) for group in self.connection.smembers('rq:groups')}
assert group.name not in redis_groups
@pytest.mark.slow
@ -130,18 +130,18 @@ class TestGroup(RQTestCase):
q.empty()
def test_get_group_key(self):
group = Group(name="foo", connection=self.connection)
self.assertEqual(Group.get_key(group.name), "rq:group:foo")
group = Group(name='foo', connection=self.connection)
self.assertEqual(Group.get_key(group.name), 'rq:group:foo')
def test_all_returns_all_groups(self):
q = Queue(connection=self.connection)
group1 = Group.create(name="group1", connection=self.connection)
Group.create(name="group2", connection=self.connection)
group1 = Group.create(name='group1', connection=self.connection)
Group.create(name='group2', connection=self.connection)
group1.enqueue_many(q, [self.job_1_data, self.job_2_data])
all_groups = Group.all(self.connection)
assert len(all_groups) == 1
assert "group1" in [group.name for group in all_groups]
assert "group2" not in [group.name for group in all_groups]
assert 'group1' in [group.name for group in all_groups]
assert 'group2' not in [group.name for group in all_groups]
def test_all_deletes_missing_groups(self):
q = Queue(connection=self.connection)

View File

@ -14,7 +14,6 @@ from tests.fixtures import say_hello
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
class TestIntermediateQueue(RQTestCase):
def test_set_first_seen(self):
"""Ensure that the first_seen attribute is set correctly."""
queue = Queue('foo', connection=self.connection)

View File

@ -32,8 +32,8 @@ class TestJob(RQTestCase):
"""Unicode in job description [issue405]"""
job = Job.create(
'myfunc',
args=[12, ""],
kwargs=dict(snowman="", null=None),
args=[12, ''],
kwargs=dict(snowman='', null=None),
connection=self.connection,
)
self.assertEqual(
@ -49,7 +49,7 @@ class TestJob(RQTestCase):
# Jobs have a random UUID and a creation date
self.assertIsNotNone(job.id)
self.assertIsNotNone(job.created_at)
self.assertEqual(str(job), "<Job %s: test job>" % job.id)
self.assertEqual(str(job), '<Job %s: test job>' % job.id)
# ...and nothing else
self.assertEqual(job.origin, '')
@ -70,8 +70,8 @@ class TestJob(RQTestCase):
def test_create_param_errors(self):
"""Creation of jobs may result in errors"""
self.assertRaises(TypeError, Job.create, fixtures.say_hello, args="string")
self.assertRaises(TypeError, Job.create, fixtures.say_hello, kwargs="string")
self.assertRaises(TypeError, Job.create, fixtures.say_hello, args='string')
self.assertRaises(TypeError, Job.create, fixtures.say_hello, kwargs='string')
self.assertRaises(TypeError, Job.create, func=42)
def test_create_typical_job(self):
@ -507,8 +507,8 @@ class TestJob(RQTestCase):
def test_multiple_dependencies_are_accepted_and_persisted(self):
"""Ensure job._dependency_ids accepts different input formats, and
is set and restored properly"""
job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id="A", connection=self.connection)
job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id="B", connection=self.connection)
job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id='A', connection=self.connection)
job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id='B', connection=self.connection)
# No dependencies
job = Job.create(func=fixtures.say_hello, connection=self.connection)
@ -518,22 +518,22 @@ class TestJob(RQTestCase):
# Various ways of specifying dependencies
cases = [
["A", ["A"]],
[job_A, ["A"]],
[["A", "B"], ["A", "B"]],
[[job_A, job_B], ["A", "B"]],
[["A", job_B], ["A", "B"]],
[("A", "B"), ["A", "B"]],
[(job_A, job_B), ["A", "B"]],
[(job_A, "B"), ["A", "B"]],
[Dependency("A"), ["A"]],
[Dependency(job_A), ["A"]],
[Dependency(["A", "B"]), ["A", "B"]],
[Dependency([job_A, job_B]), ["A", "B"]],
[Dependency(["A", job_B]), ["A", "B"]],
[Dependency(("A", "B")), ["A", "B"]],
[Dependency((job_A, job_B)), ["A", "B"]],
[Dependency((job_A, "B")), ["A", "B"]],
['A', ['A']],
[job_A, ['A']],
[['A', 'B'], ['A', 'B']],
[[job_A, job_B], ['A', 'B']],
[['A', job_B], ['A', 'B']],
[('A', 'B'), ['A', 'B']],
[(job_A, job_B), ['A', 'B']],
[(job_A, 'B'), ['A', 'B']],
[Dependency('A'), ['A']],
[Dependency(job_A), ['A']],
[Dependency(['A', 'B']), ['A', 'B']],
[Dependency([job_A, job_B]), ['A', 'B']],
[Dependency(['A', job_B]), ['A', 'B']],
[Dependency(('A', 'B')), ['A', 'B']],
[Dependency((job_A, job_B)), ['A', 'B']],
[Dependency((job_A, 'B')), ['A', 'B']],
]
for given, expected in cases:
job = Job.create(func=fixtures.say_hello, depends_on=given, connection=self.connection)
@ -546,10 +546,10 @@ class TestJob(RQTestCase):
job = Job.create(func=fixtures.say_hello, connection=self.connection)
job.save()
with self.connection.pipeline() as pipeline:
job.prepare_for_execution("worker_name", pipeline)
job.prepare_for_execution('worker_name', pipeline)
pipeline.execute()
job.refresh()
self.assertEqual(job.worker_name, "worker_name")
self.assertEqual(job.worker_name, 'worker_name')
self.assertEqual(job.get_status(), JobStatus.STARTED)
self.assertIsNotNone(job.last_heartbeat)
self.assertIsNotNone(job.started_at)
@ -894,8 +894,8 @@ class TestJob(RQTestCase):
def test_create_job_with_id(self):
"""test creating jobs with a custom ID"""
queue = Queue(connection=self.connection)
job = queue.enqueue(fixtures.say_hello, job_id="1234")
self.assertEqual(job.id, "1234")
job = queue.enqueue(fixtures.say_hello, job_id='1234')
self.assertEqual(job.id, '1234')
job.perform()
self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234)
@ -905,17 +905,17 @@ class TestJob(RQTestCase):
queue = Queue(connection=self.connection)
with self.assertRaises(ValueError):
queue.enqueue(fixtures.say_hello, job_id="1234:4321")
queue.enqueue(fixtures.say_hello, job_id='1234:4321')
def test_create_job_with_async(self):
"""test creating jobs with async function"""
queue = Queue(connection=self.connection)
async_job = queue.enqueue(fixtures.say_hello_async, job_id="async_job")
sync_job = queue.enqueue(fixtures.say_hello, job_id="sync_job")
async_job = queue.enqueue(fixtures.say_hello_async, job_id='async_job')
sync_job = queue.enqueue(fixtures.say_hello, job_id='sync_job')
self.assertEqual(async_job.id, "async_job")
self.assertEqual(sync_job.id, "sync_job")
self.assertEqual(async_job.id, 'async_job')
self.assertEqual(sync_job.id, 'sync_job')
async_task_result = async_job.perform()
sync_task_result = sync_job.perform()
@ -941,14 +941,14 @@ class TestJob(RQTestCase):
def test_create_job_with_ttl_should_have_ttl_after_enqueued(self):
"""test creating jobs with ttl and checks if get_jobs returns it properly [issue502]"""
queue = Queue(connection=self.connection)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=10)
queue.enqueue(fixtures.say_hello, job_id='1234', ttl=10)
job = queue.get_jobs()[0]
self.assertEqual(job.ttl, 10)
def test_create_job_with_ttl_should_expire(self):
"""test if a job created with ttl expires [issue502]"""
queue = Queue(connection=self.connection)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
queue.enqueue(fixtures.say_hello, job_id='1234', ttl=1)
time.sleep(1.1)
self.assertEqual(0, len(queue.get_jobs()))
@ -1102,7 +1102,7 @@ class TestJob(RQTestCase):
def test_dependencies_key_should_have_prefixed_job_id(self):
job_id = 'random'
job = Job(id=job_id, connection=self.connection)
expected_key = Job.redis_job_namespace_prefix + ":" + job_id + ':dependencies'
expected_key = Job.redis_job_namespace_prefix + ':' + job_id + ':dependencies'
assert job.dependencies_key == expected_key
@ -1247,68 +1247,68 @@ class TestJob(RQTestCase):
connection_kwargs = self.connection.connection_pool.connection_kwargs
# When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued.
# Worker 1 will be busy with the slow job, so worker 2 will complete both fast jobs.
job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", connection_kwargs, True, 0.5], job_id='slow_job')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", connection_kwargs, True])
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True])
job_slow = queue.enqueue(fixtures.rpush, args=[key, 'slow', connection_kwargs, True, 0.5], job_id='slow_job')
job_A = queue.enqueue(fixtures.rpush, args=[key, 'A', connection_kwargs, True])
job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True])
fixtures.burst_two_workers(queue, connection=self.connection)
time.sleep(0.75)
jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 2)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B]))
self.assertEqual(jobs_completed, ["A:w2", "B:w2", "slow:w1"])
self.assertEqual(jobs_completed, ['A:w2', 'B:w2', 'slow:w1'])
self.connection.delete(key)
# When job "A" depends on the slow job, then job "B" finishes before "A".
# There is no clear requirement on which worker should take job "A", so we stay silent on that.
job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", connection_kwargs, True, 0.5], job_id='slow_job')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", connection_kwargs, False], depends_on='slow_job')
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True])
job_slow = queue.enqueue(fixtures.rpush, args=[key, 'slow', connection_kwargs, True, 0.5], job_id='slow_job')
job_A = queue.enqueue(fixtures.rpush, args=[key, 'A', connection_kwargs, False], depends_on='slow_job')
job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True])
fixtures.burst_two_workers(queue, connection=self.connection)
time.sleep(0.75)
jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 2)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B]))
self.assertEqual(jobs_completed, ["B:w2", "slow:w1", "A"])
self.assertEqual(jobs_completed, ['B:w2', 'slow:w1', 'A'])
def test_execution_order_with_dual_dependency(self):
queue = Queue(connection=self.connection)
key = 'test_job:job_order'
connection_kwargs = self.connection.connection_pool.connection_kwargs
# When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued.
job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", connection_kwargs, True, 0.5], job_id='slow_1')
job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", connection_kwargs, True, 0.75], job_id='slow_2')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", connection_kwargs, True])
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True])
job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, 'slow_1', connection_kwargs, True, 0.5], job_id='slow_1')
job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, 'slow_2', connection_kwargs, True, 0.75], job_id='slow_2')
job_A = queue.enqueue(fixtures.rpush, args=[key, 'A', connection_kwargs, True])
job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True])
fixtures.burst_two_workers(queue, connection=self.connection)
time.sleep(1)
jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 3)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
self.assertEqual(jobs_completed, ["slow_1:w1", "A:w1", "B:w1", "slow_2:w2"])
self.assertEqual(jobs_completed, ['slow_1:w1', 'A:w1', 'B:w1', 'slow_2:w2'])
self.connection.delete(key)
# This time job "A" depends on two slow jobs, while job "B" depends only on the faster of
# the two. Job "B" should be completed before job "A".
# There is no clear requirement on which worker should take job "A", so we stay silent on that.
job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", connection_kwargs, True, 0.5], job_id='slow_1')
job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", connection_kwargs, True, 0.75], job_id='slow_2')
job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, 'slow_1', connection_kwargs, True, 0.5], job_id='slow_1')
job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, 'slow_2', connection_kwargs, True, 0.75], job_id='slow_2')
job_A = queue.enqueue(
fixtures.rpush, args=[key, "A", connection_kwargs, False], depends_on=['slow_1', 'slow_2']
fixtures.rpush, args=[key, 'A', connection_kwargs, False], depends_on=['slow_1', 'slow_2']
)
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", connection_kwargs, True], depends_on=['slow_1'])
job_B = queue.enqueue(fixtures.rpush, args=[key, 'B', connection_kwargs, True], depends_on=['slow_1'])
fixtures.burst_two_workers(queue, connection=self.connection)
time.sleep(1)
jobs_completed = [v.decode() for v in self.connection.lrange(key, 0, 3)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"])
self.assertEqual(jobs_completed, ['slow_1:w1', 'B:w1', 'slow_2:w2', 'A'])
@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0')
def test_blocking_result_fetch(self):
# Ensure blocking waits for the time to run the job, but not right up until the timeout.
job_sleep_seconds = 2
block_seconds = 5
queue_name = "test_blocking_queue"
queue_name = 'test_blocking_queue'
q = Queue(queue_name, connection=self.connection)
job = q.enqueue(fixtures.long_running_job, job_sleep_seconds)
started_at = time.time()

View File

@ -143,7 +143,7 @@ class TestQueue(RQTestCase):
self.assertEqual(0, q.get_job_position(job.id))
self.assertEqual(1, q.get_job_position(job2.id))
self.assertEqual(2, q.get_job_position(job3))
self.assertEqual(None, q.get_job_position("no_real_job"))
self.assertEqual(None, q.get_job_position('no_real_job'))
def test_remove(self):
"""Ensure queue.remove properly removes Job from queue."""
@ -516,8 +516,8 @@ class TestQueue(RQTestCase):
def test_enqueue_dependents_on_multiple_queues(self):
"""Enqueueing dependent jobs on multiple queues pushes jobs in the queues
and removes them from DeferredJobRegistry for each different queue."""
q_1 = Queue("queue_1", connection=self.connection)
q_2 = Queue("queue_2", connection=self.connection)
q_1 = Queue('queue_1', connection=self.connection)
q_2 = Queue('queue_2', connection=self.connection)
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
job_1 = q_1.enqueue(say_hello, depends_on=parent_job)

View File

@ -433,7 +433,7 @@ class TestDeferredRegistry(RQTestCase):
self.registry.add(job)
score = self.connection.zscore(key, job.id)
self.assertEqual(score, float("inf"))
self.assertEqual(score, float('inf'))
timestamp = current_timestamp()
ttl = 5

View File

@ -484,7 +484,7 @@ class TestQueue(RQTestCase):
connection_pool=redis.ConnectionPool(
connection_class=CustomRedisConnection,
db=4,
custom_arg="foo",
custom_arg='foo',
)
)
@ -494,7 +494,7 @@ class TestQueue(RQTestCase):
scheduler_connection = scheduler.connection.connection_pool.get_connection('info')
self.assertEqual(scheduler_connection.__class__, CustomRedisConnection)
self.assertEqual(scheduler_connection.get_custom_arg(), "foo")
self.assertEqual(scheduler_connection.get_custom_arg(), 'foo')
def test_no_custom_connection_pool(self):
"""Connection pool customizing must not interfere if we're using a standard

View File

@ -41,7 +41,7 @@ class TestTimeouts(RQTestCase):
w.work(burst=True)
self.assertIn(job, failed_job_registry)
job.refresh()
self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)
self.assertIn('rq.timeouts.JobTimeoutException', job.exc_info)
# Test negative timeout doesn't raise JobTimeoutException,
# which implies an unintended immediate timeout.

View File

@ -167,23 +167,23 @@ class TestUtils(RQTestCase):
def test_truncate_long_string(self):
"""Ensure truncate_long_string works properly"""
assert truncate_long_string("12", max_length=3) == "12"
assert truncate_long_string("123", max_length=3) == "123"
assert truncate_long_string("1234", max_length=3) == "123..."
assert truncate_long_string("12345", max_length=3) == "123..."
assert truncate_long_string('12', max_length=3) == '12'
assert truncate_long_string('123', max_length=3) == '123'
assert truncate_long_string('1234', max_length=3) == '123...'
assert truncate_long_string('12345', max_length=3) == '123...'
s = "long string but no max_length provided so no truncating should occur" * 10
s = 'long string but no max_length provided so no truncating should occur' * 10
assert truncate_long_string(s) == s
def test_get_call_string(self):
"""Ensure a case, when func_name, args and kwargs are not None, works properly"""
cs = get_call_string("f", ('some', 'args', 42), {"key1": "value1", "key2": True})
cs = get_call_string('f', ('some', 'args', 42), {'key1': 'value1', 'key2': True})
assert cs == "f('some', 'args', 42, key1='value1', key2=True)"
def test_get_call_string_with_max_length(self):
"""Ensure get_call_string works properly when max_length is provided"""
func_name = "f"
func_name = 'f'
args = (1234, 12345, 123456)
kwargs = {"len4": 1234, "len5": 12345, "len6": 123456}
kwargs = {'len4': 1234, 'len5': 12345, 'len6': 123456}
cs = get_call_string(func_name, args, kwargs, max_length=5)
assert cs == "f(1234, 12345, 12345..., len4=1234, len5=12345, len6=12345...)"
assert cs == 'f(1234, 12345, 12345..., len4=1234, len5=12345, len6=12345...)'

View File

@ -281,7 +281,7 @@ class TestWorker(RQTestCase):
w.perform_job(job, queue)
# An exception should be logged here at ERROR level
self.assertIn("Traceback", mock_logger_error.call_args[0][3])
self.assertIn('Traceback', mock_logger_error.call_args[0][3])
def test_heartbeat(self):
"""Heartbeat saves last_heartbeat"""
@ -642,10 +642,10 @@ class TestWorker(RQTestCase):
"""Cancel job and verify that when the parent job is finished,
the dependent job is not started."""
q = Queue("low", connection=self.connection)
parent_job = q.enqueue(long_running_job, 5, job_id="parent_job")
job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1")
job2 = q.enqueue(say_hello, depends_on=job, job_id="job2")
q = Queue('low', connection=self.connection)
parent_job = q.enqueue(long_running_job, 5, job_id='parent_job')
job = q.enqueue(say_hello, depends_on=parent_job, job_id='job1')
job2 = q.enqueue(say_hello, depends_on=job, job_id='job2')
job.cancel()
w = Worker([q])
@ -663,11 +663,11 @@ class TestWorker(RQTestCase):
def test_cancel_job_enqueue_dependent(self):
"""Cancel a job in a chain and enqueue the dependent jobs."""
q = Queue("low", connection=self.connection)
parent_job = q.enqueue(long_running_job, 5, job_id="parent_job")
job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1")
job2 = q.enqueue(say_hello, depends_on=job, job_id="job2")
job3 = q.enqueue(say_hello, depends_on=job2, job_id="job3")
q = Queue('low', connection=self.connection)
parent_job = q.enqueue(long_running_job, 5, job_id='parent_job')
job = q.enqueue(say_hello, depends_on=parent_job, job_id='job1')
job2 = q.enqueue(say_hello, depends_on=job, job_id='job2')
job3 = q.enqueue(say_hello, depends_on=job2, job_id='job3')
job.cancel(enqueue_dependents=True)
@ -1019,9 +1019,9 @@ class TestWorker(RQTestCase):
def test_worker_hash_(self):
"""Workers are hashed by their .name attribute"""
q = Queue('foo', connection=self.connection)
w1 = Worker([q], name="worker1")
w2 = Worker([q], name="worker2")
w3 = Worker([q], name="worker1")
w1 = Worker([q], name='worker1')
w2 = Worker([q], name='worker2')
w3 = Worker([q], name='worker1')
worker_set = set([w1, w2, w3])
self.assertEqual(len(worker_set), 2)
@ -1200,7 +1200,7 @@ class TestWorker(RQTestCase):
w = Worker([q], connection=self.connection)
q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.dequeue_job_and_maintain_ttl(10)
self.assertIn("Frank", mock_logger_info.call_args[0][2])
self.assertIn('Frank', mock_logger_info.call_args[0][2])
@mock.patch('rq.worker.logger.info')
def test_log_job_description_false(self, mock_logger_info):
@ -1209,14 +1209,14 @@ class TestWorker(RQTestCase):
w = Worker([q], log_job_description=False, connection=self.connection)
q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.dequeue_job_and_maintain_ttl(10)
self.assertNotIn("Frank", mock_logger_info.call_args[0][2])
self.assertNotIn('Frank', mock_logger_info.call_args[0][2])
def test_worker_configures_socket_timeout(self):
"""Ensures that the worker correctly updates Redis client connection to have a socket_timeout"""
q = Queue(connection=self.connection)
_ = Worker([q], connection=self.connection)
connection_kwargs = q.connection.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs["socket_timeout"], 415)
self.assertEqual(connection_kwargs['socket_timeout'], 415)
def test_worker_version(self):
q = Queue(connection=self.connection)
@ -1255,7 +1255,7 @@ class TestWorker(RQTestCase):
qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
w = Worker(qs, connection=self.connection)
w.work(burst=True, dequeue_strategy="random")
w.work(burst=True, dequeue_strategy='random')
start_times = []
for i in range(5):
@ -1299,7 +1299,7 @@ class TestWorker(RQTestCase):
qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
w = Worker(qs)
w.work(burst=True, dequeue_strategy="round_robin")
w.work(burst=True, dequeue_strategy='round_robin')
start_times = []
for i in range(5):
@ -1598,14 +1598,13 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
class TestExceptionHandlerMessageEncoding(RQTestCase):
def test_handle_exception_handles_non_ascii_in_exception_message(self):
"""worker.handle_exception doesn't crash on non-ascii in exception message."""
worker = Worker("foo", connection=self.connection)
worker = Worker('foo', connection=self.connection)
worker._exc_handlers = []
# Mimic how exception info is actually passed forwards
try:
raise Exception(u"💪")
raise Exception('💪')
except Exception:
exc_info = sys.exc_info()
worker.handle_exception(Mock(), *exc_info)

View File

@ -14,10 +14,8 @@ passenv=
; [testenv:lint]
; basepython = python3.10
; deps =
; black
; ruff
; commands =
; black --check rq tests
; ruff check rq tests
[testenv:py37]