diff --git a/rq/exceptions.py b/rq/exceptions.py index 00583d7d..6269e811 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,5 +1,2 @@ class NoQueueError(Exception): pass - -class NoMoreWorkError(Exception): - pass diff --git a/rq/queue.py b/rq/queue.py index eeac2e72..793fb391 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,6 @@ import uuid from pickle import loads, dumps from .proxy import conn -from .exceptions import NoMoreWorkError class DelayedResult(object): def __init__(self, key): @@ -17,15 +16,44 @@ class DelayedResult(object): self._rv = loads(rv) return self._rv +class Job(object): + """A Job is just a convenient datastructure to pass around job (meta) data. + """ -def to_queue_key(queue_name): - return 'rq:%s' % (queue_name,) + @classmethod + def unpickle(cls, pickle_data): + job_tuple = loads(pickle_data) + return Job(job_tuple) + + def __init__(self, job_tuple, origin=None): + self.func, self.args, self.kwargs, self.rv_key = job_tuple + self.origin = origin + + def perform(self): + """Invokes the job function with the job arguments. + """ + return self.func(*self.args, **self.kwargs) class Queue(object): + redis_queue_namespace_prefix = 'rq:' + + @classmethod + def from_queue_key(cls, queue_key): + """Returns a Queue instance, based on the naming conventions for naming + the internal Redis keys. Can be used to reverse-lookup Queues by their + Redis keys. + """ + prefix = cls.redis_queue_namespace_prefix + if not queue_key.startswith(prefix): + raise ValueError('Not a valid RQ queue key: %s' % (queue_key,)) + name = queue_key[len(prefix):] + return Queue(name) + def __init__(self, name='default'): + prefix = self.redis_queue_namespace_prefix self.name = name - self._key = to_queue_key(name) + self._key = '%s%s' % (prefix, name) @property def key(self): @@ -52,31 +80,43 @@ class Queue(object): return DelayedResult(rv_key) def dequeue(self): - s = conn.lpop(self.key) - return loads(s) + blob = conn.lpop(self.key) + if blob is None: + return None + job = Job.unpickle(blob) + job.origin = self + return job @classmethod - def _dequeue_any(cls, queues): - # Redis' BLPOP command takes multiple queue arguments, but LPOP can - # only take a single queue. Therefore, we need to loop over all - # queues manually, in order, and return None if no more work is - # available - for queue in queues: - value = conn.lpop(queue) - if value is not None: - return (queue, value) + def _lpop_any(cls, queue_keys): + """Helper method. You should not call this directly. + + Redis' BLPOP command takes multiple queue arguments, but LPOP can only + take a single queue. Therefore, we need to loop over all queues + manually, in order, and return None if no more work is available. + """ + for queue_key in queue_keys: + blob = conn.lpop(queue_key) + if blob is not None: + return (queue_key, blob) return None @classmethod def dequeue_any(cls, queues, blocking): + queue_keys = map(lambda q: q.key, queues) if blocking: - queue, msg = conn.blpop(queues) + queue_key, blob = conn.blpop(queue_keys) else: - value = cls._dequeue_any(queues) - if value is None: - raise NoMoreWorkError('No more work.') - queue, msg = value - return (queue, msg) + redis_result = cls._lpop_any(queue_keys) + if redis_result is None: + return None + queue_key, blob = redis_result + + job = Job.unpickle(blob) + queue = Queue.from_queue_key(queue_key) + job.origin = queue + return job + def __str__(self): return self.name diff --git a/rq/worker.py b/rq/worker.py index 07fdcca9..9ce92e58 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -3,14 +3,14 @@ import os import random import time import procname +from pickle import dumps try: from logbook import Logger except ImportError: from logging import Logger -from pickle import loads, dumps from .queue import Queue from .proxy import conn -from .exceptions import NoMoreWorkError, NoQueueError +from .exceptions import NoQueueError def iterable(x): return hasattr(x, '__iter__') @@ -59,13 +59,12 @@ class Worker(object): did_work = False while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - try: - wait_for_job = not quit_when_done - queue, msg = Queue.dequeue_any(self.queues, wait_for_job) - did_work = True - except NoMoreWorkError: + wait_for_job = not quit_when_done + job = Queue.dequeue_any(self.queues, wait_for_job) + if job is None: break - self.fork_and_perform_job(queue, msg) + did_work = True + self.fork_and_perform_job(job) return did_work def work_forever(self): @@ -74,7 +73,7 @@ class Worker(object): def work(self): return self._work(True) - def fork_and_perform_job(self, queue, msg): + def fork_and_perform_job(self, job): child_pid = os.fork() if child_pid == 0: random.seed() @@ -82,7 +81,7 @@ class Worker(object): try: self.procline('Processing work since %d' % (time.time(),)) self._working = True - self.perform_job(queue, msg) + self.perform_job(job) except Exception, e: self.log.exception(e) sys.exit(1) @@ -92,11 +91,10 @@ class Worker(object): os.waitpid(child_pid, 0) self._working = False - def perform_job(self, queue, msg): - func, args, kwargs, rv_key = loads(msg) - self.procline('Processing %s from %s since %s' % (func.__name__, queue, time.time())) + def perform_job(self, job): + self.procline('Processing %s from %s since %s' % (job.func.__name__, job.origin.name, time.time())) try: - rv = func(*args, **kwargs) + rv = job.perform() except Exception, e: rv = e self.log.exception(e) @@ -107,6 +105,6 @@ class Worker(object): self.log.info('Job ended normally without result') if rv is not None: p = conn.pipeline() - p.set(rv_key, dumps(rv)) - p.expire(rv_key, self.rv_ttl) + p.set(job.rv_key, dumps(rv)) + p.expire(job.rv_key, self.rv_ttl) p.execute() diff --git a/tests/test_rq.py b/tests/test_rq.py index d48936f0..fb8dced4 100644 --- a/tests/test_rq.py +++ b/tests/test_rq.py @@ -2,7 +2,7 @@ import unittest from pickle import loads from blinker import signal from redis import Redis -from rq import conn, Queue +from rq import conn, Queue, Worker # Test data def testjob(name=None): @@ -85,10 +85,55 @@ class TestQueue(RQTestCase): q.enqueue(testjob, 'Rick', foo='bar') # Pull it off the queue (normally, a worker would do this) - f, args, kwargs, rv_key = q.dequeue() - self.assertEquals(f, testjob) - self.assertEquals(args[0], 'Rick') - self.assertEquals(kwargs['foo'], 'bar') + job = q.dequeue() + self.assertEquals(job.func, testjob) + self.assertEquals(job.origin, q) + self.assertEquals(job.args[0], 'Rick') + self.assertEquals(job.kwargs['foo'], 'bar') + + + def test_dequeue_any(self): + """Fetching work from any given queue.""" + fooq = Queue('foo') + barq = Queue('bar') + + self.assertEquals(Queue.dequeue_any([fooq, barq], False), None) + + # Enqueue a single item + barq.enqueue(testjob) + job = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(job.func, testjob) + + # Enqueue items on both queues + barq.enqueue(testjob, 'for Bar') + fooq.enqueue(testjob, 'for Foo') + + job = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(job.func, testjob) + self.assertEquals(job.origin, fooq) + self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') + + job = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(job.func, testjob) + self.assertEquals(job.origin, barq) + self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') + + +class TestWorker(RQTestCase): + def test_create_worker(self): + """Worker creation.""" + fooq, barq = Queue('foo'), Queue('bar') + w = Worker([fooq, barq]) + self.assertEquals(w.queues, [fooq, barq]) + + def test_work_and_quit(self): + """Worker processes work, then quits.""" + fooq, barq = Queue('foo'), Queue('bar') + w = Worker([fooq, barq]) + self.assertEquals(w.work(), False, 'Did not expect any work on the queue.') + + fooq.enqueue(testjob, name='Frank') + self.assertEquals(w.work(), True, 'Expected at least some work done.') if __name__ == '__main__':