diff --git a/rq/job.py b/rq/job.py index a1687380..a10c340b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -69,7 +69,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None): + result_ttl=None, status=None, description=None, dependency=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -92,6 +92,9 @@ class Job(object): job.description = description or job.get_call_string() job.result_ttl = result_ttl job._status = status + # dependency could be job instance or id + if dependency is not None: + job._dependency_id = dependency.id if isinstance(dependency, Job) else dependency return job @property @@ -124,6 +127,20 @@ class Job(object): def is_started(self): return self.status == Status.STARTED + @property + def dependency(self): + """Returns a job's dependency. To avoid repeated Redis fetches, we cache + job.dependency as job._dependency. + """ + if self._dependency_id is None: + return None + if hasattr(self, '_dependency'): + return self._dependency + job = Job.fetch(self._dependency_id, connection=self.connection) + job.refresh() + self._dependency = job + return job + @property def func(self): func_name = self.func_name @@ -190,6 +207,7 @@ class Job(object): self.timeout = None self.result_ttl = None self._status = None + self._dependency_id = None self.meta = {} @@ -213,11 +231,21 @@ class Job(object): """The Redis key that is used to store job hash under.""" return b'rq:job:' + job_id.encode('utf-8') + @classmethod + def waitlist_key_for(cls, job_id): + """The Redis key that is used to store job hash under.""" + return 'rq:job:%s:waitlist' % (job_id,) + @property def key(self): """The Redis key that is used to store job hash under.""" return self.key_for(self.id) + @property + def waitlist_key(self): + """The Redis key that is used to store job hash under.""" + return self.waitlist_key_for(self.id) + @property # noqa def job_tuple(self): """Returns the job tuple that encodes the actual function call that @@ -288,8 +316,9 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) + self._dependency_id = as_text(obj.get('dependency_id', None)) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def dump(self): @@ -317,6 +346,8 @@ class Job(object): obj['result_ttl'] = self.result_ttl if self._status is not None: obj['status'] = self._status + if self._dependency_id is not None: + obj['dependency_id'] = self._dependency_id if self.meta: obj['meta'] = dumps(self.meta) @@ -390,6 +421,18 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) + def register_dependency(self): + """Jobs may have a waitlist. Jobs in this waitlist are enqueued + only if the dependency job is successfully performed. We maintain this + waitlist in Redis, with key that looks something like: + + rq:job:job_id:waitlist = ['job_id_1', 'job_id_2'] + + This method puts the job on it's dependency's waitlist. + """ + # TODO: This can probably be pipelined + self.connection.rpush(Job.waitlist_key_for(self._dependency_id), self.id) + def __str__(self): return '' % (self.id, self.description) @@ -401,5 +444,4 @@ class Job(object): def __hash__(self): return hash(self.id) - _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index e5b3b04f..e9f3c7f9 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -3,10 +3,13 @@ import uuid from .connections import resolve_connection from .job import Job, Status -from .exceptions import (NoSuchJobError, UnpickleError, - InvalidJobOperationError, DequeueTimeout) + +from .exceptions import (DequeueTimeout, InvalidJobOperationError, + NoSuchJobError, UnpickleError) from .compat import total_ordering, string_types, as_text +from redis import WatchError + def get_failed_queue(connection=None): """Returns a handle to the special failed queue.""" @@ -131,8 +134,9 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None): + result_ttl=None, description=None, after=None): """Creates a job to represent the delayed function call and enqueues it. @@ -141,8 +145,29 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, description=description, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED) + + # TODO: job with dependency shouldn't have "queued" as status + job = Job.create(func, args, kwargs, connection=self.connection, + result_ttl=result_ttl, status=Status.QUEUED, + description=description, dependency=after) + + # If job depends on an unfinished job, register itself on it's + # parent's waitlist instead of enqueueing it. + # If WatchError is raised in the process, that means something else is + # modifying the dependency. In this case we simply retry + if after is not None: + with self.connection.pipeline() as pipe: + while True: + try: + pipe.watch(after.key) + if after.status != Status.FINISHED: + job.register_dependency() + job.save() + return job + break + except WatchError: + continue + return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): @@ -168,16 +193,19 @@ class Queue(object): timeout = None description = None result_ttl = None - if 'args' in kwargs or 'kwargs' in kwargs: + after = None + if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) args = kwargs.pop('args', None) result_ttl = kwargs.pop('result_ttl', None) + after = kwargs.pop('after', None) kwargs = kwargs.pop('kwargs', None) - return self.enqueue_call(func=f, args=args, kwargs=kwargs, description=description, - timeout=timeout, result_ttl=result_ttl) + return self.enqueue_call(func=f, args=args, kwargs=kwargs, + timeout=timeout, result_ttl=result_ttl, + description=description, after=after) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. @@ -208,6 +236,16 @@ class Queue(object): job.save() return job + def enqueue_waitlist(self, job): + """Enqueues all jobs in the waitlist and clears it""" + # TODO: can probably be pipelined + while True: + job_id = as_text(self.connection.lpop(job.waitlist_key)) + if job_id is None: + break + waitlisted_job = Job.fetch(job_id, connection=self.connection) + self.enqueue_job(waitlisted_job) + def pop_job_id(self): """Pops a given job ID from this Redis queue.""" return as_text(self.connection.lpop(self.key)) diff --git a/rq/worker.py b/rq/worker.py index 64f6501b..d350373a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -327,6 +327,8 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) + if job.status == 'finished': + queue.enqueue_waitlist(job) did_perform_work = True finally: diff --git a/tests/test_job.py b/tests/test_job.py index d25f60ff..bc70d5e4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -7,6 +7,7 @@ try: from cPickle import loads except ImportError: from pickle import loads +from rq.compat import as_text from rq.job import Job, get_current_job from rq.exceptions import NoSuchJobError, UnpickleError from rq.queue import Queue @@ -134,6 +135,22 @@ class TestJob(RQTestCase): sorted(self.testconn.hkeys(job.key)), [b'created_at', b'data', b'description']) + def test_persistence_of_parent_job(self): + """Storing jobs with parent job, either instance or key.""" + parent_job = Job.create(func=some_calculation) + parent_job.save() + job = Job.create(func=some_calculation, dependency=parent_job) + job.save() + stored_job = Job.fetch(job.id) + self.assertEqual(stored_job._dependency_id, parent_job.id) + self.assertEqual(stored_job.dependency, parent_job) + + job = Job.create(func=some_calculation, dependency=parent_job.id) + job.save() + stored_job = Job.fetch(job.id) + self.assertEqual(stored_job._dependency_id, parent_job.id) + self.assertEqual(stored_job.dependency, parent_job) + def test_store_then_fetch(self): """Store, then fetch.""" job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) @@ -265,3 +282,11 @@ class TestJob(RQTestCase): # Jobs with 0 TTL are immediately deleted job.cleanup(ttl=0) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) + + def test_register_dependency(self): + """Test that jobs updates the correct job waitlist""" + job = Job.create(func=say_hello) + job._dependency_id = 'id' + job.save() + job.register_dependency() + self.assertEqual(as_text(self.testconn.lpop('rq:job:id:waitlist')), job.id) diff --git a/tests/test_queue.py b/tests/test_queue.py index a6dabb2d..521cfd3b 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -251,6 +251,38 @@ class TestQueue(RQTestCase): job = q.enqueue(say_hello) self.assertEqual(job.status, Status.QUEUED) + def test_enqueue_waitlist(self): + """Enqueueing a waitlist pushes all jobs in waitlist to queue""" + q = Queue() + parent_job = Job.create(func=say_hello) + parent_job.save() + job_1 = Job.create(func=say_hello, dependency=parent_job) + job_1.save() + job_1.register_dependency() + job_2 = Job.create(func=say_hello, dependency=parent_job) + job_2.save() + job_2.register_dependency() + + # After waitlist is enqueued, job_1 and job_2 should be in queue + self.assertEqual(q.job_ids, []) + q.enqueue_waitlist(parent_job) + self.assertEqual(q.job_ids, [job_1.id, job_2.id]) + self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) + + def test_enqueue_job_with_dependency(self): + """Jobs are enqueued only when their dependencies are finished""" + # Job with unfinished dependency is not immediately enqueued + parent_job = Job.create(func=say_hello) + q = Queue() + q.enqueue_call(say_hello, after=parent_job) + self.assertEqual(q.job_ids, []) + + # Jobs dependent on finished jobs are immediately enqueued + parent_job.status = 'finished' + parent_job.save() + job = q.enqueue_call(say_hello, after=parent_job) + self.assertEqual(q.job_ids, [job.id]) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index d1b26326..db359753 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -234,3 +234,19 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, True) + + def test_job_dependency(self): + """Enqueue waitlisted jobs only if their parents don't fail""" + q = Queue() + w = Worker([q]) + parent_job = q.enqueue(say_hello) + job = q.enqueue_call(say_hello, after=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, 'finished') + + parent_job = q.enqueue(div_by_zero) + job = q.enqueue_call(say_hello, after=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertNotEqual(job.status, 'finished')