From b8305a818f8558266c8973b6680d2cb87b125f1d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 16:54:27 +0100 Subject: [PATCH] Safer, and shorter, version of the death penalty. This case protects against JobTimeoutExceptions being raised immediately after the job body has been (successfully) executed. Still, JobTimeoutExceptions pass through naturally, like any other exception, to be handled by the default exception handler that writes failed jobs to the failed queue. Timeouts therefore are reported like any other exception. --- rq/timeouts.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ rq/worker.py | 35 +++------------------------------- 2 files changed, 54 insertions(+), 32 deletions(-) create mode 100644 rq/timeouts.py diff --git a/rq/timeouts.py b/rq/timeouts.py new file mode 100644 index 00000000..83be1e6f --- /dev/null +++ b/rq/timeouts.py @@ -0,0 +1,51 @@ +import signal + + +class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed maximum + timeout value. + """ + pass + + +class death_pentalty_after(object): + def __init__(self, timeout): + self._timeout = timeout + + def __enter__(self): + self.setup_death_penalty() + + def __exit__(self, type, value, traceback): + # Always cancel immediately, since we're done + try: + self.cancel_death_penalty() + except JobTimeoutException: + # Weird case: we're done with the with body, but now the alarm is + # fired. We may safely ignore this situation and consider the + # body done. + pass + + # __exit__ may return True to supress further exception handling. We + # don't want to suppress any exceptions here, since all errors should + # just pass through, JobTimeoutException being handled as just one of + # them. + return False + + def handle_death_penalty(self, signum, frame): + raise JobTimeoutException('Job exceeded maximum timeout ' + 'value (%d seconds).' % self._timeout) + + def setup_death_penalty(self): + """Sets up an alarm signal and a signal handler that raises + a JobTimeoutException after the timeout amount (expressed in + seconds). + """ + signal.signal(signal.SIGALRM, self.handle_death_penalty) + signal.alarm(self._timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) diff --git a/rq/worker.py b/rq/worker.py index 71b887d3..142948a6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,6 +16,7 @@ from .queue import Queue, FailedQueue from .proxy import conn from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +from .timeouts import death_pentalty_after green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -333,33 +334,6 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def raise_death_penalty_after(self, timeout): - """Sets up an alarm signal and a signal handler that raises - a JobTimeoutException after the given `timeout` amount (expressed - in seconds). - """ - - class JobTimeoutException(Exception): - """Raised when a job takes longer to complete than the allowed - maximum time. - """ - pass - - # Setup a timeout handler - def timeout_handler(signum, frame): - raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % timeout) - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(timeout) - - def cancel_death_penalty(self): - """Removes the death penalty alarm and puts back the system into - default signal handling. - """ - signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) - def perform_job(self, job): """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -368,13 +342,10 @@ class Worker(object): job.func.__name__, job.origin, time.time())) - # Set up death penalty - self.raise_death_penalty_after(job.timeout or 180) try: - rv = job.perform() - self.cancel_death_penalty() + with death_pentalty_after(job.timeout or 180): + rv = job.perform() except Exception as e: - self.cancel_death_penalty() fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name)