Cleaned up cli.py (#1876)

This commit is contained in:
Selwin Ong 2023-04-08 18:55:28 +07:00 committed by GitHub
parent ed7a171460
commit b639ee6406
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 80 deletions

View File

@ -2,7 +2,6 @@
RQ command line tool
"""
from functools import update_wrapper
import os
import sys
import warnings
@ -18,22 +17,18 @@ from rq.cli.helpers import (
show_both,
show_queues,
show_workers,
CliConfig,
parse_function_args,
parse_schedule,
pass_cli_config,
)
from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (
DEFAULT_CONNECTION_CLASS,
DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS,
DEFAULT_WORKER_CLASS,
DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL,
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT,
DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL,
DEFAULT_MAINTENANCE_TASK_INTERVAL,
)
from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries
@ -45,50 +40,6 @@ from rq.job import JobStatus
blue = make_colorizer('darkblue')
# Disable the warning that Click displays (as of Click version 5.0) when users
# use unicode_literals in Python 2.
# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
click.disable_unicode_literals_warning = True
shared_options = [
click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'),
click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'),
click.option(
'--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use'
),
click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'),
click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'),
click.option(
'--connection-class',
envvar='RQ_CONNECTION_CLASS',
default=DEFAULT_CONNECTION_CLASS,
help='Redis client class to use',
),
click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True),
click.option(
'--serializer',
'-S',
default=DEFAULT_SERIALIZER_CLASS,
help='Path to serializer, defaults to rq.serializers.DefaultSerializer',
),
]
def pass_cli_config(func):
# add all the shared options to the command
for option in shared_options:
func = option(func)
# pass the cli config object into the command
def wrapper(*args, **kwargs):
ctx = click.get_current_context()
cli_config = CliConfig(**kwargs)
return ctx.invoke(func, cli_config, *args[1:], **kwargs)
return update_wrapper(wrapper, func)
@click.group()
@click.version_option(version)
def main():
@ -105,8 +56,10 @@ def empty(cli_config, all, queues, serializer, **options):
if all:
queues = cli_config.queue_class.all(
connection=cli_config.connection, job_class=cli_config.job_class,
death_penalty_class=cli_config.death_penalty_class, serializer=serializer
connection=cli_config.connection,
job_class=cli_config.job_class,
death_penalty_class=cli_config.death_penalty_class,
serializer=serializer,
)
else:
queues = [
@ -206,7 +159,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
'--maintenance-interval',
type=int,
default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
help='Maintenance task interval (in seconds) to be used'
help='Maintenance task interval (in seconds) to be used',
)
@click.option(
'--job-monitoring-interval',
@ -227,7 +180,9 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues')
@click.option(
'--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues'
)
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(
@ -274,12 +229,14 @@ def worker(
worker_name = cli_config.worker_class.__qualname__
if worker_name in ["RoundRobinWorker", "RandomWorker"]:
strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy."
msg = f"WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead."
warnings.warn(msg, DeprecationWarning)
click.secho(msg, fg='yellow')
if dequeue_strategy not in ("default", "random", "round_robin"):
click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red')
click.secho(
"ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red'
)
sys.exit(1)
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
@ -313,7 +270,7 @@ def worker(
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging,
serializer=serializer
serializer=serializer,
)
# Should we configure Sentry?
@ -335,7 +292,7 @@ def worker(
max_jobs=max_jobs,
max_idle_time=max_idle_time,
with_scheduler=with_scheduler,
dequeue_strategy=dequeue_strategy
dequeue_strategy=dequeue_strategy,
)
except ConnectionError as e:
print(e)

View File

@ -2,7 +2,8 @@ import sys
import importlib
import time
import os
from functools import partial
from functools import partial, update_wrapper
from enum import Enum
from datetime import datetime, timezone, timedelta
@ -13,8 +14,15 @@ from shutil import get_terminal_size
import click
from redis import Redis
from redis.sentinel import Sentinel
from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, \
DEFAULT_DEATH_PENALTY_CLASS
from rq.defaults import (
DEFAULT_CONNECTION_CLASS,
DEFAULT_DEATH_PENALTY_CLASS,
DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS,
DEFAULT_WORKER_CLASS,
DEFAULT_SERIALIZER_CLASS,
)
from rq.logutils import setup_loghandlers
from rq.utils import import_attribute, parse_timeout
from rq.worker import WorkerStatus
@ -42,7 +50,7 @@ def get_redis_from_config(settings, connection_class=Redis):
elif settings.get('SENTINEL') is not None:
instances = settings['SENTINEL'].get('INSTANCES', [('localhost', 26379)])
master_name = settings['SENTINEL'].get('MASTER_NAME', 'mymaster')
connection_kwargs = {
'db': settings['SENTINEL'].get('DB', 0),
'username': settings['SENTINEL'].get('USERNAME', None),
@ -52,10 +60,8 @@ def get_redis_from_config(settings, connection_class=Redis):
}
connection_kwargs.update(settings['SENTINEL'].get('CONNECTION_KWARGS', {}))
sentinel_kwargs = settings['SENTINEL'].get('SENTINEL_KWARGS', {})
sn = Sentinel(
instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs
)
sn = Sentinel(instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
return sn.master_for(master_name)
ssl = settings.get('REDIS_SSL', False)
@ -124,13 +130,22 @@ def show_queues(queues, raw, by_queue, queue_class, worker_class):
count = counts[q]
if not raw:
chart = green('|' + '' * int(ratio * count))
line = '%-12s %s %d, %d executing, %d finished, %d failed' \
% (q.name, chart, count, q.started_job_registry.count, \
q.finished_job_registry.count, q.failed_job_registry.count)
line = '%-12s %s %d, %d executing, %d finished, %d failed' % (
q.name,
chart,
count,
q.started_job_registry.count,
q.finished_job_registry.count,
q.failed_job_registry.count,
)
else:
line = 'queue %s %d, %d executing, %d finished, %d failed' \
% (q.name, count, q.started_job_registry.count, \
q.finished_job_registry.count, q.failed_job_registry.count)
line = 'queue %s %d, %d executing, %d finished, %d failed' % (
q.name,
count,
q.started_job_registry.count,
q.finished_job_registry.count,
q.failed_job_registry.count,
)
click.echo(line)
num_jobs += count
@ -155,14 +170,22 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid)
if not raw:
line = '%s: %s %s. jobs: %d finished, %d failed' \
% (name, state_symbol(worker.get_state()), queue_names, \
worker.successful_job_count, worker.failed_job_count)
line = '%s: %s %s. jobs: %d finished, %d failed' % (
name,
state_symbol(worker.get_state()),
queue_names,
worker.successful_job_count,
worker.failed_job_count,
)
click.echo(line)
else:
line = 'worker %s %s %s. jobs: %d finished, %d failed' \
% (name, worker.get_state(), queue_names,\
worker.successful_job_count, worker.failed_job_count)
line = 'worker %s %s %s. jobs: %d finished, %d failed' % (
name,
worker.get_state(),
queue_names,
worker.successful_job_count,
worker.failed_job_count,
)
click.echo(line)
else:
@ -318,7 +341,7 @@ class CliConfig:
connection_class=DEFAULT_CONNECTION_CLASS,
path=None,
*args,
**kwargs
**kwargs,
):
self._connection = None
self.url = url
@ -363,3 +386,41 @@ class CliConfig:
else:
self._connection = get_redis_from_config(os.environ, self.connection_class)
return self._connection
shared_options = [
click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'),
click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'),
click.option(
'--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use'
),
click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'),
click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'),
click.option(
'--connection-class',
envvar='RQ_CONNECTION_CLASS',
default=DEFAULT_CONNECTION_CLASS,
help='Redis client class to use',
),
click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True),
click.option(
'--serializer',
'-S',
default=DEFAULT_SERIALIZER_CLASS,
help='Path to serializer, defaults to rq.serializers.DefaultSerializer',
),
]
def pass_cli_config(func):
# add all the shared options to the command
for option in shared_options:
func = option(func)
# pass the cli config object into the command
def wrapper(*args, **kwargs):
ctx = click.get_current_context()
cli_config = CliConfig(**kwargs)
return ctx.invoke(func, cli_config, *args[1:], **kwargs)
return update_wrapper(wrapper, func)