mirror of https://github.com/rq/rq.git
Safer, and shorter, version of the death penalty.
This case protects against JobTimeoutExceptions being raised immediately after the job body has been (successfully) executed. Still, JobTimeoutExceptions pass through naturally, like any other exception, to be handled by the default exception handler that writes failed jobs to the failed queue. Timeouts therefore are reported like any other exception.
This commit is contained in:
parent
8a856e79ea
commit
b8305a818f
|
@ -0,0 +1,51 @@
|
|||
import signal
|
||||
|
||||
|
||||
class JobTimeoutException(Exception):
|
||||
"""Raised when a job takes longer to complete than the allowed maximum
|
||||
timeout value.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class death_pentalty_after(object):
|
||||
def __init__(self, timeout):
|
||||
self._timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
self.setup_death_penalty()
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
# Always cancel immediately, since we're done
|
||||
try:
|
||||
self.cancel_death_penalty()
|
||||
except JobTimeoutException:
|
||||
# Weird case: we're done with the with body, but now the alarm is
|
||||
# fired. We may safely ignore this situation and consider the
|
||||
# body done.
|
||||
pass
|
||||
|
||||
# __exit__ may return True to supress further exception handling. We
|
||||
# don't want to suppress any exceptions here, since all errors should
|
||||
# just pass through, JobTimeoutException being handled as just one of
|
||||
# them.
|
||||
return False
|
||||
|
||||
def handle_death_penalty(self, signum, frame):
|
||||
raise JobTimeoutException('Job exceeded maximum timeout '
|
||||
'value (%d seconds).' % self._timeout)
|
||||
|
||||
def setup_death_penalty(self):
|
||||
"""Sets up an alarm signal and a signal handler that raises
|
||||
a JobTimeoutException after the timeout amount (expressed in
|
||||
seconds).
|
||||
"""
|
||||
signal.signal(signal.SIGALRM, self.handle_death_penalty)
|
||||
signal.alarm(self._timeout)
|
||||
|
||||
def cancel_death_penalty(self):
|
||||
"""Removes the death penalty alarm and puts back the system into
|
||||
default signal handling.
|
||||
"""
|
||||
signal.alarm(0)
|
||||
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
35
rq/worker.py
35
rq/worker.py
|
@ -16,6 +16,7 @@ from .queue import Queue, FailedQueue
|
|||
from .proxy import conn
|
||||
from .utils import make_colorizer
|
||||
from .exceptions import NoQueueError, UnpickleError
|
||||
from .timeouts import death_pentalty_after
|
||||
|
||||
green = make_colorizer('darkgreen')
|
||||
yellow = make_colorizer('darkyellow')
|
||||
|
@ -333,33 +334,6 @@ class Worker(object):
|
|||
# constrast to the regular sys.exit()
|
||||
os._exit(int(not success))
|
||||
|
||||
def raise_death_penalty_after(self, timeout):
|
||||
"""Sets up an alarm signal and a signal handler that raises
|
||||
a JobTimeoutException after the given `timeout` amount (expressed
|
||||
in seconds).
|
||||
"""
|
||||
|
||||
class JobTimeoutException(Exception):
|
||||
"""Raised when a job takes longer to complete than the allowed
|
||||
maximum time.
|
||||
"""
|
||||
pass
|
||||
|
||||
# Setup a timeout handler
|
||||
def timeout_handler(signum, frame):
|
||||
raise JobTimeoutException('Job exceeded maximum timeout '
|
||||
'value (%d seconds).' % timeout)
|
||||
|
||||
signal.signal(signal.SIGALRM, timeout_handler)
|
||||
signal.alarm(timeout)
|
||||
|
||||
def cancel_death_penalty(self):
|
||||
"""Removes the death penalty alarm and puts back the system into
|
||||
default signal handling.
|
||||
"""
|
||||
signal.alarm(0)
|
||||
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
||||
|
||||
def perform_job(self, job):
|
||||
"""Performs the actual work of a job. Will/should only be called
|
||||
inside the work horse's process.
|
||||
|
@ -368,13 +342,10 @@ class Worker(object):
|
|||
job.func.__name__,
|
||||
job.origin, time.time()))
|
||||
|
||||
# Set up death penalty
|
||||
self.raise_death_penalty_after(job.timeout or 180)
|
||||
try:
|
||||
rv = job.perform()
|
||||
self.cancel_death_penalty()
|
||||
with death_pentalty_after(job.timeout or 180):
|
||||
rv = job.perform()
|
||||
except Exception as e:
|
||||
self.cancel_death_penalty()
|
||||
fq = self.failed_queue
|
||||
self.log.exception(red(str(e)))
|
||||
self.log.warning('Moving job to %s queue.' % fq.name)
|
||||
|
|
Loading…
Reference in New Issue