2024-04-28 12:51:21 +00:00
|
|
|
import unittest
|
|
|
|
|
2024-04-27 12:54:20 +00:00
|
|
|
from datetime import datetime, timedelta, timezone
|
2024-04-28 12:51:21 +00:00
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
from redis import Redis
|
2024-04-27 12:54:20 +00:00
|
|
|
|
2024-04-28 12:51:21 +00:00
|
|
|
from rq import Queue, Worker
|
2024-04-27 12:54:20 +00:00
|
|
|
from rq.intermediate_queue import IntermediateQueue
|
2024-04-28 12:51:21 +00:00
|
|
|
from rq.utils import get_version
|
2024-04-27 12:54:20 +00:00
|
|
|
from tests import RQTestCase
|
|
|
|
from tests.fixtures import say_hello
|
|
|
|
|
|
|
|
|
2024-04-28 12:51:21 +00:00
|
|
|
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
|
|
|
|
class TestIntermediateQueue(RQTestCase):
|
|
|
|
|
2024-04-27 12:54:20 +00:00
|
|
|
def test_set_first_seen(self):
|
|
|
|
"""Ensure that the first_seen attribute is set correctly."""
|
|
|
|
queue = Queue('foo', connection=self.connection)
|
|
|
|
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
|
|
|
job = queue.enqueue(say_hello)
|
|
|
|
|
|
|
|
# set_first_seen() should only succeed the first time around
|
|
|
|
self.assertTrue(intermediate_queue.set_first_seen(job.id))
|
|
|
|
self.assertFalse(intermediate_queue.set_first_seen(job.id))
|
|
|
|
# It should succeed again after deleting the key
|
|
|
|
self.connection.delete(intermediate_queue.get_first_seen_key(job.id))
|
|
|
|
self.assertTrue(intermediate_queue.set_first_seen(job.id))
|
|
|
|
|
|
|
|
def test_get_first_seen(self):
|
|
|
|
"""Ensure that the first_seen attribute is set correctly."""
|
|
|
|
queue = Queue('foo', connection=self.connection)
|
|
|
|
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
|
|
|
job = queue.enqueue(say_hello)
|
|
|
|
|
|
|
|
self.assertIsNone(intermediate_queue.get_first_seen(job.id))
|
|
|
|
|
|
|
|
# Check first seen was set correctly
|
|
|
|
intermediate_queue.set_first_seen(job.id)
|
|
|
|
timestamp = intermediate_queue.get_first_seen(job.id)
|
|
|
|
self.assertTrue(datetime.now(tz=timezone.utc) - timestamp < timedelta(seconds=5)) # type: ignore
|
|
|
|
|
|
|
|
def test_should_be_cleaned_up(self):
|
|
|
|
"""Job in the intermediate queue should be cleaned up if it was seen more than 1 minute ago."""
|
|
|
|
queue = Queue('foo', connection=self.connection)
|
|
|
|
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
|
|
|
job = queue.enqueue(say_hello)
|
|
|
|
|
|
|
|
# Returns False if there's no first seen timestamp
|
|
|
|
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
|
|
|
|
# Returns False since first seen timestamp is less than 1 minute ago
|
|
|
|
intermediate_queue.set_first_seen(job.id)
|
|
|
|
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
|
|
|
|
|
|
|
|
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
|
|
|
|
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
|
|
|
|
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
|
|
|
|
self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id))
|
2024-04-28 12:24:24 +00:00
|
|
|
|
|
|
|
def test_get_job_ids(self):
|
|
|
|
"""Dequeueing job from a single queue moves job to intermediate queue."""
|
|
|
|
queue = Queue('foo', connection=self.connection)
|
|
|
|
job_1 = queue.enqueue(say_hello)
|
|
|
|
|
|
|
|
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
|
|
|
|
2024-04-28 12:37:53 +00:00
|
|
|
# Ensure that the intermediate queue is empty
|
|
|
|
self.connection.delete(intermediate_queue.key)
|
|
|
|
|
2024-04-28 12:24:24 +00:00
|
|
|
# Job ID is not in intermediate queue
|
|
|
|
self.assertEqual(intermediate_queue.get_job_ids(), [])
|
|
|
|
job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn)
|
|
|
|
# After job is dequeued, the job ID is in the intermediate queue
|
|
|
|
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id])
|
|
|
|
|
|
|
|
# Test the blocking version
|
|
|
|
job_2 = queue.enqueue(say_hello)
|
|
|
|
job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn)
|
|
|
|
# After job is dequeued, the job ID is in the intermediate queue
|
|
|
|
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id])
|
2024-04-28 12:51:21 +00:00
|
|
|
|
|
|
|
# After job_1.id is removed, only job_2.id is in the intermediate queue
|
|
|
|
intermediate_queue.remove(job_1.id)
|
|
|
|
self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id])
|
|
|
|
|
|
|
|
# def test_cleanup_intermediate_queue(self):
|
|
|
|
# """Ensure jobs stuck in the intermediate queue are cleaned up."""
|
|
|
|
# queue = Queue('foo', connection=self.connection)
|
|
|
|
# job = queue.enqueue(say_hello)
|
|
|
|
|
|
|
|
# intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
|
|
|
# self.connection.delete(intermediate_queue.key)
|
|
|
|
|
|
|
|
# # If job execution fails after it's dequeued, job should be in the intermediate queue
|
|
|
|
# # and it's status is still QUEUED
|
|
|
|
# with patch.object(Worker, 'execute_job'):
|
|
|
|
# worker = Worker(queue, connection=self.testconn)
|
|
|
|
# worker.work(burst=True)
|
|
|
|
|
|
|
|
# self.assertEqual(job.get_status(), 'queued')
|
|
|
|
# self.assertFalse(job.id in queue.get_job_ids())
|
|
|
|
# self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
|
|
|
|
# # After cleaning up the intermediate queue, job status should be `failed`
|
|
|
|
# # and job is also removed from the intermediate queue
|
|
|
|
# intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
|
|
|
# intermediate_queue.remove(job.id)
|
|
|
|
# self.assertEqual(job.get_status(), 'failed')
|
|
|
|
# self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
|