CHECKPOINT: Handle failing and unreadable jobs.

Failing (or unreadable) jobs are correctly put on the failure queue by
the worker now.
This commit is contained in:
Vincent Driessen 2012-02-08 15:18:24 +01:00
parent b1650cb9b9
commit f516f8df2e
4 changed files with 25 additions and 16 deletions

View File

@ -116,7 +116,9 @@ class Queue(object):
try:
job = Job.fetch(job_id)
except NoSuchJobError as e:
return None
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively
return self.dequeue()
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
@ -143,11 +145,13 @@ class Queue(object):
try:
job = Job.fetch(job_id)
except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore)
return None
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively
return cls.dequeue_any(queues, blocking)
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
e.job_id = job_id
e.queue = queue
raise e
job.origin = queue

View File

@ -259,9 +259,7 @@ class Worker(object):
self.log.debug('Data follows:')
self.log.debug(e.raw_data)
self.log.debug('End of unreadable data.')
fq = self.failure_queue
fq._push(e.raw_data)
self.failure_queue.push_job_id(e.job_id)
continue
if job is None:
@ -286,6 +284,13 @@ class Worker(object):
self.perform_job(job)
except Exception as e:
self.log.exception(e)
# Store the exception information...
job.exc_info = e
job.save()
# ...and put the job on the failure queue
self.failure_queue.push_job_id(job.id)
sys.exit(1)
sys.exit(0)
else:

View File

@ -1,8 +1,6 @@
from tests import RQTestCase
from tests import testjob
from pickle import dumps
from rq import Queue
from rq.exceptions import UnpickleError
class TestQueue(RQTestCase):
@ -67,7 +65,6 @@ class TestQueue(RQTestCase):
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
def test_dequeue(self):
"""Dequeueing jobs from queues."""
# Set up
@ -92,10 +89,14 @@ class TestQueue(RQTestCase):
q = Queue()
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid)
q.push_job_id(uuid)
result = q.enqueue(testjob, 'Nick', foo='bar')
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 1)
self.assertEquals(q.dequeue(), None)
self.assertEquals(q.count, 4)
self.assertEquals(q.dequeue().id, result.id)
self.assertIsNone(q.dequeue())
self.assertEquals(q.count, 0)
def test_dequeue_any(self):

View File

@ -21,7 +21,7 @@ class TestWorker(RQTestCase):
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
def test_work_is_unreadable(self):
"""Worker ignores unreadable job."""
"""Unreadable jobs are put on the failure queue."""
q = Queue()
failure_q = Queue('failure')
@ -39,7 +39,7 @@ class TestWorker(RQTestCase):
# We use the low-level internal function to enqueue any data (bypassing
# validity checks)
q.push_job_id(invalid_data)
q.push_job_id(job.id)
self.assertEquals(q.count, 1)
@ -51,7 +51,7 @@ class TestWorker(RQTestCase):
self.assertEquals(failure_q.count, 1)
def test_work_fails(self):
"""Worker processes failing job."""
"""Failing jobs are put on the failure queue."""
q = Queue()
failure_q = Queue('failure')
@ -59,12 +59,11 @@ class TestWorker(RQTestCase):
self.assertEquals(q.count, 0)
q.enqueue(failing_job)
self.assertEquals(q.count, 1)
w = Worker([q])
w.work(burst=True) # should silently pass
self.assertEquals(q.count, 0)
self.assertEquals(failure_q.count, 1)