Fix putting jobs on the failure queue when they fail.

This commit is contained in:
Vincent Driessen 2012-02-10 17:19:30 +01:00
parent 7c903e45ef
commit e05acfedce
4 changed files with 59 additions and 27 deletions

View File

@ -83,11 +83,15 @@ class Queue(object):
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
job = Job.for_call(f, *args, **kwargs)
return self.enqueue_job(job)
def enqueue_job(self, job):
"""Enqueues a job for delayed execution."""
job.origin = self.name
job.enqueued_at = times.now()
job.save()
self.push_job_id(job.id)
return Job(job.id)
return job
def requeue(self, job):
"""Requeues an existing (typically a failed job) onto the queue."""

View File

@ -7,6 +7,7 @@ import times
import procname
import socket
import signal
import traceback
from pickle import dumps
try:
from logbook import Logger
@ -284,19 +285,9 @@ class Worker(object):
self._is_horse = True
random.seed()
self.log = Logger('horse')
try:
self.perform_job(job)
except Exception as e:
self.log.exception(e)
# Store the exception information...
job.exc_info = e
job.save()
# ...and put the job on the failure queue
self.failure_queue.push_job_id(job.id)
sys.exit(1)
sys.exit(0)
success = self.perform_job(job)
sys.exit(int(not success))
else:
self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time()))
@ -317,29 +308,35 @@ class Worker(object):
def perform_job(self, job):
self.procline('Processing %s from %s since %s' % (
job.func.__name__,
job.origin.name, time.time()))
job.origin, time.time()))
msg = 'Got job %s from %s' % (
job.description,
job.origin.name)
job.origin)
self.log.info(msg)
try:
rv = job.perform()
except Exception as e:
rv = e
self.log.exception(e)
fq = self.failure_queue
self.log.exception(e)
self.log.warning('Moving job to %s queue.' % (fq.name,))
# Store the exception information...
job.ended_at = times.now()
job.exc_info = e
fq._push(job.pickle())
job.exc_info = traceback.format_exc()
# ------ REFACTOR THIS -------------------------
job.save()
# ...and put the job on the failure queue
fq.push_job_id(job.id)
# ------ UNTIL HERE ----------------------------
# (should be as easy as fq.enqueue(job) or so)
return False
else:
if rv is not None:
self.log.info('Job result = %s' % (rv,))
else:
self.log.info('Job ended normally without result')
self.log.info('Job OK, result = %s' % (rv,))
if rv is not None:
p = conn.pipeline()
p.set(job.rv_key, dumps(rv))
p.expire(job.rv_key, self.rv_ttl)
p.set(job.result, dumps(rv))
p.expire(job.result, self.rv_ttl)
p.execute()
return True

View File

@ -1,6 +1,7 @@
from tests import RQTestCase
from tests import testjob
from rq import Queue
from rq.job import Job
class TestQueue(RQTestCase):
@ -50,6 +51,22 @@ class TestQueue(RQTestCase):
self.assertEquals(self.testconn.llen(q_key), 1)
self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id)
def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data."""
q = Queue()
job = Job.for_call(testjob, 'Nick', foo='bar')
# Preconditions
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at)
# Action
q.enqueue_job(job)
# Postconditions
self.assertEquals(job.origin, q.name)
self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self):
"""Popping job IDs from queues."""

View File

@ -56,15 +56,29 @@ class TestWorker(RQTestCase):
q = Queue()
failure_q = Queue('failure')
# Preconditions
self.assertEquals(failure_q.count, 0)
self.assertEquals(q.count, 0)
q.enqueue(failing_job)
# Action
job = q.enqueue(failing_job)
self.assertEquals(q.count, 1)
# keep for later
enqueued_at_date = strip_milliseconds(job.enqueued_at)
w = Worker([q])
w.work(burst=True) # should silently pass
# Postconditions
self.assertEquals(q.count, 0)
self.assertEquals(failure_q.count, 1)
# Check the job
job = Job.fetch(job.id)
self.assertEquals(job.origin, q.name)
# should be the original enqueued_at date, not the date of enqueueing to the failure queue
self.assertEquals(job.enqueued_at, enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info