Merge branch 'selwin-persist-None-result'

This commit is contained in:
Vincent Driessen 2012-08-27 12:10:52 +02:00
commit 35761a0d71
5 changed files with 95 additions and 46 deletions

View File

@ -1,6 +1,7 @@
import importlib import importlib
import inspect import inspect
import times import times
from collections import namedtuple
from uuid import uuid4 from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError from cPickle import loads, dumps, UnpicklingError
from .connections import get_current_connection from .connections import get_current_connection
@ -8,9 +9,15 @@ from .exceptions import UnpickleError, NoSuchJobError
JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args',
'created_at', 'enqueued_at', 'connection', '_result', 'created_at', 'enqueued_at', 'connection', '_result', 'result',
'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance',
'result_ttl']) 'result_ttl', '_status', 'status'])
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
return type(name, (), values)
Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed')
def unpickle(pickled_string): def unpickle(pickled_string):
@ -48,11 +55,10 @@ def requeue_job(job_id, connection=None):
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None): result_ttl=None, status=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -74,12 +80,29 @@ class Job(object):
job._kwargs = kwargs job._kwargs = kwargs
job.description = job.get_call_string() job.description = job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job._status = status
return job return job
@property @property
def func_name(self): def func_name(self):
return self._func_name return self._func_name
@property
def status(self):
return self._status
@property
def is_finished(self):
return self.status == Status.FINISHED
@property
def is_queued(self):
return self.status == Status.QUEUED
@property
def is_failed(self):
return self.status == Status.FAILED
@property @property
def func(self): def func(self):
func_name = self.func_name func_name = self.func_name
@ -138,6 +161,7 @@ class Job(object):
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self._status = None
# Data access # Data access
@ -227,6 +251,7 @@ class Job(object):
self.exc_info = obj.get('exc_info') self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None # noqa
# Overwrite job's additional attrs (those not in JOB_ATTRS), if any # Overwrite job's additional attrs (those not in JOB_ATTRS), if any
additional_attrs = set(obj.keys()).difference(JOB_ATTRS) additional_attrs = set(obj.keys()).difference(JOB_ATTRS)
@ -258,6 +283,8 @@ class Job(object):
obj['timeout'] = self.timeout obj['timeout'] = self.timeout
if self.result_ttl is not None: if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
""" """
Store additional attributes from job instance into Redis. This is done Store additional attributes from job instance into Redis. This is done
so that third party libraries using RQ can store additional data so that third party libraries using RQ can store additional data

View File

@ -1,6 +1,6 @@
import times import times
from .connections import resolve_connection from .connections import resolve_connection
from .job import Job from .job import Job, Status
from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError
from .compat import total_ordering from .compat import total_ordering
@ -115,7 +115,8 @@ class Queue(object):
contain options for RQ itself. contain options for RQ itself.
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):

View File

@ -14,6 +14,7 @@ import logging
from cPickle import dumps from cPickle import dumps
from .queue import Queue, get_failed_queue from .queue import Queue, get_failed_queue
from .connections import get_current_connection from .connections import get_current_connection
from .job import Status
from .utils import make_colorizer from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_penalty_after from .timeouts import death_penalty_after
@ -385,10 +386,12 @@ class Worker(object):
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails # use the same exc handling when pickling fails
pickled_rv = dumps(rv) pickled_rv = dumps(rv)
job._status = Status.FINISHED
except Exception as e: except Exception as e:
fq = self.failed_queue fq = self.failed_queue
self.log.exception(red(str(e))) self.log.exception(red(str(e)))
self.log.warning('Moving job to %s queue.' % fq.name) self.log.warning('Moving job to %s queue.' % fq.name)
job._status = Status.FAILED
fq.quarantine(job, exc_info=traceback.format_exc()) fq.quarantine(job, exc_info=traceback.format_exc())
return False return False
@ -398,23 +401,25 @@ class Worker(object):
else: else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
# Expire results # How long we persist the job result depends on the value of
has_result = rv is not None # result_ttl:
explicit_ttl_requested = job.result_ttl is not None # - If result_ttl is 0, cleanup the job immediately.
should_expire = has_result or explicit_ttl_requested # - If it's a positive number, set the job to expire in X seconds.
if should_expire: # - If result_ttl is negative, don't set an expiry to it (persist
# forever)
result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa
if result_ttl == 0:
job.delete()
self.log.info('Result discarded immediately.')
else:
p = self.connection.pipeline() p = self.connection.pipeline()
p.hset(job.key, 'result', pickled_rv) p.hset(job.key, 'result', pickled_rv)
p.hset(job.key, 'status', job._status)
if explicit_ttl_requested: if result_ttl > 0:
ttl = job.result_ttl p.expire(job.key, result_ttl)
self.log.info('Result is kept for %d seconds.' % result_ttl)
else: else:
ttl = self.default_result_ttl self.log.warning('Result will never expire, clean up result key manually.')
if ttl >= 0:
p.expire(job.key, ttl)
p.execute() p.execute()
else:
# Cleanup immediately
job.delete()
return True return True

