mirror of https://github.com/rq/rq.git
Done implementing clean_intermediate_queue()
This commit is contained in:
parent
aae428500c
commit
a7f19d966f
|
@ -1,5 +1,4 @@
|
|||
import unittest
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import patch
|
||||
|
||||
|
@ -7,6 +6,7 @@ from redis import Redis
|
|||
|
||||
from rq import Queue, Worker
|
||||
from rq.intermediate_queue import IntermediateQueue
|
||||
from rq.maintenance import clean_intermediate_queue
|
||||
from rq.utils import get_version
|
||||
from tests import RQTestCase
|
||||
from tests.fixtures import say_hello
|
||||
|
@ -84,26 +84,48 @@ class TestIntermediateQueue(RQTestCase):
|
|||
intermediate_queue.remove(job_1.id)
|
||||
self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id])
|
||||
|
||||
# def test_cleanup_intermediate_queue(self):
|
||||
# """Ensure jobs stuck in the intermediate queue are cleaned up."""
|
||||
# queue = Queue('foo', connection=self.connection)
|
||||
# job = queue.enqueue(say_hello)
|
||||
def test_cleanup_intermediate_queue(self):
|
||||
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
|
||||
queue = Queue('foo', connection=self.connection)
|
||||
job = queue.enqueue(say_hello)
|
||||
|
||||
# intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
||||
# self.connection.delete(intermediate_queue.key)
|
||||
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
||||
self.connection.delete(intermediate_queue.key)
|
||||
|
||||
# # If job execution fails after it's dequeued, job should be in the intermediate queue
|
||||
# # and it's status is still QUEUED
|
||||
# with patch.object(Worker, 'execute_job'):
|
||||
# worker = Worker(queue, connection=self.testconn)
|
||||
# worker.work(burst=True)
|
||||
# If job execution fails after it's dequeued, job should be in the intermediate queue
|
||||
# and it's status is still QUEUED
|
||||
with patch.object(Worker, 'execute_job'):
|
||||
worker = Worker(queue, connection=self.testconn)
|
||||
worker.work(burst=True)
|
||||
|
||||
# self.assertEqual(job.get_status(), 'queued')
|
||||
# self.assertFalse(job.id in queue.get_job_ids())
|
||||
# self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
|
||||
# # After cleaning up the intermediate queue, job status should be `failed`
|
||||
# # and job is also removed from the intermediate queue
|
||||
# intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
|
||||
# intermediate_queue.remove(job.id)
|
||||
# self.assertEqual(job.get_status(), 'failed')
|
||||
# self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
|
||||
# If worker.execute_job() does nothing, job status should be `queued`
|
||||
# even though it's not in the queue, but it should be in the intermediate queue
|
||||
self.assertEqual(job.get_status(), 'queued')
|
||||
self.assertFalse(job.id in queue.get_job_ids())
|
||||
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
|
||||
|
||||
self.assertIsNone(intermediate_queue.get_first_seen(job.id))
|
||||
clean_intermediate_queue(worker, queue)
|
||||
# After clean_intermediate_queue is called, the job should be marked as seen,
|
||||
# but since it's been less than 1 minute, it should not be cleaned up
|
||||
self.assertIsNotNone(intermediate_queue.get_first_seen(job.id))
|
||||
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
|
||||
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
|
||||
|
||||
# If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up
|
||||
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
|
||||
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
|
||||
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
|
||||
|
||||
clean_intermediate_queue(worker, queue)
|
||||
self.assertEqual(intermediate_queue.get_job_ids(), [])
|
||||
self.assertEqual(job.get_status(), 'failed')
|
||||
|
||||
job = queue.enqueue(say_hello)
|
||||
worker.work(burst=True)
|
||||
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
|
||||
|
||||
# If job is gone, it should be immediately removed from the intermediate queue
|
||||
job.delete()
|
||||
clean_intermediate_queue(worker, queue)
|
||||
self.assertEqual(intermediate_queue.get_job_ids(), [])
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from redis import Redis
|
||||
|
||||
from rq.job import JobStatus
|
||||
from rq.maintenance import clean_intermediate_queue
|
||||
from rq.queue import Queue
|
||||
from rq.utils import get_version
|
||||
from rq.worker import Worker
|
||||
from tests import RQTestCase
|
||||
from tests.fixtures import say_hello
|
||||
|
||||
|
||||
class MaintenanceTestCase(RQTestCase):
|
||||
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
|
||||
def test_cleanup_intermediate_queue(self):
|
||||
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
|
||||
queue = Queue('foo', connection=self.testconn)
|
||||
job = queue.enqueue(say_hello)
|
||||
|
||||
# If job execution fails after it's dequeued, job should be in the intermediate queue
|
||||
# # and it's status is still QUEUED
|
||||
with patch.object(Worker, 'execute_job'):
|
||||
# mocked.execute_job.side_effect = Exception()
|
||||
worker = Worker(queue, connection=self.testconn)
|
||||
worker.work(burst=True)
|
||||
|
||||
self.assertEqual(job.get_status(), JobStatus.QUEUED)
|
||||
self.assertFalse(job.id in queue.get_job_ids())
|
||||
self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
|
||||
# After cleaning up the intermediate queue, job status should be `FAILED`
|
||||
# and job is also removed from the intermediate queue
|
||||
clean_intermediate_queue(worker, queue)
|
||||
self.assertEqual(job.get_status(), JobStatus.FAILED)
|
||||
self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
|
Loading…
Reference in New Issue