diff --git a/rq/registry.py b/rq/registry.py index 579d0c5c..5a5c078b 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -286,11 +286,11 @@ class ScheduledJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() return connection.zremrangebyscore(self.key, 0, score) - def get_jobs_to_schedule(self, timestamp=None): + def get_jobs_to_schedule(self, timestamp=None, chunk_size=1000): """Remove jobs whose timestamp is in the past from registry.""" score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, score)] + self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)] def get_scheduled_time(self, job_or_id): """Returns datetime (UTC) at which job is scheduled to be enqueued""" diff --git a/rq/scheduler.py b/rq/scheduler.py index b55f728a..4d84b48a 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -28,7 +28,6 @@ setup_loghandlers( class RQScheduler(object): - # STARTED: scheduler has been started but sleeping # WORKING: scheduler is in the midst of scheduling jobs # STOPPED: scheduler is in stopped condition @@ -137,11 +136,11 @@ class RQScheduler(object): queue = Queue(registry.name, connection=self.connection) with self.connection.pipeline() as pipeline: - # This should be done in bulk - for job_id in job_ids: - job = Job.fetch(job_id, connection=self.connection) - queue.enqueue_job(job, pipeline=pipeline) - registry.remove_jobs(timestamp) + jobs = Job.fetch_many(job_ids, connection=self.connection) + for job in jobs: + if job is not None: + queue.enqueue_job(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() self._status = self.Status.STARTED diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 480e993e..13c68e27 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -35,6 +35,21 @@ class TestScheduledJobRegistry(RQTestCase): self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20), ['foo', 'bar']) + def test_get_jobs_to_schedule_with_chunk_size(self): + """Max amount of jobs returns by get_jobs_to_schedule() equal to chunk_size""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + timestamp = current_timestamp() + chunk_size = 5 + + for index in range(0, chunk_size * 2): + self.testconn.zadd(registry.key, {'foo_{}'.format(index): 1}) + + self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size)), + chunk_size) + self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size * 2)), + chunk_size * 2) + def test_get_scheduled_time(self): """get_scheduled_time() returns job's scheduled datetime""" queue = Queue(connection=self.testconn) @@ -87,7 +102,7 @@ class TestScheduledJobRegistry(RQTestCase): with mock_tz, mock_day, mock_atz: registry.schedule(job, datetime(2019, 1, 1)) self.assertEqual(self.testconn.zscore(registry.key, job.id), - 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp + 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp # second, time.daylight != 0 (in DST) # mock the sitatuoin for American/New_York not in DST (UTC - 4) @@ -100,8 +115,7 @@ class TestScheduledJobRegistry(RQTestCase): with mock_tz, mock_day, mock_atz: registry.schedule(job, datetime(2019, 1, 1)) self.assertEqual(self.testconn.zscore(registry.key, job.id), - 1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp - + 1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp # Score is always stored in UTC even if datetime is in a different tz tz = timezone(timedelta(hours=7))