diff --git a/rq/queue.py b/rq/queue.py index a87f709a..1694268c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -68,6 +68,9 @@ class Queue(object): job_class = import_attribute(job_class) self.job_class = job_class + def __len__(self): + return self.count + @property def key(self): """Returns the Redis key for this Queue.""" diff --git a/rq/registry.py b/rq/registry.py index afa7b5bb..c59bf88f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -4,23 +4,29 @@ from .queue import FailedQueue from .utils import current_timestamp -class StartedJobRegistry: +class BaseRegistry(object): """ - Registry of currently executing jobs. Each queue maintains a StartedJobRegistry. - StartedJobRegistry contains job keys that are currently being executed. - Each key is scored by job's expiration time (datetime started + timeout). + Base implementation of job registry, implemented in Redis sorted set. Each job + is stored as a key in the registry, scored by expiration time (unix timestamp). - Jobs are added to registry right before they are executed and removed - right after completion (success or failure). - - Jobs whose score are lower than current time is considered "expired". + Jobs with scores are lower than current time is considered "expired" and + should be cleaned up. """ def __init__(self, name='default', connection=None): self.name = name - self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) + def __len__(self): + """Returns the number of jobs in this registry""" + return self.count + + @property + def count(self): + """Returns the number of jobs in this registry""" + self.cleanup() + return self.connection.zcard(self.key) + def add(self, job, timeout, pipeline=None): """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" score = current_timestamp() + timeout @@ -40,11 +46,28 @@ class StartedJobRegistry: def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" - self.move_expired_jobs_to_failed_queue() + self.cleanup() return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] - def move_expired_jobs_to_failed_queue(self): + +class StartedJobRegistry(BaseRegistry): + """ + Registry of currently executing jobs. Each queue maintains a + StartedJobRegistry. Jobs in this registry are ones that are currently + being executed. + + Jobs are added to registry right before they are executed and removed + right after completion (success or failure). + + Jobs whose score are lower than current time is considered "expired". + """ + + def __init__(self, name='default', connection=None): + super(StartedJobRegistry, self).__init__(name, connection) + self.key = 'rq:wip:%s' % name + + def cleanup(self): """Remove expired jobs from registry and add them to FailedQueue.""" job_ids = self.get_expired_job_ids() @@ -53,6 +76,22 @@ class StartedJobRegistry: with self.connection.pipeline() as pipeline: for job_id in job_ids: failed_queue.push_job_id(job_id, pipeline=pipeline) + pipeline.zremrangebyscore(self.key, 0, current_timestamp()) pipeline.execute() return job_ids + + +class FinishedJobRegistry(BaseRegistry): + """ + Registry of jobs that have been completed. Jobs are added to this + registry after they have successfully completed for monitoring purposes. + """ + + def __init__(self, name='default', connection=None): + super(FinishedJobRegistry, self).__init__(name, connection) + self.key = 'rq:finished:%s' % name + + def cleanup(self): + """Remove expired jobs from registry.""" + self.connection.zremrangebyscore(self.key, 0, current_timestamp()) diff --git a/rq/worker.py b/rq/worker.py index fbf981d1..48e22a97 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .registry import StartedJobRegistry +from .registry import FinishedJobRegistry, StartedJobRegistry try: from procname import setprocname @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - registry = StartedJobRegistry(job.origin, self.connection) + started_job_registry = StartedJobRegistry(job.origin) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -513,14 +513,18 @@ class Worker(object): job.ended_at = utcnow() job._status = Status.FINISHED job.save(pipeline=pipeline) + + finished_job_registry = FinishedJobRegistry(job.origin) + finished_job_registry.add(job, result_ttl, pipeline) + job.cleanup(result_ttl, pipeline=pipeline) - registry.remove(job, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - registry.remove(job, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) return False diff --git a/tests/test_queue.py b/tests/test_queue.py index c60b5107..e61568ef 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -101,11 +101,13 @@ class TestQueue(RQTestCase): q.enqueue(say_hello, 'Charlie') self.testconn.lpush(q.key, '1', '2') - self.assertEquals(q.count, 4) + self.assertEqual(q.count, 4) + self.assertEqual(len(q), 4) q.compact() - self.assertEquals(q.count, 2) + self.assertEqual(q.count, 2) + self.assertEqual(len(q), 2) def test_enqueue(self): """Enqueueing job onto queues.""" diff --git a/tests/test_job_started_registry.py b/tests/test_registry.py similarity index 56% rename from tests/test_job_started_registry.py rename to tests/test_registry.py index addb1db2..ce4a3456 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_registry.py @@ -5,16 +5,16 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import StartedJobRegistry +from rq.registry import FinishedJobRegistry, StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello -class TestQueue(RQTestCase): +class TestRegistry(RQTestCase): def setUp(self): - super(TestQueue, self).setUp() + super(TestRegistry, self).setUp() self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): @@ -33,8 +33,9 @@ class TestQueue(RQTestCase): def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, 10, 'bar') + timestamp = current_timestamp() + self.testconn.zadd(self.registry.key, timestamp + 10, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 20, 'bar') self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): @@ -51,8 +52,9 @@ class TestQueue(RQTestCase): failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) self.testconn.zadd(self.registry.key, 1, 'foo') - self.registry.move_expired_jobs_to_failed_queue() + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) def test_job_execution(self): """Job is removed from StartedJobRegistry after execution.""" @@ -76,3 +78,43 @@ class TestQueue(RQTestCase): worker.perform_job(job) self.assertNotIn(job.id, registry.get_job_ids()) + + def test_get_job_count(self): + """StartedJobRegistry returns the right number of job count.""" + timestamp = current_timestamp() + 10 + self.testconn.zadd(self.registry.key, timestamp, 'foo') + self.testconn.zadd(self.registry.key, timestamp, 'bar') + self.assertEqual(self.registry.count, 2) + self.assertEqual(len(self.registry), 2) + + +class TestFinishedJobRegistry(RQTestCase): + + def setUp(self): + super(TestFinishedJobRegistry, self).setUp() + self.registry = FinishedJobRegistry(connection=self.testconn) + + def test_cleanup(self): + """Finished job registry removes expired jobs.""" + timestamp = current_timestamp() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + + self.registry.cleanup() + self.assertEqual(self.registry.get_job_ids(), ['bar']) + + def test_jobs_are_put_in_registry(self): + """Completed jobs are added to FinishedJobRegistry.""" + self.assertEqual(self.registry.get_job_ids(), []) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + # Completed jobs are put in FinishedJobRegistry + job = queue.enqueue(say_hello) + worker.perform_job(job) + self.assertEqual(self.registry.get_job_ids(), [job.id]) + + # Failed jobs are not put in FinishedJobRegistry + failed_job = queue.enqueue(div_by_zero) + worker.perform_job(failed_job) + self.assertEqual(self.registry.get_job_ids(), [job.id])