From 442b389b970e9cd6843a58d82c7b9da9d1c6305b Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 25 Aug 2012 17:58:15 +0700 Subject: [PATCH 1/6] Job returning None as result are now persisted correctly. Job status can now be checked via ``status`` property which should return either "queued", "finished" or "failed". --- rq/job.py | 19 ++++++++++++++++--- rq/queue.py | 3 ++- rq/worker.py | 30 ++++++++++++++--------------- tests/test_queue.py | 6 ++++++ tests/test_worker.py | 45 ++++++++++++++++++++++---------------------- 5 files changed, 61 insertions(+), 42 deletions(-) diff --git a/rq/job.py b/rq/job.py index 7170f7ab..bf556615 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,6 +1,7 @@ import importlib import inspect import times +from collections import namedtuple from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError from .connections import get_current_connection @@ -8,9 +9,11 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', - 'created_at', 'enqueued_at', 'connection', '_result', + 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', - 'result_ttl']) + 'result_ttl', '_status', 'status']) +Status = namedtuple('Status', ('queued', 'finished', 'failed')) +STATUS = Status(queued='queued', finished='finished', failed='failed') def unpickle(pickled_string): @@ -48,11 +51,12 @@ def requeue_job(job_id, connection=None): class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + STATUS = STATUS # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None): + result_ttl=None, status=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -74,12 +78,17 @@ class Job(object): job._kwargs = kwargs job.description = job.get_call_string() job.result_ttl = result_ttl + job._status = status return job @property def func_name(self): return self._func_name + @property + def status(self): + return self._status + @property def func(self): func_name = self.func_name @@ -138,6 +147,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self._status = None # Data access @@ -227,6 +237,7 @@ class Job(object): self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self._status = obj.get('status') if obj.get('status') else None # noqa # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) @@ -258,6 +269,8 @@ class Job(object): obj['timeout'] = self.timeout if self.result_ttl is not None: obj['result_ttl'] = self.result_ttl + if self._status is not None: + obj['status'] = self._status """ Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data diff --git a/rq/queue.py b/rq/queue.py index 0ff58616..f9cfff0c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -115,7 +115,8 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) + job = Job.create(func, args, kwargs, connection=self.connection, + result_ttl=result_ttl, status=Job.STATUS.queued) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/rq/worker.py b/rq/worker.py index 389ca770..4ff7888b 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -384,10 +384,12 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails pickled_rv = dumps(rv) + job._status = job.STATUS.finished except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) + job._status = job.STATUS.failed fq.quarantine(job, exc_info=traceback.format_exc()) return False @@ -397,23 +399,21 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - # Expire results - has_result = rv is not None - explicit_ttl_requested = job.result_ttl is not None - should_expire = has_result or explicit_ttl_requested - if should_expire: + """ + How long we persist the job result depends on the value of result_ttl: + - If result_ttl is 0, cleanup the job immediately. + - If it's a positive number, set the job to expire in X seconds. + - If result_ttl is negative, don't set an expiry to it (persist forever) + """ + result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl + if result_ttl == 0: + job.delete() + else: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - - if explicit_ttl_requested: - ttl = job.result_ttl - else: - ttl = self.default_result_ttl - if ttl >= 0: - p.expire(job.key, ttl) + p.hset(job.key, 'status', job._status) + if result_ttl > 0: + p.expire(job.key, result_ttl) p.execute() - else: - # Cleanup immediately - job.delete() return True diff --git a/tests/test_queue.py b/tests/test_queue.py index fe503188..3d455c2e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -205,6 +205,12 @@ class TestQueue(RQTestCase): None) self.assertEquals(q.count, 0) + def test_enqueue_sets_status(self): + """Enqueueing a job sets its status to "queued".""" + q = Queue() + job = q.enqueue(say_hello) + self.assertEqual(job.status, Job.STATUS.queued) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 5e46a13c..91d1589e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -123,29 +123,6 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEquals(os.path.exists(SENTINEL_FILE), False) - def test_cleaning_up_of_jobs(self): - """Jobs get cleaned up after successful execution.""" - q = Queue() - job_with_rv = q.enqueue(say_hello, 'Franklin') - job_without_rv = q.enqueue(do_nothing) - - # Job hashes exists - self.assertEquals(self.testconn.type(job_with_rv.key), 'hash') - self.assertEquals(self.testconn.type(job_without_rv.key), 'hash') - - # Execute the job - w = Worker([q]) - w.work(burst=True) - - # First, assert that the job executed successfully - assert self.testconn.hget(job_with_rv.key, 'exc_info') is None - assert self.testconn.hget(job_without_rv.key, 'exc_info') is None - - # Jobs with results expire after a certain TTL, while jobs without - # results are immediately removed - assert self.testconn.ttl(job_with_rv.key) > 0 - assert not self.testconn.exists(job_without_rv.key) - @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -187,3 +164,25 @@ class TestWorker(RQTestCase): w = Worker([q]) w.work(burst=True) self.assertEqual(self.testconn.ttl(job.key), None) + + # Job with result_ttl = 0 gets deleted immediately + job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.get(job.key), None) + + def test_worker_sets_job_status(self): + """Ensure that worker correctly sets job status.""" + q = Queue() + w = Worker([q]) + + job = q.enqueue(say_hello) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, job.STATUS.finished) + + # Failed jobs should set status to "failed" + job = q.enqueue(div_by_zero, args=(1,)) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, job.STATUS.failed) \ No newline at end of file From 42243042917d8a8c74f0774d70c228585781809a Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 27 Aug 2012 09:40:59 +0200 Subject: [PATCH 2/6] I like this implementation of an 'enum' better. --- rq/job.py | 10 ++++++---- rq/queue.py | 4 ++-- rq/worker.py | 9 +++++---- tests/test_queue.py | 6 +++--- tests/test_worker.py | 10 +++++----- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/rq/job.py b/rq/job.py index bf556615..5681e66c 100644 --- a/rq/job.py +++ b/rq/job.py @@ -12,8 +12,12 @@ JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', 'result_ttl', '_status', 'status']) -Status = namedtuple('Status', ('queued', 'finished', 'failed')) -STATUS = Status(queued='queued', finished='finished', failed='failed') + +def enum(name, *sequential, **named): + values = dict(zip(sequential, range(len(sequential))), **named) + return type(name, (), values) + +Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed') def unpickle(pickled_string): @@ -51,8 +55,6 @@ def requeue_job(job_id, connection=None): class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ - STATUS = STATUS - # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, diff --git a/rq/queue.py b/rq/queue.py index f9cfff0c..6dca8e17 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,6 +1,6 @@ import times from .connections import resolve_connection -from .job import Job +from .job import Job, Status from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .compat import total_ordering @@ -116,7 +116,7 @@ class Queue(object): """ timeout = timeout or self._default_timeout job = Job.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Job.STATUS.queued) + result_ttl=result_ttl, status=Status.QUEUED) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/rq/worker.py b/rq/worker.py index 4ff7888b..3ba1d6c2 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,6 +14,7 @@ import logging from cPickle import dumps from .queue import Queue, get_failed_queue from .connections import get_current_connection +from .job import Status from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after @@ -384,12 +385,12 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails pickled_rv = dumps(rv) - job._status = job.STATUS.finished + job._status = Status.FINISHED except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) - job._status = job.STATUS.failed + job._status = Status.FAILED fq.quarantine(job, exc_info=traceback.format_exc()) return False @@ -403,9 +404,9 @@ class Worker(object): How long we persist the job result depends on the value of result_ttl: - If result_ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist forever) + - If result_ttl is negative, don't set an expiry to it (persist forever) """ - result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl + result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl if result_ttl == 0: job.delete() else: diff --git a/tests/test_queue.py b/tests/test_queue.py index 3d455c2e..4a88f652 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,7 +1,7 @@ from tests import RQTestCase from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation from rq import Queue, get_failed_queue -from rq.job import Job +from rq.job import Job, Status from rq.exceptions import InvalidJobOperationError @@ -209,7 +209,7 @@ class TestQueue(RQTestCase): """Enqueueing a job sets its status to "queued".""" q = Queue() job = q.enqueue(say_hello) - self.assertEqual(job.status, Job.STATUS.queued) + self.assertEqual(job.status, Status.QUEUED) class TestFailedQueue(RQTestCase): @@ -271,4 +271,4 @@ class TestFailedQueue(RQTestCase): """Executes a job immediately if async=False.""" q = Queue(async=False) job = q.enqueue(some_calculation, args=(2, 3)) - self.assertEqual(job.return_value, 6) \ No newline at end of file + self.assertEqual(job.return_value, 6) diff --git a/tests/test_worker.py b/tests/test_worker.py index 91d1589e..dfe5261f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,7 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ create_file_after_timeout from tests.helpers import strip_milliseconds from rq import Queue, Worker, get_failed_queue -from rq.job import Job +from rq.job import Job, Status class TestWorker(RQTestCase): @@ -175,14 +175,14 @@ class TestWorker(RQTestCase): """Ensure that worker correctly sets job status.""" q = Queue() w = Worker([q]) - + job = q.enqueue(say_hello) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, job.STATUS.finished) - + self.assertEqual(job.status, Status.FINISHED) + # Failed jobs should set status to "failed" job = q.enqueue(div_by_zero, args=(1,)) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, job.STATUS.failed) \ No newline at end of file + self.assertEqual(job.status, Status.FAILED) From 9549b34d602a73300200560559d79049f73c3aa5 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 27 Aug 2012 09:47:09 +0200 Subject: [PATCH 3/6] Add convenience accessor properties for status. --- rq/job.py | 16 ++++++++++++++++ tests/test_worker.py | 18 ++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/rq/job.py b/rq/job.py index 5681e66c..6df4b079 100644 --- a/rq/job.py +++ b/rq/job.py @@ -91,6 +91,22 @@ class Job(object): def status(self): return self._status + @property + def is_finished(self): + return self.status == Status.FINISHED + + @property + def is_queued(self): + return self.status == Status.QUEUED + + @property + def is_failed(self): + return self.status == Status.FAILED + + @property + def is_done(self): + return self._status is not None and not self.is_queued + @property def func(self): func_name = self.func_name diff --git a/tests/test_worker.py b/tests/test_worker.py index dfe5261f..9bab2857 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -177,12 +177,26 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue(say_hello) + self.assertEqual(job.status, 'queued') + self.assertEqual(job.is_queued, True) + self.assertEqual(job.is_finished, False) + self.assertEqual(job.is_failed, False) + self.assertEqual(job.is_done, False) + w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, Status.FINISHED) + self.assertEqual(job.status, 'finished') + self.assertEqual(job.is_queued, False) + self.assertEqual(job.is_finished, True) + self.assertEqual(job.is_failed, False) + self.assertEqual(job.is_done, True) # Failed jobs should set status to "failed" job = q.enqueue(div_by_zero, args=(1,)) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, Status.FAILED) + self.assertEqual(job.status, 'failed') + self.assertEqual(job.is_queued, False) + self.assertEqual(job.is_finished, False) + self.assertEqual(job.is_failed, True) + self.assertEqual(job.is_done, True) From 4b797fbf43e3e81f777fdb4ccfd69712059c9598 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 27 Aug 2012 09:58:59 +0200 Subject: [PATCH 4/6] Don't use strings, but comments. --- rq/worker.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3ba1d6c2..7002e93a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -400,13 +400,13 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - """ - How long we persist the job result depends on the value of result_ttl: - - If result_ttl is 0, cleanup the job immediately. - - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist forever) - """ - result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl + # How long we persist the job result depends on the value of + # result_ttl: + # - If result_ttl is 0, cleanup the job immediately. + # - If it's a positive number, set the job to expire in X seconds. + # - If result_ttl is negative, don't set an expiry to it (persist + # forever) + result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa if result_ttl == 0: job.delete() else: From bc7e32bae8fdedeeb6cfb7baa95db1534f562631 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 27 Aug 2012 09:59:18 +0200 Subject: [PATCH 5/6] Add expiry info to the worker log. --- rq/worker.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 7002e93a..d41162de 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -409,12 +409,16 @@ class Worker(object): result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa if result_ttl == 0: job.delete() + self.log.info('Result discarded immediately.') else: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) p.hset(job.key, 'status', job._status) if result_ttl > 0: p.expire(job.key, result_ttl) + self.log.info('Result is kept for %d seconds.' % result_ttl) + else: + self.log.warning('Result will never expire, clean up result key manually.') p.execute() return True From 6b0ebe9ceb6f8b0ce53cfaf15d59acf87af12e5a Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 27 Aug 2012 12:10:19 +0200 Subject: [PATCH 6/6] Remove is_done property. It is too similar to is_finished. --- rq/job.py | 4 ---- tests/test_worker.py | 3 --- 2 files changed, 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index 6df4b079..da78253b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -103,10 +103,6 @@ class Job(object): def is_failed(self): return self.status == Status.FAILED - @property - def is_done(self): - return self._status is not None and not self.is_queued - @property def func(self): func_name = self.func_name diff --git a/tests/test_worker.py b/tests/test_worker.py index 9bab2857..67cf6e3d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -181,7 +181,6 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_queued, True) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, False) - self.assertEqual(job.is_done, False) w.work(burst=True) job = Job.fetch(job.id) @@ -189,7 +188,6 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, True) self.assertEqual(job.is_failed, False) - self.assertEqual(job.is_done, True) # Failed jobs should set status to "failed" job = q.enqueue(div_by_zero, args=(1,)) @@ -199,4 +197,3 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, True) - self.assertEqual(job.is_done, True)