mirror of https://github.com/rq/rq.git
Moved some logic from worker.perform_job() to job.cleanup().
This commit is contained in:
parent
d5f1740c3d
commit
85e9014296
28
rq/job.py
28
rq/job.py
|
@ -287,9 +287,10 @@ class Job(object):
|
|||
self._status = obj.get('status') if obj.get('status') else None
|
||||
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
|
||||
|
||||
def save(self):
|
||||
def save(self, pipeline=None):
|
||||
"""Persists the current job instance to its corresponding Redis key."""
|
||||
key = self.key
|
||||
connection = pipeline if pipeline is not None else self.connection
|
||||
|
||||
obj = {}
|
||||
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
|
||||
|
@ -317,7 +318,7 @@ class Job(object):
|
|||
if self.meta:
|
||||
obj['meta'] = dumps(self.meta)
|
||||
|
||||
self.connection.hmset(key, obj)
|
||||
connection.hmset(key, obj)
|
||||
|
||||
def cancel(self):
|
||||
"""Cancels the given job, which will prevent the job from ever being
|
||||
|
@ -346,6 +347,13 @@ class Job(object):
|
|||
return self._result
|
||||
|
||||
|
||||
def get_ttl(self, default_ttl=None):
|
||||
"""Returns ttl for a job that determines how long a job and its result
|
||||
will be persisted. In the future, this method will also be responsible
|
||||
for determining ttl for repeated jobs.
|
||||
"""
|
||||
return default_ttl if self.result_ttl is None else self.result_ttl
|
||||
|
||||
# Representation
|
||||
def get_call_string(self): # noqa
|
||||
"""Returns a string representation of the call, formatted as a regular
|
||||
|
@ -359,6 +367,22 @@ class Job(object):
|
|||
args = ', '.join(arg_list)
|
||||
return '%s(%s)' % (self.func_name, args)
|
||||
|
||||
def cleanup(self, ttl=None, pipeline=None):
|
||||
"""Prepare job for eventual deletion (if needed). This method is usually
|
||||
called after successful execution. How long we persist the job and its
|
||||
result depends on the value of result_ttl:
|
||||
- If result_ttl is 0, cleanup the job immediately.
|
||||
- If it's a positive number, set the job to expire in X seconds.
|
||||
- If result_ttl is negative, don't set an expiry to it (persist
|
||||
forever)
|
||||
"""
|
||||
if ttl == 0:
|
||||
self.cancel()
|
||||
elif ttl > 0:
|
||||
connection = pipeline if pipeline is not None else self.connection
|
||||
connection.expire(self.key, ttl)
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return '<Job %s: %s>' % (self.id, self.description)
|
||||
|
||||
|
|
31
rq/worker.py
31
rq/worker.py
|
@ -412,9 +412,17 @@ class Worker(object):
|
|||
|
||||
# Pickle the result in the same try-except block since we need to
|
||||
# use the same exc handling when pickling fails
|
||||
pickled_rv = dumps(rv)
|
||||
job._result = rv
|
||||
job._status = Status.FINISHED
|
||||
job.ended_at = times.now()
|
||||
|
||||
result_ttl = job.get_ttl(self.default_result_ttl)
|
||||
pipeline = self.connection.pipeline()
|
||||
if result_ttl != 0:
|
||||
job.save(pipeline=pipeline)
|
||||
job.cleanup(result_ttl, pipeline=pipeline)
|
||||
pipeline.execute()
|
||||
|
||||
except:
|
||||
# Use the public setter here, to immediately update Redis
|
||||
job.status = Status.FAILED
|
||||
|
@ -426,27 +434,12 @@ class Worker(object):
|
|||
else:
|
||||
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
|
||||
|
||||
# How long we persist the job result depends on the value of
|
||||
# result_ttl:
|
||||
# - If result_ttl is 0, cleanup the job immediately.
|
||||
# - If it's a positive number, set the job to expire in X seconds.
|
||||
# - If result_ttl is negative, don't set an expiry to it (persist
|
||||
# forever)
|
||||
result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa
|
||||
if result_ttl == 0:
|
||||
job.delete()
|
||||
self.log.info('Result discarded immediately.')
|
||||
elif result_ttl > 0:
|
||||
self.log.info('Result is kept for %d seconds.' % result_ttl)
|
||||
else:
|
||||
p = self.connection.pipeline()
|
||||
p.hset(job.key, 'result', pickled_rv)
|
||||
p.hset(job.key, 'status', job._status)
|
||||
p.hset(job.key, 'ended_at', times.format(job.ended_at, 'UTC'))
|
||||
if result_ttl > 0:
|
||||
p.expire(job.key, result_ttl)
|
||||
self.log.info('Result is kept for %d seconds.' % result_ttl)
|
||||
else:
|
||||
self.log.warning('Result will never expire, clean up result key manually.')
|
||||
p.execute()
|
||||
self.log.warning('Result will never expire, clean up result key manually.')
|
||||
|
||||
return True
|
||||
|
||||
|
|
|
@ -219,3 +219,33 @@ class TestJob(RQTestCase):
|
|||
id = job.perform()
|
||||
self.assertEqual(job.id, id)
|
||||
self.assertEqual(job.func, access_self)
|
||||
|
||||
def test_get_ttl(self):
|
||||
"""Getting job TTL."""
|
||||
job_ttl = 1
|
||||
default_ttl = 2
|
||||
job = Job.create(func=say_hello, result_ttl=job_ttl)
|
||||
job.save()
|
||||
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl)
|
||||
self.assertEqual(job.get_ttl(), job_ttl)
|
||||
job = Job.create(func=say_hello)
|
||||
job.save()
|
||||
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
|
||||
self.assertEqual(job.get_ttl(), None)
|
||||
|
||||
def test_cleanup(self):
|
||||
"""Test that jobs and results are expired properly."""
|
||||
job = Job.create(func=say_hello)
|
||||
job.save()
|
||||
|
||||
# Jobs with negative TTLs don't expire
|
||||
job.cleanup(ttl=-1)
|
||||
self.assertEqual(self.testconn.ttl(job.key), -1)
|
||||
|
||||
# Jobs with positive TTLs are eventually deleted
|
||||
job.cleanup(ttl=100)
|
||||
self.assertEqual(self.testconn.ttl(job.key), 100)
|
||||
|
||||
# Jobs with 0 TTL are immediately deleted
|
||||
job.cleanup(ttl=0)
|
||||
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
|
||||
|
|
Loading…
Reference in New Issue