From a1306f89ad0d8686c6bde447bff75e2f71f0733b Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 25 Apr 2023 19:56:09 +0700 Subject: [PATCH] Renamed `Pool(num_workers=2) to `Pool(size=2)` --- rq/cli/cli.py | 2 +- rq/worker_pool.py | 12 ++++++------ tests/test_worker_pool.py | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 07532e10..e8abc44b 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -503,7 +503,7 @@ def worker_pool( setup_loghandlers_from_args(verbose, quiet, date_format, log_format) - pool = Pool(queue_names, connection=cli_config.connection, num_workers=2) + pool = Pool(queue_names, connection=cli_config.connection, size=2) pool.start(burst=burst, logging_level=logging_level) # Should we configure Sentry? diff --git a/rq/worker_pool.py b/rq/worker_pool.py index 221b7d7b..939b4f33 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -34,8 +34,8 @@ class Pool: STARTED = 2 STOPPED = 3 - def __init__(self, queues: List[Union[str, Queue]], connection: Redis, num_workers: int = 1, *args, **kwargs): - self.num_workers: int = num_workers + def __init__(self, queues: List[Union[str, Queue]], connection: Redis, size: int = 1, *args, **kwargs): + self.size: int = size self._workers: List[Worker] = [] setup_loghandlers('INFO', DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, name=__name__) self.log: logging.Logger = logging.getLogger(__name__) @@ -138,10 +138,10 @@ class Pool: """ self.log.debug('Checking worker processes') self.reap_workers() - # If we have less number of workers than num_workers, + # If we have less number of workers than size, # respawn the difference if respawn and self.status != self.Status.STOPPED: - delta = self.num_workers - len(self.worker_dict) + delta = self.size - len(self.worker_dict) if delta: for i in range(delta): self.start_worker(burst=self._burst, _sleep=self._sleep) @@ -174,8 +174,8 @@ class Pool: Run the workers * sleep: waits for X seconds before creating worker, only for testing purposes """ - self.log.debug(f'Spawning {self.num_workers} workers') - for i in range(self.num_workers): + self.log.debug(f'Spawning {self.size} workers') + for i in range(self.size): self.start_worker(i + 1, burst=burst, _sleep=_sleep, logging_level=logging_level) def stop_worker(self, worker_data: WorkerData, sig=signal.SIGINT): diff --git a/tests/test_worker_pool.py b/tests/test_worker_pool.py index e1d92824..7adff537 100644 --- a/tests/test_worker_pool.py +++ b/tests/test_worker_pool.py @@ -27,14 +27,14 @@ class TestWorkerPool(RQTestCase): # def test_spawn_workers(self): # """Test spawning workers""" - # pool = Pool(['default', 'foo'], connection=self.connection, num_workers=2) + # pool = Pool(['default', 'foo'], connection=self.connection, size=2) # pool.start_workers(burst=False) # self.assertEqual(len(pool.worker_dict.keys()), 2) # pool.stop_workers() def test_check_workers(self): """Test check_workers()""" - pool = Pool(['default'], connection=self.connection, num_workers=2) + pool = Pool(['default'], connection=self.connection, size=2) pool.start_workers(burst=False) # There should be two workers @@ -56,7 +56,7 @@ class TestWorkerPool(RQTestCase): def test_reap_workers(self): """Dead workers are removed from worker_dict""" - pool = Pool(['default'], connection=self.connection, num_workers=2) + pool = Pool(['default'], connection=self.connection, size=2) pool.start_workers(burst=False) # There should be two workers @@ -73,7 +73,7 @@ class TestWorkerPool(RQTestCase): def test_start(self): """Test start()""" - pool = Pool(['default'], connection=self.connection, num_workers=2) + pool = Pool(['default'], connection=self.connection, size=2) p = Process(target=wait_and_send_shutdown_signal, args=(os.getpid(), 0.5)) p.start() @@ -87,7 +87,7 @@ class TestWorkerPool(RQTestCase): """If two shutdown signals are sent within one second, only the first one is processed""" # Send two shutdown signals within one second while the worker is # working on a long running job. The job should still complete (not killed) - pool = Pool(['foo'], connection=self.connection, num_workers=2) + pool = Pool(['foo'], connection=self.connection, size=2) process_1 = Process(target=wait_and_send_shutdown_signal, args=(os.getpid(), 0.5)) process_1.start()