Renamed `Pool(num_workers=2) to `Pool(size=2)`

This commit is contained in:
Selwin Ong 2023-04-25 19:56:09 +07:00
parent bf7d0d74e0
commit a1306f89ad
3 changed files with 12 additions and 12 deletions

View File

@ -503,7 +503,7 @@ def worker_pool(
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
pool = Pool(queue_names, connection=cli_config.connection, num_workers=2)
pool = Pool(queue_names, connection=cli_config.connection, size=2)
pool.start(burst=burst, logging_level=logging_level)
# Should we configure Sentry?

View File

@ -34,8 +34,8 @@ class Pool:
STARTED = 2
STOPPED = 3
def __init__(self, queues: List[Union[str, Queue]], connection: Redis, num_workers: int = 1, *args, **kwargs):
self.num_workers: int = num_workers
def __init__(self, queues: List[Union[str, Queue]], connection: Redis, size: int = 1, *args, **kwargs):
self.size: int = size
self._workers: List[Worker] = []
setup_loghandlers('INFO', DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, name=__name__)
self.log: logging.Logger = logging.getLogger(__name__)
@ -138,10 +138,10 @@ class Pool:
"""
self.log.debug('Checking worker processes')
self.reap_workers()
# If we have less number of workers than num_workers,
# If we have less number of workers than size,
# respawn the difference
if respawn and self.status != self.Status.STOPPED:
delta = self.num_workers - len(self.worker_dict)
delta = self.size - len(self.worker_dict)
if delta:
for i in range(delta):
self.start_worker(burst=self._burst, _sleep=self._sleep)
@ -174,8 +174,8 @@ class Pool:
Run the workers
* sleep: waits for X seconds before creating worker, only for testing purposes
"""
self.log.debug(f'Spawning {self.num_workers} workers')
for i in range(self.num_workers):
self.log.debug(f'Spawning {self.size} workers')
for i in range(self.size):
self.start_worker(i + 1, burst=burst, _sleep=_sleep, logging_level=logging_level)
def stop_worker(self, worker_data: WorkerData, sig=signal.SIGINT):

View File

@ -27,14 +27,14 @@ class TestWorkerPool(RQTestCase):
# def test_spawn_workers(self):
# """Test spawning workers"""
# pool = Pool(['default', 'foo'], connection=self.connection, num_workers=2)
# pool = Pool(['default', 'foo'], connection=self.connection, size=2)
# pool.start_workers(burst=False)
# self.assertEqual(len(pool.worker_dict.keys()), 2)
# pool.stop_workers()
def test_check_workers(self):
"""Test check_workers()"""
pool = Pool(['default'], connection=self.connection, num_workers=2)
pool = Pool(['default'], connection=self.connection, size=2)
pool.start_workers(burst=False)
# There should be two workers
@ -56,7 +56,7 @@ class TestWorkerPool(RQTestCase):
def test_reap_workers(self):
"""Dead workers are removed from worker_dict"""
pool = Pool(['default'], connection=self.connection, num_workers=2)
pool = Pool(['default'], connection=self.connection, size=2)
pool.start_workers(burst=False)
# There should be two workers
@ -73,7 +73,7 @@ class TestWorkerPool(RQTestCase):
def test_start(self):
"""Test start()"""
pool = Pool(['default'], connection=self.connection, num_workers=2)
pool = Pool(['default'], connection=self.connection, size=2)
p = Process(target=wait_and_send_shutdown_signal, args=(os.getpid(), 0.5))
p.start()
@ -87,7 +87,7 @@ class TestWorkerPool(RQTestCase):
"""If two shutdown signals are sent within one second, only the first one is processed"""
# Send two shutdown signals within one second while the worker is
# working on a long running job. The job should still complete (not killed)
pool = Pool(['foo'], connection=self.connection, num_workers=2)
pool = Pool(['foo'], connection=self.connection, size=2)
process_1 = Process(target=wait_and_send_shutdown_signal, args=(os.getpid(), 0.5))
process_1.start()