diff --git a/rq/job.py b/rq/job.py index ebdc45c3..91867ca7 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1569,7 +1569,7 @@ class Job: registry = DeferredJobRegistry( self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer ) - registry.add(self, pipeline=pipeline) + registry.add(self, pipeline=pipeline, ttl=self.ttl) connection = pipeline if pipeline is not None else self.connection diff --git a/rq/registry.py b/rq/registry.py index 101b4f24..2c668042 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -398,11 +398,43 @@ class DeferredJobRegistry(BaseRegistry): key_template = 'rq:deferred:{0}' - def cleanup(self): - """This method is only here to prevent errors because this method is - automatically called by `count()` and `get_job_ids()` methods - implemented in BaseRegistry.""" - pass + def cleanup(self, timestamp=None): + """Remove expired jobs from registry and add them to FailedJobRegistry. + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. Removed jobs are added to the failed job registry. + """ + score = timestamp if timestamp is not None else current_timestamp() + job_ids = self.get_expired_job_ids(score) + + if job_ids: + failed_job_registry = FailedJobRegistry(self.name, self.connection, serializer=self.serializer) + + with self.connection.pipeline() as pipeline: + for job_id in job_ids: + try: + job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) + except NoSuchJobError: + continue + + job.set_status(JobStatus.FAILED, pipeline=pipeline) + exc_info = "Expired in DeferredJobRegistry, moved to FailedJobRegistry at %s" % datetime.now() + failed_job_registry.add(job, job.failure_ttl, exc_info, pipeline, True) + + pipeline.zremrangebyscore(self.key, 0, score) + pipeline.execute() + + return job_ids + + def add(self, job, ttl=None, pipeline=None, xx=False): + """ + Adds a job to a registry with expiry time of now + ttl. + Defaults to -1 (never expire). + """ + if ttl is None: + ttl = -1 + + return super(DeferredJobRegistry, self).add(job, ttl, pipeline, xx) class ScheduledJobRegistry(BaseRegistry): @@ -501,7 +533,7 @@ class CanceledJobRegistry(BaseRegistry): def clean_registries(queue: 'Queue', exception_handlers: list = None): - """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue. + """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry, and DeferredJobRegistry of a queue. Args: queue (Queue): The queue to clean @@ -517,3 +549,7 @@ def clean_registries(queue: 'Queue', exception_handlers: list = None): FailedJobRegistry( name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer ).cleanup() + + DeferredJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ).cleanup() diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 7271ea45..aabf227f 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -1,5 +1,6 @@ from rq import Queue, SimpleWorker, Worker from rq.job import Dependency, Job, JobStatus +from rq.utils import current_timestamp from tests import RQTestCase from tests.fixtures import check_dependencies_are_met, div_by_zero, say_hello @@ -154,6 +155,20 @@ class TestDependencies(RQTestCase): self.assertEqual(parent_job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED) + def test_enqueue_job_dependency_sets_ttl(self): + """Ensures that the TTL of jobs in the deferred queue is set""" + q = Queue(connection=self.connection) + parent_job = Job.create(say_hello, connection=self.connection) + parent_job.save() + + timestamp = current_timestamp() + ttl = 5 + job = Job.create(say_hello, connection=self.connection, depends_on=parent_job, ttl=ttl) + q.enqueue_job(job) + score = self.connection.zscore(q.deferred_job_registry.key, job.id) + self.assertLess(score, timestamp + ttl + 2) + self.assertGreater(score, timestamp + ttl - 2) + def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.connection) diff --git a/tests/test_registry.py b/tests/test_registry.py index 797d7b6d..fe3593c6 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -402,6 +402,24 @@ class TestDeferredRegistry(RQTestCase): job_ids = [as_text(job_id) for job_id in self.connection.zrange(self.registry.key, 0, -1)] self.assertEqual(job_ids, [job.id]) + def test_add_with_deferred_ttl(self): + """Job TTL defaults to +inf""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + + key = self.registry.key + + self.registry.add(job) + score = self.connection.zscore(key, job.id) + self.assertEqual(score, float("inf")) + + timestamp = current_timestamp() + ttl = 5 + self.registry.add(job, ttl=ttl) + score = self.connection.zscore(key, job.id) + self.assertLess(score, timestamp + ttl + 2) + self.assertGreater(score, timestamp + ttl - 2) + def test_register_dependency(self): """Ensure job creation and deletion works with DeferredJobRegistry.""" queue = Queue(connection=self.connection) @@ -415,6 +433,38 @@ class TestDeferredRegistry(RQTestCase): job2.delete() self.assertEqual(registry.get_job_ids(), []) + def test_cleanup_supports_deleted_jobs(self): + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + self.registry.add(job, ttl=10) + + self.assertEqual(self.registry.count, 1) + job.delete(remove_from_queue=False) + self.assertEqual(self.registry.count, 1) + + self.registry.cleanup(current_timestamp() + 100) + self.assertEqual(self.registry.count, 0) + + def test_cleanup_moves_jobs_to_failed_job_registry(self): + """Moving expired jobs to FailedJobRegistry.""" + queue = Queue(connection=self.connection) + failed_job_registry = FailedJobRegistry(connection=self.connection) + job = queue.enqueue(say_hello) + + self.connection.zadd(self.registry.key, {job.id: 2}) + + # Job has not been moved to FailedJobRegistry + self.registry.cleanup(1) + self.assertNotIn(job, failed_job_registry) + self.assertIn(job, self.registry) + + self.registry.cleanup() + self.assertIn(job.id, failed_job_registry) + self.assertNotIn(job, self.registry) + job.refresh() + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertTrue(job.exc_info) # explanation is written to exc_info + class TestFailedJobRegistry(RQTestCase): def test_default_failure_ttl(self):