diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index cdeea4e1..55be1c80 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -1,5 +1,4 @@ import unittest - from datetime import datetime, timedelta, timezone from unittest.mock import patch @@ -7,6 +6,7 @@ from redis import Redis from rq import Queue, Worker from rq.intermediate_queue import IntermediateQueue +from rq.maintenance import clean_intermediate_queue from rq.utils import get_version from tests import RQTestCase from tests.fixtures import say_hello @@ -84,26 +84,48 @@ class TestIntermediateQueue(RQTestCase): intermediate_queue.remove(job_1.id) self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id]) - # def test_cleanup_intermediate_queue(self): - # """Ensure jobs stuck in the intermediate queue are cleaned up.""" - # queue = Queue('foo', connection=self.connection) - # job = queue.enqueue(say_hello) + def test_cleanup_intermediate_queue(self): + """Ensure jobs stuck in the intermediate queue are cleaned up.""" + queue = Queue('foo', connection=self.connection) + job = queue.enqueue(say_hello) - # intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) - # self.connection.delete(intermediate_queue.key) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + self.connection.delete(intermediate_queue.key) - # # If job execution fails after it's dequeued, job should be in the intermediate queue - # # and it's status is still QUEUED - # with patch.object(Worker, 'execute_job'): - # worker = Worker(queue, connection=self.testconn) - # worker.work(burst=True) + # If job execution fails after it's dequeued, job should be in the intermediate queue + # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) - # self.assertEqual(job.get_status(), 'queued') - # self.assertFalse(job.id in queue.get_job_ids()) - # self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) - # # After cleaning up the intermediate queue, job status should be `failed` - # # and job is also removed from the intermediate queue - # intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) - # intermediate_queue.remove(job.id) - # self.assertEqual(job.get_status(), 'failed') - # self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) + # If worker.execute_job() does nothing, job status should be `queued` + # even though it's not in the queue, but it should be in the intermediate queue + self.assertEqual(job.get_status(), 'queued') + self.assertFalse(job.id in queue.get_job_ids()) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + clean_intermediate_queue(worker, queue) + # After clean_intermediate_queue is called, the job should be marked as seen, + # but since it's been less than 1 minute, it should not be cleaned up + self.assertIsNotNone(intermediate_queue.get_first_seen(job.id)) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + + clean_intermediate_queue(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) + self.assertEqual(job.get_status(), 'failed') + + job = queue.enqueue(say_hello) + worker.work(burst=True) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If job is gone, it should be immediately removed from the intermediate queue + job.delete() + clean_intermediate_queue(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) diff --git a/tests/test_maintenance.py b/tests/test_maintenance.py deleted file mode 100644 index 8cef010d..00000000 --- a/tests/test_maintenance.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest -from unittest.mock import patch - -from redis import Redis - -from rq.job import JobStatus -from rq.maintenance import clean_intermediate_queue -from rq.queue import Queue -from rq.utils import get_version -from rq.worker import Worker -from tests import RQTestCase -from tests.fixtures import say_hello - - -class MaintenanceTestCase(RQTestCase): - @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') - def test_cleanup_intermediate_queue(self): - """Ensure jobs stuck in the intermediate queue are cleaned up.""" - queue = Queue('foo', connection=self.testconn) - job = queue.enqueue(say_hello) - - # If job execution fails after it's dequeued, job should be in the intermediate queue - # # and it's status is still QUEUED - with patch.object(Worker, 'execute_job'): - # mocked.execute_job.side_effect = Exception() - worker = Worker(queue, connection=self.testconn) - worker.work(burst=True) - - self.assertEqual(job.get_status(), JobStatus.QUEUED) - self.assertFalse(job.id in queue.get_job_ids()) - self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) - # After cleaning up the intermediate queue, job status should be `FAILED` - # and job is also removed from the intermediate queue - clean_intermediate_queue(worker, queue) - self.assertEqual(job.get_status(), JobStatus.FAILED) - self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))