diff --git a/rq/job.py b/rq/job.py index 7170f7ab..da78253b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,6 +1,7 @@ import importlib import inspect import times +from collections import namedtuple from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError from .connections import get_current_connection @@ -8,9 +9,15 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', - 'created_at', 'enqueued_at', 'connection', '_result', + 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', - 'result_ttl']) + 'result_ttl', '_status', 'status']) + +def enum(name, *sequential, **named): + values = dict(zip(sequential, range(len(sequential))), **named) + return type(name, (), values) + +Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed') def unpickle(pickled_string): @@ -48,11 +55,10 @@ def requeue_job(job_id, connection=None): class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ - # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None): + result_ttl=None, status=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -74,12 +80,29 @@ class Job(object): job._kwargs = kwargs job.description = job.get_call_string() job.result_ttl = result_ttl + job._status = status return job @property def func_name(self): return self._func_name + @property + def status(self): + return self._status + + @property + def is_finished(self): + return self.status == Status.FINISHED + + @property + def is_queued(self): + return self.status == Status.QUEUED + + @property + def is_failed(self): + return self.status == Status.FAILED + @property def func(self): func_name = self.func_name @@ -138,6 +161,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self._status = None # Data access @@ -227,6 +251,7 @@ class Job(object): self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self._status = obj.get('status') if obj.get('status') else None # noqa # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) @@ -258,6 +283,8 @@ class Job(object): obj['timeout'] = self.timeout if self.result_ttl is not None: obj['result_ttl'] = self.result_ttl + if self._status is not None: + obj['status'] = self._status """ Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data diff --git a/rq/queue.py b/rq/queue.py index 0ff58616..6dca8e17 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,6 +1,6 @@ import times from .connections import resolve_connection -from .job import Job +from .job import Job, Status from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .compat import total_ordering @@ -115,7 +115,8 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) + job = Job.create(func, args, kwargs, connection=self.connection, + result_ttl=result_ttl, status=Status.QUEUED) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/rq/worker.py b/rq/worker.py index 1ea226f1..18e52f86 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,6 +14,7 @@ import logging from cPickle import dumps from .queue import Queue, get_failed_queue from .connections import get_current_connection +from .job import Status from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after @@ -385,10 +386,12 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails pickled_rv = dumps(rv) + job._status = Status.FINISHED except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) + job._status = Status.FAILED fq.quarantine(job, exc_info=traceback.format_exc()) return False @@ -398,23 +401,25 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - # Expire results - has_result = rv is not None - explicit_ttl_requested = job.result_ttl is not None - should_expire = has_result or explicit_ttl_requested - if should_expire: + # How long we persist the job result depends on the value of + # result_ttl: + # - If result_ttl is 0, cleanup the job immediately. + # - If it's a positive number, set the job to expire in X seconds. + # - If result_ttl is negative, don't set an expiry to it (persist + # forever) + result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa + if result_ttl == 0: + job.delete() + self.log.info('Result discarded immediately.') + else: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - - if explicit_ttl_requested: - ttl = job.result_ttl + p.hset(job.key, 'status', job._status) + if result_ttl > 0: + p.expire(job.key, result_ttl) + self.log.info('Result is kept for %d seconds.' % result_ttl) else: - ttl = self.default_result_ttl - if ttl >= 0: - p.expire(job.key, ttl) + self.log.warning('Result will never expire, clean up result key manually.') p.execute() - else: - # Cleanup immediately - job.delete() return True diff --git a/tests/test_queue.py b/tests/test_queue.py index fe503188..4a88f652 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,7 +1,7 @@ from tests import RQTestCase from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation from rq import Queue, get_failed_queue -from rq.job import Job +from rq.job import Job, Status from rq.exceptions import InvalidJobOperationError @@ -205,6 +205,12 @@ class TestQueue(RQTestCase): None) self.assertEquals(q.count, 0) + def test_enqueue_sets_status(self): + """Enqueueing a job sets its status to "queued".""" + q = Queue() + job = q.enqueue(say_hello) + self.assertEqual(job.status, Status.QUEUED) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): @@ -265,4 +271,4 @@ class TestFailedQueue(RQTestCase): """Executes a job immediately if async=False.""" q = Queue(async=False) job = q.enqueue(some_calculation, args=(2, 3)) - self.assertEqual(job.return_value, 6) \ No newline at end of file + self.assertEqual(job.return_value, 6) diff --git a/tests/test_worker.py b/tests/test_worker.py index 5e46a13c..67cf6e3d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,7 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ create_file_after_timeout from tests.helpers import strip_milliseconds from rq import Queue, Worker, get_failed_queue -from rq.job import Job +from rq.job import Job, Status class TestWorker(RQTestCase): @@ -123,29 +123,6 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEquals(os.path.exists(SENTINEL_FILE), False) - def test_cleaning_up_of_jobs(self): - """Jobs get cleaned up after successful execution.""" - q = Queue() - job_with_rv = q.enqueue(say_hello, 'Franklin') - job_without_rv = q.enqueue(do_nothing) - - # Job hashes exists - self.assertEquals(self.testconn.type(job_with_rv.key), 'hash') - self.assertEquals(self.testconn.type(job_without_rv.key), 'hash') - - # Execute the job - w = Worker([q]) - w.work(burst=True) - - # First, assert that the job executed successfully - assert self.testconn.hget(job_with_rv.key, 'exc_info') is None - assert self.testconn.hget(job_without_rv.key, 'exc_info') is None - - # Jobs with results expire after a certain TTL, while jobs without - # results are immediately removed - assert self.testconn.ttl(job_with_rv.key) > 0 - assert not self.testconn.exists(job_without_rv.key) - @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -187,3 +164,36 @@ class TestWorker(RQTestCase): w = Worker([q]) w.work(burst=True) self.assertEqual(self.testconn.ttl(job.key), None) + + # Job with result_ttl = 0 gets deleted immediately + job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.get(job.key), None) + + def test_worker_sets_job_status(self): + """Ensure that worker correctly sets job status.""" + q = Queue() + w = Worker([q]) + + job = q.enqueue(say_hello) + self.assertEqual(job.status, 'queued') + self.assertEqual(job.is_queued, True) + self.assertEqual(job.is_finished, False) + self.assertEqual(job.is_failed, False) + + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, 'finished') + self.assertEqual(job.is_queued, False) + self.assertEqual(job.is_finished, True) + self.assertEqual(job.is_failed, False) + + # Failed jobs should set status to "failed" + job = q.enqueue(div_by_zero, args=(1,)) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, 'failed') + self.assertEqual(job.is_queued, False) + self.assertEqual(job.is_finished, False) + self.assertEqual(job.is_failed, True)