Merge pull request #691 from amyangfei/delete_job_fix

Don't call job.cancel if job has finished
This commit is contained in:
Selwin Ong 2016-05-05 06:34:59 +07:00
commit 779a1683c7
3 changed files with 12 additions and 5 deletions

View File

@ -480,9 +480,10 @@ class Job(object):
queue.remove(self, pipeline=pipeline)
pipeline.execute()
def delete(self, pipeline=None):
def delete(self, pipeline=None, remove_from_queue=True):
"""Cancels the job and deletes the job hash from Redis."""
self.cancel()
if remove_from_queue:
self.cancel()
connection = pipeline if pipeline is not None else self.connection
connection.delete(self.key)
connection.delete(self.dependents_key)
@ -530,7 +531,7 @@ class Job(object):
return '{0}({1})'.format(self.func_name, args)
def cleanup(self, ttl=None, pipeline=None):
def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True):
"""Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its
result depends on the value of ttl:
@ -540,7 +541,7 @@ class Job(object):
forever)
"""
if ttl == 0:
self.delete()
self.delete(remove_from_queue=remove_from_queue)
elif not ttl:
return
elif ttl > 0:

View File

@ -605,7 +605,7 @@ class Worker(object):
finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()

View File

@ -273,20 +273,26 @@ class TestWorker(RQTestCase):
q = Queue()
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertNotEqual(self.testconn._ttl(job.key), 0)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn._ttl(job.key), -1)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn.get(job.key), None)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
def test_worker_sets_job_status(self):
"""Ensure that worker correctly sets job status."""