diff --git a/rq/job.py b/rq/job.py index 6c084ac0..e94f5128 100644 --- a/rq/job.py +++ b/rq/job.py @@ -160,6 +160,15 @@ class Job(object): job.refresh() return job + @classmethod + def safe_fetch(cls, id, connection=None): + """Fetches a persisted job from its corresponding Redis key, but does + not instantiate it, making it impossible to get UnpickleErrors. + """ + job = cls(id, connection=connection) + job.refresh(safe=True) + return job + def __init__(self, id=None, connection=None): self.connection = resolve_connection(connection) self._id = id @@ -240,7 +249,7 @@ class Job(object): # Persistence - def refresh(self): # noqa + def refresh(self, safe=False): # noqa """Overwrite the current instance's properties with the values in the corresponding Redis key. @@ -257,7 +266,12 @@ class Job(object): else: return times.to_universal(date_str) - self._func_name, self._instance, self._args, self._kwargs = unpickle(obj.get('data')) # noqa + self.data = obj.get('data') + try: + self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) + except UnpickleError: + if not safe: + raise self.created_at = to_date(obj.get('created_at')) self.origin = obj.get('origin') self.description = obj.get('description') diff --git a/rq/queue.py b/rq/queue.py index f130e5be..b962b097 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -73,7 +73,7 @@ class Queue(object): """Returns a list of all (valid) jobs in the queue.""" def safe_fetch(job_id): try: - job = Job.fetch(job_id, connection=self.connection) + job = Job.safe_fetch(job_id, connection=self.connection) except NoSuchJobError: return None except UnpickleError: diff --git a/rq/worker.py b/rq/worker.py index ee3c3177..99a37469 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -19,7 +19,7 @@ except ImportError: from logging import Logger from .queue import Queue, get_failed_queue from .connections import get_current_connection -from .job import Status +from .job import Job, Status from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after @@ -309,13 +309,8 @@ class Worker(object): except StopRequested: break except UnpickleError as e: - msg = '*** Ignoring unpickleable data on %s.' % \ - green(e.queue.name) - self.log.warning(msg) - self.log.debug('Data follows:') - self.log.debug(e.raw_data) - self.log.debug('End of unreadable data.') - self.failed_queue.push_job_id(e.job_id) + job = Job.safe_fetch(e.job_id) + self.handle_exception(job, *sys.exc_info()) continue self.state = 'busy'