diff --git a/rq/job.py b/rq/job.py index fd8d3026..919d9ca2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -194,6 +194,7 @@ class Job: self.meta: Dict[str, Any] = {} self.serializer = resolve_serializer(serializer) self.retries_left: Optional[int] = None + self.number_of_retries: Optional[int] = None self.retry_intervals: Optional[List[int]] = None self.redis_server_version: Optional[Tuple[int, int, int]] = None self.last_heartbeat: Optional[datetime] = None @@ -999,6 +1000,7 @@ class Job: except Exception: # depends on the serializer self.meta = {'unserialized': obj.get('meta', {})} + self.number_of_retries = int(obj['number_of_retries']) if obj.get('number_of_retries') else None self.retries_left = int(obj['retries_left']) if obj.get('retries_left') else None if obj.get('retry_intervals'): self.retry_intervals = json.loads(obj['retry_intervals'].decode()) @@ -1049,6 +1051,9 @@ class Job: 'group_id': self.group_id or '', } + if self.number_of_retries is not None: + obj['number_of_retries'] = self.number_of_retries + if self.retries_left is not None: obj['retries_left'] = self.retries_left if self.retry_intervals is not None: @@ -1524,6 +1529,13 @@ class Job: Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline) + def _handle_retry_result(self, queue: 'Queue', pipeline: 'Pipeline'): + if self.supports_redis_streams: + from .results import Result + Result.create_retried(self, self.failure_ttl, pipeline=pipeline) + self.number_of_retries = 1 if not self.number_of_retries else self.number_of_retries + 1 + queue._enqueue_job(self, pipeline=pipeline) + def get_retry_interval(self) -> int: """Returns the desired retry interval. If number of retries is bigger than length of intervals, the first @@ -1539,6 +1551,10 @@ class Job: index = max(number_of_intervals - self.retries_left, 0) return self.retry_intervals[index] + @property + def should_retry(self) -> bool: + return (self.retries_left is not None and self.retries_left > 0) + def retry(self, queue: 'Queue', pipeline: 'Pipeline'): """Requeue or schedule this job for execution. If the the `retry_interval` was set on the job itself, @@ -1692,6 +1708,33 @@ class Retry: self.max = max self.intervals = intervals + @classmethod + def get_interval(cls, count: int, intervals: Union[int, List[int], None]) -> int: + """Returns the appropriate retry interval based on retry count and intervals. + If intervals is an integer, returns that value directly. + If intervals is a list and retry count is bigger than length of intervals, + the first value in the list will be used. + + Args: + count (int): The current retry count + intervals (Union[int, List[int]]): Either a single interval value or list of intervals to use + + Returns: + retry_interval (int): The appropriate retry interval + """ + # If intervals is an integer, return it directly + if isinstance(intervals, int): + return intervals + + # If intervals is an empty list or None, return 0 + if not intervals: + return 0 + + # Calculate appropriate interval from list + number_of_intervals = len(intervals) + index = min(number_of_intervals - 1, count) + return intervals[index] + class Callback: def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None): diff --git a/rq/registry.py b/rq/registry.py index 620850b7..eb0fcd0b 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -68,7 +68,7 @@ class BaseRegistry: and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs ) - def __contains__(self, item: Union[str, 'Job']) -> bool: + def __contains__(self, item: Any) -> bool: """ Returns a boolean indicating registry contains the given job instance or job id. @@ -497,17 +497,17 @@ class ScheduledJobRegistry(BaseRegistry): score: Any = timestamp if timestamp is not None else current_timestamp() return connection.zremrangebyscore(self.key, 0, score) - def get_jobs_to_schedule(self, timestamp: Optional[datetime] = None, chunk_size: int = 1000) -> List[str]: + def get_jobs_to_schedule(self, timestamp: Optional[int] = None, chunk_size: int = 1000) -> List[str]: """Get's a list of job IDs that should be scheduled. Args: - timestamp (Optional[datetime], optional): _description_. Defaults to None. + timestamp (Optional[int]): _description_. Defaults to None. chunk_size (int, optional): _description_. Defaults to 1000. Returns: jobs (List[str]): A list of Job ids """ - score: Any = timestamp if timestamp is not None else current_timestamp() + score: int = timestamp if timestamp is not None else current_timestamp() jobs_to_schedule = self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size) return [as_text(job_id) for job_id in jobs_to_schedule] @@ -523,7 +523,7 @@ class ScheduledJobRegistry(BaseRegistry): Returns: datetime (datetime): The scheduled time as datetime object """ - if isinstance(job_or_id, self.job_class): + if isinstance(job_or_id, Job): job_id = job_or_id.id else: job_id = job_or_id diff --git a/rq/results.py b/rq/results.py index d719a8bd..c22cc849 100644 --- a/rq/results.py +++ b/rq/results.py @@ -21,6 +21,7 @@ class Result: SUCCESSFUL = 1 FAILED = 2 STOPPED = 3 + RETRIED = 4 def __init__( self, @@ -55,7 +56,7 @@ class Result: return bool(self.id) @classmethod - def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None): + def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None) -> 'Result': result = cls( job_id=job.id, type=type, @@ -68,7 +69,7 @@ class Result: return result @classmethod - def create_failure(cls, job, ttl, exc_string, pipeline=None): + def create_failure(cls, job, ttl, exc_string, pipeline=None) -> 'Result': result = cls( job_id=job.id, type=cls.Type.FAILED, @@ -79,6 +80,17 @@ class Result: result.save(ttl=ttl, pipeline=pipeline) return result + @classmethod + def create_retried(cls, job, ttl, pipeline=None) -> 'Result': + result = cls( + job_id=job.id, + type=cls.Type.RETRIED, + connection=job.connection, + serializer=job.serializer, + ) + result.save(ttl=ttl, pipeline=pipeline) + return result + @classmethod def all(cls, job: Job, serializer=None): """Returns all results for job""" diff --git a/rq/worker.py b/rq/worker.py index 6b760268..4f257307 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -10,7 +10,7 @@ import sys import time import traceback import warnings -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from random import shuffle from types import FrameType @@ -47,7 +47,7 @@ from .defaults import ( from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException from .executions import Execution from .group import Group -from .job import Job, JobStatus +from .job import Job, JobStatus, Retry from .logutils import blue, green, setup_loghandlers, yellow from .queue import Queue from .registry import StartedJobRegistry, clean_registries @@ -649,6 +649,7 @@ class BaseWorker: """Cleans up the execution of a job. It will remove the job from the `StartedJobRegistry` and delete the Execution object. """ + self.log.debug('Cleaning up execution of job %s', job.id) started_job_registry = StartedJobRegistry( job.origin, self.connection, job_class=self.job_class, serializer=self.serializer ) @@ -706,7 +707,7 @@ class BaseWorker: # check whether a job was stopped intentionally and set the job # status appropriately if it was this job. job_is_stopped = self._stopped_job_id == job.id - retry = job.retries_left and job.retries_left > 0 and not job_is_stopped + retry = job.should_retry and not job_is_stopped if job_is_stopped: job.set_status(JobStatus.STOPPED, pipeline=pipeline) @@ -716,9 +717,6 @@ class BaseWorker: if not retry: job.set_status(JobStatus.FAILED, pipeline=pipeline) - # started_job_registry.remove(job, pipeline=pipeline) - # self.execution.delete(pipeline=pipeline) # type: ignore - # self.set_current_job_id(None, pipeline=pipeline) self.cleanup_execution(job, pipeline=pipeline) if not self.disable_default_exception_handler and not retry: @@ -1492,6 +1490,57 @@ class Worker(BaseWorker): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) + def handle_job_retry(self, job: 'Job', queue: 'Queue', retry: Retry, started_job_registry: StartedJobRegistry): + """Handles the retry of certain job. + It will remove the job from the `StartedJobRegistry` and requeue or reschedule the job. + + Args: + job (Job): The job that will be retried. + queue (Queue): The queue + started_job_registry (StartedJobRegistry): The started registry + """ + self.log.debug('Handling retry of job %s', job.id) + + # Check if job has exceeded max retries + if job.number_of_retries and job.number_of_retries >= retry.max: + # If max retries exceeded, treat as failure + self.log.warning('Job %s has exceeded maximum retry attempts (%d)', job.id, retry.max) + exc_string = f'Job failed after {retry.max} retry attempts' + self.handle_job_failure(job, queue=queue, exc_string=exc_string) + return + + # Calculate retry interval based on retry count + retry_interval = Retry.get_interval(job.number_of_retries or 0, retry.intervals) + + with self.connection.pipeline() as pipeline: + self.increment_failed_job_count(pipeline=pipeline) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore + + if retry_interval > 0: + # Schedule job for later if there's an interval + scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=retry_interval) + job.set_status(JobStatus.SCHEDULED, pipeline=pipeline) + queue.schedule_job(job, scheduled_time, pipeline=pipeline) + self.log.debug( + 'Job %s: scheduled for retry at %s, %s attempts remaining', + job.id, + scheduled_time, + retry.max - (job.number_of_retries or 0), + ) + else: + self.log.debug( + 'Job %s: enqueued for retry, %s attempts remaining', + job.id, + retry.max - (job.number_of_retries or 0), + ) + job._handle_retry_result(queue=queue, pipeline=pipeline) + + self.cleanup_execution(job, pipeline=pipeline) + + pipeline.execute() + + self.log.debug('Finished handling retry of job %s', job.id) + def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry): """Handles the successful execution of certain job. It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`, @@ -1581,17 +1630,25 @@ class Worker(BaseWorker): timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): self.log.debug('Performing Job %s ...', job.id) - rv = job.perform() + return_value = job.perform() self.log.debug('Finished performing Job %s', job.id) self.handle_execution_ended(job, queue, job.success_callback_timeout) # Pickle the result in the same try-except block since we need # to use the same exc handling when pickling fails - job._result = rv + job._result = return_value - job.execute_success_callback(self.death_penalty_class, rv) + if isinstance(return_value, Retry): + # Retry the job + self.log.debug('Job %s returns a Retry object', job.id) + self.handle_job_retry( + job=job, queue=queue, retry=return_value, started_job_registry=started_job_registry + ) + return True + else: + job.execute_success_callback(self.death_penalty_class, return_value) + self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) - self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA self.log.debug('Job %s raised an exception.', job.id) job._status = JobStatus.FAILED @@ -1616,8 +1673,8 @@ class Worker(BaseWorker): return False self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id) - if rv is not None: - self.log.debug('Result: %r', yellow(as_text(str(rv)))) + if return_value is not None: + self.log.debug('Result: %r', yellow(str(return_value))) if self.log_result_lifespan: result_ttl = job.get_result_ttl(self.default_result_ttl) diff --git a/tests/test_retry.py b/tests/test_retry.py index 2482dc3d..27580610 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -1,17 +1,23 @@ from datetime import datetime, timedelta, timezone +from rq import Queue from rq.job import Job, JobStatus, Retry -from rq.queue import Queue from rq.registry import FailedJobRegistry, StartedJobRegistry from rq.worker import Worker -from tests import RQTestCase, fixtures +from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello +def return_retry(max: int = 1, interval: int = 0): + return Retry(max=max, interval=interval) + + class TestRetry(RQTestCase): + """Tests from test_retry.py""" + def test_persistence_of_retry_data(self): """Retry related data is stored and restored properly""" - job = Job.create(func=fixtures.some_calculation, connection=self.connection) + job = Job.create(func=say_hello, connection=self.connection) job.retries_left = 3 job.retry_intervals = [1, 2, 3] job.save() @@ -43,7 +49,7 @@ class TestRetry(RQTestCase): def test_get_retry_interval(self): """get_retry_interval() returns the right retry interval""" - job = Job.create(func=fixtures.say_hello, connection=self.connection) + job = Job.create(func=say_hello, connection=self.connection) # Handle case where self.retry_intervals is None job.retries_left = 2 @@ -67,7 +73,7 @@ class TestRetry(RQTestCase): self.assertEqual(job.get_retry_interval(), 3) def test_job_retry(self): - """Test job.retry() works properly""" + """job.retry() works properly""" queue = Queue(connection=self.connection) retry = Retry(max=3, interval=5) job = queue.enqueue(div_by_zero, retry=retry) @@ -85,7 +91,6 @@ class TestRetry(RQTestCase): with self.connection.pipeline() as pipeline: job.retry(queue, pipeline) - pipeline.execute() self.assertEqual(job.retries_left, 2) @@ -136,3 +141,115 @@ class TestRetry(RQTestCase): self.assertEqual(len(queue), 2) self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertIn(job, failed_job_registry) + + def test_retry_get_interval(self): + """Retry.get_interval() returns the right retry interval""" + self.assertEqual(Retry.get_interval(0, [1, 2, 3]), 1) + self.assertEqual(Retry.get_interval(1, [1, 2, 3]), 2) + self.assertEqual(Retry.get_interval(3, [1, 2, 3]), 3) + self.assertEqual(Retry.get_interval(4, [1, 2, 3]), 3) + self.assertEqual(Retry.get_interval(5, [1, 2, 3]), 3) + + # Handle case where interval is None + self.assertEqual(Retry.get_interval(1, None), 0) + self.assertEqual(Retry.get_interval(2, None), 0) + + # Handle case where interval is a single integer + self.assertEqual(Retry.get_interval(1, 3), 3) + self.assertEqual(Retry.get_interval(2, 3), 3) + + +class TestWorkerRetry(RQTestCase): + """Tests from test_job_retry.py""" + + def test_retry(self): + """Worker processes retry correctly when job returns Retry""" + queue = Queue(connection=self.connection) + job = queue.enqueue(return_retry) + worker = Worker([queue], connection=self.connection) + worker.work(max_jobs=1) + + # A result with type `RETRIED` should be created + # skip on Redis < 5 + if job.supports_redis_streams: + result = job.latest_result() + self.assertEqual(result.type, result.Type.RETRIED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + def test_job_handle_retry(self): + """job._handle_retry_result() increments job.number_of_retries""" + queue = Queue(connection=self.connection) + job = queue.enqueue(return_retry) + pipeline = self.connection.pipeline() + + # job._handle_retry_result() should increment job.number_of_retries + job._handle_retry_result(queue, pipeline) + pipeline.execute() + job = Job.fetch(job.id, connection=self.connection) + self.assertEqual(job.number_of_retries, 1) + + pipeline = self.connection.pipeline() + job._handle_retry_result(queue, pipeline) + pipeline.execute() + self.assertEqual(job.number_of_retries, 2) + + def test_worker_handles_max_retry(self): + """Job fails after maximum retries are exhausted""" + queue = Queue(connection=self.connection) + job = queue.enqueue(return_retry, max=2) + worker = Worker([queue], connection=self.connection) + worker.work(max_jobs=1) + + # A result with type `RETRIED` should be created, + # job should be back in the queue + if job.supports_redis_streams: + result = job.latest_result() + self.assertEqual(result.type, result.Type.RETRIED) + self.assertIn(job.id, queue.get_job_ids()) + + # Second retry + worker.work(max_jobs=1) + if job.supports_redis_streams: + result = job.latest_result() + self.assertEqual(result.type, result.Type.RETRIED) + self.assertIn(job.id, queue.get_job_ids()) + + # Third execution would fail since max number of retries is 2 + worker.work(max_jobs=1) + if job.supports_redis_streams: + result = job.latest_result() + self.assertEqual(result.type, result.Type.FAILED) + self.assertNotIn(job.id, queue.get_job_ids()) + + def test_worker_handles_retry_interval(self): + """Worker handles retry with interval correctly""" + queue = Queue(connection=self.connection) + job = queue.enqueue(return_retry, max=1, interval=10) + worker = Worker([queue], connection=self.connection) + worker.work(max_jobs=1) + + now = datetime.now(timezone.utc) + # Job should be scheduled for retry + self.assertEqual(job.get_status(), JobStatus.SCHEDULED) + self.assertNotIn(job.id, queue.get_job_ids()) + registry = queue.scheduled_job_registry + self.assertIn(job.id, registry) + + scheduled_time = registry.get_scheduled_time(job) + # Ensure that job is scheduled roughly 5 seconds from now + self.assertTrue(now + timedelta(seconds=7) < scheduled_time < now + timedelta(seconds=13)) + + job = queue.enqueue(return_retry, max=1, interval=30) + worker = Worker([queue], connection=self.connection) + worker.work(max_jobs=1) + + now = datetime.now(timezone.utc) + # Job should be scheduled for retry + self.assertEqual(job.get_status(), JobStatus.SCHEDULED) + self.assertNotIn(job.id, queue.get_job_ids()) + registry = queue.scheduled_job_registry + self.assertIn(job.id, registry) + + scheduled_time = registry.get_scheduled_time(job) + # Ensure that job is scheduled roughly 5 seconds from now + self.assertTrue(now + timedelta(seconds=27) < scheduled_time < now + timedelta(seconds=33)) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index f1673d26..7f2d338b 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -329,7 +329,7 @@ class TestWorker(RQTestCase): worker = Worker(queues=[queue], connection=self.connection) worker.work(burst=True, with_scheduler=True) - self.assertIsNotNone(worker.scheduler) + assert worker.scheduler self.assertIsNone(self.connection.get(worker.scheduler.get_locking_key('default'))) @mock.patch.object(RQScheduler, 'acquire_locks')