[App] Scale out/in interval for autoscaler (#16093)

* Adding arguments for scale out/in interval

* Tests
This commit is contained in:
Sherin Thomas 2022-12-19 13:49:00 +00:00 committed by GitHub
parent aba5f12352
commit 0fd3d54205
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 27 deletions

View File

@ -75,7 +75,8 @@ app = L.LightningApp(
# autoscaler specific args
min_replicas=1,
max_replicas=4,
autoscale_interval=10,
scale_out_interval=10,
scale_in_interval=10,
endpoint="predict",
input_type=L.app.components.Image,
output_type=L.app.components.Number,

View File

@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063))
- Enabled users to have more control over scaling out/in interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093))
- Added more datatypes to serving component ([#16018](https://github.com/Lightning-AI/lightning/pull/16018))
- Added `work.delete` method to delete the work ([#16103](https://github.com/Lightning-AI/lightning/pull/16103))

View File

@ -409,7 +409,8 @@ class AutoScaler(LightningFlow):
Args:
min_replicas: The number of works to start when app initializes.
max_replicas: The max number of works to spawn to handle the incoming requests.
autoscale_interval: The number of seconds to wait before checking whether to upscale or downscale the works.
scale_out_interval: The number of seconds to wait before checking whether to increase the number of servers.
scale_in_interval: The number of seconds to wait before checking whether to decrease the number of servers.
endpoint: Provide the REST API path.
max_batch_size: (auto-batching) The number of requests to process at once.
timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process.
@ -426,7 +427,8 @@ class AutoScaler(LightningFlow):
MyPythonServer,
min_replicas=1,
max_replicas=8,
autoscale_interval=10,
scale_out_interval=10,
scale_in_interval=10,
)
)
@ -455,7 +457,8 @@ class AutoScaler(LightningFlow):
MyPythonServer,
min_replicas=1,
max_replicas=8,
autoscale_interval=10,
scale_out_interval=10,
scale_in_interval=10,
max_batch_size=8, # for auto batching
timeout_batching=1, # for auto batching
)
@ -467,7 +470,8 @@ class AutoScaler(LightningFlow):
work_cls: Type[LightningWork],
min_replicas: int = 1,
max_replicas: int = 4,
autoscale_interval: int = 10,
scale_out_interval: int = 10,
scale_in_interval: int = 10,
max_batch_size: int = 8,
timeout_batching: float = 1,
endpoint: str = "api/predict",
@ -486,7 +490,8 @@ class AutoScaler(LightningFlow):
self._input_type = input_type
self._output_type = output_type
self.autoscale_interval = autoscale_interval
self.scale_out_interval = scale_out_interval
self.scale_in_interval = scale_in_interval
self.max_batch_size = max_batch_size
if max_replicas < min_replicas:
@ -612,11 +617,6 @@ class AutoScaler(LightningFlow):
def autoscale(self) -> None:
"""Adjust the number of works based on the target number returned by ``self.scale``."""
if time.time() - self._last_autoscale < self.autoscale_interval:
return
self.load_balancer.update_servers(self.workers)
metrics = {
"pending_requests": self.num_pending_requests,
"pending_works": self.num_pending_works,
@ -628,23 +628,29 @@ class AutoScaler(LightningFlow):
min(self.max_replicas, self.scale(self.num_replicas, metrics)),
)
# upscale
num_workers_to_add = num_target_workers - self.num_replicas
for _ in range(num_workers_to_add):
logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}")
work = self.create_work()
new_work_id = self.add_work(work)
logger.info(f"Work created: '{new_work_id}'")
# scale-out
if time.time() - self._last_autoscale > self.scale_out_interval:
num_workers_to_add = num_target_workers - self.num_replicas
for _ in range(num_workers_to_add):
logger.info(f"Scaling out from {self.num_replicas} to {self.num_replicas + 1}")
work = self.create_work()
# TODO: move works into structures
new_work_id = self.add_work(work)
logger.info(f"Work created: '{new_work_id}'")
if num_workers_to_add > 0:
self._last_autoscale = time.time()
# downscale
num_workers_to_remove = self.num_replicas - num_target_workers
for _ in range(num_workers_to_remove):
logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}")
removed_work_id = self.remove_work(self.num_replicas - 1)
logger.info(f"Work removed: '{removed_work_id}'")
# scale-in
if time.time() - self._last_autoscale > self.scale_in_interval:
num_workers_to_remove = self.num_replicas - num_target_workers
for _ in range(num_workers_to_remove):
logger.info(f"Scaling in from {self.num_replicas} to {self.num_replicas - 1}")
removed_work_id = self.remove_work(self.num_replicas - 1)
logger.info(f"Work removed: '{removed_work_id}'")
if num_workers_to_remove > 0:
self._last_autoscale = time.time()
self.load_balancer.update_servers(self.workers)
self._last_autoscale = time.time()
def configure_layout(self):
tabs = [

View File

@ -42,7 +42,8 @@ def test_num_replicas_not_above_max_replicas(*_):
EmptyWork,
min_replicas=1,
max_replicas=max_replicas,
autoscale_interval=0.001,
scale_out_interval=0.001,
scale_in_interval=0.001,
)
for _ in range(max_replicas + 1):
@ -62,7 +63,8 @@ def test_num_replicas_not_belo_min_replicas(*_):
EmptyWork,
min_replicas=min_replicas,
max_replicas=4,
autoscale_interval=0.001,
scale_out_interval=0.001,
scale_in_interval=0.001,
)
for _ in range(3):
@ -131,3 +133,43 @@ def test_API_ACCESS_ENDPOINT_creation():
auto_scaler.load_balancer.run()
fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static")
def test_autoscaler_scale_up(monkeypatch):
monkeypatch.setattr(AutoScaler, "num_pending_works", 0)
monkeypatch.setattr(AutoScaler, "num_pending_requests", 100)
monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=1))
monkeypatch.setattr(AutoScaler, "create_work", mock.MagicMock())
monkeypatch.setattr(AutoScaler, "add_work", mock.MagicMock())
auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_out_interval=0.001)
# Mocking the attributes
auto_scaler._last_autoscale = time.time() - 100000
auto_scaler.num_replicas = 0
# triggering scale up
auto_scaler.autoscale()
auto_scaler.scale.assert_called_once()
auto_scaler.create_work.assert_called_once()
auto_scaler.add_work.assert_called_once()
def test_autoscaler_scale_down(monkeypatch):
monkeypatch.setattr(AutoScaler, "num_pending_works", 0)
monkeypatch.setattr(AutoScaler, "num_pending_requests", 0)
monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=0))
monkeypatch.setattr(AutoScaler, "remove_work", mock.MagicMock())
monkeypatch.setattr(AutoScaler, "workers", mock.MagicMock())
auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_in_interval=0.001)
# Mocking the attributes
auto_scaler._last_autoscale = time.time() - 100000
auto_scaler.num_replicas = 1
auto_scaler.__dict__["load_balancer"] = mock.MagicMock()
# triggering scale up
auto_scaler.autoscale()
auto_scaler.scale.assert_called_once()
auto_scaler.remove_work.assert_called_once()