Merge branch 'v1'

This commit is contained in:
Selwin Ong 2024-05-01 14:20:29 +07:00
commit b52457afb6
8 changed files with 320 additions and 62 deletions

View File

@ -11,6 +11,9 @@ Breaking Changes:
* `Queue.all()` requires `connection` argument.
* `@job` decorator now requires `connection` argument.
### RQ 1.16.2 (2024-05-01)
* Fixed a bug that may cause jobs from intermediate queue to be moved to FailedJobRegistry. Thanks @selwin!
### RQ 1.16.1 (2024-03-09)
* Added `worker_pool.get_worker_process()` to make `WorkerPool` easier to extend. Thanks @selwin!

119
rq/intermediate_queue.py Normal file
View File

@ -0,0 +1,119 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, List, Optional
from redis import Redis
from rq.utils import now
if TYPE_CHECKING:
from .queue import Queue
from .worker import BaseWorker
class IntermediateQueue(object):
def __init__(self, queue_key: str, connection: Redis):
self.queue_key = queue_key
self.key = self.get_intermediate_queue_key(queue_key)
self.connection = connection
@classmethod
def get_intermediate_queue_key(cls, queue_key: str) -> str:
"""Returns the intermediate queue key for a given queue key.
Args:
key (str): The queue key
Returns:
str: The intermediate queue key
"""
return f'{queue_key}:intermediate'
def get_first_seen_key(self, job_id: str) -> str:
"""Returns the first seen key for a given job ID.
Args:
job_id (str): The job ID
Returns:
str: The first seen key
"""
return f'{self.key}:first_seen:{job_id}'
def set_first_seen(self, job_id: str) -> bool:
"""Sets the first seen timestamp for a job.
Args:
job_id (str): The job ID
timestamp (float): The timestamp
"""
# TODO: job_id should be changed to execution ID in 2.0
return bool(self.connection.set(self.get_first_seen_key(job_id), now().timestamp(), nx=True, ex=3600 * 24))
def get_first_seen(self, job_id: str) -> Optional[datetime]:
"""Returns the first seen timestamp for a job.
Args:
job_id (str): The job ID
Returns:
Optional[datetime]: The timestamp
"""
timestamp = self.connection.get(self.get_first_seen_key(job_id))
if timestamp:
return datetime.fromtimestamp(float(timestamp), tz=timezone.utc)
return None
def should_be_cleaned_up(self, job_id: str) -> bool:
"""Returns whether a job should be cleaned up.
A job in intermediate queue should be cleaned up if it has been there for more than 1 minute.
Args:
job_id (str): The job ID
Returns:
bool: Whether the job should be cleaned up
"""
# TODO: should be changed to execution ID in 2.0
first_seen = self.get_first_seen(job_id)
if not first_seen:
return False
return now() - first_seen > timedelta(minutes=1)
def get_job_ids(self) -> List[str]:
"""Returns the job IDs in the intermediate queue.
Returns:
List[str]: The job IDs
"""
return [job_id.decode() for job_id in self.connection.lrange(self.key, 0, -1)]
def remove(self, job_id: str) -> None:
"""Removes a job from the intermediate queue.
Args:
job_id (str): The job ID
"""
self.connection.lrem(self.key, 1, job_id)
def cleanup(self, worker: 'BaseWorker', queue: 'Queue') -> None:
job_ids = self.get_job_ids()
for job_id in job_ids:
job = queue.fetch_job(job_id)
if job_id not in queue.started_job_registry:
if not job:
# If the job doesn't exist in the queue, we can safely remove it from the intermediate queue.
self.remove(job_id)
continue
# If this is the first time we've seen this job, do nothing.
# `set_first_seen` will return `True` if the key was set, `False` if it already existed.
if self.set_first_seen(job_id):
continue
if self.should_be_cleaned_up(job_id):
worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.')
self.remove(job_id)

View File

