diff --git a/rq/timeouts.py b/rq/timeouts.py new file mode 100644 index 00000000..83be1e6f --- /dev/null +++ b/rq/timeouts.py @@ -0,0 +1,51 @@ +import signal + + +class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed maximum + timeout value. + """ + pass + + +class death_pentalty_after(object): + def __init__(self, timeout): + self._timeout = timeout + + def __enter__(self): + self.setup_death_penalty() + + def __exit__(self, type, value, traceback): + # Always cancel immediately, since we're done + try: + self.cancel_death_penalty() + except JobTimeoutException: + # Weird case: we're done with the with body, but now the alarm is + # fired. We may safely ignore this situation and consider the + # body done. + pass + + # __exit__ may return True to supress further exception handling. We + # don't want to suppress any exceptions here, since all errors should + # just pass through, JobTimeoutException being handled as just one of + # them. + return False + + def handle_death_penalty(self, signum, frame): + raise JobTimeoutException('Job exceeded maximum timeout ' + 'value (%d seconds).' % self._timeout) + + def setup_death_penalty(self): + """Sets up an alarm signal and a signal handler that raises + a JobTimeoutException after the timeout amount (expressed in + seconds). + """ + signal.signal(signal.SIGALRM, self.handle_death_penalty) + signal.alarm(self._timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) diff --git a/rq/worker.py b/rq/worker.py index 71b887d3..142948a6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,6 +16,7 @@ from .queue import Queue, FailedQueue from .proxy import conn from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +from .timeouts import death_pentalty_after green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -333,33 +334,6 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def raise_death_penalty_after(self, timeout): - """Sets up an alarm signal and a signal handler that raises - a JobTimeoutException after the given `timeout` amount (expressed - in seconds). - """ - - class JobTimeoutException(Exception): - """Raised when a job takes longer to complete than the allowed - maximum time. - """ - pass - - # Setup a timeout handler - def timeout_handler(signum, frame): - raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % timeout) - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(timeout) - - def cancel_death_penalty(self): - """Removes the death penalty alarm and puts back the system into - default signal handling. - """ - signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) - def perform_job(self, job): """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -368,13 +342,10 @@ class Worker(object): job.func.__name__, job.origin, time.time())) - # Set up death penalty - self.raise_death_penalty_after(job.timeout or 180) try: - rv = job.perform() - self.cancel_death_penalty() + with death_pentalty_after(job.timeout or 180): + rv = job.perform() except Exception as e: - self.cancel_death_penalty() fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name)