From f4283afe686c6f5dc2f563a02d17d961806de83b Mon Sep 17 00:00:00 2001 From: Anton Daneyko <153308121+anton-daneyko-ultramarin@users.noreply.github.com> Date: Thu, 17 Oct 2024 06:14:54 +0200 Subject: [PATCH] Allow to get job count and job IDs without cleanup (#2133) Prior to this commit it was not possible to get a number of jobs in a registry or list job IDs without triggering a cleanup. Calling cleanup caused issues for: * Monitoring tools (see https://github.com/rq/rq/pull/2104) and * Cases with high number of jobs in the registries (see https://github.com/rq/rq/pull/2003). * Cases where a failure callback is registered and but the `get_job_ids` is called not from the main thread. In this commit: 1. A new `BaseRegistry.get_job_count` method is added. It is a side effect free version of `BaseRegistry.count` and runs in O(1). 2. A `cleanup` parameter added to `get_job_ids` that allows to avoid clean up if it is set to `False`. Resolves https://github.com/rq/rq/pull/2104. Resolves https://github.com/rq/rq/pull/2003. --- rq/registry.py | 22 ++++++++++++++++++---- tests/test_registry.py | 26 ++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 217b5708..921868b6 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -83,12 +83,24 @@ class BaseRegistry: @property def count(self) -> int: - """Returns the number of jobs in this registry + """Returns the number of jobs in this registry after running cleanup Returns: int: _description_ """ - self.cleanup() + return self.get_job_count(cleanup=True) + + def get_job_count(self, cleanup=True) -> int: + """Returns the number of jobs in this registry after optional cleanup. + + Args: + cleanup (bool, optional): _description_. Defaults to True. + + Returns: + int: _description_ + """ + if cleanup: + self.cleanup() return self.connection.zcard(self.key) def add(self, job: 'Job', ttl=0, pipeline: Optional['Pipeline'] = None, xx: bool = False) -> int: @@ -153,18 +165,20 @@ class BaseRegistry: expired_jobs = self.connection.zrangebyscore(self.key, 0, score) return [as_text(job_id) for job_id in expired_jobs] - def get_job_ids(self, start: int = 0, end: int = -1, desc: bool = False): + def get_job_ids(self, start: int = 0, end: int = -1, desc: bool = False, cleanup: bool = True) -> List[str]: """Returns list of all job ids. Args: start (int, optional): _description_. Defaults to 0. end (int, optional): _description_. Defaults to -1. desc (bool, optional): _description_. Defaults to False. + cleanup (bool, optional): _description_. Defaults to True. Returns: _type_: _description_ """ - self.cleanup() + if cleanup: + self.cleanup() return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end, desc=desc)] def get_queue(self): diff --git a/tests/test_registry.py b/tests/test_registry.py index fe3593c6..7a2d1b07 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -131,10 +131,19 @@ class TestRegistry(RQTestCase): def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" timestamp = current_timestamp() + self.connection.zadd(self.registry.key, {'will-be-cleaned-up': 1}) self.connection.zadd(self.registry.key, {'foo': timestamp + 10}) self.connection.zadd(self.registry.key, {'bar': timestamp + 20}) self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) + def test_get_job_ids_does_not_cleanup(self): + """Getting job ids from StartedJobRegistry without a cleanup.""" + timestamp = current_timestamp() + self.connection.zadd(self.registry.key, {'will-be-returned-despite-outdated': 1}) + self.connection.zadd(self.registry.key, {'foo': timestamp + 10}) + self.connection.zadd(self.registry.key, {'bar': timestamp + 20}) + self.assertEqual(self.registry.get_job_ids(cleanup=False), ['will-be-returned-despite-outdated', 'foo', 'bar']) + def test_get_expired_job_ids(self): """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() @@ -242,15 +251,28 @@ class TestRegistry(RQTestCase): pipeline.execute() self.assertNotIn(execution.composite_key, registry.get_job_ids()) - def test_get_job_count(self): + def test_count(self): """StartedJobRegistry returns the right number of job count.""" timestamp = current_timestamp() + 10 + self.connection.zadd(self.registry.key, {'will-be-cleaned-up': 1}) self.connection.zadd(self.registry.key, {'foo': timestamp}) self.connection.zadd(self.registry.key, {'bar': timestamp}) self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) - # Make sure + def test_get_job_count(self): + """Ensure cleanup is not called and does not affect the reported number of jobs. + + Note, the original motivation to stop calling cleanup was to make the count operation O(1) to allow usage of + monitoring tools and avoid side effects of failure callbacks that cleanup triggers. + """ + timestamp = current_timestamp() + 10 + self.connection.zadd(self.registry.key, {'will-be-counted-despite-outdated': 1}) + self.connection.zadd(self.registry.key, {'foo': timestamp}) + self.connection.zadd(self.registry.key, {'bar': timestamp}) + with mock.patch.object(self.registry, 'cleanup') as mock_cleanup: + self.assertEqual(self.registry.get_job_count(cleanup=False), 3) + mock_cleanup.assert_not_called() def test_clean_registries(self): """clean_registries() cleans Started and Finished job registries."""