diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py index e317bc73..0d3d42fb 100644 --- a/rq/intermediate_queue.py +++ b/rq/intermediate_queue.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta, timezone -from typing import Optional +from typing import List, Optional from redis import Redis @@ -75,3 +75,11 @@ class IntermediateQueue(object): if not first_seen: return False return now() - first_seen > timedelta(minutes=1) + + def get_job_ids(self) -> List[str]: + """Returns the job IDs in the intermediate queue. + + Returns: + List[str]: The job IDs + """ + return [job_id.decode() for job_id in self.connection.lrange(self.key, 0, -1)] diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index 52d66f50..04effc8b 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -49,3 +49,22 @@ class TestWorker(RQTestCase): two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id)) + + def test_get_job_ids(self): + """Dequeueing job from a single queue moves job to intermediate queue.""" + queue = Queue('foo', connection=self.connection) + job_1 = queue.enqueue(say_hello) + + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + + # Job ID is not in intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), []) + job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id]) + + # Test the blocking version + job_2 = queue.enqueue(say_hello) + job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id])