Jobs can return Retry object to retry execution (#2159)

* Jobs can return a Retry object

* Fix test on Redis < 5

* Fix ruff warnings

* Minor house keeping

* More minor fixes

* More typing improvements

* More fixes

* Fix typing
This commit is contained in:
Selwin Ong 2024-12-01 15:58:05 +07:00 committed by GitHub
parent 26a3577443
commit fc86e9ab5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 255 additions and 26 deletions

View File

@ -194,6 +194,7 @@ class Job:
self.meta: Dict[str, Any] = {}
self.serializer = resolve_serializer(serializer)
self.retries_left: Optional[int] = None
self.number_of_retries: Optional[int] = None
self.retry_intervals: Optional[List[int]] = None
self.redis_server_version: Optional[Tuple[int, int, int]] = None
self.last_heartbeat: Optional[datetime] = None
@ -999,6 +1000,7 @@ class Job:
except Exception: # depends on the serializer
self.meta = {'unserialized': obj.get('meta', {})}
self.number_of_retries = int(obj['number_of_retries']) if obj.get('number_of_retries') else None
self.retries_left = int(obj['retries_left']) if obj.get('retries_left') else None
if obj.get('retry_intervals'):
self.retry_intervals = json.loads(obj['retry_intervals'].decode())
@ -1049,6 +1051,9 @@ class Job:
'group_id': self.group_id or '',
}
if self.number_of_retries is not None:
obj['number_of_retries'] = self.number_of_retries
if self.retries_left is not None:
obj['retries_left'] = self.retries_left
if self.retry_intervals is not None:
@ -1524,6 +1529,13 @@ class Job:
Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline)
def _handle_retry_result(self, queue: 'Queue', pipeline: 'Pipeline'):
if self.supports_redis_streams:
from .results import Result
Result.create_retried(self, self.failure_ttl, pipeline=pipeline)
self.number_of_retries = 1 if not self.number_of_retries else self.number_of_retries + 1
queue._enqueue_job(self, pipeline=pipeline)
def get_retry_interval(self) -> int:
"""Returns the desired retry interval.
If number of retries is bigger than length of intervals, the first
@ -1539,6 +1551,10 @@ class Job:
index = max(number_of_intervals - self.retries_left, 0)
return self.retry_intervals[index]
@property
def should_retry(self) -> bool:
return (self.retries_left is not None and self.retries_left > 0)
def retry(self, queue: 'Queue', pipeline: 'Pipeline'):
"""Requeue or schedule this job for execution.
If the the `retry_interval` was set on the job itself,
@ -1692,6 +1708,33 @@ class Retry:
self.max = max
self.intervals = intervals
@classmethod
def get_interval(cls, count: int, intervals: Union[int, List[int], None]) -> int:
"""Returns the appropriate retry interval based on retry count and intervals.
If intervals is an integer, returns that value directly.
If intervals is a list and retry count is bigger than length of intervals,
the first value in the list will be used.
Args:
count (int): The current retry count
intervals (Union[int, List[int]]): Either a single interval value or list of intervals to use
Returns:
retry_interval (int): The appropriate retry interval
"""
# If intervals is an integer, return it directly
if isinstance(intervals, int):
return intervals
# If intervals is an empty list or None, return 0
if not intervals:
return 0
# Calculate appropriate interval from list
number_of_intervals = len(intervals)
index = min(number_of_intervals - 1, count)
return intervals[index]
class Callback:
def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None):

View File

@ -68,7 +68,7 @@ class BaseRegistry:
and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
)
def __contains__(self, item: Union[str, 'Job']) -> bool:
def __contains__(self, item: Any) -> bool:
"""
Returns a boolean indicating registry contains the given
job instance or job id.
@ -497,17 +497,17 @@ class ScheduledJobRegistry(BaseRegistry):
score: Any = timestamp if timestamp is not None else current_timestamp()
return connection.zremrangebyscore(self.key, 0, score)
def get_jobs_to_schedule(self, timestamp: Optional[datetime] = None, chunk_size: int = 1000) -> List[str]:
def get_jobs_to_schedule(self, timestamp: Optional[int] = None, chunk_size: int = 1000) -> List[str]:
"""Get's a list of job IDs that should be scheduled.
Args:
timestamp (Optional[datetime], optional): _description_. Defaults to None.
timestamp (Optional[int]): _description_. Defaults to None.
chunk_size (int, optional): _description_. Defaults to 1000.
Returns:
jobs (List[str]): A list of Job ids
"""
score: Any = timestamp if timestamp is not None else current_timestamp()
score: int = timestamp if timestamp is not None else current_timestamp()
jobs_to_schedule = self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)
return [as_text(job_id) for job_id in jobs_to_schedule]
@ -523,7 +523,7 @@ class ScheduledJobRegistry(BaseRegistry):
Returns:
datetime (datetime): The scheduled time as datetime object
"""
if isinstance(job_or_id, self.job_class):
if isinstance(job_or_id, Job):
job_id = job_or_id.id
else:
job_id = job_or_id

