diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 1571dbb7..5e660b57 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -21,17 +21,17 @@ else: """Class decorator that fills in missing ordering methods""" convert = { '__lt__': [('__gt__', lambda self, other: other < self), - ('__le__', lambda self, other: not other < self), - ('__ge__', lambda self, other: not self < other)], + ('__le__', lambda self, other: not other < self), + ('__ge__', lambda self, other: not self < other)], '__le__': [('__ge__', lambda self, other: other <= self), - ('__lt__', lambda self, other: not other <= self), - ('__gt__', lambda self, other: not self <= other)], + ('__lt__', lambda self, other: not other <= self), + ('__gt__', lambda self, other: not self <= other)], '__gt__': [('__lt__', lambda self, other: other > self), - ('__ge__', lambda self, other: not other > self), - ('__le__', lambda self, other: not self > other)], + ('__ge__', lambda self, other: not other > self), + ('__le__', lambda self, other: not self > other)], '__ge__': [('__le__', lambda self, other: other >= self), - ('__gt__', lambda self, other: not other >= self), - ('__lt__', lambda self, other: not self >= other)] + ('__gt__', lambda self, other: not other >= self), + ('__lt__', lambda self, other: not self >= other)] } roots = set(dir(cls)) & set(convert) if not roots: diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index 84b9ef9e..5608e631 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -9,13 +9,13 @@ def register_sentry(client, worker): """ def send_to_sentry(job, *exc_info): client.captureException( - exc_info=exc_info, - extra={ - 'job_id': job.id, - 'func': job.func_name, - 'args': job.args, - 'kwargs': job.kwargs, - 'description': job.description, - }) + exc_info=exc_info, + extra={ + 'job_id': job.id, + 'func': job.func_name, + 'args': job.args, + 'kwargs': job.kwargs, + 'description': job.description, + }) worker.push_exc_handler(send_to_sentry) diff --git a/rq/job.py b/rq/job.py index 045b57a3..29562d6a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,15 +4,18 @@ from __future__ import (absolute_import, division, print_function, import inspect from uuid import uuid4 + +from rq.compat import as_text, decode_redis_hash, string_types, text_type + +from .connections import resolve_connection +from .exceptions import NoSuchJobError, UnpickleError +from .local import LocalStack +from .utils import import_attribute, utcformat, utcnow, utcparse + try: from cPickle import loads, dumps, UnpicklingError except ImportError: # noqa from pickle import loads, dumps, UnpicklingError # noqa -from .local import LocalStack -from .connections import resolve_connection -from .exceptions import UnpickleError, NoSuchJobError -from .utils import import_attribute, utcnow, utcformat, utcparse -from rq.compat import text_type, decode_redis_hash, as_text, string_types def enum(name, *sequential, **named): diff --git a/rq/logutils.py b/rq/logutils.py index 40c1db2c..0da1d9a4 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -28,7 +28,7 @@ def setup_loghandlers(level=None): "handlers": { "console": { "level": "DEBUG", - #"class": "logging.StreamHandler", + # "class": "logging.StreamHandler", "class": "rq.utils.ColorizingStreamHandler", "formatter": "console", "exclude": ["%(asctime)s"], diff --git a/rq/queue.py b/rq/queue.py index 3b7bec39..cc7fd13e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -148,12 +148,10 @@ class Queue(object): if Job.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - - def push_job_id(self, job_id): # noqa + def push_job_id(self, job_id): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): """Creates a job to represent the delayed function call and enqueues @@ -213,10 +211,10 @@ class Queue(object): description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) - + if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa - args = kwargs.pop('args', None) + args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, @@ -347,7 +345,6 @@ class Queue(object): raise e return job, queue - # Total ordering defition (the rest of the required Python methods are # auto-generated by the @total_ordering decorator) def __eq__(self, other): # noqa diff --git a/rq/scripts/rqgenload.py b/rq/scripts/rqgenload.py index 97bb5def..5c87fbb3 100755 --- a/rq/scripts/rqgenload.py +++ b/rq/scripts/rqgenload.py @@ -14,6 +14,7 @@ def parse_args(): opts, args = parser.parse_args() return (opts, args, parser) + def main(): import sys sys.path.insert(0, '.') @@ -25,12 +26,12 @@ def main(): queues = ('default', 'high', 'low') sample_calls = [ - (dummy.do_nothing, [], {}), - (dummy.sleep, [1], {}), - (dummy.fib, [8], {}), # normal result - (dummy.fib, [24], {}), # takes pretty long - (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc - (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) + (dummy.do_nothing, [], {}), + (dummy.sleep, [1], {}), + (dummy.fib, [8], {}), # normal result + (dummy.fib, [24], {}), # takes pretty long + (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc + (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) ] for i in range(opts.count): @@ -40,28 +41,27 @@ def main(): q = Queue(random.choice(queues)) q.enqueue(f, *args, **kwargs) - #q = Queue('foo') - #q.enqueue(do_nothing) - #q.enqueue(sleep, 3) - #q = Queue('bar') - #q.enqueue(yield_stuff) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) + # q = Queue('foo') + # q.enqueue(do_nothing) + # q.enqueue(sleep, 3) + # q = Queue('bar') + # q.enqueue(yield_stuff) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) if __name__ == '__main__': main() - diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 2f6af7ab..48d318a4 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -111,7 +111,7 @@ def show_workers(args): queues = dict([(q, []) for q in qs]) for w in ws: for q in w.queues: - if not q in queues: + if q not in queues: continue queues[q].append(w) diff --git a/rq/timeouts.py b/rq/timeouts.py index ae0fd48e..afe385d4 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -14,7 +14,7 @@ class JobTimeoutException(Exception): class BaseDeathPenalty(object): """Base class to setup job timeouts.""" - + def __init__(self, timeout): self._timeout = timeout @@ -45,7 +45,7 @@ class BaseDeathPenalty(object): class UnixSignalDeathPenalty(BaseDeathPenalty): - + def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' 'value (%d seconds).' % self._timeout) @@ -63,4 +63,4 @@ class UnixSignalDeathPenalty(BaseDeathPenalty): default signal handling. """ signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) \ No newline at end of file + signal.signal(signal.SIGALRM, signal.SIG_DFL) diff --git a/rq/worker.py b/rq/worker.py index 7bc7113e..a44f4123 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -68,7 +68,6 @@ class Worker(object): redis_workers_keys = 'rq:workers' death_penalty_class = UnixSignalDeathPenalty - @classmethod def all(cls, connection=None): """Returns an iterable of all Workers. @@ -140,8 +139,7 @@ class Worker(object): if exc_handler is not None: self.push_exc_handler(exc_handler) - - def validate_queues(self): # noqa + def validate_queues(self): """Sanity check for the given queues.""" if not iterable(self.queues): raise ValueError('Argument queues not iterable.') @@ -157,8 +155,7 @@ class Worker(object): """Returns the Redis keys representing this worker's queues.""" return map(lambda q: q.key, self.queues) - - @property # noqa + @property def name(self): """Returns the name of the worker, under which it is registered to the monitoring system. @@ -201,8 +198,7 @@ class Worker(object): """ setprocname('rq: %s' % (message,)) - - def register_birth(self): # noqa + def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker %s' % (self.name,)) if self.connection.exists(self.key) and \ @@ -326,8 +322,7 @@ class Worker(object): signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) - - def work(self, burst=False): # noqa + def work(self, burst=False): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all diff --git a/tests/__init__.py b/tests/__init__.py index e704eaaf..93f11b38 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -79,5 +79,5 @@ class RQTestCase(unittest.TestCase): # Pop the connection to Redis testconn = pop_connection() - assert testconn == cls.testconn, 'Wow, something really nasty ' \ - 'happened to the Redis connection stack. Check your setup.' + assert testconn == cls.testconn, \ + 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' diff --git a/tests/helpers.py b/tests/helpers.py index 3ecf0dd7..1475cf0a 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -7,4 +7,3 @@ from datetime import timedelta def strip_microseconds(date): return date - timedelta(microseconds=date.microsecond) - diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 91c53e9d..afb008fc 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -44,7 +44,7 @@ class TestDecorator(RQTestCase): """Ensure that passing in result_ttl to the decorator sets the result_ttl on the job """ - #Ensure default + # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.result_ttl, DEFAULT_RESULT_TTL) diff --git a/tests/test_queue.py b/tests/test_queue.py index a999b5ec..1d467a65 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -23,8 +23,7 @@ class TestQueue(RQTestCase): q = Queue() self.assertEquals(q.name, 'default') - - def test_equality(self): # noqa + def test_equality(self): """Mathematical equality of queues.""" q1 = Queue('foo') q2 = Queue('foo') @@ -35,8 +34,7 @@ class TestQueue(RQTestCase): self.assertNotEquals(q1, q3) self.assertNotEquals(q2, q3) - - def test_empty_queue(self): # noqa + def test_empty_queue(self): """Emptying queues.""" q = Queue('example') @@ -109,8 +107,7 @@ class TestQueue(RQTestCase): self.assertEquals(q.count, 2) - - def test_enqueue(self): # noqa + def test_enqueue(self): """Enqueueing job onto queues.""" q = Queue() self.assertEquals(q.is_empty(), True) @@ -142,8 +139,7 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, q.name) self.assertIsNotNone(job.enqueued_at) - - def test_pop_job_id(self): # noqa + def test_pop_job_id(self): """Popping job IDs from queues.""" # Set up q = Queue() @@ -269,8 +265,8 @@ class TestQueue(RQTestCase): def test_enqueue_explicit_args(self): """enqueue() works for both implicit/explicit args.""" q = Queue() - - # Implicit args/kwargs mode + + # Implicit args/kwargs mode job = q.enqueue(echo, 1, timeout=1, result_ttl=1, bar='baz') self.assertEqual(job.timeout, 1) self.assertEqual(job.result_ttl, 1) @@ -292,7 +288,6 @@ class TestQueue(RQTestCase): ((1,), {'timeout': 1, 'result_ttl': 1}) ) - def test_all_queues(self): """All queues""" q1 = Queue('first-queue') diff --git a/tests/test_worker.py b/tests/test_worker.py index 12554426..27e85bb9 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -26,16 +26,16 @@ class TestWorker(RQTestCase): fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) self.assertEquals(w.work(burst=True), False, - 'Did not expect any work on the queue.') + 'Did not expect any work on the queue.') fooq.enqueue(say_hello, name='Frank') self.assertEquals(w.work(burst=True), True, - 'Expected at least some work done.') + 'Expected at least some work done.') def test_worker_ttl(self): """Worker ttl.""" w = Worker([]) - w.register_birth() # ugly: our test should only call public APIs + w.register_birth() # ugly: our test should only call public APIs [worker_key] = self.testconn.smembers(Worker.redis_workers_keys) self.assertIsNotNone(self.testconn.ttl(worker_key)) w.register_death() @@ -46,7 +46,7 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue('tests.fixtures.say_hello', name='Frank') self.assertEquals(w.work(burst=True), True, - 'Expected at least some work done.') + 'Expected at least some work done.') self.assertEquals(job.result, 'Hi there, Frank!') def test_work_is_unreadable(self): @@ -175,10 +175,9 @@ class TestWorker(RQTestCase): w = Worker([q]) # Put it on the queue with a timeout value - res = q.enqueue( - create_file_after_timeout, - args=(sentinel_file, 4), - timeout=1) + res = q.enqueue(create_file_after_timeout, + args=(sentinel_file, 4), + timeout=1) try: os.unlink(sentinel_file)