[App] Add CloudMultiProcessBackend to run an children App within the Flow in the cloud (#15800)

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* updte

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* Update src/lightning_app/CHANGELOG.md

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>
This commit is contained in:
thomas chaton 2022-11-24 15:36:37 +00:00 committed by GitHub
parent 0a1273187a
commit 8ca6dfe646
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 312 additions and 10 deletions

View File

@ -138,6 +138,7 @@ module = [
"lightning_app.utilities.packaging.cloud_compute",
"lightning_app.utilities.packaging.docker",
"lightning_app.utilities.packaging.lightning_utils",
"lightning_app.utilities.port",
"lightning_app.utilities.proxies",
"lightning_app.utilities.scheduler",
"lightning_app.utilities.state",

View File

@ -13,6 +13,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Show a message when `BuildConfig(requirements=[...])` is passed but a `requirements.txt` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))
- Show a message when `BuildConfig(dockerfile="...")` is passed but a `Dockerfile` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))
- Added a CloudMultiProcessBackend which enables running a child App from within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800))
### Changed
- `lightning add ssh-key` CLI command has been transitioned to `lightning create ssh-key` with the same calling signature ([#15761](https://github.com/Lightning-AI/lightning/pull/15761))

View File

@ -54,7 +54,7 @@ def get_app_url(runtime_type: RuntimeType, *args: Any, need_credits: bool = Fals
action = "?action=add_credits" if need_credits else ""
return f"{get_lightning_cloud_url()}/me/apps/{lit_app.id}{action}"
else:
return "http://127.0.0.1:7501/view"
return os.getenv("APP_SERVER_HOST", "http://127.0.0.1:7501/view")
def main() -> None:

View File

@ -138,6 +138,7 @@ class LightningApp:
self._schedules: Dict[str, Dict] = {}
self.threads: List[threading.Thread] = []
self.exception = None
self.collect_changes: bool = True
# NOTE: Checkpointing is disabled by default for the time being. We
# will enable it when resuming from full checkpoint is supported. Also,
@ -362,11 +363,14 @@ class LightningApp:
delta.raise_errors = False
return deltas
def maybe_apply_changes(self) -> bool:
def maybe_apply_changes(self) -> None:
"""Get the deltas from both the flow queue and the work queue, merge the two deltas and update the
state."""
self._send_flow_to_work_deltas(self.state)
if not self.collect_changes:
return None
deltas = self._collect_deltas_from_ui_and_work_queues()
if not deltas:

View File

@ -3,6 +3,8 @@ from pathlib import Path
import lightning_cloud.env
from lightning_app.utilities.port import _find_lit_app_port
def get_lightning_cloud_url() -> str:
# DO NOT CHANGE!
@ -19,7 +21,8 @@ FLOW_DURATION_THRESHOLD = 1.0
FLOW_DURATION_SAMPLES = 5
APP_SERVER_HOST = os.getenv("LIGHTNING_APP_STATE_URL", "http://127.0.0.1")
APP_SERVER_PORT = 7501
APP_SERVER_IN_CLOUD = "http://lightningapp" in APP_SERVER_HOST
APP_SERVER_PORT = _find_lit_app_port(7501)
APP_STATE_MAX_SIZE_BYTES = 1024 * 1024 # 1 MB
CLOUD_QUEUE_TYPE = os.getenv("LIGHTNING_CLOUD_QUEUE_TYPE", None)
@ -52,9 +55,6 @@ LIGHTNING_APPS_PUBLIC_REGISTRY = "https://lightning.ai/v1/apps"
# EXPERIMENTAL: ENV VARIABLES TO ENABLE MULTIPLE WORKS IN THE SAME MACHINE
DEFAULT_NUMBER_OF_EXPOSED_PORTS = int(os.getenv("DEFAULT_NUMBER_OF_EXPOSED_PORTS", "50"))
ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER = bool(
int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))
) # Note: This is disabled for the time being.
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER = bool(
int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER", "0"))
) # This isn't used in the cloud yet.
@ -71,3 +71,7 @@ ENABLE_PUSHING_STATE_ENDPOINT = ENABLE_PULLING_STATE_ENDPOINT and bool(
)
ENABLE_STATE_WEBSOCKET = bool(int(os.getenv("ENABLE_STATE_WEBSOCKET", "0")))
ENABLE_UPLOAD_ENDPOINT = bool(int(os.getenv("ENABLE_UPLOAD_ENDPOINT", "1")))
def enable_multiple_works_in_default_container() -> bool:
return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))

View File

