diff --git a/rq/queue.py b/rq/queue.py index 6628565c..853cd2a6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -527,7 +527,8 @@ class Queue: failure_ttl=failure_ttl, description=description, depends_on=depends_on, job_id=job_id, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure) - + if at_front: + job.enqueue_at_front = True return self.schedule_job(job, datetime, pipeline=pipeline) def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None): diff --git a/rq/scheduler.py b/rq/scheduler.py index 912f844a..577976c0 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -162,7 +162,7 @@ class RQScheduler: ) for job in jobs: if job is not None: - queue.enqueue_job(job, pipeline=pipeline) + queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) registry.remove(job, pipeline=pipeline) pipeline.execute() self._status = self.Status.STARTED diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index e90c4297..76a26855 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -348,6 +348,33 @@ class TestQueue(RQTestCase): self.assertEqual(len(queue), 1) self.assertEqual(len(registry), 0) + def test_enqueue_at_at_front(self): + """queue.enqueue_at() accepts at_front argument. When true, job will be put at position 0 + of the queue when the time comes for the job to be scheduled""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + scheduler = RQScheduler([queue], connection=self.testconn) + scheduler.acquire_locks() + # Jobs created using enqueue_at is put in the ScheduledJobRegistry + # job_first should be enqueued first + job_first = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello) + # job_second will be enqueued second, but "at_front" + job_second = queue.enqueue_at(datetime(2019, 1, 2, tzinfo=timezone.utc), say_hello, at_front=True) + self.assertEqual(len(queue), 0) + self.assertEqual(len(registry), 2) + + # enqueue_at set job status to "scheduled" + self.assertTrue(job_first.get_status() == 'scheduled') + self.assertTrue(job_second.get_status() == 'scheduled') + + # After enqueue_scheduled_jobs() is called, the registry is empty + # and job is enqueued + scheduler.enqueue_scheduled_jobs() + self.assertEqual(len(queue), 2) + self.assertEqual(len(registry), 0) + self.assertEqual(0, queue.get_job_position(job_second.id)) + self.assertEqual(1, queue.get_job_position(job_first.id)) + def test_enqueue_in(self): """queue.enqueue_in() schedules job correctly""" queue = Queue(connection=self.testconn)