diff --git a/__init__.py b/__init__.py index 19cee287..96b8c9e4 100644 --- a/__init__.py +++ b/__init__.py @@ -1,21 +1,40 @@ -from rdb import conn +import uuid from pickle import loads, dumps +from rdb import conn + +def to_queue_key(queue_name): + return 'rq:%s' % (queue_name,) + +class DelayedResult(object): + def __init__(self, key): + self.key = key + self._rv = None + + @property + def return_value(self): + if self._rv is None: + rv = conn.get(self.key) + if rv is not None: + # cache the result + self._rv = loads(rv) + return self._rv + + +class task(object): + def __init__(self, queue='normal'): + self.queue = queue + + def __call__(self, f): + def delay(*args, **kwargs): + queue_key = to_queue_key(self.queue) + key = '%s:result:%s' % (queue_key, str(uuid.uuid4())) + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + s = dumps((f, key, args, kwargs)) + conn.rpush(queue_key, s) + return DelayedResult(key) + f.delay = delay + return f + -def queue_daemon(app, queue_keys, rv_ttl=500): - """Simple implementation of a Redis queue worker, based on - http://flask.pocoo.org/snippets/73/ - Will listen endlessly on the given queue keys. - """ - while True: - msg = conn.blpop(queue_keys) - func, key, args, kwargs = loads(msg[1]) - try: - rv = func(*args, **kwargs) - except Exception, e: - rv = e - if rv is not None: - p = conn.pipeline() - conn.set(key, dumps(rv)) - conn.expire(key, rv_ttl) - p.execute() diff --git a/daemon.py b/daemon.py new file mode 100644 index 00000000..2c888a46 --- /dev/null +++ b/daemon.py @@ -0,0 +1,17 @@ +from logbook import Logger +from .worker import Worker + +def run_daemon(queue_keys, rv_ttl=500): + """Simple implementation of a Redis queue worker, based on + http://flask.pocoo.org/snippets/73/ + + Will listen endlessly on the given queue keys. + """ + worker = Worker(queue_keys, rv_ttl) + + log = Logger('worker') + log.info('Listening for messages on Redis queues:') + for key in queue_keys: + log.info('- %s' % (key,)) + + worker.work() diff --git a/worker.py b/worker.py new file mode 100644 index 00000000..47456420 --- /dev/null +++ b/worker.py @@ -0,0 +1,78 @@ +import sys +import os +import random +import time +import procname +from logbook import Logger +from pickle import loads, dumps +from rdb import conn +from . import to_queue_key + +class NoQueueError(Exception): pass + +class Worker(object): + def __init__(self, queue_names, rv_ttl=500): + self.queue_names = queue_names + self.rv_ttl = rv_ttl + self._working = False + self.log = Logger('worker') + self.validate_queues() + + def validate_queues(self): + if not self.queue_names: + raise NoQueueError('Give each worker at least one queue.') + + @property + def queue_keys(self): + return map(to_queue_key, self.queue_names) + + def is_idle(self): + return not self.is_working() + + def is_working(self): + return self._working + + @property + def pid(self): + return os.getpid() + + def procline(self, message): + self.log.debug(message) + procname.setprocname('rq: %s' % (message,)) + + def work(self): + while True: + self.procline('Waiting on %s' % (', '.join(self.queue_names),)) + queue, msg = conn.blpop(self.queue_keys) + self.fork_and_perform_task(queue, msg) + + def fork_and_perform_task(self, queue, msg): + child_pid = os.fork() + if child_pid == 0: + random.seed() + self.log = Logger('horse') + try: + self.procline('Processing work since %d' % (time.time(),)) + self._working = True + self.perform_task(queue, msg) + except Exception, e: + self.log.exception(e) + sys.exit(1) + sys.exit(0) + else: + self.procline('Forked %d at %d' % (child_pid, time.time())) + os.waitpid(child_pid, 0) + self._working = False + + def perform_task(self, queue, msg): + func, key, args, kwargs = loads(msg) + self.procline('Processing %s from %s since %s' % (func.__name__, queue, time.time())) + try: + rv = func(*args, **kwargs) + except Exception, e: + rv = e + if rv is not None: + p = conn.pipeline() + conn.set(key, dumps(rv)) + conn.expire(key, self.rv_ttl) + p.execute()