diff --git a/tests/test_commands.py b/tests/test_commands.py index 500906a3..786fc26e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,17 +1,44 @@ -from rq.serializers import JSONSerializer import time from multiprocessing import Process +from redis import Redis + from tests import RQTestCase from tests.fixtures import long_running_job from rq import Queue, Worker from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command from rq.exceptions import InvalidJobOperation, NoSuchJobError +from rq.serializers import JSONSerializer from rq.worker import WorkerStatus +def _send_shutdown_command(worker_name, connection_kwargs): + time.sleep(0.25) + send_shutdown_command(Redis(**connection_kwargs), worker_name) + + +def _send_kill_horse_command(worker_name, connection_kwargs): + """Waits 0.25 seconds before sending kill-horse command""" + time.sleep(0.25) + send_kill_horse_command(Redis(**connection_kwargs), worker_name) + + +def start_work(queue_name, worker_name, connection_kwargs): + worker = Worker(queue_name, name=worker_name, + connection=Redis(**connection_kwargs)) + worker.work() + + +def start_work_burst(queue_name, worker_name, connection_kwargs): + worker = Worker(queue_name, name=worker_name, + connection=Redis(**connection_kwargs), + serializer=JSONSerializer) + worker.work(burst=True) + + + class TestCommands(RQTestCase): def test_shutdown_command(self): @@ -19,11 +46,9 @@ class TestCommands(RQTestCase): connection = self.testconn worker = Worker('foo', connection=connection) - def _send_shutdown_command(): - time.sleep(0.25) - send_shutdown_command(connection, worker.name) - - p = Process(target=_send_shutdown_command) + p = Process(target=_send_shutdown_command, + args=(worker.name, + connection.connection_pool.connection_kwargs.copy())) p.start() worker.work() p.join(1) @@ -35,22 +60,18 @@ class TestCommands(RQTestCase): job = queue.enqueue(long_running_job, 4) worker = Worker('foo', connection=connection) - def _send_kill_horse_command(): - """Waits 0.25 seconds before sending kill-horse command""" - time.sleep(0.25) - send_kill_horse_command(connection, worker.name) - - p = Process(target=_send_kill_horse_command) + p = Process(target=_send_kill_horse_command, + args=(worker.name, + connection.connection_pool.connection_kwargs.copy())) p.start() worker.work(burst=True) p.join(1) job.refresh() self.assertTrue(job.id in queue.failed_job_registry) - def start_work(): - worker.work() - - p = Process(target=start_work) + p = Process(target=start_work, + args=('foo', worker.name, + connection.connection_pool.connection_kwargs.copy())) p.start() p.join(2) @@ -76,10 +97,9 @@ class TestCommands(RQTestCase): with self.assertRaises(NoSuchJobError): send_stop_job_command(connection, job_id='1', serializer=JSONSerializer) - def start_work(): - worker.work(burst=True) - - p = Process(target=start_work) + p = Process(target=start_work_burst, + args=('foo', worker.name, + connection.connection_pool.connection_kwargs.copy())) p.start() p.join(1)