Allow to get job count and job IDs without cleanup (#2133)

Prior to this commit it was not possible to get a number of jobs in a
registry or list job IDs without triggering a cleanup.
Calling cleanup caused issues for:
 * Monitoring tools (see https://github.com/rq/rq/pull/2104) and
 * Cases with high number of jobs in the registries (see
   https://github.com/rq/rq/pull/2003).
 * Cases where a failure callback is registered and but the
   `get_job_ids` is called not from the main thread.

In this commit:
 1. A new `BaseRegistry.get_job_count` method is added. It is a side
    effect free version of `BaseRegistry.count` and runs in O(1).
 2. A `cleanup` parameter added to `get_job_ids` that allows to avoid
    clean up if it is set to `False`.

Resolves https://github.com/rq/rq/pull/2104.
Resolves https://github.com/rq/rq/pull/2003.
This commit is contained in:
Anton Daneyko 2024-10-17 06:14:54 +02:00 committed by GitHub
parent b1620da009
commit f4283afe68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 6 deletions

View File

@ -83,12 +83,24 @@ class BaseRegistry:
@property @property
def count(self) -> int: def count(self) -> int:
"""Returns the number of jobs in this registry """Returns the number of jobs in this registry after running cleanup
Returns: Returns:
int: _description_ int: _description_
""" """
self.cleanup() return self.get_job_count(cleanup=True)
def get_job_count(self, cleanup=True) -> int:
"""Returns the number of jobs in this registry after optional cleanup.
Args:
cleanup (bool, optional): _description_. Defaults to True.
Returns:
int: _description_
"""
if cleanup:
self.cleanup()
return self.connection.zcard(self.key) return self.connection.zcard(self.key)
def add(self, job: 'Job', ttl=0, pipeline: Optional['Pipeline'] = None, xx: bool = False) -> int: def add(self, job: 'Job', ttl=0, pipeline: Optional['Pipeline'] = None, xx: bool = False) -> int:
@ -153,18 +165,20 @@ class BaseRegistry:
expired_jobs = self.connection.zrangebyscore(self.key, 0, score) expired_jobs = self.connection.zrangebyscore(self.key, 0, score)
return [as_text(job_id) for job_id in expired_jobs] return [as_text(job_id) for job_id in expired_jobs]
def get_job_ids(self, start: int = 0, end: int = -1, desc: bool = False): def get_job_ids(self, start: int = 0, end: int = -1, desc: bool = False, cleanup: bool = True) -> List[str]:
"""Returns list of all job ids. """Returns list of all job ids.
Args: Args:
start (int, optional): _description_. Defaults to 0. start (int, optional): _description_. Defaults to 0.
end (int, optional): _description_. Defaults to -1. end (int, optional): _description_. Defaults to -1.
desc (bool, optional): _description_. Defaults to False. desc (bool, optional): _description_. Defaults to False.
cleanup (bool, optional): _description_. Defaults to True.
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
self.cleanup() if cleanup:
self.cleanup()
return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end, desc=desc)] return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end, desc=desc)]
def get_queue(self): def get_queue(self):

View File

@ -131,10 +131,19 @@ class TestRegistry(RQTestCase):
def test_get_job_ids(self): def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry.""" """Getting job ids from StartedJobRegistry."""
timestamp = current_timestamp() timestamp = current_timestamp()
self.connection.zadd(self.registry.key, {'will-be-cleaned-up': 1})
self.connection.zadd(self.registry.key, {'foo': timestamp + 10}) self.connection.zadd(self.registry.key, {'foo': timestamp + 10})
self.connection.zadd(self.registry.key, {'bar': timestamp + 20}) self.connection.zadd(self.registry.key, {'bar': timestamp + 20})
self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])
def test_get_job_ids_does_not_cleanup(self):
"""Getting job ids from StartedJobRegistry without a cleanup."""
timestamp = current_timestamp()
self.connection.zadd(self.registry.key, {'will-be-returned-despite-outdated': 1})
self.connection.zadd(self.registry.key, {'foo': timestamp + 10})
self.connection.zadd(self.registry.key, {'bar': timestamp + 20})
self.assertEqual(self.registry.get_job_ids(cleanup=False), ['will-be-returned-despite-outdated', 'foo', 'bar'])
def test_get_expired_job_ids(self): def test_get_expired_job_ids(self):
"""Getting expired job ids form StartedJobRegistry.""" """Getting expired job ids form StartedJobRegistry."""
timestamp = current_timestamp() timestamp = current_timestamp()
@ -242,15 +251,28 @@ class TestRegistry(RQTestCase):
pipeline.execute() pipeline.execute()
self.assertNotIn(execution.composite_key, registry.get_job_ids()) self.assertNotIn(execution.composite_key, registry.get_job_ids())
def test_get_job_count(self): def test_count(self):
"""StartedJobRegistry returns the right number of job count.""" """StartedJobRegistry returns the right number of job count."""
timestamp = current_timestamp() + 10 timestamp = current_timestamp() + 10
self.connection.zadd(self.registry.key, {'will-be-cleaned-up': 1})
self.connection.zadd(self.registry.key, {'foo': timestamp}) self.connection.zadd(self.registry.key, {'foo': timestamp})
self.connection.zadd(self.registry.key, {'bar': timestamp}) self.connection.zadd(self.registry.key, {'bar': timestamp})
self.assertEqual(self.registry.count, 2) self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2) self.assertEqual(len(self.registry), 2)
# Make sure def test_get_job_count(self):
"""Ensure cleanup is not called and does not affect the reported number of jobs.
Note, the original motivation to stop calling cleanup was to make the count operation O(1) to allow usage of
monitoring tools and avoid side effects of failure callbacks that cleanup triggers.
"""
timestamp = current_timestamp() + 10
self.connection.zadd(self.registry.key, {'will-be-counted-despite-outdated': 1})
self.connection.zadd(self.registry.key, {'foo': timestamp})
self.connection.zadd(self.registry.key, {'bar': timestamp})
with mock.patch.object(self.registry, 'cleanup') as mock_cleanup:
self.assertEqual(self.registry.get_job_count(cleanup=False), 3)
mock_cleanup.assert_not_called()
def test_clean_registries(self): def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries.""" """clean_registries() cleans Started and Finished job registries."""