From 317a58a3b5ae816fb980db7fff71d81cfab1e74d Mon Sep 17 00:00:00 2001 From: Goran Peretin Date: Tue, 17 Jul 2012 08:08:35 +0200 Subject: [PATCH 1/3] quarantine preserves job timeout --- rq/queue.py | 2 +- tests/test_queue.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 0f071390..ac5bd523 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -263,7 +263,7 @@ class FailedQueue(Queue): """ job.ended_at = times.now() job.exc_info = exc_info - return self.enqueue_job(job, set_meta_data=False) + return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False) def requeue(self, job_id): """Requeues the job with the given job ID.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index a75abf58..93a0c9bc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -217,3 +217,13 @@ class TestFailedQueue(RQTestCase): # Assert that we cannot requeue a job that's not on the failed queue with self.assertRaises(InvalidJobOperationError): get_failed_queue().requeue(job.id) + + def test_quarantine_preserves_timeout(self): + """Quarantine preserves job timeout.""" + job = Job.create(div_by_zero, 1, 2, 3) + job.origin = 'fake' + job.timeout = 200 + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) + + self.assertEquals(job.timeout, 200) From 34d161eb11ca60631c9d7098d5c080d55ee77205 Mon Sep 17 00:00:00 2001 From: Goran Peretin Date: Tue, 17 Jul 2012 08:41:24 +0200 Subject: [PATCH 2/3] requeueing preserves job timeout --- rq/queue.py | 2 +- tests/test_queue.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index ac5bd523..a7e5dfe9 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -280,4 +280,4 @@ class FailedQueue(Queue): job.exc_info = None q = Queue(job.origin, connection=self.connection) - q.enqueue_job(job) + q.enqueue_job(job, timeout=job.timeout) diff --git a/tests/test_queue.py b/tests/test_queue.py index 93a0c9bc..9f2c4dae 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -227,3 +227,15 @@ class TestFailedQueue(RQTestCase): get_failed_queue().quarantine(job, Exception('Some fake error')) self.assertEquals(job.timeout, 200) + + def test_requeueing_preserves_timeout(self): + """Requeueing preserves job timeout.""" + job = Job.create(div_by_zero, 1, 2, 3) + job.origin = 'fake' + job.timeout = 200 + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) + get_failed_queue().requeue(job.id) + + job = Job.fetch(job.id) + self.assertEquals(job.timeout, 200) From 03bd49511dd6ce3849583281004ede00e07fb4b0 Mon Sep 17 00:00:00 2001 From: Omar Khan Date: Tue, 17 Jul 2012 11:48:41 +0100 Subject: [PATCH 3/3] Allow instance methods to be enqueued Only works for picklable instances --- rq/job.py | 19 ++++++++++++++++--- tests/fixtures.py | 9 +++++++++ tests/test_job.py | 16 ++++++++++++++-- tests/test_queue.py | 15 ++++++++++++++- 4 files changed, 53 insertions(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index 90c3a0e7..b14b7434 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,4 +1,5 @@ import importlib +import inspect import times from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError @@ -50,7 +51,11 @@ class Job(object): """ connection = kwargs.pop('connection', None) job = cls(connection=connection) - job._func_name = '%s.%s' % (func.__module__, func.__name__) + if inspect.ismethod(func): + job._instance = func.im_self + job._func_name = func.__name__ + else: + job._func_name = '%s.%s' % (func.__module__, func.__name__) job._args = args job._kwargs = kwargs job.description = job.get_call_string() @@ -66,10 +71,17 @@ class Job(object): if func_name is None: return None + if self.instance: + return getattr(self.instance, func_name) + module_name, func_name = func_name.rsplit('.', 1) module = importlib.import_module(module_name) return getattr(module, func_name) + @property + def instance(self): + return self._instance + @property def args(self): return self._args @@ -100,6 +112,7 @@ class Job(object): self._id = id self.created_at = times.now() self._func_name = None + self._instance = None self._args = None self._kwargs = None self.description = None @@ -141,7 +154,7 @@ class Job(object): def job_tuple(self): """Returns the job tuple that encodes the actual function call that this job represents.""" - return (self.func_name, self.args, self.kwargs) + return (self.func_name, self.instance, self.args, self.kwargs) @property def return_value(self): @@ -190,7 +203,7 @@ class Job(object): else: return times.to_universal(date_str) - self._func_name, self._args, self._kwargs = unpickle(data) + self._func_name, self._instance, self._args, self._kwargs = unpickle(data) self.created_at = to_date(created_at) self.origin = origin self.description = description diff --git a/tests/fixtures.py b/tests/fixtures.py index 36d0c8af..32003dec 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -39,3 +39,12 @@ def create_file(path): def create_file_after_timeout(path, timeout): time.sleep(timeout) create_file(path) + + +class Calculator(object): + """Test instance methods.""" + def __init__(self, denominator): + self.denominator = denominator + + def calculate(x, y): + return x * y / self.denominator diff --git a/tests/test_job.py b/tests/test_job.py index 453ed06f..18fe60f6 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,7 +1,7 @@ import times from datetime import datetime from tests import RQTestCase -from tests.fixtures import some_calculation, say_hello +from tests.fixtures import Calculator, some_calculation, say_hello from tests.helpers import strip_milliseconds from cPickle import loads from rq.job import Job @@ -19,6 +19,7 @@ class TestJob(RQTestCase): # ...and nothing else self.assertIsNone(job.func) + self.assertIsNone(job.instance) self.assertIsNone(job.args) self.assertIsNone(job.kwargs) self.assertIsNone(job.origin) @@ -35,6 +36,7 @@ class TestJob(RQTestCase): self.assertIsNotNone(job.id) self.assertIsNotNone(job.created_at) self.assertIsNotNone(job.description) + self.assertIsNone(job.instance) # Job data is set... self.assertEquals(job.func, some_calculation) @@ -46,6 +48,15 @@ class TestJob(RQTestCase): self.assertIsNone(job.enqueued_at) self.assertIsNone(job.return_value) + def test_create_instance_method_job(self): + """Creation of jobs for instance methods.""" + c = Calculator(2) + job = Job.create(c.calculate, 3, 4) + + # Job data is set + self.assertEquals(job.func, c.calculate) + self.assertEquals(job.instance, c) + self.assertEquals(job.args, (3, 4)) def test_save(self): # noqa """Storing jobs.""" @@ -64,7 +75,7 @@ class TestJob(RQTestCase): """Fetching jobs.""" # Prepare test self.testconn.hset('rq:job:some_id', 'data', - "(S'tests.fixtures.some_calculation'\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa + "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") # noqa self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") @@ -72,6 +83,7 @@ class TestJob(RQTestCase): job = Job.fetch('some_id') self.assertEquals(job.id, 'some_id') self.assertEquals(job.func_name, 'tests.fixtures.some_calculation') + self.assertIsNone(job.instance) self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) diff --git a/tests/test_queue.py b/tests/test_queue.py index a75abf58..b39b1960 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,5 +1,5 @@ from tests import RQTestCase -from tests.fixtures import say_hello, div_by_zero +from tests.fixtures import Calculator, say_hello, div_by_zero from rq import Queue, get_failed_queue from rq.job import Job from rq.exceptions import InvalidJobOperationError @@ -132,6 +132,19 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEquals(q.count, 0) + def test_dequeue_instance_method(self): + """Dequeueing instance method jobs from queues.""" + q = Queue() + c = Calculator(2) + result = q.enqueue(c.calculate, 3, 4) + + job = q.dequeue() + # The instance has been pickled and unpickled, so it is now a separate + # object. Test for equality using each object's __dict__ instead. + self.assertEquals(job.instance.__dict__, c.__dict__) + self.assertEquals(job.func.__name__, 'calculate') + self.assertEquals(job.args, (3, 4)) + def test_dequeue_ignores_nonexisting_jobs(self): """Dequeuing silently ignores non-existing jobs."""