Ensure pipeline in multi mode after dep setup (#1498)

This commit is contained in:
Josh Cohen 2021-06-21 23:30:33 -04:00 committed by GitHub
parent 456743b225
commit 591f11bcc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 0 deletions

View File

@ -366,6 +366,8 @@ class Queue:
else:
# if pipeline comes from caller, re-raise to them
raise
elif pipeline is not None:
pipeline.multi() # Ensure pipeline in multi mode before returning to caller
return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,

View File

@ -539,6 +539,22 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
def test_enqueue_job_with_no_dependency_prior_watch_and_pipeline(self):
"""Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
q = Queue()
with q.connection.pipeline() as pipe:
pipe.watch(b'fake_key') # Test watch then enqueue
job = q.enqueue_call(say_hello, pipeline=pipe)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
# Not in queue before execute, since passed in pipeline
self.assertEqual(len(q), 0)
# Make sure modifying key doesn't cause issues, if in multi mode won't fail
pipe.set(b'fake_key', b'fake_value')
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 1)
def test_enqueue_many_internal_pipeline(self):
"""Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided
(but at_front still applies)"""