job.cancel() should remove itself from queue.

This commit is contained in:
Selwin Ong 2014-07-15 09:59:55 +07:00
parent 052d0df4bf
commit 638211df20
5 changed files with 17 additions and 11 deletions

View File

@ -447,9 +447,14 @@ class Job(object):
cancellation. Technically, this call is (currently) the same as just cancellation. Technically, this call is (currently) the same as just
deleting the job hash. deleting the job hash.
""" """
from .queue import Queue
pipeline = self.connection._pipeline() pipeline = self.connection._pipeline()
self.delete(pipeline=pipeline) self.delete(pipeline=pipeline)
pipeline.delete(self.dependents_key) pipeline.delete(self.dependents_key)
if self.origin:
queue = Queue(name=self.origin, connection=self.connection)
queue.remove(self, pipeline=pipeline)
pipeline.execute() pipeline.execute()
def delete(self, pipeline=None): def delete(self, pipeline=None):

View File

@ -130,9 +130,13 @@ class Queue(object):
"""Returns a count of all messages in the queue.""" """Returns a count of all messages in the queue."""
return self.connection.llen(self.key) return self.connection.llen(self.key)
def remove(self, job_or_id): def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None:
pipeline.lrem(self.key, 0, job_id)
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 0, job_id)
def compact(self): def compact(self):

View File

@ -321,9 +321,12 @@ class TestJob(RQTestCase):
def test_cancel(self): def test_cancel(self):
"""job.cancel() deletes itself & dependents mapping from Redis.""" """job.cancel() deletes itself & dependents mapping from Redis."""
job = Job.create(func=say_hello) queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
job2 = Job.create(func=say_hello, depends_on=job) job2 = Job.create(func=say_hello, depends_on=job)
job2.register_dependency() job2.register_dependency()
job.cancel() job.cancel()
self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.key))
self.assertFalse(self.testconn.exists(job.dependents_key)) self.assertFalse(self.testconn.exists(job.dependents_key))
self.assertNotIn(job.id, queue.get_job_ids())

View File

@ -90,16 +90,12 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
def test_compact(self): def test_compact(self):
"""Compacting queueus.""" """Queue.compact() removes non-existing jobs."""
q = Queue() q = Queue()
q.enqueue(say_hello, 'Alice') q.enqueue(say_hello, 'Alice')
bob = q.enqueue(say_hello, 'Bob')
q.enqueue(say_hello, 'Charlie') q.enqueue(say_hello, 'Charlie')
debrah = q.enqueue(say_hello, 'Debrah') self.testconn.lpush(q.key, '1', '2')
bob.cancel()
debrah.cancel()
self.assertEquals(q.count, 4) self.assertEquals(q.count, 4)

View File

@ -155,9 +155,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(create_file, SENTINEL_FILE) job = q.enqueue(create_file, SENTINEL_FILE)
# Here, we cancel the job, so the sentinel file may not be created # Here, we cancel the job, so the sentinel file may not be created
assert q.count == 1 self.testconn.delete(job.key)
job.cancel()
assert q.count == 1
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)