Job timeouts are now handled by "worker.death_penalty_class".

This commit is contained in:
Selwin Ong 2014-04-04 17:16:33 +07:00
parent a55be12864
commit 7eb6c2ab9f
2 changed files with 17 additions and 4 deletions

View File

@ -8,7 +8,9 @@ class JobTimeoutException(Exception):
pass
class death_penalty_after(object):
class BaseDeathPenalty(object):
"""Base class to setup job timeouts."""
def __init__(self, timeout):
self._timeout = timeout
@ -31,6 +33,15 @@ class death_penalty_after(object):
# invoking context.
return False
def setup_death_penalty(self):
raise NotImplementedError()
def cancel_death_penalty(self):
raise NotImplementedError()
class UnixSignalDeathPenalty(BaseDeathPenalty):
def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout)

View File

@ -18,7 +18,7 @@ from .job import Job, Status
from .utils import make_colorizer, utcnow, utcformat
from .logutils import setup_loghandlers
from .exceptions import NoQueueError, DequeueTimeout
from .timeouts import death_penalty_after
from .timeouts import UnixSignalDeathPenalty
from .version import VERSION
from rq.compat import text_type, as_text
@ -59,6 +59,8 @@ def signal_name(signum):
class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers'
death_penalty_class = UnixSignalDeathPenalty
@classmethod
def all(cls, connection=None):
@ -462,7 +464,7 @@ class Worker(object):
with self.connection._pipeline() as pipeline:
try:
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT):
rv = job.perform()
# Pickle the result in the same try-except block since we need to