Merge branch 'KanbanSolutions-use-pipeline-context'

This commit is contained in:
Vincent Driessen 2014-01-23 10:41:06 +01:00
commit 9ad7da684c
1 changed files with 19 additions and 19 deletions

View File

@ -416,28 +416,28 @@ class Worker(object):
job.func_name,
job.origin, time.time()))
try:
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
rv = job.perform()
with self.connection._pipeline() as pipeline:
try:
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
rv = job.perform()
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
job._result = rv
job._status = Status.FINISHED
job.ended_at = utcnow()
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
job._result = rv
job._status = Status.FINISHED
job.ended_at = utcnow()
result_ttl = job.get_ttl(self.default_result_ttl)
pipeline = self.connection._pipeline()
if result_ttl != 0:
job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
pipeline.execute()
result_ttl = job.get_ttl(self.default_result_ttl)
if result_ttl != 0:
job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
pipeline.execute()
except:
# Use the public setter here, to immediately update Redis
job.status = Status.FAILED
self.handle_exception(job, *sys.exc_info())
return False
except Exception:
# Use the public setter here, to immediately update Redis
job.status = Status.FAILED
self.handle_exception(job, *sys.exc_info())
return False
if rv is None:
self.log.info('Job OK')