scheduler: now operates with chunks of jobs (#1355)

* scheduler: now operates with chunks of jobs

* scheduler: set default chunk_size for ScheduledJobRegistry.get_jobs_to_schedule

* scheduler: fixed missing indent

* scheduler: added test for get_jobs_to_schedule() with chunk_size parameter

* scheduler: fixed test for passing python 3.5 (no f-strings)

* scheduler: fixed chunk_size in test make it lighter to run
This commit is contained in:
Nikita Romaniuk 2020-10-20 02:57:03 +03:00 committed by GitHub
parent 9adcd7e50c
commit 2da957a68d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 11 deletions

View File

@ -286,11 +286,11 @@ class ScheduledJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp()
return connection.zremrangebyscore(self.key, 0, score)
def get_jobs_to_schedule(self, timestamp=None):
def get_jobs_to_schedule(self, timestamp=None, chunk_size=1000):
"""Remove jobs whose timestamp is in the past from registry."""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, score)]
self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)]
def get_scheduled_time(self, job_or_id):
"""Returns datetime (UTC) at which job is scheduled to be enqueued"""

View File

@ -28,7 +28,6 @@ setup_loghandlers(
class RQScheduler(object):
# STARTED: scheduler has been started but sleeping
# WORKING: scheduler is in the midst of scheduling jobs
# STOPPED: scheduler is in stopped condition
@ -137,11 +136,11 @@ class RQScheduler(object):
queue = Queue(registry.name, connection=self.connection)
with self.connection.pipeline() as pipeline:
# This should be done in bulk
for job_id in job_ids:
job = Job.fetch(job_id, connection=self.connection)
queue.enqueue_job(job, pipeline=pipeline)
registry.remove_jobs(timestamp)
jobs = Job.fetch_many(job_ids, connection=self.connection)
for job in jobs:
if job is not None:
queue.enqueue_job(job, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
pipeline.execute()
self._status = self.Status.STARTED

View File

@ -35,6 +35,21 @@ class TestScheduledJobRegistry(RQTestCase):
self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20),
['foo', 'bar'])
def test_get_jobs_to_schedule_with_chunk_size(self):
"""Max amount of jobs returns by get_jobs_to_schedule() equal to chunk_size"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
timestamp = current_timestamp()
chunk_size = 5
for index in range(0, chunk_size * 2):
self.testconn.zadd(registry.key, {'foo_{}'.format(index): 1})
self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size)),
chunk_size)
self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size * 2)),
chunk_size * 2)
def test_get_scheduled_time(self):
"""get_scheduled_time() returns job's scheduled datetime"""
queue = Queue(connection=self.testconn)
@ -87,7 +102,7 @@ class TestScheduledJobRegistry(RQTestCase):
with mock_tz, mock_day, mock_atz:
registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp
1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp
# second, time.daylight != 0 (in DST)
# mock the sitatuoin for American/New_York not in DST (UTC - 4)
@ -100,8 +115,7 @@ class TestScheduledJobRegistry(RQTestCase):
with mock_tz, mock_day, mock_atz:
registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp
1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp
# Score is always stored in UTC even if datetime is in a different tz
tz = timezone(timedelta(hours=7))