Merge pull request #531 from RyanMTB/updated_worker_api

updated worker api see issue 255
This commit is contained in:
Selwin Ong 2015-05-23 10:19:16 +07:00
commit 779737f1c9
3 changed files with 49 additions and 5 deletions

View File

@ -70,6 +70,9 @@ class Queue(object):
def __len__(self):
return self.count
def __iter__(self):
yield self
@property
def key(self):
"""Returns the Redis key for this Queue."""

View File

@ -125,8 +125,7 @@ class Worker(object):
if connection is None:
connection = get_current_connection()
self.connection = connection
if isinstance(queues, self.queue_class):
queues = [queues]
queues = self.process_queue_args(queues)
self._name = name
self.queues = queues
self.validate_queues()
@ -160,11 +159,18 @@ class Worker(object):
def validate_queues(self):
"""Sanity check for the given queues."""
if not iterable(self.queues):
raise ValueError('Argument queues not iterable.')
for queue in self.queues:
if not isinstance(queue, self.queue_class):
raise NoQueueError('Give each worker at least one Queue.')
raise NoQueueError('{0} is not a queue'.format(queue))
def process_queue_args(self, queue_args):
""" allow for a string, a queue an iterable of strings
or an iterable of queues"""
if isinstance(queue_args, text_type):
return self.queue_class(name = queue_args)
else:
return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type)
else queue_arg for queue_arg in queue_args]
def queue_names(self):
"""Returns the queue names of this worker's queues."""

View File

@ -12,6 +12,7 @@ from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker
from rq.compat import as_text
from rq.exceptions import NoQueueError
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
@ -28,6 +29,40 @@ class TestWorker(RQTestCase):
w = Worker([fooq, barq])
self.assertEquals(w.queues, [fooq, barq])
def test_create_worker_args_single_queue(self):
"""Test Worker creation with single queue instance arg"""
fooq = Queue('foo')
w = Worker(fooq)
self.assertEquals(w.queue_keys(), ['rq:queue:foo'])
def test_create_worker_args_single_string(self):
""" Test Worker creation with single string arg"""
w = Worker('foo')
self.assertEquals(w.queue_keys(),['rq:queue:foo'])
def test_create_worker_args_iterable_strings(self):
""" Test Worker creation with iterable of strings"""
w = Worker(['foo', 'bar'])
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
def test_create_worker_args_iterable_queues(self):
""" Test Worker test worker creation
with an iterable of queue instance args"""
w = Worker(map(Queue, ['foo', 'bar']))
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
def test_create_worker_args_list_map(self):
""" Test Worker test worker creation
with a list of queue from map"""
w = Worker(list(map(Queue, ['foo', 'bar'])))
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
def test_create_worker_raises_noqueue_error(self):
""" make sure raises noqueue error if a
a non string or queue is passed"""
with self.assertRaises(NoQueueError):
w = Worker([1])
def test_work_and_quit(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar')