rqworker default arguments of socket, worker_ttl, results_ttl bugfix

This commit is contained in:
Tomas Hanacek 2014-04-07 19:10:29 +02:00
parent 35d839f4e2
commit 6494233058
2 changed files with 20 additions and 13 deletions

View File

@ -48,7 +48,7 @@ def setup_default_arguments(args, settings):
args.host = strict_first([args.host, settings.get('REDIS_HOST'), os.environ.get('RQ_REDIS_HOST'), 'localhost'])
args.port = int(strict_first([args.port, settings.get('REDIS_PORT'), os.environ.get('RQ_REDIS_PORT'), 6379]))
args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), False])
args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), None])
args.db = strict_first([args.db, settings.get('REDIS_DB'), os.environ.get('RQ_REDIS_DB'), 0])
args.password = strict_first([args.password, settings.get('REDIS_PASSWORD'), os.environ.get('RQ_REDIS_PASSWORD')])

View File

@ -100,8 +100,8 @@ class Worker(object):
return worker
def __init__(self, queues, name=None,
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
default_result_ttl=None, connection=None,
exc_handler=None, default_worker_ttl=None): # noqa
if connection is None:
connection = get_current_connection()
self.connection = connection
@ -111,8 +111,15 @@ class Worker(object):
self.queues = queues
self.validate_queues()
self._exc_handlers = []
if default_result_ttl is None:
default_result_ttl = DEFAULT_RESULT_TTL
self.default_result_ttl = default_result_ttl
if default_worker_ttl is None:
default_worker_ttl = DEFAULT_WORKER_TTL
self.default_worker_ttl = default_worker_ttl
self._state = 'starting'
self._is_horse = False
self._horse_pid = 0
@ -334,7 +341,7 @@ class Worker(object):
if self.stopped:
self.log.info('Stopping on request.')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
result = self.dequeue_job_and_maintain_ttl(timeout)
@ -359,21 +366,21 @@ class Worker(object):
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()
self.set_state('idle')
self.set_state('idle')
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' %
green(', '.join(qnames)))
while True:
self.heartbeat()
try:
result = Queue.dequeue_any(self.queues, timeout,
connection=self.connection)
if result is not None:
job, queue = result
job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id))
@ -455,9 +462,9 @@ class Worker(object):
"""
self.set_state('busy')
self.set_current_job_id(job.id)
self.set_current_job_id(job.id)
self.heartbeat((job.timeout or 180) + 60)
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
@ -470,14 +477,14 @@ class Worker(object):
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
job._result = rv
self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl)
if result_ttl != 0:
job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
pipeline.execute()
except Exception: