import json import os from datetime import datetime, timedelta, timezone from time import sleep from uuid import uuid4 import pytest from click.testing import CliRunner from redis import Redis from rq import Queue from rq.cli import main from rq.cli.helpers import CliConfig, parse_function_arg, parse_schedule, read_config_file from rq.job import Job, JobStatus from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.scheduler import RQScheduler from rq.serializers import JSONSerializer from rq.timeouts import UnixSignalDeathPenalty from rq.worker import Worker, WorkerStatus from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello class CLITestCase(RQTestCase): def setUp(self): super().setUp() db_num = self.connection.connection_pool.connection_kwargs['db'] self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num self.connection = Redis.from_url(self.redis_url) def assert_normal_execution(self, result): if result.exit_code == 0: return True else: print('Non normal execution') print('Exit Code: {}'.format(result.exit_code)) print('Output: {}'.format(result.output)) print('Exception: {}'.format(result.exception)) self.assertEqual(result.exit_code, 0) class TestRQCli(CLITestCase): @pytest.fixture(autouse=True) def set_tmpdir(self, tmpdir): self.tmpdir = tmpdir def assert_normal_execution(self, result): if result.exit_code == 0: return True else: print('Non normal execution') print('Exit Code: {}'.format(result.exit_code)) print('Output: {}'.format(result.output)) print('Exception: {}'.format(result.exception)) self.assertEqual(result.exit_code, 0) """Test rq_cli script""" def setUp(self): super().setUp() job = Job.create(func=div_by_zero, args=(1, 2, 3), connection=self.connection) job.origin = 'fake' job.save() def test_config_file(self): settings = read_config_file('tests.config_files.dummy') self.assertIn('REDIS_HOST', settings) self.assertEqual(settings['REDIS_HOST'], 'testhost.example.com') def test_config_file_logging(self): runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '-c', 'tests.config_files.dummy_logging']) self.assert_normal_execution(result) def test_config_file_option(self): """""" cli_config = CliConfig(config='tests.config_files.dummy') self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', ) runner = CliRunner() result = runner.invoke(main, ['info', '--config', cli_config.config]) self.assertEqual(result.exit_code, 1) def test_config_file_default_options(self): """""" cli_config = CliConfig(config='tests.config_files.dummy') self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', ) self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['port'], 6379) self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['db'], 0) self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], None) def test_config_file_default_options_override(self): """""" cli_config = CliConfig(config='tests.config_files.dummy_override') self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', ) self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['port'], 6378) self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['db'], 2) self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], '123') def test_config_env_vars(self): os.environ['REDIS_HOST'] = 'testhost.example.com' cli_config = CliConfig() self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', ) def test_death_penalty_class(self): cli_config = CliConfig() self.assertEqual(UnixSignalDeathPenalty, cli_config.death_penalty_class) cli_config = CliConfig(death_penalty_class='rq.job.Job') self.assertEqual(Job, cli_config.death_penalty_class) with self.assertRaises(ValueError): CliConfig(death_penalty_class='rq.abcd') def test_empty_nothing(self): """rq empty -u """ runner = CliRunner() result = runner.invoke(main, ['empty', '-u', self.redis_url]) self.assert_normal_execution(result) self.assertEqual(result.output.strip(), 'Nothing to do') def test_requeue(self): """rq requeue -u --all""" connection = Redis.from_url(self.redis_url) queue = Queue('requeue', connection=connection) registry = queue.failed_job_registry runner = CliRunner() job = queue.enqueue(div_by_zero) job2 = queue.enqueue(div_by_zero) job3 = queue.enqueue(div_by_zero) worker = Worker([queue], connection=connection) worker.work(burst=True) self.assertIn(job, registry) self.assertIn(job2, registry) self.assertIn(job3, registry) result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id]) self.assert_normal_execution(result) # Only the first specified job is requeued self.assertNotIn(job, registry) self.assertIn(job2, registry) self.assertIn(job3, registry) result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all']) self.assert_normal_execution(result) # With --all flag, all failed jobs are requeued self.assertNotIn(job2, registry) self.assertNotIn(job3, registry) def test_requeue_with_serializer(self): """rq requeue -u -S --all""" connection = Redis.from_url(self.redis_url) queue = Queue('requeue', connection=connection, serializer=JSONSerializer) registry = queue.failed_job_registry runner = CliRunner() job = queue.enqueue(div_by_zero) job2 = queue.enqueue(div_by_zero) job3 = queue.enqueue(div_by_zero) worker = Worker([queue], serializer=JSONSerializer, connection=connection) worker.work(burst=True) self.assertIn(job, registry) self.assertIn(job2, registry) self.assertIn(job3, registry) result = runner.invoke( main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id] ) self.assert_normal_execution(result) # Only the first specified job is requeued self.assertNotIn(job, registry) self.assertIn(job2, registry) self.assertIn(job3, registry) result = runner.invoke( main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all'], ) self.assert_normal_execution(result) # With --all flag, all failed jobs are requeued self.assertNotIn(job2, registry) self.assertNotIn(job3, registry) def test_info(self): """rq info -u """ runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url]) self.assert_normal_execution(result) self.assertIn('0 queues, 0 jobs total', result.output) queue = Queue(connection=self.connection) queue.enqueue(say_hello) result = runner.invoke(main, ['info', '-u', self.redis_url]) self.assert_normal_execution(result) self.assertIn('1 queues, 1 jobs total', result.output) def test_info_only_queues(self): """rq info -u --only-queues (-Q)""" runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-queues']) self.assert_normal_execution(result) self.assertIn('0 queues, 0 jobs total', result.output) queue = Queue(connection=self.connection) queue.enqueue(say_hello) result = runner.invoke(main, ['info', '-u', self.redis_url]) self.assert_normal_execution(result) self.assertIn('1 queues, 1 jobs total', result.output) def test_info_only_workers(self): """rq info -u --only-workers (-W)""" runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('0 workers, 0 queue', result.output) result = runner.invoke(main, ['info', '--by-queue', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('0 workers, 0 queue', result.output) worker = Worker(['default'], connection=self.connection) worker.register_birth() result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('1 workers, 0 queues', result.output) worker.register_death() queue = Queue(connection=self.connection) queue.enqueue(say_hello) result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('0 workers, 1 queues', result.output) foo_queue = Queue(name='foo', connection=self.connection) foo_queue.enqueue(say_hello) bar_queue = Queue(name='bar', connection=self.connection) bar_queue.enqueue(say_hello) worker_1 = Worker([foo_queue, bar_queue], connection=self.connection) worker_1.register_birth() worker_2 = Worker([foo_queue, bar_queue], connection=self.connection) worker_2.register_birth() worker_2.set_state(WorkerStatus.BUSY) result = runner.invoke(main, ['info', 'foo', 'bar', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('2 workers, 2 queues', result.output) result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) # Ensure both queues' workers are shown self.assertIn('foo:', result.output) self.assertIn('bar:', result.output) self.assertIn('2 workers, 2 queues', result.output) def test_worker(self): """rq worker -u -b""" runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assert_normal_execution(result) def test_worker_pid(self): """rq worker -u /tmp/..""" pid = self.tmpdir.join('rq.pid') runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--pid', str(pid)]) self.assertTrue(len(pid.read()) > 0) self.assert_normal_execution(result) def test_worker_with_scheduler(self): """rq worker -u --with-scheduler""" queue = Queue(connection=self.connection) queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello) registry = ScheduledJobRegistry(queue=queue) runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assert_normal_execution(result) self.assertEqual(len(registry), 1) # 1 job still scheduled result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--with-scheduler']) self.assert_normal_execution(result) self.assertEqual(len(registry), 0) # Job has been enqueued def test_worker_logging_options(self): """--quiet and --verbose logging options are supported""" runner = CliRunner() args = ['worker', '-u', self.redis_url, '-b'] result = runner.invoke(main, args + ['--verbose']) self.assert_normal_execution(result) result = runner.invoke(main, args + ['--quiet']) self.assert_normal_execution(result) # --quiet and --verbose are mutually exclusive result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0) def test_worker_dequeue_strategy(self): """--quiet and --verbose logging options are supported""" runner = CliRunner() args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random'] result = runner.invoke(main, args) self.assert_normal_execution(result) args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin'] result = runner.invoke(main, args) self.assert_normal_execution(result) args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong'] result = runner.invoke(main, args) self.assertEqual(result.exit_code, 1) def test_exception_handlers(self): """rq worker -u -b --exception-handler """ connection = Redis.from_url(self.redis_url) q = Queue('default', connection=connection) runner = CliRunner() # If exception handler is not given, no custom exception handler is run job = q.enqueue(div_by_zero) runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) registry = FailedJobRegistry(queue=q) self.assertTrue(job in registry) # If disable-default-exception-handler is given, job is not moved to FailedJobRegistry job = q.enqueue(div_by_zero) runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--disable-default-exception-handler']) registry = FailedJobRegistry(queue=q) self.assertFalse(job in registry) # Both default and custom exception handler is run job = q.enqueue(div_by_zero) runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.add_meta']) registry = FailedJobRegistry(queue=q) self.assertTrue(job in registry) job.refresh() self.assertEqual(job.meta, {'foo': 1}) # Only custom exception handler is run job = q.enqueue(div_by_zero) runner.invoke( main, [ 'worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.add_meta', '--disable-default-exception-handler', ], ) registry = FailedJobRegistry(queue=q) self.assertFalse(job in registry) job.refresh() self.assertEqual(job.meta, {'foo': 1}) def test_suspend_and_resume(self): """rq suspend -u rq worker -u -b rq resume -u """ runner = CliRunner() result = runner.invoke(main, ['suspend', '-u', self.redis_url]) self.assert_normal_execution(result) result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assertEqual(result.exit_code, 1) self.assertEqual(result.output.strip(), 'RQ is currently suspended, to resume job execution run "rq resume"') result = runner.invoke(main, ['resume', '-u', self.redis_url]) self.assert_normal_execution(result) def test_suspend_with_ttl(self): """rq suspend -u --duration=2""" runner = CliRunner() result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1]) self.assert_normal_execution(result) def test_suspend_with_invalid_ttl(self): """rq suspend -u --duration=0""" runner = CliRunner() result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0]) self.assertEqual(result.exit_code, 1) self.assertIn('Duration must be an integer greater than 1', result.output) def test_serializer(self): """rq worker -u --serializer """ connection = Redis.from_url(self.redis_url) q = Queue('default', connection=connection, serializer=JSONSerializer) runner = CliRunner() job = q.enqueue(say_hello) runner.invoke(main, ['worker', '-u', self.redis_url, '--serializer rq.serializer.JSONSerializer']) self.assertIn(job.id, q.job_ids) def test_cli_enqueue(self): """rq enqueue -u tests.fixtures.say_hello""" queue = Queue(connection=self.connection) self.assertTrue(queue.is_empty()) runner = CliRunner() result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello']) self.assert_normal_execution(result) prefix = "Enqueued tests.fixtures.say_hello() with job-id '" suffix = "'.\n" self.assertTrue(result.output.startswith(prefix)) self.assertTrue(result.output.endswith(suffix)) job_id = result.output[len(prefix) : -len(suffix)] queue_key = 'rq:queue:default' self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) worker = Worker(queue, connection=self.connection) worker.work(True) self.assertEqual(Job(job_id, connection=self.connection).result, 'Hi there, Stranger!') def test_cli_enqueue_with_serializer(self): """rq enqueue -u -S rq.serializers.JSONSerializer tests.fixtures.say_hello""" queue = Queue(connection=self.connection, serializer=JSONSerializer) self.assertTrue(queue.is_empty()) runner = CliRunner() result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello'] ) self.assert_normal_execution(result) prefix = "Enqueued tests.fixtures.say_hello() with job-id '" suffix = "'.\n" self.assertTrue(result.output.startswith(prefix)) self.assertTrue(result.output.endswith(suffix)) job_id = result.output[len(prefix) : -len(suffix)] queue_key = 'rq:queue:default' self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) worker = Worker(queue, serializer=JSONSerializer, connection=self.connection) worker.work(True) self.assertEqual( Job(job_id, serializer=JSONSerializer, connection=self.connection).result, 'Hi there, Stranger!' ) def test_cli_enqueue_args(self): """rq enqueue -u tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def""" queue = Queue(connection=self.connection) self.assertTrue(queue.is_empty()) runner = CliRunner() result = runner.invoke( main, [ 'enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'hello', ':[1, {"key": "value"}]', ':@tests/test.json', '%1, 2', 'json:=[3.0, true]', 'nojson=abc', 'file=@tests/test.json', ], ) self.assert_normal_execution(result) job_id = self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii') worker = Worker(queue, connection=self.connection) worker.work(True) args, kwargs = Job(job_id, connection=self.connection).result self.assertEqual(args, ('hello', [1, {'key': 'value'}], {'test': True}, (1, 2))) self.assertEqual(kwargs, {'json': [3.0, True], 'nojson': 'abc', 'file': '{"test": true}\n'}) def test_cli_enqueue_schedule_in(self): """rq enqueue -u tests.fixtures.say_hello --schedule-in 1s""" queue = Queue(connection=self.connection) registry = ScheduledJobRegistry(queue=queue) worker = Worker(queue, connection=self.connection) scheduler = RQScheduler(queue, self.connection) self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 0) runner = CliRunner() result = runner.invoke( main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-in', '10s'] ) self.assert_normal_execution(result) scheduler.acquire_locks() scheduler.enqueue_scheduled_jobs() self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 1) self.assertFalse(worker.work(True)) sleep(11) scheduler.enqueue_scheduled_jobs() self.assertTrue(len(queue) == 1) self.assertTrue(len(registry) == 0) self.assertTrue(worker.work(True)) def test_cli_enqueue_schedule_at(self): """ rq enqueue -u tests.fixtures.say_hello --schedule-at 2021-01-01T00:00:00 rq enqueue -u tests.fixtures.say_hello --schedule-at 2100-01-01T00:00:00 """ queue = Queue(connection=self.connection) registry = ScheduledJobRegistry(queue=queue) worker = Worker(queue, connection=self.connection) scheduler = RQScheduler(queue, self.connection) self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 0) runner = CliRunner() result = runner.invoke( main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2021-01-01T00:00:00'] ) self.assert_normal_execution(result) scheduler.acquire_locks() self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 1) scheduler.enqueue_scheduled_jobs() self.assertTrue(len(queue) == 1) self.assertTrue(len(registry) == 0) self.assertTrue(worker.work(True)) self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 0) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2100-01-01T00:00:00'] ) self.assert_normal_execution(result) self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 1) scheduler.enqueue_scheduled_jobs() self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 1) self.assertFalse(worker.work(True)) def test_cli_enqueue_retry(self): """rq enqueue -u tests.fixtures.say_hello --retry-max 3 --retry-interval 10 --retry-interval 20 --retry-interval 40""" queue = Queue(connection=self.connection) self.assertTrue(queue.is_empty()) runner = CliRunner() result = runner.invoke( main, [ 'enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--retry-max', '3', '--retry-interval', '10', '--retry-interval', '20', '--retry-interval', '40', ], ) self.assert_normal_execution(result) job = Job.fetch( self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'), connection=self.connection ) self.assertEqual(job.retries_left, 3) self.assertEqual(job.retry_intervals, [10, 20, 40]) def test_cli_enqueue_errors(self): """ rq enqueue -u tests.fixtures.echo :invalid_json rq enqueue -u tests.fixtures.echo %invalid_eval_statement rq enqueue -u tests.fixtures.echo key=value key=value rq enqueue -u tests.fixtures.echo --schedule-in 1s --schedule-at 2000-01-01T00:00:00 rq enqueue -u tests.fixtures.echo @not_existing_file """ runner = CliRunner() result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', ':invalid_json']) self.assertNotEqual(result.exit_code, 0) self.assertIn('Unable to parse 1. non keyword argument as JSON.', result.output) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '%invalid_eval_statement'] ) self.assertNotEqual(result.exit_code, 0) self.assertIn('Unable to eval 1. non keyword argument as Python object.', result.output) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'key=value', 'key=value']) self.assertNotEqual(result.exit_code, 0) self.assertIn("You can't specify multiple values for the same keyword.", result.output) result = runner.invoke( main, [ 'enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '--schedule-in', '1s', '--schedule-at', '2000-01-01T00:00:00', ], ) self.assertNotEqual(result.exit_code, 0) self.assertIn("You can't specify both --schedule-in and --schedule-at", result.output) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '@not_existing_file']) self.assertNotEqual(result.exit_code, 0) self.assertIn('Not found', result.output) def test_parse_schedule(self): """executes the rq.cli.helpers.parse_schedule function""" self.assertEqual(parse_schedule(None, '2000-01-23T23:45:01'), datetime(2000, 1, 23, 23, 45, 1)) start = datetime.now(timezone.utc) + timedelta(minutes=5) middle = parse_schedule('5m', None) end = datetime.now(timezone.utc) + timedelta(minutes=5) self.assertGreater(middle, start) self.assertLess(middle, end) def test_parse_function_arg(self): """executes the rq.cli.helpers.parse_function_arg function""" self.assertEqual(parse_function_arg('abc', 0), (None, 'abc')) self.assertEqual(parse_function_arg(':{"json": true}', 1), (None, {'json': True})) self.assertEqual(parse_function_arg('%1, 2', 2), (None, (1, 2))) self.assertEqual(parse_function_arg('key=value', 3), ('key', 'value')) self.assertEqual(parse_function_arg('jsonkey:=["json", "value"]', 4), ('jsonkey', ['json', 'value'])) self.assertEqual(parse_function_arg('evalkey%=1.2', 5), ('evalkey', 1.2)) self.assertEqual(parse_function_arg(':@tests/test.json', 6), (None, {'test': True})) self.assertEqual(parse_function_arg('@tests/test.json', 7), (None, '{"test": true}\n')) def test_cli_enqueue_doc_test(self): """tests the examples of the documentation""" runner = CliRunner() id = str(uuid4()) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc']) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), (['abc'], {})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([], {'abc': 'def'})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([{'json': 'abc'}], {})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([], {'key': {'json': 'abc'}})) id = str(uuid4()) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%1, 2']) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([(1, 2)], {})) id = str(uuid4()) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%None']) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([None], {})) id = str(uuid4()) result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%True']) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([True], {})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([], {'key': (1, 2)})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([], {'key': {'foo': True}})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([open('tests/test.json', 'r').read()], {})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([], {'key': open('tests/test.json', 'r').read()})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([json.loads(open('tests/test.json', 'r').read())], {})) id = str(uuid4()) result = runner.invoke( main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json'] ) self.assert_normal_execution(result) job = Job.fetch(id, connection=self.connection) self.assertEqual((job.args, job.kwargs), ([], {'key': json.loads(open('tests/test.json', 'r').read())})) class WorkerPoolCLITestCase(CLITestCase): def test_worker_pool_burst_and_num_workers(self): """rq worker-pool -u -b -n 3""" runner = CliRunner() result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '-n', '3']) self.assert_normal_execution(result) def test_serializer_and_queue_argument(self): """rq worker-pool foo bar -u -b""" queue = Queue('foo', connection=self.connection, serializer=JSONSerializer) job = queue.enqueue(say_hello, 'Hello') queue = Queue('bar', connection=self.connection, serializer=JSONSerializer) job_2 = queue.enqueue(say_hello, 'Hello') runner = CliRunner() runner.invoke( main, ['worker-pool', 'foo', 'bar', '-u', self.redis_url, '-b', '--serializer', 'rq.serializers.JSONSerializer'], ) self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED) self.assertEqual(job_2.get_status(refresh=True), JobStatus.FINISHED) def test_worker_class_argument(self): """rq worker-pool -u -b --worker-class rq.Worker""" runner = CliRunner() result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.Worker']) self.assert_normal_execution(result) result = runner.invoke( main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.SimpleWorker'] ) self.assert_normal_execution(result) # This one fails because the worker class doesn't exist result = runner.invoke( main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.NonExistantWorker'] ) self.assertNotEqual(result.exit_code, 0) def test_job_class_argument(self): """rq worker-pool -u -b --job-class rq.job.Job""" runner = CliRunner() result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.Job']) self.assert_normal_execution(result) # This one fails because Job class doesn't exist result = runner.invoke( main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.NonExistantJob'] ) self.assertNotEqual(result.exit_code, 0) def test_worker_pool_logging_options(self): """--quiet and --verbose logging options are supported""" runner = CliRunner() args = ['worker-pool', '-u', self.redis_url, '-b'] result = runner.invoke(main, args + ['--verbose']) self.assert_normal_execution(result) result = runner.invoke(main, args + ['--quiet']) self.assert_normal_execution(result) # --quiet and --verbose are mutually exclusive result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0)