diff --git a/rq/exceptions.py b/rq/exceptions.py index 9d66d75b..0b885f76 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -2,5 +2,7 @@ class NoQueueError(Exception): pass class UnpickleError(Exception): - pass + def __init__(self, message, raw_data): + super(UnpickleError, self).__init__(message) + self.raw_data = raw_data diff --git a/rq/job.py b/rq/job.py index 07318e82..3e67a76d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -15,7 +15,7 @@ class Job(object): assert isinstance(unpickled_obj, Job) return unpickled_obj except (AssertionError, AttributeError, IndexError, TypeError): - raise UnpickleError('Could not unpickle Job.') + raise UnpickleError('Could not unpickle Job.', pickle_data) def __init__(self, func, *args, **kwargs): self._id = unicode(uuid4()) diff --git a/rq/queue.py b/rq/queue.py index 9440f4fa..447139dd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -3,6 +3,7 @@ from functools import total_ordering from pickle import loads from .proxy import conn from .job import Job +from .exceptions import UnpickleError class DelayedResult(object): @@ -103,6 +104,10 @@ class Queue(object): job.origin = self.name return job + def _push(self, pickled_job): + """Enqueues a pickled_job on the corresponding Redis queue.""" + conn.rpush(self.key, pickled_job) + def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. @@ -111,7 +116,7 @@ class Queue(object): """ job = self._create_job(f, *args, **kwargs) job.enqueued_at = datetime.utcnow() - conn.rpush(self.key, job.pickle()) + self._push(job.pickle()) return DelayedResult(job.rv_key) def requeue(self, job): @@ -126,7 +131,13 @@ class Queue(object): blob = conn.lpop(self.key) if blob is None: return None - job = Job.unpickle(blob) + try: + job = Job.unpickle(blob) + except UnpickleError as e: + # Attach queue information on the exception for improved error + # reporting + e.queue = self + raise e job.origin = self return job @@ -162,8 +173,14 @@ class Queue(object): return None queue_key, blob = redis_result - job = Job.unpickle(blob) queue = Queue.from_queue_key(queue_key) + try: + job = Job.unpickle(blob) + except UnpickleError as e: + # Attach queue information on the exception for improved error + # reporting + e.queue = queue + raise e job.origin = queue return job diff --git a/rq/worker.py b/rq/worker.py index c78bd11a..49cdd97b 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,7 +14,7 @@ except ImportError: from logging import Logger from .queue import Queue from .proxy import conn -from .exceptions import NoQueueError +from .exceptions import NoQueueError, UnpickleError def iterable(x): return hasattr(x, '__iter__') @@ -34,6 +34,7 @@ def signal_name(signum): except KeyError: return 'SIG_UNKNOWN' + class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' @@ -249,7 +250,18 @@ class Worker(object): self.procline('Listening on %s' % (','.join(qnames))) self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) wait_for_job = not burst - job = Queue.dequeue_any(self.queues, wait_for_job) + try: + job = Queue.dequeue_any(self.queues, wait_for_job) + except UnpickleError as e: + self.log.warning('*** Ignoring unpickleable data on %s.', e.queue) + self.log.debug('Data follows:') + self.log.debug(e.raw_data) + self.log.debug('End of unreadable data.') + + q = Queue('failure') + q._push(e.raw_data) + continue + if job is None: break self.state = 'busy' diff --git a/tests/__init__.py b/tests/__init__.py index 1ecf83b8..31096846 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -9,6 +9,10 @@ def testjob(name=None): name = 'Stranger' return 'Hi there, %s!' % (name,) +def failing_job(x): + # Will throw a division-by-zero error + return x / 0 + class RQTestCase(unittest.TestCase): """Base class to inherit test cases from for RQ. diff --git a/tests/test_queue.py b/tests/test_queue.py index 415f58de..c7cbc2e1 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -48,6 +48,7 @@ class TestQueue(RQTestCase): self.assertEquals(q.empty, False) self.assertQueueContains(q, testjob) + def test_dequeue(self): """Fetching work from specific queue.""" q = Queue('foo') @@ -60,7 +61,6 @@ class TestQueue(RQTestCase): self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') - def test_dequeue_any(self): """Fetching work from any given queue.""" fooq = Queue('foo') @@ -87,7 +87,6 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, barq) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') - def test_dequeue_unpicklable_data(self): """Error handling of invalid pickle data.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index d1eec997..182c3ea3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,6 +1,7 @@ from tests import RQTestCase -from tests import testjob +from tests import testjob, failing_job from rq import Queue, Worker +from rq.job import Job class TestWorker(RQTestCase): @@ -19,4 +20,33 @@ class TestWorker(RQTestCase): fooq.enqueue(testjob, name='Frank') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') + def test_work_is_unreadable(self): + """Worker processes unreadable job.""" + q = Queue() + failure_q = Queue('failure') + + self.assertEquals(failure_q.count, 0) + self.assertEquals(q.count, 0) + + # NOTE: We have to fake this enqueueing for this test case. + # What we're simulating here is a call to a function that is not + # importable from the worker process. + job = Job(failing_job, 3) + pickled_job = job.pickle() + invalid_data = pickled_job.replace( + 'failing_job', 'nonexisting_job') + + # We use the low-level internal function to enqueue any data (bypassing + # validity checks) + q._push(invalid_data) + + self.assertEquals(q.count, 1) + + # All set, we're going to process it + w = Worker([q]) + w.work(burst=True) # should silently pass + self.assertEquals(q.count, 0) + + self.assertEquals(failure_q.count, 1) +