diff --git a/rq/job.py b/rq/job.py index c91c7811..0939eed3 100644 --- a/rq/job.py +++ b/rq/job.py @@ -143,10 +143,10 @@ class Job(object): """ key = self.key properties = ['data', 'created_at', 'origin', 'description', - 'enqueued_at', 'ended_at', 'result', 'exc_info'] + 'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout'] data, created_at, origin, description, \ enqueued_at, ended_at, result, \ - exc_info = conn.hmget(key, properties) + exc_info, timeout = conn.hmget(key, properties) if data is None: raise NoSuchJobError('No such job: %s' % (key,)) @@ -164,6 +164,10 @@ class Job(object): self.ended_at = to_date(ended_at) self._result = result self.exc_info = exc_info + if timeout is None: + self.timeout = None + else: + self.timeout = int(timeout) def save(self): """Persists the current job instance to its corresponding Redis key.""" @@ -186,6 +190,8 @@ class Job(object): obj['result'] = self._result if self.exc_info is not None: obj['exc_info'] = self.exc_info + if self.timeout is not None: + obj['timeout'] = self.timeout conn.hmset(key, obj) diff --git a/rq/queue.py b/rq/queue.py index 7ec1efe9..f783be74 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -32,10 +32,11 @@ class Queue(object): name = queue_key[len(prefix):] return Queue(name) - def __init__(self, name='default'): + def __init__(self, name='default', default_timeout=None): prefix = self.redis_queue_namespace_prefix self.name = name self._key = '%s%s' % (prefix, name) + self._default_timeout = default_timeout @property def key(self): @@ -99,24 +100,38 @@ class Queue(object): Expects the function to call, along with the arguments and keyword arguments. + + The special keyword `timeout` is reserved for `enqueue()` itself and + it won't be passed to the actual job function. """ if f.__module__ == '__main__': raise ValueError( 'Functions from the __main__ module cannot be processed ' 'by workers.') + timeout = kwargs.pop('timeout', None) job = Job.create(f, *args, **kwargs) - return self.enqueue_job(job) + return self.enqueue_job(job, timeout=timeout) - def enqueue_job(self, job, set_meta_data=True): + def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. + When the `timeout` argument is sent, it will overrides the default + timeout value of 180 seconds. `timeout` may either be a string or + integer. + If the `set_meta_data` argument is `True` (default), it will update the properties `origin` and `enqueued_at`. """ if set_meta_data: job.origin = self.name job.enqueued_at = times.now() + + if timeout: + job.timeout = timeout # _timeout_in_seconds(timeout) + else: + job.timeout = 180 # default + job.save() self.push_job_id(job.id) return job diff --git a/rq/worker.py b/rq/worker.py index 25830aa7..71b887d3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,4 +1,3 @@ -import sys import os import errno import random @@ -295,14 +294,14 @@ class Worker(object): return did_perform_work def fork_and_perform_job(self, job): + """Spawns a work horse to perform the actual work and passes it a job. + The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ child_pid = os.fork() if child_pid == 0: - self._is_horse = True - random.seed() - self.log = Logger('horse') - - success = self.perform_job(job) - sys.exit(int(not success)) + self.main_work_horse(job) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) @@ -320,13 +319,62 @@ class Worker(object): if e.errno != errno.EINTR: raise + def main_work_horse(self, job): + """This is the entry point of the newly spawned work horse.""" + # After fork()'ing, always assure we are generating random sequences + # that are different from the worker. + random.seed() + self._is_horse = True + self.log = Logger('horse') + + success = self.perform_job(job) + + # os._exit() is the way to exit from childs after a fork(), in + # constrast to the regular sys.exit() + os._exit(int(not success)) + + def raise_death_penalty_after(self, timeout): + """Sets up an alarm signal and a signal handler that raises + a JobTimeoutException after the given `timeout` amount (expressed + in seconds). + """ + + class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed + maximum time. + """ + pass + + # Setup a timeout handler + def timeout_handler(signum, frame): + raise JobTimeoutException('Job exceeded maximum timeout ' + 'value (%d seconds).' % timeout) + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ self.procline('Processing %s from %s since %s' % ( job.func.__name__, job.origin, time.time())) + + # Set up death penalty + self.raise_death_penalty_after(job.timeout or 180) try: rv = job.perform() + self.cancel_death_penalty() except Exception as e: + self.cancel_death_penalty() fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name)