diff --git a/rq/exceptions.py b/rq/exceptions.py index 793bcc50..4d2cb6a1 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -2,6 +2,10 @@ class NoSuchJobError(Exception): pass +class InvalidJobOperationError(Exception): + pass + + class NoQueueError(Exception): pass diff --git a/rq/queue.py b/rq/queue.py index 9bc09e85..27497872 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -2,7 +2,7 @@ import times from functools import total_ordering from .proxy import conn from .job import Job -from .exceptions import NoSuchJobError, UnpickleError +from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError def compact(lst): @@ -121,10 +121,6 @@ class Queue(object): self.push_job_id(job.id) return job - def requeue(self, job): - """Requeues an existing (typically a failed job) onto the queue.""" - raise NotImplementedError('Implement this') - def pop_job_id(self): """Pops a given job ID from this Redis queue.""" return conn.lpop(self.key) @@ -238,3 +234,15 @@ class FailedQueue(Queue): job.ended_at = times.now() job.exc_info = exc_info return self.enqueue_job(job, set_meta_data=False) + + def requeue(self, job_id): + """Requeues the given job ID.""" + job = Job.fetch(job_id) + + # Delete it from the FailedQueue (raise an error if that failed) + if conn.lrem(self.key, job.id) == 0: + raise InvalidJobOperationError('Cannot requeue non-failed jobs.') + + job.exc_info = None + q = Queue(job.origin) + q.enqueue_job(job) diff --git a/tests/test_job.py b/tests/test_job.py index a065c787..78e2002e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -152,3 +152,4 @@ class TestJob(RQTestCase): self.testconn.hset(job.key, 'data', unimportable_data) with self.assertRaises(UnpickleError): job.refresh() + diff --git a/tests/test_queue.py b/tests/test_queue.py index 6b7329ff..4401aae9 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,6 +1,7 @@ from tests import RQTestCase -from tests import testjob +from tests import testjob, failing_job from rq import Queue, FailedQueue, Job +from rq.exceptions import InvalidJobOperationError class TestQueue(RQTestCase): @@ -189,3 +190,29 @@ class TestQueue(RQTestCase): self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None) self.assertEquals(q.count, 0) + + +class TestFailedQueue(RQTestCase): + def test_requeue_job(self): + """Requeueing existing jobs.""" + job = Job.for_call(failing_job, 1, 2, 3) + job.origin = 'fake' + job.save() + FailedQueue().quarantine(job, Exception('Some fake error')) + + self.assertItemsEqual(Queue.all(), [FailedQueue()]) + self.assertEquals(FailedQueue().count, 1) + + FailedQueue().requeue(job.id) + + self.assertEquals(FailedQueue().count, 0) + self.assertEquals(Queue('fake').count, 1) + + def test_requeue_nonfailed_job_fails(self): + """Requeueing non-failed jobs raises error.""" + q = Queue() + job = q.enqueue(testjob, 'Nick', foo='bar') + + # Assert that we cannot requeue a job that's not on the failed queue + with self.assertRaises(InvalidJobOperationError): + FailedQueue().requeue(job.id)