mirror of https://github.com/rq/rq.git
Merge pull request #330 from selwin/death-penalty
Job timeouts are now handled by "worker.death_penalty_class"
This commit is contained in:
commit
35d839f4e2
|
@ -8,7 +8,9 @@ class JobTimeoutException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class death_penalty_after(object):
|
class BaseDeathPenalty(object):
|
||||||
|
"""Base class to setup job timeouts."""
|
||||||
|
|
||||||
def __init__(self, timeout):
|
def __init__(self, timeout):
|
||||||
self._timeout = timeout
|
self._timeout = timeout
|
||||||
|
|
||||||
|
@ -31,6 +33,15 @@ class death_penalty_after(object):
|
||||||
# invoking context.
|
# invoking context.
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def setup_death_penalty(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def cancel_death_penalty(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
|
class UnixSignalDeathPenalty(BaseDeathPenalty):
|
||||||
|
|
||||||
def handle_death_penalty(self, signum, frame):
|
def handle_death_penalty(self, signum, frame):
|
||||||
raise JobTimeoutException('Job exceeded maximum timeout '
|
raise JobTimeoutException('Job exceeded maximum timeout '
|
||||||
'value (%d seconds).' % self._timeout)
|
'value (%d seconds).' % self._timeout)
|
||||||
|
@ -48,4 +59,4 @@ class death_penalty_after(object):
|
||||||
default signal handling.
|
default signal handling.
|
||||||
"""
|
"""
|
||||||
signal.alarm(0)
|
signal.alarm(0)
|
||||||
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
|
@ -18,7 +18,7 @@ from .job import Job, Status
|
||||||
from .utils import make_colorizer, utcnow, utcformat
|
from .utils import make_colorizer, utcnow, utcformat
|
||||||
from .logutils import setup_loghandlers
|
from .logutils import setup_loghandlers
|
||||||
from .exceptions import NoQueueError, DequeueTimeout
|
from .exceptions import NoQueueError, DequeueTimeout
|
||||||
from .timeouts import death_penalty_after
|
from .timeouts import UnixSignalDeathPenalty
|
||||||
from .version import VERSION
|
from .version import VERSION
|
||||||
from rq.compat import text_type, as_text
|
from rq.compat import text_type, as_text
|
||||||
|
|
||||||
|
@ -59,6 +59,8 @@ def signal_name(signum):
|
||||||
class Worker(object):
|
class Worker(object):
|
||||||
redis_worker_namespace_prefix = 'rq:worker:'
|
redis_worker_namespace_prefix = 'rq:worker:'
|
||||||
redis_workers_keys = 'rq:workers'
|
redis_workers_keys = 'rq:workers'
|
||||||
|
death_penalty_class = UnixSignalDeathPenalty
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def all(cls, connection=None):
|
def all(cls, connection=None):
|
||||||
|
@ -462,7 +464,7 @@ class Worker(object):
|
||||||
|
|
||||||
with self.connection._pipeline() as pipeline:
|
with self.connection._pipeline() as pipeline:
|
||||||
try:
|
try:
|
||||||
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
|
with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT):
|
||||||
rv = job.perform()
|
rv = job.perform()
|
||||||
|
|
||||||
# Pickle the result in the same try-except block since we need to
|
# Pickle the result in the same try-except block since we need to
|
||||||
|
|
Loading…
Reference in New Issue