View File

@ -21,6 +21,7 @@ class Result:
SUCCESSFUL = 1
FAILED = 2
STOPPED = 3
RETRIED = 4
def __init__(
self,
@ -55,7 +56,7 @@ class Result:
return bool(self.id)
@classmethod
def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None):
def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None) -> 'Result':
result = cls(
job_id=job.id,
type=type,
@ -68,7 +69,7 @@ class Result:
return result
@classmethod
def create_failure(cls, job, ttl, exc_string, pipeline=None):
def create_failure(cls, job, ttl, exc_string, pipeline=None) -> 'Result':
result = cls(
job_id=job.id,
type=cls.Type.FAILED,
@ -79,6 +80,17 @@ class Result:
result.save(ttl=ttl, pipeline=pipeline)
return result
@classmethod
def create_retried(cls, job, ttl, pipeline=None) -> 'Result':
result = cls(
job_id=job.id,
type=cls.Type.RETRIED,
connection=job.connection,
serializer=job.serializer,
)
result.save(ttl=ttl, pipeline=pipeline)
return result
@classmethod
def all(cls, job: Job, serializer=None):
"""Returns all results for job"""

View File

@ -10,7 +10,7 @@ import sys
import time
import traceback
import warnings
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from random import shuffle
from types import FrameType
@ -47,7 +47,7 @@ from .defaults import (
from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException
from .executions import Execution
from .group import Group
from .job import Job, JobStatus
from .job import Job, JobStatus, Retry
from .logutils import blue, green, setup_loghandlers, yellow
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
@ -649,6 +649,7 @@ class BaseWorker:
"""Cleans up the execution of a job.
It will remove the job from the `StartedJobRegistry` and delete the Execution object.
"""
self.log.debug('Cleaning up execution of job %s', job.id)
started_job_registry = StartedJobRegistry(
job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
)
@ -706,7 +707,7 @@ class BaseWorker:
# check whether a job was stopped intentionally and set the job
# status appropriately if it was this job.
job_is_stopped = self._stopped_job_id == job.id
retry = job.retries_left and job.retries_left > 0 and not job_is_stopped
retry = job.should_retry and not job_is_stopped
if job_is_stopped:
job.set_status(JobStatus.STOPPED, pipeline=pipeline)
@ -716,9 +717,6 @@ class BaseWorker:
if not retry:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
# started_job_registry.remove(job, pipeline=pipeline)
# self.execution.delete(pipeline=pipeline) # type: ignore
# self.set_current_job_id(None, pipeline=pipeline)
self.cleanup_execution(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry:
@ -1492,6 +1490,57 @@ class Worker(BaseWorker):
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_retry(self, job: 'Job', queue: 'Queue', retry: Retry, started_job_registry: StartedJobRegistry):
"""Handles the retry of certain job.
It will remove the job from the `StartedJobRegistry` and requeue or reschedule the job.
Args:
job (Job): The job that will be retried.
queue (Queue): The queue
started_job_registry (StartedJobRegistry): The started registry
"""
self.log.debug('Handling retry of job %s', job.id)
# Check if job has exceeded max retries
if job.number_of_retries and job.number_of_retries >= retry.max:
# If max retries exceeded, treat as failure
self.log.warning('Job %s has exceeded maximum retry attempts (%d)', job.id, retry.max)
exc_string = f'Job failed after {retry.max} retry attempts'
self.handle_job_failure(job, queue=queue, exc_string=exc_string)
return
# Calculate retry interval based on retry count
retry_interval = Retry.get_interval(job.number_of_retries or 0, retry.intervals)
with self.connection.pipeline() as pipeline:
self.increment_failed_job_count(pipeline=pipeline)
self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore
if retry_interval > 0:
# Schedule job for later if there's an interval
scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
job.set_status(JobStatus.SCHEDULED, pipeline=pipeline)
queue.schedule_job(job, scheduled_time, pipeline=pipeline)
self.log.debug(
'Job %s: scheduled for retry at %s, %s attempts remaining',
job.id,
scheduled_time,
retry.max - (job.number_of_retries or 0),
)
else:
self.log.debug(
'Job %s: enqueued for retry, %s attempts remaining',
job.id,
retry.max - (job.number_of_retries or 0),
)
job._handle_retry_result(queue=queue, pipeline=pipeline)
self.cleanup_execution(job, pipeline=pipeline)
pipeline.execute()
self.log.debug('Finished handling retry of job %s', job.id)
def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry):
"""Handles the successful execution of certain job.
It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`,
@ -1581,17 +1630,25 @@ class Worker(BaseWorker):
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
self.log.debug('Performing Job %s ...', job.id)
rv = job.perform()
return_value = job.perform()
self.log.debug('Finished performing Job %s', job.id)
self.handle_execution_ended(job, queue, job.success_callback_timeout)
# Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails
job._result = rv
job._result = return_value
job.execute_success_callback(self.death_penalty_class, rv)
if isinstance(return_value, Retry):
# Retry the job
self.log.debug('Job %s returns a Retry object', job.id)
self.handle_job_retry(
job=job, queue=queue, retry=return_value, started_job_registry=started_job_registry
)
return True
else:
job.execute_success_callback(self.death_penalty_class, return_value)
self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry)
self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry)
except: # NOQA
self.log.debug('Job %s raised an exception.', job.id)
job._status = JobStatus.FAILED
@ -1616,8 +1673,8 @@ class Worker(BaseWorker):
return False
self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id)
if rv is not None:
self.log.debug('Result: %r', yellow(as_text(str(rv))))
if return_value is not None:
self.log.debug('Result: %r', yellow(str(return_value)))
if self.log_result_lifespan:
result_ttl = job.get_result_ttl(self.default_result_ttl)

View File

@ -1,17 +1,23 @@
from datetime import datetime, timedelta, timezone
from rq import Queue
from rq.job import Job, JobStatus, Retry
from rq.queue import Queue
from rq.registry import FailedJobRegistry, StartedJobRegistry
from rq.worker import Worker
from tests import RQTestCase, fixtures
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
def return_retry(max: int = 1, interval: int = 0):
return Retry(max=max, interval=interval)
class TestRetry(RQTestCase):
"""Tests from test_retry.py"""
def test_persistence_of_retry_data(self):
"""Retry related data is stored and restored properly"""
job = Job.create(func=fixtures.some_calculation, connection=self.connection)
job = Job.create(func=say_hello, connection=self.connection)
job.retries_left = 3
job.retry_intervals = [1, 2, 3]
job.save()
@ -43,7 +49,7 @@ class TestRetry(RQTestCase):
def test_get_retry_interval(self):
"""get_retry_interval() returns the right retry interval"""
job = Job.create(func=fixtures.say_hello, connection=self.connection)
job = Job.create(func=say_hello, connection=self.connection)
# Handle case where self.retry_intervals is None
job.retries_left = 2
@ -67,7 +73,7 @@ class TestRetry(RQTestCase):
self.assertEqual(job.get_retry_interval(), 3)
def test_job_retry(self):
"""Test job.retry() works properly"""
"""job.retry() works properly"""
queue = Queue(connection=self.connection)
retry = Retry(max=3, interval=5)
job = queue.enqueue(div_by_zero, retry=retry)
@ -85,7 +91,6 @@ class TestRetry(RQTestCase):
with self.connection.pipeline() as pipeline:
job.retry(queue, pipeline)
pipeline.execute()
self.assertEqual(job.retries_left, 2)
@ -136,3 +141,115 @@ class TestRetry(RQTestCase):
self.assertEqual(len(queue), 2)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIn(job, failed_job_registry)
def test_retry_get_interval(self):
"""Retry.get_interval() returns the right retry interval"""
self.assertEqual(Retry.get_interval(0, [1, 2, 3]), 1)
self.assertEqual(Retry.get_interval(1, [1, 2, 3]), 2)
self.assertEqual(Retry.get_interval(3, [1, 2, 3]), 3)
self.assertEqual(Retry.get_interval(4, [1, 2, 3]), 3)
self.assertEqual(Retry.get_interval(5, [1, 2, 3]), 3)
# Handle case where interval is None
self.assertEqual(Retry.get_interval(1, None), 0)
self.assertEqual(Retry.get_interval(2, None), 0)
# Handle case where interval is a single integer
self.assertEqual(Retry.get_interval(1, 3), 3)
self.assertEqual(Retry.get_interval(2, 3), 3)
class TestWorkerRetry(RQTestCase):
"""Tests from test_job_retry.py"""
def test_retry(self):
"""Worker processes retry correctly when job returns Retry"""
queue = Queue(connection=self.connection)
job = queue.enqueue(return_retry)
worker = Worker([queue], connection=self.connection)
worker.work(max_jobs=1)
# A result with type `RETRIED` should be created
# skip on Redis < 5
if job.supports_redis_streams:
result = job.latest_result()
self.assertEqual(result.type, result.Type.RETRIED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_job_handle_retry(self):
"""job._handle_retry_result() increments job.number_of_retries"""
queue = Queue(connection=self.connection)
job = queue.enqueue(return_retry)
pipeline = self.connection.pipeline()
# job._handle_retry_result() should increment job.number_of_retries
job._handle_retry_result(queue, pipeline)
pipeline.execute()
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.number_of_retries, 1)
pipeline = self.connection.pipeline()
job._handle_retry_result(queue, pipeline)
pipeline.execute()
self.assertEqual(job.number_of_retries, 2)
def test_worker_handles_max_retry(self):
"""Job fails after maximum retries are exhausted"""
queue = Queue(connection=self.connection)
job = queue.enqueue(return_retry, max=2)
worker = Worker([queue], connection=self.connection)
worker.work(max_jobs=1)
# A result with type `RETRIED` should be created,
# job should be back in the queue
if job.supports_redis_streams:
result = job.latest_result()
self.assertEqual(result.type, result.Type.RETRIED)
self.assertIn(job.id, queue.get_job_ids())
# Second retry
worker.work(max_jobs=1)
if job.supports_redis_streams:
result = job.latest_result()
self.assertEqual(result.type, result.Type.RETRIED)
self.assertIn(job.id, queue.get_job_ids())
# Third execution would fail since max number of retries is 2
worker.work(max_jobs=1)
if job.supports_redis_streams:
result = job.latest_result()
self.assertEqual(result.type, result.Type.FAILED)
self.assertNotIn(job.id, queue.get_job_ids())
def test_worker_handles_retry_interval(self):
"""Worker handles retry with interval correctly"""
queue = Queue(connection=self.connection)
job = queue.enqueue(return_retry, max=1, interval=10)
worker = Worker([queue], connection=self.connection)
worker.work(max_jobs=1)
now = datetime.now(timezone.utc)
# Job should be scheduled for retry
self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
self.assertNotIn(job.id, queue.get_job_ids())
registry = queue.scheduled_job_registry
self.assertIn(job.id, registry)
scheduled_time = registry.get_scheduled_time(job)
# Ensure that job is scheduled roughly 5 seconds from now
self.assertTrue(now + timedelta(seconds=7) < scheduled_time < now + timedelta(seconds=13))
job = queue.enqueue(return_retry, max=1, interval=30)
worker = Worker([queue], connection=self.connection)
worker.work(max_jobs=1)
now = datetime.now(timezone.utc)
# Job should be scheduled for retry
self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
self.assertNotIn(job.id, queue.get_job_ids())
registry = queue.scheduled_job_registry
self.assertIn(job.id, registry)
scheduled_time = registry.get_scheduled_time(job)
# Ensure that job is scheduled roughly 5 seconds from now
self.assertTrue(now + timedelta(seconds=27) < scheduled_time < now + timedelta(seconds=33))

View File

@ -329,7 +329,7 @@ class TestWorker(RQTestCase):
worker = Worker(queues=[queue], connection=self.connection)
worker.work(burst=True, with_scheduler=True)
self.assertIsNotNone(worker.scheduler)
assert worker.scheduler
self.assertIsNone(self.connection.get(worker.scheduler.get_locking_key('default')))
@mock.patch.object(RQScheduler, 'acquire_locks')