View File

@ -1,7 +1,7 @@
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation
from rq import Queue, get_failed_queue from rq import Queue, get_failed_queue
from rq.job import Job from rq.job import Job, Status
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
@ -205,6 +205,12 @@ class TestQueue(RQTestCase):
None) None)
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
def test_enqueue_sets_status(self):
"""Enqueueing a job sets its status to "queued"."""
q = Queue()
job = q.enqueue(say_hello)
self.assertEqual(job.status, Status.QUEUED)
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):
@ -265,4 +271,4 @@ class TestFailedQueue(RQTestCase):
"""Executes a job immediately if async=False.""" """Executes a job immediately if async=False."""
q = Queue(async=False) q = Queue(async=False)
job = q.enqueue(some_calculation, args=(2, 3)) job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6) self.assertEqual(job.return_value, 6)

View File

@ -4,7 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \
create_file_after_timeout create_file_after_timeout
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from rq import Queue, Worker, get_failed_queue from rq import Queue, Worker, get_failed_queue
from rq.job import Job from rq.job import Job, Status
class TestWorker(RQTestCase): class TestWorker(RQTestCase):
@ -123,29 +123,6 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution # Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False) self.assertEquals(os.path.exists(SENTINEL_FILE), False)
def test_cleaning_up_of_jobs(self):
"""Jobs get cleaned up after successful execution."""
q = Queue()
job_with_rv = q.enqueue(say_hello, 'Franklin')
job_without_rv = q.enqueue(do_nothing)
# Job hashes exists
self.assertEquals(self.testconn.type(job_with_rv.key), 'hash')
self.assertEquals(self.testconn.type(job_without_rv.key), 'hash')
# Execute the job
w = Worker([q])
w.work(burst=True)
# First, assert that the job executed successfully
assert self.testconn.hget(job_with_rv.key, 'exc_info') is None
assert self.testconn.hget(job_without_rv.key, 'exc_info') is None
# Jobs with results expire after a certain TTL, while jobs without
# results are immediately removed
assert self.testconn.ttl(job_with_rv.key) > 0
assert not self.testconn.exists(job_without_rv.key)
@slow # noqa @slow # noqa
def test_timeouts(self): def test_timeouts(self):
"""Worker kills jobs after timeout.""" """Worker kills jobs after timeout."""
@ -187,3 +164,36 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), None) self.assertEqual(self.testconn.ttl(job.key), None)
# Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
w = Worker([q])
w.work(burst=True)
self.assertEqual(self.testconn.get(job.key), None)
def test_worker_sets_job_status(self):
"""Ensure that worker correctly sets job status."""
q = Queue()
w = Worker([q])
job = q.enqueue(say_hello)
self.assertEqual(job.status, 'queued')
self.assertEqual(job.is_queued, True)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, False)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.status, 'finished')
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, True)
self.assertEqual(job.is_failed, False)
# Failed jobs should set status to "failed"
job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.status, 'failed')
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)