From 2de949126fb0213c3654051d49fafc84e1e54193 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 3 Mar 2024 20:06:02 +0700 Subject: [PATCH 01/11] Added worker_pool.get_worker_process() --- rq/worker_pool.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/rq/worker_pool.py b/rq/worker_pool.py index eb93065c..92a7f2a1 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -141,19 +141,15 @@ class WorkerPool: for i in range(delta): self.start_worker(burst=self._burst, _sleep=self._sleep) - def start_worker( + def get_worker_process( self, - count: Optional[int] = None, - burst: bool = True, + name: str, + burst: bool, _sleep: float = 0, logging_level: str = "INFO", - ): - """ - Starts a worker and adds the data to worker_datas. - * sleep: waits for X seconds before creating worker, for testing purposes - """ - name = uuid4().hex - process = Process( + ) -> Process: + """Returns the worker process""" + return Process( target=run_worker, args=(name, self._queue_names, self._connection_class, self._pool_class, self._pool_kwargs), kwargs={ @@ -166,6 +162,20 @@ class WorkerPool: }, name=f'Worker {name} (WorkerPool {self.name})', ) + + def start_worker( + self, + count: Optional[int] = None, + burst: bool = True, + _sleep: float = 0, + logging_level: str = "INFO", + ): + """ + Starts a worker and adds the data to worker_datas. + * sleep: waits for X seconds before creating worker, for testing purposes + """ + name = uuid4().hex + process = self.get_worker_process(name, burst=burst, _sleep=_sleep, logging_level=logging_level) process.start() worker_data = WorkerData(name=name, pid=process.pid, process=process) # type: ignore self.worker_dict[name] = worker_data From 1f8e031a46a4947b65c9b862a5bdf809a1b63d10 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 9 Mar 2024 09:31:58 +0700 Subject: [PATCH 02/11] Bump version to 1.16.1 --- CHANGES.md | 3 +++ pyproject.toml | 1 + rq/version.py | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index c1b5ec30..24ace27a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,6 @@ +### RQ 1.16.1 (2024-03-09) +* Added `worker_pool.get_worker_process()` to make `WorkerPool` easier to extend. Thanks @selwin! + ### RQ 1.16 (2024-02-24) * Added a way for jobs to wait for latest result `job.latest_result(timeout=60)`. Thanks @ajnisbet! * Fixed an issue where `stopped_callback` is not respected when job is enqueued via `enqueue_many()`. Thanks @eswolinsky3241! diff --git a/pyproject.toml b/pyproject.toml index 30509d67..bbf21e16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ maintainers = [ {name = "Selwin Ong"}, ] authors = [ + { name = "Selwin Ong", email = "selwin.ong@gmail.com" }, { name = "Vincent Driessen", email = "vincent@3rdcloud.com" }, ] requires-python = ">=3.7" diff --git a/rq/version.py b/rq/version.py index a51469e2..01f10492 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '1.16.0' +VERSION = '1.16.1' From 673b70dad09673e3129ef565941a0403c057d427 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 13 Apr 2024 14:38:53 +0700 Subject: [PATCH 03/11] Fix test_clean_large_registry --- tests/test_worker_registration.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/tests/test_worker_registration.py b/tests/test_worker_registration.py index 26ee6174..6f0df801 100644 --- a/tests/test_worker_registration.py +++ b/tests/test_worker_registration.py @@ -85,25 +85,19 @@ class TestWorkerRegistry(RQTestCase): clean_registry() splits invalid_keys into multiple lists for set removal to avoid sending more than redis can receive """ - MAX_WORKERS = 41 - MAX_KEYS = 37 - # srem is called twice per invalid key batch: once for WORKERS_BY_QUEUE_KEY; once for REDIS_WORKER_KEYS + worker_count = 11 + MAX_KEYS = 6 SREM_CALL_COUNT = 2 - queue = Queue(name='foo') - for i in range(MAX_WORKERS): - worker = Worker([queue]) + queue = Queue(name='foo', connection=self.connection) + for i in range(worker_count): + worker = Worker([queue], connection=self.connection) register(worker) - with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), patch.object( - queue.connection, 'pipeline', wraps=queue.connection.pipeline - ) as pipeline_mock: - # clean_worker_registry creates a pipeline with a context manager. Configure the mock using the context - # manager entry method __enter__ - pipeline_mock.return_value.__enter__.return_value.srem.return_value = None - pipeline_mock.return_value.__enter__.return_value.execute.return_value = [0] * MAX_WORKERS - + # Since we registered 11 workers and set the maximum keys to be deleted in each command to 6, + # `srem` command should be called a total of 4 times. + # `srem` is called twice per invalid key group; once for WORKERS_BY_QUEUE_KEY and once for REDIS_WORKER_KEYS + with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), patch('redis.client.Pipeline.srem') as mock: clean_worker_registry(queue) - - expected_call_count = (ceildiv(MAX_WORKERS, MAX_KEYS)) * SREM_CALL_COUNT - self.assertEqual(pipeline_mock.return_value.__enter__.return_value.srem.call_count, expected_call_count) + expected_call_count = (ceildiv(worker_count, MAX_KEYS)) * SREM_CALL_COUNT + self.assertEqual(mock.call_count, expected_call_count) From 1068956bad4c5ae21309349726ce24ca9122a8df Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 27 Apr 2024 15:24:50 +0700 Subject: [PATCH 04/11] Use sentry-sdk<2 for tests --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index bbf21e16..967d99b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ dependencies = [ "pytest", "pytest-cov", "ruff", - "sentry-sdk", + "sentry-sdk<2", "tox", ] [tool.hatch.envs.test.scripts] From 50467b4d582f650526efd3622636c702cddee06a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 27 Apr 2024 19:54:20 +0700 Subject: [PATCH 05/11] Made IntermediateQueue class --- rq/intermediate_queue.py | 77 ++++++++++++++++++++++++++++++++ rq/queue.py | 20 +++------ tests/test_intermediate_queue.py | 51 +++++++++++++++++++++ 3 files changed, 133 insertions(+), 15 deletions(-) create mode 100644 rq/intermediate_queue.py create mode 100644 tests/test_intermediate_queue.py diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py new file mode 100644 index 00000000..e317bc73 --- /dev/null +++ b/rq/intermediate_queue.py @@ -0,0 +1,77 @@ +from datetime import datetime, timedelta, timezone +from typing import Optional + +from redis import Redis + +from rq.utils import now + + +class IntermediateQueue(object): + + def __init__(self, queue_key: str, connection: Redis): + self.queue_key = queue_key + self.key = self.get_intermediate_queue_key(queue_key) + self.connection = connection + + @classmethod + def get_intermediate_queue_key(cls, queue_key: str) -> str: + """Returns the intermediate queue key for a given queue key. + + Args: + key (str): The queue key + + Returns: + str: The intermediate queue key + """ + return f'{queue_key}:intermediate' + + def get_first_seen_key(self, job_id: str) -> str: + """Returns the first seen key for a given job ID. + + Args: + job_id (str): The job ID + + Returns: + str: The first seen key + """ + return f'{self.key}:first_seen:{job_id}' + + def set_first_seen(self, job_id: str) -> bool: + """Sets the first seen timestamp for a job. + + Args: + job_id (str): The job ID + timestamp (float): The timestamp + """ + # TODO: job_id should be changed to execution ID in 2.0 + return bool(self.connection.set(self.get_first_seen_key(job_id), now().timestamp(), nx=True, ex=3600 * 24)) + + def get_first_seen(self, job_id: str) -> Optional[datetime]: + """Returns the first seen timestamp for a job. + + Args: + job_id (str): The job ID + + Returns: + Optional[datetime]: The timestamp + """ + timestamp = self.connection.get(self.get_first_seen_key(job_id)) + if timestamp: + return datetime.fromtimestamp(float(timestamp), tz=timezone.utc) + return None + + def should_be_cleaned_up(self, job_id: str) -> bool: + """Returns whether a job should be cleaned up. + A job in intermediate queue should be cleaned up if it has been there for more than 1 minute. + + Args: + job_id (str): The job ID + + Returns: + bool: Whether the job should be cleaned up + """ + # TODO: should be changed to execution ID in 2.0 + first_seen = self.get_first_seen(job_id) + if not first_seen: + return False + return now() - first_seen > timedelta(minutes=1) diff --git a/rq/queue.py b/rq/queue.py index 2dbecbda..1212cf71 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -22,6 +22,7 @@ from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .dependency import Dependency from .exceptions import DequeueTimeout, NoSuchJobError +from .intermediate_queue import IntermediateQueue from .job import Callback, Job, JobStatus from .logutils import blue, green from .serializers import resolve_serializer @@ -141,18 +142,6 @@ class Queue: death_penalty_class=death_penalty_class, ) - @classmethod - def get_intermediate_queue_key(cls, key: str) -> str: - """Returns the intermediate queue key for a given queue key. - - Args: - key (str): The queue key - - Returns: - str: The intermediate queue key - """ - return f'{key}:intermediate' - def __init__( self, name: str = 'default', @@ -227,7 +216,7 @@ class Queue: @property def intermediate_queue_key(self): """Returns the Redis key for intermediate queue.""" - return self.get_intermediate_queue_key(self._key) + return IntermediateQueue.get_intermediate_queue_key(self._key) @property def registry_cleaning_key(self): @@ -1310,18 +1299,19 @@ class Queue: """Similar to lpop, but accepts only a single queue key and immediately pushes the result to an intermediate queue. """ + intermediate_queue = IntermediateQueue(queue_key, connection) if timeout is not None: # blocking variant if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') colored_queue = green(queue_key) logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}") - result = connection.blmove(queue_key, cls.get_intermediate_queue_key(queue_key), timeout) + result = connection.blmove(queue_key, intermediate_queue.key, timeout) if result is None: logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}") raise DequeueTimeout(timeout, queue_key) return queue_key, result else: # non-blocking variant - result = connection.lmove(queue_key, cls.get_intermediate_queue_key(queue_key)) + result = connection.lmove(queue_key, intermediate_queue.key) if result is not None: return queue_key, result return None diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py new file mode 100644 index 00000000..52d66f50 --- /dev/null +++ b/tests/test_intermediate_queue.py @@ -0,0 +1,51 @@ +from datetime import datetime, timedelta, timezone + +from rq import Queue +from rq.intermediate_queue import IntermediateQueue +from tests import RQTestCase +from tests.fixtures import say_hello + + +class TestWorker(RQTestCase): + def test_set_first_seen(self): + """Ensure that the first_seen attribute is set correctly.""" + queue = Queue('foo', connection=self.connection) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + job = queue.enqueue(say_hello) + + # set_first_seen() should only succeed the first time around + self.assertTrue(intermediate_queue.set_first_seen(job.id)) + self.assertFalse(intermediate_queue.set_first_seen(job.id)) + # It should succeed again after deleting the key + self.connection.delete(intermediate_queue.get_first_seen_key(job.id)) + self.assertTrue(intermediate_queue.set_first_seen(job.id)) + + def test_get_first_seen(self): + """Ensure that the first_seen attribute is set correctly.""" + queue = Queue('foo', connection=self.connection) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + job = queue.enqueue(say_hello) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + + # Check first seen was set correctly + intermediate_queue.set_first_seen(job.id) + timestamp = intermediate_queue.get_first_seen(job.id) + self.assertTrue(datetime.now(tz=timezone.utc) - timestamp < timedelta(seconds=5)) # type: ignore + + def test_should_be_cleaned_up(self): + """Job in the intermediate queue should be cleaned up if it was seen more than 1 minute ago.""" + queue = Queue('foo', connection=self.connection) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + job = queue.enqueue(say_hello) + + # Returns False if there's no first seen timestamp + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + # Returns False since first seen timestamp is less than 1 minute ago + intermediate_queue.set_first_seen(job.id) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id)) From 7d4c7eee30e87b3a228db44698a1a9ff03af8192 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2024 19:24:24 +0700 Subject: [PATCH 06/11] Added intermediate_queue.get_job_ids() --- rq/intermediate_queue.py | 10 +++++++++- tests/test_intermediate_queue.py | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py index e317bc73..0d3d42fb 100644 --- a/rq/intermediate_queue.py +++ b/rq/intermediate_queue.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta, timezone -from typing import Optional +from typing import List, Optional from redis import Redis @@ -75,3 +75,11 @@ class IntermediateQueue(object): if not first_seen: return False return now() - first_seen > timedelta(minutes=1) + + def get_job_ids(self) -> List[str]: + """Returns the job IDs in the intermediate queue. + + Returns: + List[str]: The job IDs + """ + return [job_id.decode() for job_id in self.connection.lrange(self.key, 0, -1)] diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index 52d66f50..04effc8b 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -49,3 +49,22 @@ class TestWorker(RQTestCase): two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) self.assertTrue(intermediate_queue.should_be_cleaned_up(job.id)) + + def test_get_job_ids(self): + """Dequeueing job from a single queue moves job to intermediate queue.""" + queue = Queue('foo', connection=self.connection) + job_1 = queue.enqueue(say_hello) + + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + + # Job ID is not in intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), []) + job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id]) + + # Test the blocking version + job_2 = queue.enqueue(say_hello) + job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id]) From 3ca348049fed751e6c40cef4904dbb3673508e10 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2024 19:37:53 +0700 Subject: [PATCH 07/11] Ensure intermediate_queue is empty before running tests --- tests/test_intermediate_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index 04effc8b..feaf1bf6 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -57,6 +57,9 @@ class TestWorker(RQTestCase): intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + # Ensure that the intermediate queue is empty + self.connection.delete(intermediate_queue.key) + # Job ID is not in intermediate queue self.assertEqual(intermediate_queue.get_job_ids(), []) job, queue = Queue.dequeue_any([queue], timeout=None, connection=self.testconn) From aae428500c9278f373dbecd86abc44d33f8b38c9 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2024 19:51:21 +0700 Subject: [PATCH 08/11] Skip intermediate queue tests if Redis < 6.2.0 --- rq/intermediate_queue.py | 8 ++++++ rq/maintenance.py | 24 ++++++++++++++---- tests/test_intermediate_queue.py | 42 +++++++++++++++++++++++++++++--- 3 files changed, 66 insertions(+), 8 deletions(-) diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py index 0d3d42fb..ae93e2f4 100644 --- a/rq/intermediate_queue.py +++ b/rq/intermediate_queue.py @@ -83,3 +83,11 @@ class IntermediateQueue(object): List[str]: The job IDs """ return [job_id.decode() for job_id in self.connection.lrange(self.key, 0, -1)] + + def remove(self, job_id: str) -> None: + """Removes a job from the intermediate queue. + + Args: + job_id (str): The job ID + """ + self.connection.lrem(self.key, 1, job_id) diff --git a/rq/maintenance.py b/rq/maintenance.py index 5c9d7cd2..61c5b67a 100644 --- a/rq/maintenance.py +++ b/rq/maintenance.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING +from .intermediate_queue import IntermediateQueue from .queue import Queue -from .utils import as_text if TYPE_CHECKING: from .worker import BaseWorker @@ -17,10 +17,24 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None: We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry. """ - job_ids = [as_text(job_id) for job_id in queue.connection.lrange(queue.intermediate_queue_key, 0, -1)] + intermediate_queue = IntermediateQueue(queue.key, connection=queue.connection) + job_ids = intermediate_queue.get_job_ids() + for job_id in job_ids: + job = queue.fetch_job(job_id) + if job_id not in queue.started_job_registry: - job = queue.fetch_job(job_id) - if job: + + if not job: + # If the job doesn't exist in the queue, we can safely remove it from the intermediate queue. + intermediate_queue.remove(job_id) + continue + + # If this is the first time we've seen this job, do nothing. + # `set_first_seen` will return `True` if the key was set, `False` if it already existed. + if intermediate_queue.set_first_seen(job_id): + continue + + if intermediate_queue.should_be_cleaned_up(job_id): worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') - queue.connection.lrem(queue.intermediate_queue_key, 1, job_id) + intermediate_queue.remove(job_id) diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index feaf1bf6..cdeea4e1 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -1,12 +1,20 @@ -from datetime import datetime, timedelta, timezone +import unittest -from rq import Queue +from datetime import datetime, timedelta, timezone +from unittest.mock import patch + +from redis import Redis + +from rq import Queue, Worker from rq.intermediate_queue import IntermediateQueue +from rq.utils import get_version from tests import RQTestCase from tests.fixtures import say_hello -class TestWorker(RQTestCase): +@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') +class TestIntermediateQueue(RQTestCase): + def test_set_first_seen(self): """Ensure that the first_seen attribute is set correctly.""" queue = Queue('foo', connection=self.connection) @@ -71,3 +79,31 @@ class TestWorker(RQTestCase): job, queue = Queue.dequeue_any([queue], timeout=1, connection=self.testconn) # After job is dequeued, the job ID is in the intermediate queue self.assertEqual(intermediate_queue.get_job_ids(), [job_1.id, job_2.id]) + + # After job_1.id is removed, only job_2.id is in the intermediate queue + intermediate_queue.remove(job_1.id) + self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id]) + + # def test_cleanup_intermediate_queue(self): + # """Ensure jobs stuck in the intermediate queue are cleaned up.""" + # queue = Queue('foo', connection=self.connection) + # job = queue.enqueue(say_hello) + + # intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + # self.connection.delete(intermediate_queue.key) + + # # If job execution fails after it's dequeued, job should be in the intermediate queue + # # and it's status is still QUEUED + # with patch.object(Worker, 'execute_job'): + # worker = Worker(queue, connection=self.testconn) + # worker.work(burst=True) + + # self.assertEqual(job.get_status(), 'queued') + # self.assertFalse(job.id in queue.get_job_ids()) + # self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) + # # After cleaning up the intermediate queue, job status should be `failed` + # # and job is also removed from the intermediate queue + # intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + # intermediate_queue.remove(job.id) + # self.assertEqual(job.get_status(), 'failed') + # self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) From a7f19d966f9de54d0ac873fab5eaf0ea424a2fad Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2024 20:32:30 +0700 Subject: [PATCH 09/11] Done implementing clean_intermediate_queue() --- tests/test_intermediate_queue.py | 64 +++++++++++++++++++++----------- tests/test_maintenance.py | 36 ------------------ 2 files changed, 43 insertions(+), 57 deletions(-) delete mode 100644 tests/test_maintenance.py diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index cdeea4e1..55be1c80 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -1,5 +1,4 @@ import unittest - from datetime import datetime, timedelta, timezone from unittest.mock import patch @@ -7,6 +6,7 @@ from redis import Redis from rq import Queue, Worker from rq.intermediate_queue import IntermediateQueue +from rq.maintenance import clean_intermediate_queue from rq.utils import get_version from tests import RQTestCase from tests.fixtures import say_hello @@ -84,26 +84,48 @@ class TestIntermediateQueue(RQTestCase): intermediate_queue.remove(job_1.id) self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id]) - # def test_cleanup_intermediate_queue(self): - # """Ensure jobs stuck in the intermediate queue are cleaned up.""" - # queue = Queue('foo', connection=self.connection) - # job = queue.enqueue(say_hello) + def test_cleanup_intermediate_queue(self): + """Ensure jobs stuck in the intermediate queue are cleaned up.""" + queue = Queue('foo', connection=self.connection) + job = queue.enqueue(say_hello) - # intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) - # self.connection.delete(intermediate_queue.key) + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + self.connection.delete(intermediate_queue.key) - # # If job execution fails after it's dequeued, job should be in the intermediate queue - # # and it's status is still QUEUED - # with patch.object(Worker, 'execute_job'): - # worker = Worker(queue, connection=self.testconn) - # worker.work(burst=True) + # If job execution fails after it's dequeued, job should be in the intermediate queue + # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) - # self.assertEqual(job.get_status(), 'queued') - # self.assertFalse(job.id in queue.get_job_ids()) - # self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) - # # After cleaning up the intermediate queue, job status should be `failed` - # # and job is also removed from the intermediate queue - # intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) - # intermediate_queue.remove(job.id) - # self.assertEqual(job.get_status(), 'failed') - # self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id)) + # If worker.execute_job() does nothing, job status should be `queued` + # even though it's not in the queue, but it should be in the intermediate queue + self.assertEqual(job.get_status(), 'queued') + self.assertFalse(job.id in queue.get_job_ids()) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + clean_intermediate_queue(worker, queue) + # After clean_intermediate_queue is called, the job should be marked as seen, + # but since it's been less than 1 minute, it should not be cleaned up + self.assertIsNotNone(intermediate_queue.get_first_seen(job.id)) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + + clean_intermediate_queue(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) + self.assertEqual(job.get_status(), 'failed') + + job = queue.enqueue(say_hello) + worker.work(burst=True) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If job is gone, it should be immediately removed from the intermediate queue + job.delete() + clean_intermediate_queue(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) diff --git a/tests/test_maintenance.py b/tests/test_maintenance.py deleted file mode 100644 index 8cef010d..00000000 --- a/tests/test_maintenance.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest -from unittest.mock import patch - -from redis import Redis - -from rq.job import JobStatus -from rq.maintenance import clean_intermediate_queue -from rq.queue import Queue -from rq.utils import get_version -from rq.worker import Worker -from tests import RQTestCase -from tests.fixtures import say_hello - - -class MaintenanceTestCase(RQTestCase): - @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') - def test_cleanup_intermediate_queue(self): - """Ensure jobs stuck in the intermediate queue are cleaned up.""" - queue = Queue('foo', connection=self.testconn) - job = queue.enqueue(say_hello) - - # If job execution fails after it's dequeued, job should be in the intermediate queue - # # and it's status is still QUEUED - with patch.object(Worker, 'execute_job'): - # mocked.execute_job.side_effect = Exception() - worker = Worker(queue, connection=self.testconn) - worker.work(burst=True) - - self.assertEqual(job.get_status(), JobStatus.QUEUED) - self.assertFalse(job.id in queue.get_job_ids()) - self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) - # After cleaning up the intermediate queue, job status should be `FAILED` - # and job is also removed from the intermediate queue - clean_intermediate_queue(worker, queue) - self.assertEqual(job.get_status(), JobStatus.FAILED) - self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) From e2d144d7727b5efac34389e8b054969b4ea76271 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2024 20:48:07 +0700 Subject: [PATCH 10/11] Moved intermediate queue cleaning functionality to intermediate_queue.cleanup() --- rq/intermediate_queue.py | 28 ++++++++++++++++++- rq/maintenance.py | 27 +++++------------- rq/queue.py | 5 ++++ rq/worker.py | 3 +- tests/test_intermediate_queue.py | 48 +++++++++++++++++++++++++++++++- 5 files changed, 87 insertions(+), 24 deletions(-) diff --git a/rq/intermediate_queue.py b/rq/intermediate_queue.py index ae93e2f4..5bdb057d 100644 --- a/rq/intermediate_queue.py +++ b/rq/intermediate_queue.py @@ -1,10 +1,14 @@ from datetime import datetime, timedelta, timezone -from typing import List, Optional +from typing import TYPE_CHECKING, List, Optional from redis import Redis from rq.utils import now +if TYPE_CHECKING: + from .queue import Queue + from .worker import BaseWorker + class IntermediateQueue(object): @@ -91,3 +95,25 @@ class IntermediateQueue(object): job_id (str): The job ID """ self.connection.lrem(self.key, 1, job_id) + + def cleanup(self, worker: 'BaseWorker', queue: 'Queue') -> None: + job_ids = self.get_job_ids() + + for job_id in job_ids: + job = queue.fetch_job(job_id) + + if job_id not in queue.started_job_registry: + + if not job: + # If the job doesn't exist in the queue, we can safely remove it from the intermediate queue. + self.remove(job_id) + continue + + # If this is the first time we've seen this job, do nothing. + # `set_first_seen` will return `True` if the key was set, `False` if it already existed. + if self.set_first_seen(job_id): + continue + + if self.should_be_cleaned_up(job_id): + worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') + self.remove(job_id) diff --git a/rq/maintenance.py b/rq/maintenance.py index 61c5b67a..52ed4749 100644 --- a/rq/maintenance.py +++ b/rq/maintenance.py @@ -1,3 +1,5 @@ +import warnings + from typing import TYPE_CHECKING from .intermediate_queue import IntermediateQueue @@ -17,24 +19,9 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None: We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry. """ + warnings.warn( + "clean_intermediate_queue is deprecated. Use IntermediateQueue.cleanup instead.", + DeprecationWarning, + ) intermediate_queue = IntermediateQueue(queue.key, connection=queue.connection) - job_ids = intermediate_queue.get_job_ids() - - for job_id in job_ids: - job = queue.fetch_job(job_id) - - if job_id not in queue.started_job_registry: - - if not job: - # If the job doesn't exist in the queue, we can safely remove it from the intermediate queue. - intermediate_queue.remove(job_id) - continue - - # If this is the first time we've seen this job, do nothing. - # `set_first_seen` will return `True` if the key was set, `False` if it already existed. - if intermediate_queue.set_first_seen(job_id): - continue - - if intermediate_queue.should_be_cleaned_up(job_id): - worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') - intermediate_queue.remove(job_id) + intermediate_queue.cleanup(worker, queue) diff --git a/rq/queue.py b/rq/queue.py index 1212cf71..b4263500 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -218,6 +218,11 @@ class Queue: """Returns the Redis key for intermediate queue.""" return IntermediateQueue.get_intermediate_queue_key(self._key) + @property + def intermediate_queue(self) -> IntermediateQueue: + """Returns the IntermediateQueue instance for this Queue.""" + return IntermediateQueue(self.key, connection=self.connection) + @property def registry_cleaning_key(self): """Redis key used to indicate this queue has been cleaned.""" diff --git a/rq/worker.py b/rq/worker.py index 8fa185a4..bf43d739 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -48,7 +48,6 @@ from .defaults import ( from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException from .job import Job, JobStatus from .logutils import blue, green, setup_loghandlers, yellow -from .maintenance import clean_intermediate_queue from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler @@ -317,7 +316,7 @@ class BaseWorker: self.log.info('Cleaning registries for queue: %s', queue.name) clean_registries(queue) worker_registration.clean_worker_registry(queue) - clean_intermediate_queue(self, queue) + queue.intermediate_queue.cleanup(self, queue) queue.release_maintenance_lock() self.last_cleaned_at = utcnow() diff --git a/tests/test_intermediate_queue.py b/tests/test_intermediate_queue.py index 55be1c80..02843bce 100644 --- a/tests/test_intermediate_queue.py +++ b/tests/test_intermediate_queue.py @@ -84,7 +84,7 @@ class TestIntermediateQueue(RQTestCase): intermediate_queue.remove(job_1.id) self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id]) - def test_cleanup_intermediate_queue(self): + def test_cleanup_intermediate_queue_in_maintenance(self): """Ensure jobs stuck in the intermediate queue are cleaned up.""" queue = Queue('foo', connection=self.connection) job = queue.enqueue(say_hello) @@ -129,3 +129,49 @@ class TestIntermediateQueue(RQTestCase): job.delete() clean_intermediate_queue(worker, queue) self.assertEqual(intermediate_queue.get_job_ids(), []) + + def test_cleanup_intermediate_queue(self): + """Ensure jobs stuck in the intermediate queue are cleaned up.""" + queue = Queue('foo', connection=self.connection) + job = queue.enqueue(say_hello) + + intermediate_queue = IntermediateQueue(queue.key, connection=self.connection) + self.connection.delete(intermediate_queue.key) + + # If job execution fails after it's dequeued, job should be in the intermediate queue + # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + + # If worker.execute_job() does nothing, job status should be `queued` + # even though it's not in the queue, but it should be in the intermediate queue + self.assertEqual(job.get_status(), 'queued') + self.assertFalse(job.id in queue.get_job_ids()) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + self.assertIsNone(intermediate_queue.get_first_seen(job.id)) + intermediate_queue.cleanup(worker, queue) + # After clean_intermediate_queue is called, the job should be marked as seen, + # but since it's been less than 1 minute, it should not be cleaned up + self.assertIsNotNone(intermediate_queue.get_first_seen(job.id)) + self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id)) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up + first_seen_key = intermediate_queue.get_first_seen_key(job.id) + two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2) + self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10) + + intermediate_queue.cleanup(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) + self.assertEqual(job.get_status(), 'failed') + + job = queue.enqueue(say_hello) + worker.work(burst=True) + self.assertEqual(intermediate_queue.get_job_ids(), [job.id]) + + # If job is gone, it should be immediately removed from the intermediate queue + job.delete() + intermediate_queue.cleanup(worker, queue) + self.assertEqual(intermediate_queue.get_job_ids(), []) From 0e15b2a942b8d724e938b04c030161422942d1ea Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 1 May 2024 14:12:08 +0700 Subject: [PATCH 11/11] Bump version to 1.16.2 --- CHANGES.md | 3 +++ rq/version.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 24ace27a..7d7ff9b7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,6 @@ +### RQ 1.16.2 (2024-05-01) +* Fixed a bug that may cause jobs from intermediate queue to be moved to FailedJobRegistry. Thanks @selwin! + ### RQ 1.16.1 (2024-03-09) * Added `worker_pool.get_worker_process()` to make `WorkerPool` easier to extend. Thanks @selwin! diff --git a/rq/version.py b/rq/version.py index 01f10492..44a3256c 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '1.16.1' +VERSION = '1.16.2'