diff --git a/CHANGES.md b/CHANGES.md index 5c7242f1..46d2fe29 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,9 @@ Breaking Changes: * `Queue.all()` requires `connection` argument. * `@job` decorator now requires `connection` argument. +### RQ 1.16.2 (2024-05-01) +* Fixed a bug that may cause jobs from intermediate queue to be moved to FailedJobRegistry. Thanks @selwin! + ### RQ 1.16.1 (2024-03-09) * Added `worker_pool.get_worker_process()` to make `WorkerPool` easier to extend. Thanks @selwin! diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py new file mode 100644 index 00000000..5bdb057d --- /dev/null +++ b/rq/intermediate_queue.py @@ -0,0 +1,119 @@ +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, List, Optional + +from redis import Redis + +from rq.utils import now + +if TYPE_CHECKING: + from .queue import Queue + from .worker import BaseWorker + + +class IntermediateQueue(object): + + def __init__(self, queue_key: str, connection: Redis): + self.queue_key = queue_key + self.key = self.get_intermediate_queue_key(queue_key) + self.connection = connection + + @classmethod + def get_intermediate_queue_key(cls, queue_key: str) -> str: + """Returns the intermediate queue key for a given queue key. + + Args: + key (str): The queue key + + Returns: + str: The intermediate queue key + """ + return f'{queue_key}:intermediate' + + def get_first_seen_key(self, job_id: str) -> str: + """Returns the first seen key for a given job ID. + + Args: + job_id (str): The job ID + + Returns: + str: The first seen key + """ + return f'{self.key}:first_seen:{job_id}' + + def set_first_seen(self, job_id: str) -> bool: + """Sets the first seen timestamp for a job. + + Args: + job_id (str): The job ID + timestamp (float): The timestamp + """ + # TODO: job_id should be changed to execution ID in 2.0 + return bool(self.connection.set(self.get_first_seen_key(job_id), now().timestamp(), nx=True, ex=3600 * 24)) + + def get_first_seen(self, job_id: str) -> Optional[datetime]: + """Returns the first seen timestamp for a job. + + Args: + job_id (str): The job ID + + Returns: + Optional[datetime]: The timestamp + """ + timestamp = self.connection.get(self.get_first_seen_key(job_id)) + if timestamp: + return datetime.fromtimestamp(float(timestamp), tz=timezone.utc) + return None + + def should_be_cleaned_up(self, job_id: str) -> bool: + """Returns whether a job should be cleaned up. + A job in intermediate queue should be cleaned up if it has been there for more than 1 minute. + + Args: + job_id (str): The job ID + + Returns: + bool: Whether the job should be cleaned up + """ + # TODO: should be changed to execution ID in 2.0 + first_seen = self.get_first_seen(job_id) + if not first_seen: + return False + return now() - first_seen > timedelta(minutes=1) + + def get_job_ids(self) -> List[str]: + """Returns the job IDs in the intermediate queue. + + Returns: + List[str]: The job IDs + """ + return [job_id.decode() for job_id in self.connection.lrange(self.key, 0, -1)] + + def remove(self, job_id: str) -> None: + """Removes a job from the intermediate queue. + + Args: + job_id (str): The job ID + """ + self.connection.lrem(self.key, 1, job_id) + + def cleanup(self, worker: 'BaseWorker', queue: 'Queue') -> None: + job_ids = self.get_job_ids() + + for job_id in job_ids: + job = queue.fetch_job(job_id) + + if job_id not in queue.started_job_registry: + + if not job: + # If the job doesn't exist in the queue, we can safely remove it from the intermediate queue. + self.remove(job_id) + continue + + # If this is the first time we've seen this job, do nothing. + # `set_first_seen` will return `True` if the key was set, `False` if it already existed. + if self.set_first_seen(job_id): + continue + + if self.should_be_cleaned_up(job_id): + worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') + self.remove(job_id) diff --git a/rq/maintenance.py b/rq/maintenance.py index 5c9d7cd2..52ed4749 100644 --- a/rq/maintenance.py +++ b/rq/maintenance.py @@ -1,7 +1,9 @@ +import warnings + from typing import TYPE_CHECKING +from .intermediate_queue import IntermediateQueue from .queue import Queue -from .utils import as_text if TYPE_CHECKING: from .worker import BaseWorker @@ -17,10 +19,9 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None: We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry. """ - job_ids = [as_text(job_id) for job_id in queue.connection.lrange(queue.intermediate_queue_key, 0, -1)] - for job_id in job_ids: - if job_id not in queue.started_job_registry: - job = queue.fetch_job(job_id) - if job: - worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') - queue.connection.lrem(queue.intermediate_queue_key, 1, job_id) + warnings.warn( + "clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.", + DeprecationWarning, + ) + intermediate_queue = IntermediateQueue(queue.key, connection=queue.connection) + intermediate_queue.cleanup(worker, queue) diff --git a/rq/queue.py b/rq/queue.py index 4586b72b..277dcef1 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from .defaults import DEFAULT_RESULT_TTL from .dependency import Dependency from .exceptions import DequeueTimeout, NoSuchJobError +from .intermediate_queue import IntermediateQueue from .job import Callback, Job, JobStatus from .logutils import blue, green from .serializers import resolve_serializer @@ -139,18 +140,6 @@ class Queue: death_penalty_class=death_penalty_class, ) - @classmethod - def get_intermediate_queue_key(cls, key: str) -> str: - """Returns the intermediate queue key for a given queue key. - - Args: - key (str): The queue key - - Returns: - str: The intermediate queue key - """ - return f'{key}:intermediate' - def __init__( self, name: str = 'default', @@ -227,7 +216,12 @@ class Queue: @property def intermediate_queue_key(self): """Returns the Redis key for intermediate queue.""" - return self.get_intermediate_queue_key(self._key) + return IntermediateQueue.get_intermediate_queue_key(self._key) + + @property + def intermediate_queue(self) -> IntermediateQueue: + """Returns the IntermediateQueue instance for this Queue.""" + return IntermediateQueue(self.key, connection=self.connection) @property def registry_cleaning_key(self): @@ -1316,18 +1310,19 @@ class Queue: """Similar to lpop, but accepts only a single queue key and immediately pushes the result to an intermediate queue. """ + intermediate_queue = IntermediateQueue(queue_key, connection) if timeout is not None: # blocking variant if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') colored_queue = green(queue_key) logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}") - result = connection.blmove(queue_key, cls.get_intermediate_queue_key(queue_key), timeout) + result = connection.blmove(queue_key, intermediate_queue.key, timeout) if result is None: logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}") raise DequeueTimeout(timeout, queue_key) return queue_key, result else: # non-blocking variant - result = connection.lmove(queue_key, cls.get_intermediate_queue_key(queue_key)) + result = connection.lmove(queue_key, intermediate_queue.key) if result is not None: return queue_key, result return None diff --git a/rq/version.py b/rq/version.py index 01f10492..44a3256c 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '1.16.1' +VERSION = '1.16.2' diff --git a/rq/worker.py b/rq/worker.py index 3624b953..f792d1b9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -49,7 +49,6 @@ from .executions import Execution from .group import Group from .job import Job, JobStatus from .logutils import blue, green, setup_loghandlers, yellow -from .maintenance import clean_intermediate_queue from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler @@ -450,7 +449,7 @@ class BaseWorker: self.log.info('Cleaning registries for queue: %s', queue.name) clean_registries(queue, self._exc_handlers) worker_registration.clean_worker_registry(queue) - clean_intermediate_queue(self, queue) + queue.intermediate_queue.cleanup(self, queue) queue.release_maintenance_lock() self.last_cleaned_at = now() diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py new file mode 100644 index 00000000..02843bce --- /dev/null +++ b/tests/test_intermediate_queue.py @@ -0,0 +1,177 @@ +import unittest +from datetime import datetime, timedelta, timezone +from unittest.mock import patch + +from redis import Redis + +from rq import Queue, Worker +from rq.intermediate_queue import IntermediateQueue +from rq.maintenance import clean_intermediate_queue +from rq.utils import get_version +from tests import RQTestCase +from tests.fixtures import say_hello + + +@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') +class TestIntermediateQueue(RQTestCase): + + def test_set_first_seen(self): + """Ensure that the first_seen attribute is set correctly.""" + queue = Queue('foo', connection=self.connection) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + job = queue.enqueue(say_hello) + + # set_first_seen() should only succeed the first time around + self.assertTrue(intermediate_queue.set_first_seen(job.id)) + self.assertFalse(intermediate_queue.set_first_seen(job.id)) + # It should succeed again after deleting the key + self.connection.delete(intermediate_queue.get_first_seen_key(job.id)) + self.assertTrue(intermediate_queue.set_first_seen(job.id)) + + def test_get_first_seen(self): + """Ensure that the first_seen attribute is set correctly.""" + queue = Queue('foo', connection=self.connection) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + job = queue.enqueue(say_hello) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + + # Check first seen was set correctly + intermediate_queue.set_first_seen(job.id) + timestamp = intermediate_queue.get_first_seen(job.id) + self.assertTrue(datetime.now(tz=timezone.utc) - timestamp < timedelta(seconds=5)) # type: ignore + + def test_should_be_cleaned_up(self): + """Job in the intermediate queue should be cleaned up if it was seen more than 1 minute ago.""" + queue = Queue('foo', connection=self.connection) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + job = queue.enqueue(say_hello) + + # Returns False if there's no first seen timestamp + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + # Returns False since first seen timestamp is less than 1 minute ago + intermediate_queue.set_first_seen(job.id) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id)) + + def test_get_job_ids(self): + """Dequeueing job from a single queue moves job to intermediate queue.""" + queue = Queue('foo', connection=self.connection) + job_1 = queue.enqueue(say_hello) + + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + + # Ensure that the intermediate queue is empty + self.connection.delete(intermediate_queue.key) + + # Job ID is not in intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), []) + job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id]) + + # Test the blocking version + job_2 = queue.enqueue(say_hello) + job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id]) + + # After job_1.id is removed, only job_2.id is in the intermediate queue + intermediate_queue.remove(job_1.id) + self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id]) + + def test_cleanup_intermediate_queue_in_maintenance(self): + """Ensure jobs stuck in the intermediate queue are cleaned up.""" + queue = Queue('foo', connection=self.connection) + job = queue.enqueue(say_hello) + + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + self.connection.delete(intermediate_queue.key) + + # If job execution fails after it's dequeued, job should be in the intermediate queue + # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + + # If worker.execute_job() does nothing, job status should be `queued` + # even though it's not in the queue, but it should be in the intermediate queue + self.assertEqual(job.get_status(), 'queued') + self.assertFalse(job.id in queue.get_job_ids()) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + clean_intermediate_queue(worker, queue) + # After clean_intermediate_queue is called, the job should be marked as seen, + # but since it's been less than 1 minute, it should not be cleaned up + self.assertIsNotNone(intermediate_queue.get_first_seen(job.id)) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + + clean_intermediate_queue(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) + self.assertEqual(job.get_status(), 'failed') + + job = queue.enqueue(say_hello) + worker.work(burst=True) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If job is gone, it should be immediately removed from the intermediate queue + job.delete() + clean_intermediate_queue(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) + + def test_cleanup_intermediate_queue(self): + """Ensure jobs stuck in the intermediate queue are cleaned up.""" + queue = Queue('foo', connection=self.connection) + job = queue.enqueue(say_hello) + + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + self.connection.delete(intermediate_queue.key) + + # If job execution fails after it's dequeued, job should be in the intermediate queue + # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + + # If worker.execute_job() does nothing, job status should be `queued` + # even though it's not in the queue, but it should be in the intermediate queue + self.assertEqual(job.get_status(), 'queued') + self.assertFalse(job.id in queue.get_job_ids()) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + intermediate_queue.cleanup(worker, queue) + # After clean_intermediate_queue is called, the job should be marked as seen, + # but since it's been less than 1 minute, it should not be cleaned up + self.assertIsNotNone(intermediate_queue.get_first_seen(job.id)) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + + intermediate_queue.cleanup(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) + self.assertEqual(job.get_status(), 'failed') + + job = queue.enqueue(say_hello) + worker.work(burst=True) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If job is gone, it should be immediately removed from the intermediate queue + job.delete() + intermediate_queue.cleanup(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) diff --git a/tests/test_maintenance.py b/tests/test_maintenance.py deleted file mode 100644 index 3f955159..00000000 --- a/tests/test_maintenance.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest -from unittest.mock import patch - -from redis import Redis - -from rq.job import JobStatus -from rq.maintenance import clean_intermediate_queue -from rq.queue import Queue -from rq.utils import get_version -from rq.worker import Worker -from tests import RQTestCase -from tests.fixtures import say_hello - - -class MaintenanceTestCase(RQTestCase): - @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') - def test_cleanup_intermediate_queue(self): - """Ensure jobs stuck in the intermediate queue are cleaned up.""" - queue = Queue('foo', connection=self.connection) - job = queue.enqueue(say_hello) - - # If job execution fails after it's dequeued, job should be in the intermediate queue - # # and it's status is still QUEUED - with patch.object(Worker, 'execute_job'): - # mocked.execute_job.side_effect = Exception() - worker = Worker(queue, connection=self.connection) - worker.work(burst=True) - - self.assertEqual(job.get_status(), JobStatus.QUEUED) - self.assertFalse(job.id in queue.get_job_ids()) - self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) - # After cleaning up the intermediate queue, job status should be `FAILED` - # and job is also removed from the intermediate queue - clean_intermediate_queue(worker, queue) - self.assertEqual(job.get_status(), JobStatus.FAILED) - self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id))