From 6494233058fa56e2314a179bbf9ac1df1feaeb2e Mon Sep 17 00:00:00 2001 From: Tomas Hanacek Date: Mon, 7 Apr 2014 19:10:29 +0200 Subject: [PATCH] rqworker default arguments of socket, worker_ttl, results_ttl bugfix --- rq/scripts/__init__.py | 2 +- rq/worker.py | 31 +++++++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index bfbb3a13..cab1f976 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -48,7 +48,7 @@ def setup_default_arguments(args, settings): args.host = strict_first([args.host, settings.get('REDIS_HOST'), os.environ.get('RQ_REDIS_HOST'), 'localhost']) args.port = int(strict_first([args.port, settings.get('REDIS_PORT'), os.environ.get('RQ_REDIS_PORT'), 6379])) - args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), False]) + args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), None]) args.db = strict_first([args.db, settings.get('REDIS_DB'), os.environ.get('RQ_REDIS_DB'), 0]) args.password = strict_first([args.password, settings.get('REDIS_PASSWORD'), os.environ.get('RQ_REDIS_PASSWORD')]) diff --git a/rq/worker.py b/rq/worker.py index 05b4fa0d..11df4074 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -100,8 +100,8 @@ class Worker(object): return worker def __init__(self, queues, name=None, - default_result_ttl=DEFAULT_RESULT_TTL, connection=None, - exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa + default_result_ttl=None, connection=None, + exc_handler=None, default_worker_ttl=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -111,8 +111,15 @@ class Worker(object): self.queues = queues self.validate_queues() self._exc_handlers = [] + + if default_result_ttl is None: + default_result_ttl = DEFAULT_RESULT_TTL self.default_result_ttl = default_result_ttl + + if default_worker_ttl is None: + default_worker_ttl = DEFAULT_WORKER_TTL self.default_worker_ttl = default_worker_ttl + self._state = 'starting' self._is_horse = False self._horse_pid = 0 @@ -334,7 +341,7 @@ class Worker(object): if self.stopped: self.log.info('Stopping on request.') break - + timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -359,21 +366,21 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = self.queue_names() - - self.set_state('idle') + + self.set_state('idle') self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') self.log.info('*** Listening on %s...' % green(', '.join(qnames))) - + while True: self.heartbeat() - + try: result = Queue.dequeue_any(self.queues, timeout, connection=self.connection) if result is not None: - job, queue = result + job, queue = result self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) @@ -455,9 +462,9 @@ class Worker(object): """ self.set_state('busy') - self.set_current_job_id(job.id) + self.set_current_job_id(job.id) self.heartbeat((job.timeout or 180) + 60) - + self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) @@ -470,14 +477,14 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails job._result = rv - + self.set_current_job_id(None, pipeline=pipeline) result_ttl = job.get_ttl(self.default_result_ttl) if result_ttl != 0: job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) - + pipeline.execute() except Exception: