Throw DequeueError when reading unprocessable data from queue.

This commit is contained in:
Vincent Driessen 2012-01-27 15:15:13 +01:00
parent aa2c9e85eb
commit 210477c2ab
3 changed files with 42 additions and 3 deletions

View File

@ -1,2 +1,5 @@
class NoQueueError(Exception):
pass
class DequeueError(Exception):
pass

View File

@ -2,6 +2,7 @@ import uuid
from functools import total_ordering
from pickle import loads, dumps
from .proxy import conn
from .exceptions import DequeueError
class DelayedResult(object):
"""Proxy object that is returned as a result of `Queue.enqueue()` calls.
@ -44,8 +45,11 @@ class Job(object):
@classmethod
def unpickle(cls, pickle_data):
"""Constructs a Job instance form the given pickle'd job tuple data."""
job_tuple = loads(pickle_data)
return Job(job_tuple)
try:
job_tuple = loads(pickle_data)
return Job(job_tuple)
except (AttributeError, ValueError, IndexError):
raise DequeueError('Could not decode job tuple.')
def __init__(self, job_tuple, origin=None):
self.func, self.args, self.kwargs, self.rv_key = job_tuple

View File

@ -1,8 +1,9 @@
import unittest
from pickle import loads
from pickle import loads, dumps
from redis import Redis
from logbook import NullHandler
from rq import conn, Queue, Worker
from rq.exceptions import DequeueError
# Test data
def testjob(name=None):
@ -135,6 +136,37 @@ class TestQueue(RQTestCase):
self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.')
def test_dequeue_unpicklable_data(self):
"""Error handling of invalid pickle data."""
# Push non-pickle data on the queue
q = Queue('foo')
blob = 'this is nothing like pickled data'
self.testconn.rpush(q._key, blob)
with self.assertRaises(DequeueError):
q.dequeue() # error occurs when perform()'ing
# Push value pickle data, but not representing a job tuple
q = Queue('foo')
blob = dumps('this is not a job tuple')
self.testconn.rpush(q._key, blob)
with self.assertRaises(DequeueError):
q.dequeue() # error occurs when perform()'ing
# Push slightly incorrect pickled data onto the queue (simulate
# a function that can't be imported from the worker)
q = Queue('foo')
job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused'))
blob = job_tuple.replace('testjob', 'fooobar')
self.testconn.rpush(q._key, blob)
with self.assertRaises(DequeueError):
q.dequeue() # error occurs when dequeue()'ing
class TestWorker(RQTestCase):
def test_create_worker(self):
"""Worker creation."""