From 9a00b0eca6e79f0f24645cbf92f762e0d6bb8a6a Mon Sep 17 00:00:00 2001 From: RyanMTB Date: Wed, 20 May 2015 21:48:13 -0700 Subject: [PATCH] Updated Worker API --- rq/queue.py | 3 +++ rq/worker.py | 16 +++++++++++----- tests/test_worker.py | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 2187c7ed..bf5a1862 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -70,6 +70,9 @@ class Queue(object): def __len__(self): return self.count + def __iter__(self): + yield self + @property def key(self): """Returns the Redis key for this Queue.""" diff --git a/rq/worker.py b/rq/worker.py index fa3c49b5..39e8ce6d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -125,8 +125,7 @@ class Worker(object): if connection is None: connection = get_current_connection() self.connection = connection - if isinstance(queues, self.queue_class): - queues = [queues] + queues = self.process_queue_args(queues) self._name = name self.queues = queues self.validate_queues() @@ -160,11 +159,18 @@ class Worker(object): def validate_queues(self): """Sanity check for the given queues.""" - if not iterable(self.queues): - raise ValueError('Argument queues not iterable.') for queue in self.queues: if not isinstance(queue, self.queue_class): - raise NoQueueError('Give each worker at least one Queue.') + raise NoQueueError('{0} is not a queue'.format(queue)) + + def process_queue_args(self, queue_args): + """ allow for a string, a queue an iterable of strings + or an iterable of queues""" + if isinstance(queue_args, text_type): + return self.queue_class(name = queue_args) + else: + return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) + else queue_arg for queue_arg in queue_args] def queue_names(self): """Returns the queue names of this worker's queues.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 6f89f4af..c74d36a7 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -12,6 +12,7 @@ from tests.helpers import strip_microseconds from rq import get_failed_queue, Queue, SimpleWorker, Worker from rq.compat import as_text +from rq.exceptions import NoQueueError from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend @@ -28,6 +29,40 @@ class TestWorker(RQTestCase): w = Worker([fooq, barq]) self.assertEquals(w.queues, [fooq, barq]) + def test_create_worker_args_single_queue(self): + """Test Worker creation with single queue instance arg""" + fooq = Queue('foo') + w = Worker(fooq) + self.assertEquals(w.queue_keys(), ['rq:queue:foo']) + + def test_create_worker_args_single_string(self): + """ Test Worker creation with single string arg""" + w = Worker('foo') + self.assertEquals(w.queue_keys(),['rq:queue:foo']) + + def test_create_worker_args_iterable_strings(self): + """ Test Worker creation with iterable of strings""" + w = Worker(['foo', 'bar']) + self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) + + def test_create_worker_args_iterable_queues(self): + """ Test Worker test worker creation + with an iterable of queue instance args""" + w = Worker(map(Queue, ['foo', 'bar'])) + self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) + + def test_create_worker_args_list_map(self): + """ Test Worker test worker creation + with a list of queue from map""" + w = Worker(list(map(Queue, ['foo', 'bar']))) + self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) + + def test_create_worker_raises_noqueue_error(self): + """ make sure raises noqueue error if a + a non string or queue is passed""" + with self.assertRaises(NoQueueError): + w = Worker([1]) + def test_work_and_quit(self): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar')