Added intermediate_queue.get_job_ids()

This commit is contained in:
Selwin Ong 2024-04-28 19:24:24 +07:00
parent 50467b4d58
commit 7d4c7eee30
2 changed files with 28 additions and 1 deletions

View File

@ -1,5 +1,5 @@
from datetime import datetime, timedelta, timezone
from typing import Optional
from typing import List, Optional
from redis import Redis
@ -75,3 +75,11 @@ class IntermediateQueue(object):
if not first_seen:
return False
return now() - first_seen > timedelta(minutes=1)
def get_job_ids(self) -> List[str]:
"""Returns the job IDs in the intermediate queue.
Returns:
List[str]: The job IDs
"""
return [job_id.decode() for job_id in self.connection.lrange(self.key, 0, -1)]

View File

@ -49,3 +49,22 @@ class TestWorker(RQTestCase):
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id))
def test_get_job_ids(self):
"""Dequeueing job from a single queue moves job to intermediate queue."""
queue = Queue('foo', connection=self.connection)
job_1 = queue.enqueue(say_hello)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
# Job ID is not in intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [])
job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id])
# Test the blocking version
job_2 = queue.enqueue(say_hello)
job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id])