mirror of https://github.com/rq/rq.git
Merge branch 'jchia-master'
Conflicts: rq/job.py rq/queue.py
This commit is contained in:
commit
7fdd115e28
|
@ -69,7 +69,7 @@ class Job(object):
|
||||||
# Job construction
|
# Job construction
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, func, args=None, kwargs=None, connection=None,
|
def create(cls, func, args=None, kwargs=None, connection=None,
|
||||||
result_ttl=None, status=None, description=None, depends_on=None):
|
result_ttl=None, status=None, description=None, depends_on=None, timeout=None):
|
||||||
"""Creates a new Job instance for the given function, arguments, and
|
"""Creates a new Job instance for the given function, arguments, and
|
||||||
keyword arguments.
|
keyword arguments.
|
||||||
"""
|
"""
|
||||||
|
@ -91,6 +91,7 @@ class Job(object):
|
||||||
job._kwargs = kwargs
|
job._kwargs = kwargs
|
||||||
job.description = description or job.get_call_string()
|
job.description = description or job.get_call_string()
|
||||||
job.result_ttl = result_ttl
|
job.result_ttl = result_ttl
|
||||||
|
job.timeout = timeout
|
||||||
job._status = status
|
job._status = status
|
||||||
# dependency could be job instance or id
|
# dependency could be job instance or id
|
||||||
if depends_on is not None:
|
if depends_on is not None:
|
||||||
|
|
21
rq/queue.py
21
rq/queue.py
|
@ -22,6 +22,7 @@ def compact(lst):
|
||||||
|
|
||||||
@total_ordering
|
@total_ordering
|
||||||
class Queue(object):
|
class Queue(object):
|
||||||
|
DEFAULT_TIMEOUT = 180 # Default timeout seconds.
|
||||||
redis_queue_namespace_prefix = 'rq:queue:'
|
redis_queue_namespace_prefix = 'rq:queue:'
|
||||||
redis_queues_keys = 'rq:queues'
|
redis_queues_keys = 'rq:queues'
|
||||||
|
|
||||||
|
@ -153,7 +154,7 @@ class Queue(object):
|
||||||
# TODO: job with dependency shouldn't have "queued" as status
|
# TODO: job with dependency shouldn't have "queued" as status
|
||||||
job = Job.create(func, args, kwargs, connection=self.connection,
|
job = Job.create(func, args, kwargs, connection=self.connection,
|
||||||
result_ttl=result_ttl, status=Status.QUEUED,
|
result_ttl=result_ttl, status=Status.QUEUED,
|
||||||
description=description, depends_on=depends_on)
|
description=description, depends_on=depends_on, timeout=timeout)
|
||||||
|
|
||||||
# If job depends on an unfinished job, register itself on it's
|
# If job depends on an unfinished job, register itself on it's
|
||||||
# parent's dependents instead of enqueueing it.
|
# parent's dependents instead of enqueueing it.
|
||||||
|
@ -172,7 +173,7 @@ class Queue(object):
|
||||||
except WatchError:
|
except WatchError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return self.enqueue_job(job, timeout=timeout)
|
return self.enqueue_job(job)
|
||||||
|
|
||||||
def enqueue(self, f, *args, **kwargs):
|
def enqueue(self, f, *args, **kwargs):
|
||||||
"""Creates a job to represent the delayed function call and enqueues
|
"""Creates a job to represent the delayed function call and enqueues
|
||||||
|
@ -211,13 +212,9 @@ class Queue(object):
|
||||||
timeout=timeout, result_ttl=result_ttl,
|
timeout=timeout, result_ttl=result_ttl,
|
||||||
description=description, depends_on=depends_on)
|
description=description, depends_on=depends_on)
|
||||||
|
|
||||||
def enqueue_job(self, job, timeout=None, set_meta_data=True):
|
def enqueue_job(self, job, set_meta_data=True):
|
||||||
"""Enqueues a job for delayed execution.
|
"""Enqueues a job for delayed execution.
|
||||||
|
|
||||||
When the `timeout` argument is sent, it will overrides the default
|
|
||||||
timeout value of 180 seconds. `timeout` may either be a string or
|
|
||||||
integer.
|
|
||||||
|
|
||||||
If the `set_meta_data` argument is `True` (default), it will update
|
If the `set_meta_data` argument is `True` (default), it will update
|
||||||
the properties `origin` and `enqueued_at`.
|
the properties `origin` and `enqueued_at`.
|
||||||
|
|
||||||
|
@ -230,10 +227,8 @@ class Queue(object):
|
||||||
job.origin = self.name
|
job.origin = self.name
|
||||||
job.enqueued_at = times.now()
|
job.enqueued_at = times.now()
|
||||||
|
|
||||||
if timeout:
|
if job.timeout is None:
|
||||||
job.timeout = timeout # _timeout_in_seconds(timeout)
|
job.timeout = self.DEFAULT_TIMEOUT
|
||||||
else:
|
|
||||||
job.timeout = 180 # default
|
|
||||||
job.save()
|
job.save()
|
||||||
|
|
||||||
if self._async:
|
if self._async:
|
||||||
|
@ -379,7 +374,7 @@ class FailedQueue(Queue):
|
||||||
"""
|
"""
|
||||||
job.ended_at = times.now()
|
job.ended_at = times.now()
|
||||||
job.exc_info = exc_info
|
job.exc_info = exc_info
|
||||||
return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False)
|
return self.enqueue_job(job, set_meta_data=False)
|
||||||
|
|
||||||
def requeue(self, job_id):
|
def requeue(self, job_id):
|
||||||
"""Requeues the job with the given job ID."""
|
"""Requeues the job with the given job ID."""
|
||||||
|
@ -397,4 +392,4 @@ class FailedQueue(Queue):
|
||||||
job.status = Status.QUEUED
|
job.status = Status.QUEUED
|
||||||
job.exc_info = None
|
job.exc_info = None
|
||||||
q = Queue(job.origin, connection=self.connection)
|
q = Queue(job.origin, connection=self.connection)
|
||||||
q.enqueue_job(job, timeout=job.timeout)
|
q.enqueue_job(job)
|
||||||
|
|
|
@ -323,7 +323,7 @@ class Worker(object):
|
||||||
self.log.info('%s: %s (%s)' % (green(queue.name),
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
||||||
blue(job.description), job.id))
|
blue(job.description), job.id))
|
||||||
|
|
||||||
self.connection.expire(self.key, (job.timeout or 180) + 60)
|
self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60)
|
||||||
self.fork_and_perform_job(job)
|
self.fork_and_perform_job(job)
|
||||||
self.connection.expire(self.key, self.default_worker_ttl)
|
self.connection.expire(self.key, self.default_worker_ttl)
|
||||||
if job.status == 'finished':
|
if job.status == 'finished':
|
||||||
|
@ -404,7 +404,7 @@ class Worker(object):
|
||||||
job.origin, time.time()))
|
job.origin, time.time()))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with death_penalty_after(job.timeout or 180):
|
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
|
||||||
rv = job.perform()
|
rv = job.perform()
|
||||||
|
|
||||||
# Pickle the result in the same try-except block since we need to
|
# Pickle the result in the same try-except block since we need to
|
||||||
|
|
|
@ -308,7 +308,7 @@ class TestQueue(RQTestCase):
|
||||||
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
|
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
|
||||||
|
|
||||||
def test_enqueue_job_with_dependency(self):
|
def test_enqueue_job_with_dependency(self):
|
||||||
"""Jobs are enqueued only when their dependencies are finished"""
|
"""Jobs are enqueued only when their dependencies are finished."""
|
||||||
# Job with unfinished dependency is not immediately enqueued
|
# Job with unfinished dependency is not immediately enqueued
|
||||||
parent_job = Job.create(func=say_hello)
|
parent_job = Job.create(func=say_hello)
|
||||||
q = Queue()
|
q = Queue()
|
||||||
|
@ -320,6 +320,23 @@ class TestQueue(RQTestCase):
|
||||||
parent_job.save()
|
parent_job.save()
|
||||||
job = q.enqueue_call(say_hello, depends_on=parent_job)
|
job = q.enqueue_call(say_hello, depends_on=parent_job)
|
||||||
self.assertEqual(q.job_ids, [job.id])
|
self.assertEqual(q.job_ids, [job.id])
|
||||||
|
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
|
||||||
|
|
||||||
|
def test_enqueue_job_with_dependency_and_timeout(self):
|
||||||
|
"""Jobs still know their specified timeout after being scheduled as a dependency."""
|
||||||
|
# Job with unfinished dependency is not immediately enqueued
|
||||||
|
parent_job = Job.create(func=say_hello)
|
||||||
|
q = Queue()
|
||||||
|
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
|
||||||
|
self.assertEqual(q.job_ids, [])
|
||||||
|
self.assertEqual(job.timeout, 123)
|
||||||
|
|
||||||
|
# Jobs dependent on finished jobs are immediately enqueued
|
||||||
|
parent_job.status = 'finished'
|
||||||
|
parent_job.save()
|
||||||
|
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
|
||||||
|
self.assertEqual(q.job_ids, [job.id])
|
||||||
|
self.assertEqual(job.timeout, 123)
|
||||||
|
|
||||||
|
|
||||||
class TestFailedQueue(RQTestCase):
|
class TestFailedQueue(RQTestCase):
|
||||||
|
|
Loading…
Reference in New Issue