Initial attempt at job timeouts.

This commit is contained in:
Vincent Driessen 2012-02-22 16:54:02 +01:00
parent fb587297f6
commit 8a856e79ea
3 changed files with 81 additions and 12 deletions

View File

@ -143,10 +143,10 @@ class Job(object):
"""
key = self.key
properties = ['data', 'created_at', 'origin', 'description',
'enqueued_at', 'ended_at', 'result', 'exc_info']
'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout']
data, created_at, origin, description, \
enqueued_at, ended_at, result, \
exc_info = conn.hmget(key, properties)
exc_info, timeout = conn.hmget(key, properties)
if data is None:
raise NoSuchJobError('No such job: %s' % (key,))
@ -164,6 +164,10 @@ class Job(object):
self.ended_at = to_date(ended_at)
self._result = result
self.exc_info = exc_info
if timeout is None:
self.timeout = None
else:
self.timeout = int(timeout)
def save(self):
"""Persists the current job instance to its corresponding Redis key."""
@ -186,6 +190,8 @@ class Job(object):
obj['result'] = self._result
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
conn.hmset(key, obj)

View File

@ -32,10 +32,11 @@ class Queue(object):
name = queue_key[len(prefix):]
return Queue(name)
def __init__(self, name='default'):
def __init__(self, name='default', default_timeout=None):
prefix = self.redis_queue_namespace_prefix
self.name = name
self._key = '%s%s' % (prefix, name)
self._default_timeout = default_timeout
@property
def key(self):
@ -99,24 +100,38 @@ class Queue(object):
Expects the function to call, along with the arguments and keyword
arguments.
The special keyword `timeout` is reserved for `enqueue()` itself and
it won't be passed to the actual job function.
"""
if f.__module__ == '__main__':
raise ValueError(
'Functions from the __main__ module cannot be processed '
'by workers.')
timeout = kwargs.pop('timeout', None)
job = Job.create(f, *args, **kwargs)
return self.enqueue_job(job)
return self.enqueue_job(job, timeout=timeout)
def enqueue_job(self, job, set_meta_data=True):
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.
When the `timeout` argument is sent, it will overrides the default
timeout value of 180 seconds. `timeout` may either be a string or
integer.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
"""
if set_meta_data:
job.origin = self.name
job.enqueued_at = times.now()
if timeout:
job.timeout = timeout # _timeout_in_seconds(timeout)
else:
job.timeout = 180 # default
job.save()
self.push_job_id(job.id)
return job

View File

@ -1,4 +1,3 @@
import sys
import os
import errno
import random
@ -295,14 +294,14 @@ class Worker(object):
return did_perform_work
def fork_and_perform_job(self, job):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
child_pid = os.fork()
if child_pid == 0:
self._is_horse = True
random.seed()
self.log = Logger('horse')
success = self.perform_job(job)
sys.exit(int(not success))
self.main_work_horse(job)
else:
self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time()))
@ -320,13 +319,62 @@ class Worker(object):
if e.errno != errno.EINTR:
raise
def main_work_horse(self, job):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
# that are different from the worker.
random.seed()
self._is_horse = True
self.log = Logger('horse')
success = self.perform_job(job)
# os._exit() is the way to exit from childs after a fork(), in
# constrast to the regular sys.exit()
os._exit(int(not success))
def raise_death_penalty_after(self, timeout):
"""Sets up an alarm signal and a signal handler that raises
a JobTimeoutException after the given `timeout` amount (expressed
in seconds).
"""
class JobTimeoutException(Exception):
"""Raised when a job takes longer to complete than the allowed
maximum time.
"""
pass
# Setup a timeout handler
def timeout_handler(signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % timeout)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.procline('Processing %s from %s since %s' % (
job.func.__name__,
job.origin, time.time()))
# Set up death penalty
self.raise_death_penalty_after(job.timeout or 180)
try:
rv = job.perform()
self.cancel_death_penalty()
except Exception as e:
self.cancel_death_penalty()
fq = self.failed_queue
self.log.exception(red(str(e)))
self.log.warning('Moving job to %s queue.' % fq.name)