@ -1,9 +1,10 @@
from enum import Enum
from lightning_app.core.constants import APP_SERVER_IN_CLOUD
from lightning_app.runners.backends.backend import Backend
from lightning_app.runners.backends.cloud import CloudBackend
from lightning_app.runners.backends.docker import DockerBackend
from lightning_app.runners.backends.mp_process import MultiProcessingBackend
from lightning_app.runners.backends.mp_process import CloudMultiProcessingBackend, MultiProcessingBackend
class BackendType(Enum):
@ -13,6 +14,8 @@ class BackendType(Enum):
def get_backend(self, entrypoint_file: str) -> "Backend":
if self == BackendType.MULTIPROCESSING:
if APP_SERVER_IN_CLOUD:
return CloudMultiProcessingBackend(entrypoint_file)
return MultiProcessingBackend(entrypoint_file)
elif self == BackendType.DOCKER:
return DockerBackend(entrypoint_file)

View File

@ -6,6 +6,7 @@ from lightning_app.core.queues import QueuingSystem
from lightning_app.runners.backends.backend import Backend, WorkManager
from lightning_app.utilities.enum import WorkStageStatus
from lightning_app.utilities.network import _check_service_url_is_ready
from lightning_app.utilities.port import disable_port, enable_port
from lightning_app.utilities.proxies import ProxyWorkRun, WorkRunner
@ -83,3 +84,24 @@ class MultiProcessingBackend(Backend):
def stop_work(self, app, work: "lightning_app.LightningWork") -> None:
work_manager: MultiProcessWorkManager = app.processes[work.name]
work_manager.kill()
class CloudMultiProcessingBackend(MultiProcessingBackend):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Note: Track the open ports to close them on termination.
self.ports = []
def create_work(self, app, work) -> None:
work._host = "0.0.0.0"
nc = enable_port()
self.ports.append(nc.port)
work._port = nc.port
work._future_url = f"https://{nc.host}"
return super().create_work(app, work)
def stop_work(self, app, work: "lightning_app.LightningWork") -> None:
disable_port(work._port)
self.ports = [port for port in self.ports if port != work._port]
return super().stop_work(app, work)

View File

@ -51,7 +51,7 @@ from lightning_app.core.constants import (
DISABLE_DEPENDENCY_CACHE,
DOT_IGNORE_FILENAME,
ENABLE_APP_COMMENT_COMMAND_EXECUTION,
ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER,
enable_multiple_works_in_default_container,
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER,
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
@ -243,7 +243,7 @@ class CloudRuntime(Runtime):
if self.run_app_comment_commands or ENABLE_APP_COMMENT_COMMAND_EXECUTION:
v1_env_vars.append(V1EnvVar(name="ENABLE_APP_COMMENT_COMMAND_EXECUTION", value="1"))
if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:
if enable_multiple_works_in_default_container():
v1_env_vars.append(V1EnvVar(name="ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", value="1"))
if ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER:
@ -303,7 +303,7 @@ class CloudRuntime(Runtime):
)
network_configs: Optional[List[V1NetworkConfig]] = None
if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:
if enable_multiple_works_in_default_container():
network_configs = []
initial_port = 8080 + 1 + len(frontend_specs)
for _ in range(DEFAULT_NUMBER_OF_EXPOSED_PORTS):

View File

@ -5,6 +5,7 @@ from typing import Any, Callable, Optional, Union
from lightning_app.api.http_methods import _add_tags_to_api, _validate_api
from lightning_app.core.api import start_server
from lightning_app.core.constants import APP_SERVER_IN_CLOUD
from lightning_app.runners.backends import Backend
from lightning_app.runners.runtime import Runtime
from lightning_app.storage.orchestrator import StorageOrchestrator
@ -13,6 +14,7 @@ from lightning_app.utilities.commands.base import _commands_to_api, _prepare_com
from lightning_app.utilities.component import _set_flow_context, _set_frontend_context
from lightning_app.utilities.load_app import extract_metadata_from_app
from lightning_app.utilities.network import find_free_network_port
from lightning_app.utilities.port import disable_port
@dataclass
@ -31,6 +33,9 @@ class MultiProcessRuntime(Runtime):
try:
_set_flow_context()
# Note: In case the runtime is used in the cloud.
self.host = "0.0.0.0" if APP_SERVER_IN_CLOUD else self.host
self.app.backend = self.backend
self.backend._prepare_queues(self.app)
self.backend.resolve_url(self.app, "http://127.0.0.1")
@ -109,3 +114,11 @@ class MultiProcessRuntime(Runtime):
raise
finally:
self.terminate()
def terminate(self):
if APP_SERVER_IN_CLOUD:
# Close all the ports open for the App within the App.
ports = [self.port] + getattr(self.backend, "ports", [])
for port in ports:
disable_port(port)
super().terminate()

View File