@ -1,7 +1,9 @@
import warnings
from typing import TYPE_CHECKING
from .intermediate_queue import IntermediateQueue
from .queue import Queue
from .utils import as_text
if TYPE_CHECKING:
from .worker import BaseWorker
@ -17,10 +19,9 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None:
We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry.
"""
job_ids = [as_text(job_id) for job_id in queue.connection.lrange(queue.intermediate_queue_key, 0, -1)]
for job_id in job_ids:
if job_id not in queue.started_job_registry:
job = queue.fetch_job(job_id)
if job:
worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.')
queue.connection.lrem(queue.intermediate_queue_key, 1, job_id)
warnings.warn(
"clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.",
DeprecationWarning,
)
intermediate_queue = IntermediateQueue(queue.key, connection=queue.connection)
intermediate_queue.cleanup(worker, queue)

View File

@ -21,6 +21,7 @@ if TYPE_CHECKING:
from .defaults import DEFAULT_RESULT_TTL
from .dependency import Dependency
from .exceptions import DequeueTimeout, NoSuchJobError
from .intermediate_queue import IntermediateQueue
from .job import Callback, Job, JobStatus
from .logutils import blue, green
from .serializers import resolve_serializer
@ -139,18 +140,6 @@ class Queue:
death_penalty_class=death_penalty_class,
)
@classmethod
def get_intermediate_queue_key(cls, key: str) -> str:
"""Returns the intermediate queue key for a given queue key.
Args:
key (str): The queue key
Returns:
str: The intermediate queue key
"""
return f'{key}:intermediate'
def __init__(
self,
name: str = 'default',
@ -227,7 +216,12 @@ class Queue:
@property
def intermediate_queue_key(self):
"""Returns the Redis key for intermediate queue."""
return self.get_intermediate_queue_key(self._key)
return IntermediateQueue.get_intermediate_queue_key(self._key)
@property
def intermediate_queue(self) -> IntermediateQueue:
"""Returns the IntermediateQueue instance for this Queue."""
return IntermediateQueue(self.key, connection=self.connection)
@property
def registry_cleaning_key(self):
@ -1316,18 +1310,19 @@ class Queue:
"""Similar to lpop, but accepts only a single queue key and immediately pushes
the result to an intermediate queue.
"""
intermediate_queue = IntermediateQueue(queue_key, connection)
if timeout is not None: # blocking variant
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
colored_queue = green(queue_key)
logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}")
result = connection.blmove(queue_key, cls.get_intermediate_queue_key(queue_key), timeout)
result = connection.blmove(queue_key, intermediate_queue.key, timeout)
if result is None:
logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}")
raise DequeueTimeout(timeout, queue_key)
return queue_key, result
else: # non-blocking variant
result = connection.lmove(queue_key, cls.get_intermediate_queue_key(queue_key))
result = connection.lmove(queue_key, intermediate_queue.key)
if result is not None:
return queue_key, result
return None

View File

@ -1 +1 @@
VERSION = '1.16.1'
VERSION = '1.16.2'

View File

@ -49,7 +49,6 @@ from .executions import Execution
from .group import Group
from .job import Job, JobStatus
from .logutils import blue, green, setup_loghandlers, yellow
from .maintenance import clean_intermediate_queue
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
@ -450,7 +449,7 @@ class BaseWorker:
self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue, self._exc_handlers)
worker_registration.clean_worker_registry(queue)
clean_intermediate_queue(self, queue)
queue.intermediate_queue.cleanup(self, queue)
queue.release_maintenance_lock()
self.last_cleaned_at = now()

View File

@ -0,0 +1,177 @@
import unittest
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from redis import Redis
from rq import Queue, Worker
from rq.intermediate_queue import IntermediateQueue
from rq.maintenance import clean_intermediate_queue
from rq.utils import get_version
from tests import RQTestCase
from tests.fixtures import say_hello
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
class TestIntermediateQueue(RQTestCase):
def test_set_first_seen(self):
"""Ensure that the first_seen attribute is set correctly."""
queue = Queue('foo', connection=self.connection)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
job = queue.enqueue(say_hello)
# set_first_seen() should only succeed the first time around
self.assertTrue(intermediate_queue.set_first_seen(job.id))
self.assertFalse(intermediate_queue.set_first_seen(job.id))
# It should succeed again after deleting the key
self.connection.delete(intermediate_queue.get_first_seen_key(job.id))
self.assertTrue(intermediate_queue.set_first_seen(job.id))
def test_get_first_seen(self):
"""Ensure that the first_seen attribute is set correctly."""
queue = Queue('foo', connection=self.connection)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
job = queue.enqueue(say_hello)
self.assertIsNone(intermediate_queue.get_first_seen(job.id))
# Check first seen was set correctly
intermediate_queue.set_first_seen(job.id)
timestamp = intermediate_queue.get_first_seen(job.id)
self.assertTrue(datetime.now(tz=timezone.utc) - timestamp < timedelta(seconds=5)) # type: ignore
def test_should_be_cleaned_up(self):
"""Job in the intermediate queue should be cleaned up if it was seen more than 1 minute ago."""
queue = Queue('foo', connection=self.connection)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
job = queue.enqueue(say_hello)
# Returns False if there's no first seen timestamp
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
# Returns False since first seen timestamp is less than 1 minute ago
intermediate_queue.set_first_seen(job.id)
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id))
def test_get_job_ids(self):
"""Dequeueing job from a single queue moves job to intermediate queue."""
queue = Queue('foo', connection=self.connection)
job_1 = queue.enqueue(say_hello)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
# Ensure that the intermediate queue is empty
self.connection.delete(intermediate_queue.key)
# Job ID is not in intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [])
job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id])
# Test the blocking version
job_2 = queue.enqueue(say_hello)
job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id])
# After job_1.id is removed, only job_2.id is in the intermediate queue
intermediate_queue.remove(job_1.id)
self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id])
def test_cleanup_intermediate_queue_in_maintenance(self):
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
queue = Queue('foo', connection=self.connection)
job = queue.enqueue(say_hello)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
self.connection.delete(intermediate_queue.key)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
# If worker.execute_job() does nothing, job status should be `queued`
# even though it's not in the queue, but it should be in the intermediate queue
self.assertEqual(job.get_status(), 'queued')
self.assertFalse(job.id in queue.get_job_ids())
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
self.assertIsNone(intermediate_queue.get_first_seen(job.id))
clean_intermediate_queue(worker, queue)
# After clean_intermediate_queue is called, the job should be marked as seen,
# but since it's been less than 1 minute, it should not be cleaned up
self.assertIsNotNone(intermediate_queue.get_first_seen(job.id))
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
# If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
clean_intermediate_queue(worker, queue)
self.assertEqual(intermediate_queue.get_job_ids(), [])
self.assertEqual(job.get_status(), 'failed')
job = queue.enqueue(say_hello)
worker.work(burst=True)
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
# If job is gone, it should be immediately removed from the intermediate queue
job.delete()
clean_intermediate_queue(worker, queue)
self.assertEqual(intermediate_queue.get_job_ids(), [])
def test_cleanup_intermediate_queue(self):
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
queue = Queue('foo', connection=self.connection)
job = queue.enqueue(say_hello)
intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
self.connection.delete(intermediate_queue.key)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
# If worker.execute_job() does nothing, job status should be `queued`
# even though it's not in the queue, but it should be in the intermediate queue
self.assertEqual(job.get_status(), 'queued')
self.assertFalse(job.id in queue.get_job_ids())
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
self.assertIsNone(intermediate_queue.get_first_seen(job.id))
intermediate_queue.cleanup(worker, queue)
# After clean_intermediate_queue is called, the job should be marked as seen,
# but since it's been less than 1 minute, it should not be cleaned up
self.assertIsNotNone(intermediate_queue.get_first_seen(job.id))
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
# If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)
intermediate_queue.cleanup(worker, queue)
self.assertEqual(intermediate_queue.get_job_ids(), [])
self.assertEqual(job.get_status(), 'failed')
job = queue.enqueue(say_hello)
worker.work(burst=True)
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])
# If job is gone, it should be immediately removed from the intermediate queue
job.delete()
intermediate_queue.cleanup(worker, queue)
self.assertEqual(intermediate_queue.get_job_ids(), [])

View File

@ -1,36 +0,0 @@
import unittest
from unittest.mock import patch
from redis import Redis
from rq.job import JobStatus
from rq.maintenance import clean_intermediate_queue
from rq.queue import Queue
from rq.utils import get_version
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import say_hello
class MaintenanceTestCase(RQTestCase):
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_cleanup_intermediate_queue(self):
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
queue = Queue('foo', connection=self.connection)
job = queue.enqueue(say_hello)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# # and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
# mocked.execute_job.side_effect = Exception()
worker = Worker(queue, connection=self.connection)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertFalse(job.id in queue.get_job_ids())
self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
# After cleaning up the intermediate queue, job status should be `FAILED`
# and job is also removed from the intermediate queue
clean_intermediate_queue(worker, queue)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id))