mirror of https://github.com/rq/rq.git
commit
6144ec0786
48
rq/job.py
48
rq/job.py
|
@ -69,7 +69,7 @@ class Job(object):
|
|||
# Job construction
|
||||
@classmethod
|
||||
def create(cls, func, args=None, kwargs=None, connection=None,
|
||||
result_ttl=None, status=None, description=None):
|
||||
result_ttl=None, status=None, description=None, dependency=None):
|
||||
"""Creates a new Job instance for the given function, arguments, and
|
||||
keyword arguments.
|
||||
"""
|
||||
|
@ -92,6 +92,9 @@ class Job(object):
|
|||
job.description = description or job.get_call_string()
|
||||
job.result_ttl = result_ttl
|
||||
job._status = status
|
||||
# dependency could be job instance or id
|
||||
if dependency is not None:
|
||||
job._dependency_id = dependency.id if isinstance(dependency, Job) else dependency
|
||||
return job
|
||||
|
||||
@property
|
||||
|
@ -124,6 +127,20 @@ class Job(object):
|
|||
def is_started(self):
|
||||
return self.status == Status.STARTED
|
||||
|
||||
@property
|
||||
def dependency(self):
|
||||
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache
|
||||
job.dependency as job._dependency.
|
||||
"""
|
||||
if self._dependency_id is None:
|
||||
return None
|
||||
if hasattr(self, '_dependency'):
|
||||
return self._dependency
|
||||
job = Job.fetch(self._dependency_id, connection=self.connection)
|
||||
job.refresh()
|
||||
self._dependency = job
|
||||
return job
|
||||
|
||||
@property
|
||||
def func(self):
|
||||
func_name = self.func_name
|
||||
|
@ -190,6 +207,7 @@ class Job(object):
|
|||
self.timeout = None
|
||||
self.result_ttl = None
|
||||
self._status = None
|
||||
self._dependency_id = None
|
||||
self.meta = {}
|
||||
|
||||
|
||||
|
@ -213,11 +231,21 @@ class Job(object):
|
|||
"""The Redis key that is used to store job hash under."""
|
||||
return b'rq:job:' + job_id.encode('utf-8')
|
||||
|
||||
@classmethod
|
||||
def waitlist_key_for(cls, job_id):
|
||||
"""The Redis key that is used to store job hash under."""
|
||||
return 'rq:job:%s:waitlist' % (job_id,)
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
"""The Redis key that is used to store job hash under."""
|
||||
return self.key_for(self.id)
|
||||
|
||||
@property
|
||||
def waitlist_key(self):
|
||||
"""The Redis key that is used to store job hash under."""
|
||||
return self.waitlist_key_for(self.id)
|
||||
|
||||
@property # noqa
|
||||
def job_tuple(self):
|
||||
"""Returns the job tuple that encodes the actual function call that
|
||||
|
@ -288,8 +316,9 @@ class Job(object):
|
|||
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
|
||||
self.exc_info = obj.get('exc_info')
|
||||
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
|
||||
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
|
||||
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
|
||||
self._status = as_text(obj.get('status') if obj.get('status') else None)
|
||||
self._dependency_id = as_text(obj.get('dependency_id', None))
|
||||
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
|
||||
|
||||
def dump(self):
|
||||
|
@ -317,6 +346,8 @@ class Job(object):
|
|||
obj['result_ttl'] = self.result_ttl
|
||||
if self._status is not None:
|
||||
obj['status'] = self._status
|
||||
if self._dependency_id is not None:
|
||||
obj['dependency_id'] = self._dependency_id
|
||||
if self.meta:
|
||||
obj['meta'] = dumps(self.meta)
|
||||
|
||||
|
@ -390,6 +421,18 @@ class Job(object):
|
|||
connection = pipeline if pipeline is not None else self.connection
|
||||
connection.expire(self.key, ttl)
|
||||
|
||||
def register_dependency(self):
|
||||
"""Jobs may have a waitlist. Jobs in this waitlist are enqueued
|
||||
only if the dependency job is successfully performed. We maintain this
|
||||
waitlist in Redis, with key that looks something like:
|
||||
|
||||
rq:job:job_id:waitlist = ['job_id_1', 'job_id_2']
|
||||
|
||||
This method puts the job on it's dependency's waitlist.
|
||||
"""
|
||||
# TODO: This can probably be pipelined
|
||||
self.connection.rpush(Job.waitlist_key_for(self._dependency_id), self.id)
|
||||
|
||||
def __str__(self):
|
||||
return '<Job %s: %s>' % (self.id, self.description)
|
||||
|
||||
|
@ -401,5 +444,4 @@ class Job(object):
|
|||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
|
||||
_job_stack = LocalStack()
|
||||
|
|
54
rq/queue.py
54
rq/queue.py
|
@ -3,10 +3,13 @@ import uuid
|
|||
|
||||
from .connections import resolve_connection
|
||||
from .job import Job, Status
|
||||
from .exceptions import (NoSuchJobError, UnpickleError,
|
||||
InvalidJobOperationError, DequeueTimeout)
|
||||
|
||||
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
|
||||
NoSuchJobError, UnpickleError)
|
||||
from .compat import total_ordering, string_types, as_text
|
||||
|
||||
from redis import WatchError
|
||||
|
||||
|
||||
def get_failed_queue(connection=None):
|
||||
"""Returns a handle to the special failed queue."""
|
||||
|
@ -131,8 +134,9 @@ class Queue(object):
|
|||
"""Pushes a job ID on the corresponding Redis queue."""
|
||||
self.connection.rpush(self.key, job_id)
|
||||
|
||||
|
||||
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
|
||||
result_ttl=None, description=None):
|
||||
result_ttl=None, description=None, after=None):
|
||||
"""Creates a job to represent the delayed function call and enqueues
|
||||
it.
|
||||
|
||||
|
@ -141,8 +145,29 @@ class Queue(object):
|
|||
contain options for RQ itself.
|
||||
"""
|
||||
timeout = timeout or self._default_timeout
|
||||
job = Job.create(func, args, kwargs, description=description, connection=self.connection,
|
||||
result_ttl=result_ttl, status=Status.QUEUED)
|
||||
|
||||
# TODO: job with dependency shouldn't have "queued" as status
|
||||
job = Job.create(func, args, kwargs, connection=self.connection,
|
||||
result_ttl=result_ttl, status=Status.QUEUED,
|
||||
description=description, dependency=after)
|
||||
|
||||
# If job depends on an unfinished job, register itself on it's
|
||||
# parent's waitlist instead of enqueueing it.
|
||||
# If WatchError is raised in the process, that means something else is
|
||||
# modifying the dependency. In this case we simply retry
|
||||
if after is not None:
|
||||
with self.connection.pipeline() as pipe:
|
||||
while True:
|
||||
try:
|
||||
pipe.watch(after.key)
|
||||
if after.status != Status.FINISHED:
|
||||
job.register_dependency()
|
||||
job.save()
|
||||
return job
|
||||
break
|
||||
except WatchError:
|
||||
continue
|
||||
|
||||
return self.enqueue_job(job, timeout=timeout)
|
||||
|
||||
def enqueue(self, f, *args, **kwargs):
|
||||
|
@ -168,16 +193,19 @@ class Queue(object):
|
|||
timeout = None
|
||||
description = None
|
||||
result_ttl = None
|
||||
if 'args' in kwargs or 'kwargs' in kwargs:
|
||||
after = None
|
||||
if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs:
|
||||
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
|
||||
timeout = kwargs.pop('timeout', None)
|
||||
description = kwargs.pop('description', None)
|
||||
args = kwargs.pop('args', None)
|
||||
result_ttl = kwargs.pop('result_ttl', None)
|
||||
after = kwargs.pop('after', None)
|
||||
kwargs = kwargs.pop('kwargs', None)
|
||||
|
||||
return self.enqueue_call(func=f, args=args, kwargs=kwargs, description=description,
|
||||
timeout=timeout, result_ttl=result_ttl)
|
||||
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
|
||||
timeout=timeout, result_ttl=result_ttl,
|
||||
description=description, after=after)
|
||||
|
||||
def enqueue_job(self, job, timeout=None, set_meta_data=True):
|
||||
"""Enqueues a job for delayed execution.
|
||||
|
@ -208,6 +236,16 @@ class Queue(object):
|
|||
job.save()
|
||||
return job
|
||||
|
||||
def enqueue_waitlist(self, job):
|
||||
"""Enqueues all jobs in the waitlist and clears it"""
|
||||
# TODO: can probably be pipelined
|
||||
while True:
|
||||
job_id = as_text(self.connection.lpop(job.waitlist_key))
|
||||
if job_id is None:
|
||||
break
|
||||
waitlisted_job = Job.fetch(job_id, connection=self.connection)
|
||||
self.enqueue_job(waitlisted_job)
|
||||
|
||||
def pop_job_id(self):
|
||||
"""Pops a given job ID from this Redis queue."""
|
||||
return as_text(self.connection.lpop(self.key))
|
||||
|
|
|
@ -327,6 +327,8 @@ class Worker(object):
|
|||
self.connection.expire(self.key, (job.timeout or 180) + 60)
|
||||
self.fork_and_perform_job(job)
|
||||
self.connection.expire(self.key, self.default_worker_ttl)
|
||||
if job.status == 'finished':
|
||||
queue.enqueue_waitlist(job)
|
||||
|
||||
did_perform_work = True
|
||||
finally:
|
||||
|
|
|
@ -7,6 +7,7 @@ try:
|
|||
from cPickle import loads
|
||||
except ImportError:
|
||||
from pickle import loads
|
||||
from rq.compat import as_text
|
||||
from rq.job import Job, get_current_job
|
||||
from rq.exceptions import NoSuchJobError, UnpickleError
|
||||
from rq.queue import Queue
|
||||
|
@ -134,6 +135,22 @@ class TestJob(RQTestCase):
|
|||
sorted(self.testconn.hkeys(job.key)),
|
||||
[b'created_at', b'data', b'description'])
|
||||
|
||||
def test_persistence_of_parent_job(self):
|
||||
"""Storing jobs with parent job, either instance or key."""
|
||||
parent_job = Job.create(func=some_calculation)
|
||||
parent_job.save()
|
||||
job = Job.create(func=some_calculation, dependency=parent_job)
|
||||
job.save()
|
||||
stored_job = Job.fetch(job.id)
|
||||
self.assertEqual(stored_job._dependency_id, parent_job.id)
|
||||
self.assertEqual(stored_job.dependency, parent_job)
|
||||
|
||||
job = Job.create(func=some_calculation, dependency=parent_job.id)
|
||||
job.save()
|
||||
stored_job = Job.fetch(job.id)
|
||||
self.assertEqual(stored_job._dependency_id, parent_job.id)
|
||||
self.assertEqual(stored_job.dependency, parent_job)
|
||||
|
||||
def test_store_then_fetch(self):
|
||||
"""Store, then fetch."""
|
||||
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
|
||||
|
@ -265,3 +282,11 @@ class TestJob(RQTestCase):
|
|||
# Jobs with 0 TTL are immediately deleted
|
||||
job.cleanup(ttl=0)
|
||||
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
|
||||
|
||||
def test_register_dependency(self):
|
||||
"""Test that jobs updates the correct job waitlist"""
|
||||
job = Job.create(func=say_hello)
|
||||
job._dependency_id = 'id'
|
||||
job.save()
|
||||
job.register_dependency()
|
||||
self.assertEqual(as_text(self.testconn.lpop('rq:job:id:waitlist')), job.id)
|
||||
|
|
|
@ -251,6 +251,38 @@ class TestQueue(RQTestCase):
|
|||
job = q.enqueue(say_hello)
|
||||
self.assertEqual(job.status, Status.QUEUED)
|
||||
|
||||
def test_enqueue_waitlist(self):
|
||||
"""Enqueueing a waitlist pushes all jobs in waitlist to queue"""
|
||||
q = Queue()
|
||||
parent_job = Job.create(func=say_hello)
|
||||
parent_job.save()
|
||||
job_1 = Job.create(func=say_hello, dependency=parent_job)
|
||||
job_1.save()
|
||||
job_1.register_dependency()
|
||||
job_2 = Job.create(func=say_hello, dependency=parent_job)
|
||||
job_2.save()
|
||||
job_2.register_dependency()
|
||||
|
||||
# After waitlist is enqueued, job_1 and job_2 should be in queue
|
||||
self.assertEqual(q.job_ids, [])
|
||||
q.enqueue_waitlist(parent_job)
|
||||
self.assertEqual(q.job_ids, [job_1.id, job_2.id])
|
||||
self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
|
||||
|
||||
def test_enqueue_job_with_dependency(self):
|
||||
"""Jobs are enqueued only when their dependencies are finished"""
|
||||
# Job with unfinished dependency is not immediately enqueued
|
||||
parent_job = Job.create(func=say_hello)
|
||||
q = Queue()
|
||||
q.enqueue_call(say_hello, after=parent_job)
|
||||
self.assertEqual(q.job_ids, [])
|
||||
|
||||
# Jobs dependent on finished jobs are immediately enqueued
|
||||
parent_job.status = 'finished'
|
||||
parent_job.save()
|
||||
job = q.enqueue_call(say_hello, after=parent_job)
|
||||
self.assertEqual(q.job_ids, [job.id])
|
||||
|
||||
|
||||
class TestFailedQueue(RQTestCase):
|
||||
def test_requeue_job(self):
|
||||
|
|
|
@ -234,3 +234,19 @@ class TestWorker(RQTestCase):
|
|||
self.assertEqual(job.is_queued, False)
|
||||
self.assertEqual(job.is_finished, False)
|
||||
self.assertEqual(job.is_failed, True)
|
||||
|
||||
def test_job_dependency(self):
|
||||
"""Enqueue waitlisted jobs only if their parents don't fail"""
|
||||
q = Queue()
|
||||
w = Worker([q])
|
||||
parent_job = q.enqueue(say_hello)
|
||||
job = q.enqueue_call(say_hello, after=parent_job)
|
||||
w.work(burst=True)
|
||||
job = Job.fetch(job.id)
|
||||
self.assertEqual(job.status, 'finished')
|
||||
|
||||
parent_job = q.enqueue(div_by_zero)
|
||||
job = q.enqueue_call(say_hello, after=parent_job)
|
||||
w.work(burst=True)
|
||||
job = Job.fetch(job.id)
|
||||
self.assertNotEqual(job.status, 'finished')
|
||||
|
|
Loading…
Reference in New Issue