Add option to enqueue a job's dependents when canceling (#1549)

* Add option to enqueue a jobs dependents when canceling

* Address @selwin's review
This commit is contained in:
Josh Cohen 2021-08-26 08:16:12 -04:00 committed by GitHub
parent b80045d615
commit bac58f24ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 18 deletions

View File

@ -55,9 +55,12 @@ Any job ID that is encountered by a worker for which no job hash is found in
Redis is simply ignored. This makes it easy to cancel jobs by simply removing
the job hash. In Python:
```python
from rq import cancel_job
cancel_job('2eafc1e6-48c2-464b-a0ff-88fd199d039c')
```
Note that it is irrelevant on which queue the job resides. When a worker
eventually pops the job ID from the queue and notes that the Job hash does not
exist (anymore), it simply discards the job ID and continues with the next.

View File

@ -171,6 +171,17 @@ Canceling a job will remove:
Note that `job.cancel()` does **not** delete the job itself from Redis. If you want to
delete the job from Redis and reclaim memory, use `job.delete()`.
Note: if you want to enqueue the dependents of the job you
are trying to cancel use the following:
```python
from rq import cancel_job
cancel_job(
'2eafc1e6-48c2-464b-a0ff-88fd199d039c',
enqueue_dependents=True
)
```
## Job / Queue Creation with Custom Serializer
When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments.

View File

@ -16,6 +16,8 @@ from enum import Enum
from functools import partial
from uuid import uuid4
from redis import WatchError
from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection
from .exceptions import DeserializationError, NoSuchJobError
@ -46,11 +48,11 @@ class JobStatus(str, Enum):
UNEVALUATED = object()
def cancel_job(job_id, connection=None, serializer=None):
def cancel_job(job_id, connection=None, serializer=None, enqueue_dependents=False):
"""Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later).
"""
Job.fetch(job_id, connection=connection, serializer=serializer).cancel()
Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents)
def get_current_job(connection=None, job_class=None):
@ -676,32 +678,56 @@ class Job:
meta = self.serializer.dumps(self.meta)
self.connection.hset(self.key, 'meta', meta)
def cancel(self, pipeline=None):
def cancel(self, pipeline=None, enqueue_dependents=False):
"""Cancels the given job, which will prevent the job from ever being
ran (or inspected).
This method merely exists as a high-level API call to cancel jobs
without worrying about the internals required to implement job
cancellation.
You can enqueue the jobs dependents optionally,
Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in.
"""
pipeline = pipeline or self.connection.pipeline()
if self.origin:
from .registry import CanceledJobRegistry
from .queue import Queue
q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
q.remove(self, pipeline=pipeline)
from .registry import CanceledJobRegistry
from .queue import Queue
pipe = pipeline or self.connection.pipeline()
while True:
try:
q = Queue(
name=self.origin,
connection=self.connection,
job_class=self.__class__,
serializer=self.serializer
)
if enqueue_dependents:
# Only WATCH if no pipeline passed, otherwise caller is responsible
if pipeline is None:
pipe.watch(self.dependents_key)
q.enqueue_dependents(self, pipeline=pipeline)
q.remove(self, pipeline=pipe)
self.set_status(JobStatus.CANCELED, pipeline=pipeline)
self.set_status(JobStatus.CANCELED, pipeline=pipe)
registry = CanceledJobRegistry(
self.origin,
self.connection,
job_class=self.__class__,
serializer=self.serializer
)
registry.add(self, pipeline=pipeline)
pipeline.execute()
registry = CanceledJobRegistry(
self.origin,
self.connection,
job_class=self.__class__,
serializer=self.serializer
)
registry.add(self, pipeline=pipe)
if pipeline is None:
pipe.execute()
break
except WatchError:
if pipeline is None:
continue
else:
# if the pipeline comes from the caller, we re-raise the
# exception as it it the responsibility of the caller to
# handle it
raise
def requeue(self):
"""Requeues job."""

View File

@ -812,6 +812,54 @@ class TestJob(RQTestCase):
job.delete()
self.assertNotIn(job, registry)
def test_create_and_cancel_job_enqueue_dependents(self):
"""Ensure job.cancel() works properly with enqueue_dependents=True"""
queue = Queue(connection=self.testconn)
dependency = queue.enqueue(fixtures.say_hello)
dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
cancel_job(dependency.id, enqueue_dependents=True)
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(0, len(queue.deferred_job_registry))
registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
self.assertIn(dependency, registry)
self.assertEqual(dependency.get_status(), JobStatus.CANCELED)
self.assertIn(dependent, queue.get_jobs())
self.assertEqual(dependent.get_status(), JobStatus.QUEUED)
# If job is deleted, it's also removed from CanceledJobRegistry
dependency.delete()
self.assertNotIn(dependency, registry)
def test_create_and_cancel_job_enqueue_dependents_with_pipeline(self):
"""Ensure job.cancel() works properly with enqueue_dependents=True"""
queue = Queue(connection=self.testconn)
dependency = queue.enqueue(fixtures.say_hello)
dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
self.testconn.set('some:key', b'some:value')
with self.testconn.pipeline() as pipe:
pipe.watch('some:key')
self.assertEqual(self.testconn.get('some:key'), b'some:value')
dependency.cancel(pipeline=pipe, enqueue_dependents=True)
pipe.set('some:key', b'some:other:value')
pipe.execute()
self.assertEqual(self.testconn.get('some:key'), b'some:other:value')
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(0, len(queue.deferred_job_registry))
registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
self.assertIn(dependency, registry)
self.assertEqual(dependency.get_status(), JobStatus.CANCELED)
self.assertIn(dependent, queue.get_jobs())
self.assertEqual(dependent.get_status(), JobStatus.QUEUED)
# If job is deleted, it's also removed from CanceledJobRegistry
dependency.delete()
self.assertNotIn(dependency, registry)
def test_create_and_cancel_job_with_serializer(self):
"""test creating and using cancel_job (with serializer) deletes job properly"""
queue = Queue(connection=self.testconn, serializer=JSONSerializer)