Safe fetching a deleted job removes the deleted job from queue.

This commit is contained in:
Selwin Ong 2013-02-16 18:50:37 +07:00
parent 501a3870e1
commit c987569650
2 changed files with 34 additions and 2 deletions

View File

@ -76,6 +76,7 @@ class Queue(object):
try:
job = Job.safe_fetch(job_id, connection=self.connection)
except NoSuchJobError:
self.remove(job_id)
return None
except UnpickleError:
return None
@ -88,6 +89,11 @@ class Queue(object):
"""Returns a count of all messages in the queue."""
return self.connection.llen(self.key)
def remove(self, job_or_id):
"""Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, Job) else job_or_id
return self.connection._lrem(self.key, 0, job_id)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
@ -316,11 +322,11 @@ class FailedQueue(Queue):
job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing)
self.connection._lrem(self.key, 0, job_id)
self.remove(job_id)
return
# Delete it from the failed queue (raise an error if that failed)
if self.connection._lrem(self.key, 0, job.id) == 0:
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.status = Status.QUEUED

View File

@ -50,6 +50,32 @@ class TestQueue(RQTestCase):
self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEquals(q.is_empty(), False)
def test_remove(self):
"""Ensure queue.remove properly removes Job from queue."""
q = Queue('example')
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job)
self.assertNotIn(job.id, q.job_ids)
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job.id)
self.assertNotIn(job.id, q.job_ids)
def test_jobs(self):
"""Getting jobs out of a queue."""
q = Queue('example')
self.assertEqual(q.jobs, [])
job = q.enqueue(say_hello)
self.assertEqual(q.jobs, [job])
# Fetching a deleted removes it from queue
job.delete()
self.assertEqual(q.job_ids, [job.id])
q.jobs
self.assertEqual(q.job_ids, [])
def test_compact(self):
"""Compacting queueus."""
q = Queue()