Add at_front for scheduled/enqueue_at jobs (#1743)

* Add at_front for scheduled/enqueue_at jobs

* Add test to at_front
This commit is contained in:
gabriels1234 2022-12-28 18:57:34 -05:00 committed by GitHub
parent 5f7ed6970d
commit 50d8d72928
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 2 deletions

View File

@ -527,7 +527,8 @@ class Queue:
failure_ttl=failure_ttl, description=description, failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta, retry=retry, depends_on=depends_on, job_id=job_id, meta=meta, retry=retry,
on_success=on_success, on_failure=on_failure) on_success=on_success, on_failure=on_failure)
if at_front:
job.enqueue_at_front = True
return self.schedule_job(job, datetime, pipeline=pipeline) return self.schedule_job(job, datetime, pipeline=pipeline)
def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None): def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None):

View File

@ -162,7 +162,7 @@ class RQScheduler:
) )
for job in jobs: for job in jobs:
if job is not None: if job is not None:
queue.enqueue_job(job, pipeline=pipeline) queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
registry.remove(job, pipeline=pipeline) registry.remove(job, pipeline=pipeline)
pipeline.execute() pipeline.execute()
self._status = self.Status.STARTED self._status = self.Status.STARTED

View File

@ -348,6 +348,33 @@ class TestQueue(RQTestCase):
self.assertEqual(len(queue), 1) self.assertEqual(len(queue), 1)
self.assertEqual(len(registry), 0) self.assertEqual(len(registry), 0)
def test_enqueue_at_at_front(self):
"""queue.enqueue_at() accepts at_front argument. When true, job will be put at position 0
of the queue when the time comes for the job to be scheduled"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry
# job_first should be enqueued first
job_first = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
# job_second will be enqueued second, but "at_front"
job_second = queue.enqueue_at(datetime(2019, 1, 2, tzinfo=timezone.utc), say_hello, at_front=True)
self.assertEqual(len(queue), 0)
self.assertEqual(len(registry), 2)
# enqueue_at set job status to "scheduled"
self.assertTrue(job_first.get_status() == 'scheduled')
self.assertTrue(job_second.get_status() == 'scheduled')
# After enqueue_scheduled_jobs() is called, the registry is empty
# and job is enqueued
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 2)
self.assertEqual(len(registry), 0)
self.assertEqual(0, queue.get_job_position(job_second.id))
self.assertEqual(1, queue.get_job_position(job_first.id))
def test_enqueue_in(self): def test_enqueue_in(self):
"""queue.enqueue_in() schedules job correctly""" """queue.enqueue_in() schedules job correctly"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)