diff --git a/rq/worker.py b/rq/worker.py index fa3c49b5..a4cd5290 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -20,7 +20,7 @@ from .exceptions import DequeueTimeout, NoQueueError from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue -from .registry import FinishedJobRegistry, StartedJobRegistry +from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse @@ -146,6 +146,7 @@ class Worker(object): self._stopped = False self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + self.maintenance_date = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -646,6 +647,12 @@ class Worker(object): """The hash does not take the database/connection into account""" return hash(self.name) + def clean_registries(self): + """Runs maintenance jobs on each Queue's registries.""" + for queue in self.queues: + clean_registries(queue) + self.maintenance_date = utcnow() + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_worker.py b/tests/test_worker.py index 6f89f4af..961e96b9 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -401,3 +401,22 @@ class TestWorker(RQTestCase): death_date = w.death_date self.assertIsNotNone(death_date) self.assertEquals(type(death_date).__name__, 'datetime') + + def test_clean_queue_registries(self): + """worker.clean_registries sets maintenance_date and cleans registries.""" + foo_queue = Queue('foo', connection=self.testconn) + foo_registry = StartedJobRegistry('foo', connection=self.testconn) + self.testconn.zadd(foo_registry.key, 1, 'foo') + self.assertEqual(self.testconn.zcard(foo_registry.key), 1) + + bar_queue = Queue('bar', connection=self.testconn) + bar_registry = StartedJobRegistry('bar', connection=self.testconn) + self.testconn.zadd(bar_registry.key, 1, 'bar') + self.assertEqual(self.testconn.zcard(bar_registry.key), 1) + + worker = Worker([foo_queue, bar_queue]) + self.assertEqual(worker.maintenance_date, None) + worker.clean_registries() + self.assertNotEqual(worker.maintenance_date, None) + self.assertEqual(self.testconn.zcard(foo_registry.key), 0) + self.assertEqual(self.testconn.zcard(bar_registry.key), 0)