From 724c844378774260c8c8cb9216017336ee7d6f3d Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 28 Apr 2016 10:04:12 +0800 Subject: [PATCH] Don't call job.cancel if job has finished --- rq/job.py | 9 +++++---- rq/worker.py | 2 +- tests/test_worker.py | 6 ++++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index 54a0f83f..eceb7711 100644 --- a/rq/job.py +++ b/rq/job.py @@ -480,9 +480,10 @@ class Job(object): queue.remove(self, pipeline=pipeline) pipeline.execute() - def delete(self, pipeline=None): + def delete(self, pipeline=None, remove_from_queue=True): """Cancels the job and deletes the job hash from Redis.""" - self.cancel() + if remove_from_queue: + self.cancel() connection = pipeline if pipeline is not None else self.connection connection.delete(self.key) connection.delete(self.dependents_key) @@ -530,7 +531,7 @@ class Job(object): return '{0}({1})'.format(self.func_name, args) - def cleanup(self, ttl=None, pipeline=None): + def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its result depends on the value of ttl: @@ -540,7 +541,7 @@ class Job(object): forever) """ if ttl == 0: - self.delete() + self.delete(remove_from_queue=remove_from_queue) elif not ttl: return elif ttl > 0: diff --git a/rq/worker.py b/rq/worker.py index 67e8ea7c..5d4e74f3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -605,7 +605,7 @@ class Worker(object): finished_job_registry = FinishedJobRegistry(job.origin, self.connection) finished_job_registry.add(job, result_ttl, pipeline) - job.cleanup(result_ttl, pipeline=pipeline) + job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() diff --git a/tests/test_worker.py b/tests/test_worker.py index bae0f6fb..f135ee59 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -273,20 +273,26 @@ class TestWorker(RQTestCase): q = Queue() job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) + self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) self.assertNotEqual(self.testconn._ttl(job.key), 0) + self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) + self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) self.assertEqual(self.testconn._ttl(job.key), -1) + self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) # Job with result_ttl = 0 gets deleted immediately job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) w = Worker([q]) + self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) self.assertEqual(self.testconn.get(job.key), None) + self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) def test_worker_sets_job_status(self): """Ensure that worker correctly sets job status."""