@ -0,0 +1,143 @@
import os
from typing import Optional
from lightning_cloud.openapi import AppinstancesIdBody, Externalv1LightningappInstance, V1NetworkConfig
from lightning_app.utilities.network import LightningClient
def _find_lit_app_port(default_port: int) -> int:
"""Make a request to the cloud controlplane to find a disabled port of the flow, enable it and return it."""
app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)
enable_multiple_works_in_default_container = bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))
if not app_id or not project_id or not enable_multiple_works_in_default_container:
return default_port
client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None
for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp
if not lit_app:
raise RuntimeError(
"App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)
found_nc = None
for nc in lit_app.spec.network_config:
if not nc.enable:
found_nc = nc
nc.enable = True
break
client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)
if not found_nc:
raise RuntimeError(
"No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)
# Note: This is required for the framework to know we need to use the CloudMultiProcessRuntime.
os.environ["APP_SERVER_HOST"] = f"https://{found_nc.host}"
return found_nc.port
def enable_port() -> V1NetworkConfig:
"""Make a request to the cloud controlplane to open a port of the flow."""
app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)
if not app_id or not project_id:
raise Exception("The app_id and project_id should be defined.")
client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None
for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp
if not lit_app:
raise RuntimeError(
"App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)
found_nc = None
for nc in lit_app.spec.network_config:
if not nc.enable:
found_nc = nc
nc.enable = True
break
client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)
if not found_nc:
raise RuntimeError(
"No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)
return found_nc
def disable_port(port: int, ignore_disabled: bool = True) -> None:
"""Make a request to the cloud controlplane to close a port of the flow."""
app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)
if not app_id or not project_id:
raise Exception("The app_id and project_id should be defined.")
client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None
for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp
if not lit_app:
raise RuntimeError(
"App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)
found_nc = None
for nc in lit_app.spec.network_config:
if nc.port == port:
if not nc.enable and not ignore_disabled:
raise RuntimeError(f"The port {port} was already disabled.")
nc.enable = False
found_nc = nc
break
client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)
if not found_nc:
ports = [nc.port for nc in lit_app.spec.network_config]
raise ValueError(f"The provided port doesn't exists. Available ports are {ports}.")
assert found_nc

View File

@ -0,0 +1,109 @@
from unittest.mock import MagicMock
import pytest
from lightning_cloud.openapi import V1NetworkConfig
from lightning_app.utilities import port
from lightning_app.utilities.port import _find_lit_app_port, disable_port, enable_port
def test_find_lit_app_port(monkeypatch):
client = MagicMock()
monkeypatch.setattr(port, "LightningClient", MagicMock(return_value=client))
assert 5701 == _find_lit_app_port(5701)
resp = MagicMock()
lit_app = MagicMock()
lit_app.id = "a"
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=False),
]
resp.lightningapps = [lit_app]
client.lightningapp_instance_service_list_lightningapp_instances.return_value = resp
monkeypatch.setenv("LIGHTNING_CLOUD_APP_ID", "a")
monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "a")
monkeypatch.setenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "1")
assert _find_lit_app_port(5701) == 1
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=True),
]
with pytest.raises(RuntimeError, match="No available port was found. Please"):
_find_lit_app_port(5701)
def test_enable_port(monkeypatch):
client = MagicMock()
monkeypatch.setattr(port, "LightningClient", MagicMock(return_value=client))
assert 5701 == _find_lit_app_port(5701)
resp = MagicMock()
lit_app = MagicMock()
lit_app.id = "a"
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=False),
]
resp.lightningapps = [lit_app]
client.lightningapp_instance_service_list_lightningapp_instances.return_value = resp
monkeypatch.setenv("LIGHTNING_CLOUD_APP_ID", "a")
monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "a")
monkeypatch.setenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "1")
assert enable_port()
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=True),
]
with pytest.raises(RuntimeError, match="No available port was found. Please"):
assert enable_port()
def test_disable_port(monkeypatch):
client = MagicMock()
monkeypatch.setattr(port, "LightningClient", MagicMock(return_value=client))
assert 5701 == _find_lit_app_port(5701)
resp = MagicMock()
lit_app = MagicMock()
lit_app.id = "a"
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=False),
]
resp.lightningapps = [lit_app]
client.lightningapp_instance_service_list_lightningapp_instances.return_value = resp
monkeypatch.setenv("LIGHTNING_CLOUD_APP_ID", "a")
monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "a")
monkeypatch.setenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "1")
disable_port(0)
assert not lit_app.spec.network_config[0].enable
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=False),
]
with pytest.raises(RuntimeError, match="The port 1 was already disabled."):
disable_port(1, ignore_disabled=False)
lit_app.spec.network_config = [
V1NetworkConfig(host="a", port=0, enable=True),
V1NetworkConfig(host="a", port=1, enable=False),
]
with pytest.raises(ValueError, match="[0, 1]"):
assert disable_port(10)