diff --git a/rq/queue.py b/rq/queue.py index 8890d276..e5072cbe 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -366,6 +366,8 @@ class Queue: else: # if pipeline comes from caller, re-raise to them raise + elif pipeline is not None: + pipeline.multi() # Ensure pipeline in multi mode before returning to caller return job def enqueue_call(self, func, args=None, kwargs=None, timeout=None, diff --git a/tests/test_queue.py b/tests/test_queue.py index 9d005459..510c6714 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -539,6 +539,22 @@ class TestQueue(RQTestCase): self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + def test_enqueue_job_with_no_dependency_prior_watch_and_pipeline(self): + """Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline.""" + q = Queue() + with q.connection.pipeline() as pipe: + pipe.watch(b'fake_key') # Test watch then enqueue + job = q.enqueue_call(say_hello, pipeline=pipe) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + # Not in queue before execute, since passed in pipeline + self.assertEqual(len(q), 0) + # Make sure modifying key doesn't cause issues, if in multi mode won't fail + pipe.set(b'fake_key', b'fake_value') + pipe.execute() + # Only in registry after execute, since passed in pipeline + self.assertEqual(len(q), 1) + def test_enqueue_many_internal_pipeline(self): """Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided (but at_front still applies)"""