rq/tests/test_intermediate_queue.py

74 lines
3.5 KiB
Python
Raw Normal View History

2024-04-27 12:54:20 +00:00
from datetime import datetime, timedelta, timezone
from rq import Queue
from rq.intermediate_queue import IntermediateQueue
from tests import RQTestCase
from tests.fixtures import say_hello
class TestWorker(RQTestCase):
def test_set_first_seen(self):
"""Ensure that the first_seen attribute is set correctly."""
queue = Queue('foo', connection=self.connection)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
job = queue.enqueue(say_hello)
# set_first_seen() should only succeed the first time around
self.assertTrue(intermediate_queue.set_first_seen(job.id))
self.assertFalse(intermediate_queue.set_first_seen(job.id))
# It should succeed again after deleting the key
self.connection.delete(intermediate_queue.get_first_seen_key(job.id))
self.assertTrue(intermediate_queue.set_first_seen(job.id))
def test_get_first_seen(self):
"""Ensure that the first_seen attribute is set correctly."""
queue = Queue('foo', connection=self.connection)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
job = queue.enqueue(say_hello)
self.assertIsNone(intermediate_queue.get_first_seen(job.id))
# Check first seen was set correctly
intermediate_queue.set_first_seen(job.id)
timestamp = intermediate_queue.get_first_seen(job.id)
self.assertTrue(datetime.now(tz=timezone.utc) - timestamp < timedelta(seconds=5)) # type: ignore
def test_should_be_cleaned_up(self):
"""Job in the intermediate queue should be cleaned up if it was seen more than 1 minute ago."""
queue = Queue('foo', connection=self.connection)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
job = queue.enqueue(say_hello)
# Returns False if there's no first seen timestamp
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
# Returns False since first seen timestamp is less than 1 minute ago
intermediate_queue.set_first_seen(job.id)
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id))
2024-04-28 12:24:24 +00:00
def test_get_job_ids(self):
"""Dequeueing job from a single queue moves job to intermediate queue."""
queue = Queue('foo', connection=self.connection)
job_1 = queue.enqueue(say_hello)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
# Ensure that the intermediate queue is empty
self.connection.delete(intermediate_queue.key)
2024-04-28 12:24:24 +00:00
# Job ID is not in intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [])
job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id])
# Test the blocking version
job_2 = queue.enqueue(say_hello)
job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id])