Lightning App Fixes from Training Studio App dev (#14532)
* update * update * update * update * update * update * update * update * update * update * update * update * update * update * update * update * update
This commit is contained in:
parent
b24f5f164a
commit
c81a71c908
|
@ -0,0 +1,17 @@
|
|||
_notebooks
|
||||
.azure
|
||||
.circleci
|
||||
.github
|
||||
.ipynb_checkpoints
|
||||
.pytest_cache
|
||||
.shared
|
||||
.storage
|
||||
.venv
|
||||
.vscode
|
||||
.git
|
||||
artifacts
|
||||
Datasets
|
||||
dist
|
||||
docs
|
||||
examples
|
||||
tests
|
|
@ -17,9 +17,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
|
|||
- Application storage prefix moved from `app_id` to `project_id/app_id` ([#14583](https://github.com/Lightning-AI/lightning/pull/14583))
|
||||
|
||||
|
||||
### Deprecated
|
||||
- Improve Lightning App connect logic by disconnecting automatically ([#14532](https://github.com/Lightning-AI/lightning/pull/14532))
|
||||
|
||||
-
|
||||
|
||||
### Fixed
|
||||
|
||||
|
@ -27,11 +26,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
|
|||
|
||||
- Resolved a bug where the wrong client was passed to collect cloud logs ([#14684](https://github.com/Lightning-AI/lightning/pull/14684))
|
||||
|
||||
### Removed
|
||||
|
||||
-
|
||||
- Unification of app template: moved `app.py` to root dir for `lightning init app <app_name>` template ([#13853](https://github.com/Lightning-AI/lightning/pull/13853))
|
||||
|
||||
|
||||
|
||||
- Fixed a bug where the uploaded command file wasn't properly parsed ([#14532](https://github.com/Lightning-AI/lightning/pull/14532))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Resolved `LightningApp(..., debug=True)` ([#14464](https://github.com/Lightning-AI/lightning/pull/14464))
|
||||
|
||||
|
|
|
@ -33,7 +33,8 @@ def connect(app_name_or_id: str, yes: bool = False):
|
|||
else:
|
||||
click.echo(f"You are already connected to the cloud Lightning App: {app_name_or_id}.")
|
||||
else:
|
||||
click.echo("You are already connected to a Lightning App. Please, use `lightning disconnect`.")
|
||||
disconnect()
|
||||
connect(app_name_or_id, yes)
|
||||
|
||||
elif app_name_or_id.startswith("localhost"):
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ def main():
|
|||
if app_name:
|
||||
# 3: Handle development use case.
|
||||
is_local_app = app_name == "localhost"
|
||||
if is_local_app and sys.argv[1:3] == ["run", "app"]:
|
||||
if sys.argv[1:3] == ["run", "app"] or sys.argv[1:3] == ["show", "logs"]:
|
||||
_main()
|
||||
else:
|
||||
if is_local_app:
|
||||
|
@ -147,9 +147,20 @@ def logs(app_name: str, components: List[str], follow: bool) -> None:
|
|||
if not components:
|
||||
components = app_component_names
|
||||
|
||||
for component in components:
|
||||
if component not in app_component_names:
|
||||
raise click.ClickException(f"Component '{component}' does not exist in app {app_name}.")
|
||||
else:
|
||||
|
||||
def add_prefix(c: str):
|
||||
if c == "flow":
|
||||
return c
|
||||
if not c.startswith("root."):
|
||||
return "root." + c
|
||||
return c
|
||||
|
||||
components = [add_prefix(c) for c in components]
|
||||
|
||||
for component in components:
|
||||
if component not in app_component_names:
|
||||
raise click.ClickException(f"Component '{component}' does not exist in app {app_name}.")
|
||||
|
||||
log_reader = _app_logs_reader(
|
||||
logs_api_client=_LightningLogsSocketAPI(client.api_client),
|
||||
|
|
|
@ -21,7 +21,7 @@ from websockets.exceptions import ConnectionClosed
|
|||
|
||||
from lightning_app.api.http_methods import HttpMethod
|
||||
from lightning_app.api.request_types import DeltaRequest
|
||||
from lightning_app.core.constants import FRONTEND_DIR
|
||||
from lightning_app.core.constants import ENABLE_STATE_WEBSOCKET, FRONTEND_DIR
|
||||
from lightning_app.core.queues import RedisQueue
|
||||
from lightning_app.utilities.app_helpers import InMemoryStateStore, Logger, StateStore
|
||||
from lightning_app.utilities.enum import OpenAPITags
|
||||
|
@ -261,6 +261,9 @@ async def healthz(response: Response):
|
|||
@fastapi_service.websocket("/api/v1/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
if not ENABLE_STATE_WEBSOCKET:
|
||||
await websocket.close()
|
||||
return
|
||||
try:
|
||||
counter = global_app_state_store.counter
|
||||
while True:
|
||||
|
|
|
@ -293,9 +293,16 @@ class LightningApp:
|
|||
component_output: t.Optional[ComponentDelta] = self.get_state_changed_from_queue(self.delta_queue)
|
||||
if component_output:
|
||||
logger.debug(f"Received from {component_output.id} : {component_output.delta.to_dict()}")
|
||||
work = self.get_component_by_name(component_output.id)
|
||||
new_work_delta = _delta_to_app_state_delta(self.root, work, deepcopy(component_output.delta))
|
||||
deltas.append(new_work_delta)
|
||||
|
||||
work = None
|
||||
try:
|
||||
work = self.get_component_by_name(component_output.id)
|
||||
except (KeyError, AttributeError) as e:
|
||||
logger.error(f"The component {component_output.id} couldn't be accessed. Exception: {e}")
|
||||
|
||||
if work:
|
||||
new_work_delta = _delta_to_app_state_delta(self.root, work, deepcopy(component_output.delta))
|
||||
deltas.append(new_work_delta)
|
||||
else:
|
||||
should_get_component_output = False
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ DOT_IGNORE_FILENAME = ".lightningignore"
|
|||
DEBUG_ENABLED = bool(int(os.getenv("LIGHTNING_DEBUG", "0")))
|
||||
LIGHTNING_COMPONENT_PUBLIC_REGISTRY = "https://lightning.ai/v1/components"
|
||||
LIGHTNING_APPS_PUBLIC_REGISTRY = "https://lightning.ai/v1/apps"
|
||||
ENABLE_STATE_WEBSOCKET = bool(int(os.getenv("ENABLE_STATE_WEBSOCKET", "0")))
|
||||
|
||||
|
||||
def get_lightning_cloud_url() -> str:
|
||||
|
|
|
@ -92,11 +92,6 @@ class Runtime:
|
|||
has_messaged = False
|
||||
while not self.done:
|
||||
try:
|
||||
for work in self.app.works:
|
||||
if not hasattr(work, "_has_called_on_exit"):
|
||||
work.on_exit()
|
||||
work._has_called_on_exit = True
|
||||
|
||||
if self.app.backend is not None:
|
||||
self.app.backend.stop_all_works(self.app.works)
|
||||
|
||||
|
|
|
@ -232,6 +232,16 @@ def run_app_in_cloud(
|
|||
|
||||
os.environ["LIGHTNING_APP_NAME"] = name
|
||||
|
||||
url = Config.url
|
||||
if url.endswith("/"):
|
||||
url = url[:-1]
|
||||
payload = {"apiKey": Config.api_key, "username": Config.username}
|
||||
res = requests.post(url + "/v1/auth/login", data=json.dumps(payload))
|
||||
if "token" not in res.json():
|
||||
raise Exception("You haven't properly setup your environment variables.")
|
||||
|
||||
token = res.json()["token"]
|
||||
|
||||
# 3. Disconnect from the App if any.
|
||||
Popen("lightning disconnect", shell=True).wait()
|
||||
|
||||
|
@ -273,7 +283,6 @@ def run_app_in_cloud(
|
|||
# 6. Create chromium browser, auth to lightning_app.ai and yield the admin and view pages.
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=bool(int(os.getenv("HEADLESS", "0"))))
|
||||
payload = {"apiKey": Config.api_key, "username": Config.username, "duration": "120000"}
|
||||
context = browser.new_context(
|
||||
# Eventually this will need to be deleted
|
||||
http_credentials=HttpCredentials(
|
||||
|
@ -283,11 +292,6 @@ def run_app_in_cloud(
|
|||
record_har_path=Config.har_location,
|
||||
)
|
||||
admin_page = context.new_page()
|
||||
url = Config.url
|
||||
if url.endswith("/"):
|
||||
url = url[:-1]
|
||||
res = requests.post(url + "/v1/auth/login", data=json.dumps(payload))
|
||||
token = res.json()["token"]
|
||||
print(f"The Lightning App Token is: {token}")
|
||||
print(f"The Lightning App user key is: {Config.key}")
|
||||
print(f"The Lightning App user id is: {Config.id}")
|
||||
|
|
|
@ -116,7 +116,9 @@ def _retrieve_application_url_and_available_commands(app_id_or_name_or_url: Opti
|
|||
raise Exception("The application is starting. Try in a few moments.")
|
||||
resp = requests.get(lightningapp.status.url + "/openapi.json")
|
||||
if resp.status_code != 200:
|
||||
raise Exception(f"The server didn't process the request properly. Found {resp.json()}")
|
||||
raise Exception(
|
||||
"The server didn't process the request properly. " "Try once your application is ready."
|
||||
)
|
||||
return lightningapp.status.url, _extract_command_from_openapi(resp.json()), lightningapp.id
|
||||
return None, None, None
|
||||
|
||||
|
|
|
@ -60,9 +60,9 @@ class ClientCommand:
|
|||
def run(self, **cli_kwargs) -> None:
|
||||
"""Overrides with the logic to execute on the client side."""
|
||||
|
||||
def invoke_handler(self, config: BaseModel) -> Dict[str, Any]:
|
||||
def invoke_handler(self, config: Optional[BaseModel] = None) -> Dict[str, Any]:
|
||||
command = self.command_name.replace(" ", "_")
|
||||
resp = requests.post(self.app_url + f"/command/{command}", data=config.json())
|
||||
resp = requests.post(self.app_url + f"/command/{command}", data=config.json() if config else None)
|
||||
assert resp.status_code == 200, resp.json()
|
||||
return resp.json()
|
||||
|
||||
|
@ -155,6 +155,7 @@ def _validate_client_command(command: ClientCommand):
|
|||
def _upload_command(command_name: str, command: ClientCommand) -> Optional[str]:
|
||||
from lightning_app.storage.path import _is_s3fs_available, filesystem, shared_storage_path
|
||||
|
||||
command_name = command_name.replace(" ", "_")
|
||||
filepath = f"commands/{command_name}.py"
|
||||
remote_url = str(shared_storage_path() / "artifacts" / filepath)
|
||||
fs = filesystem()
|
||||
|
@ -164,6 +165,7 @@ def _upload_command(command_name: str, command: ClientCommand) -> Optional[str]:
|
|||
|
||||
if not isinstance(fs, S3FileSystem):
|
||||
return
|
||||
|
||||
source_file = str(inspect.getfile(command.__class__))
|
||||
remote_url = str(shared_storage_path() / "artifacts" / filepath)
|
||||
fs.put(source_file, remote_url)
|
||||
|
|
|
@ -405,8 +405,29 @@ class WorkRunner:
|
|||
except BaseException as e:
|
||||
# 10.2 Send failed delta to the flow.
|
||||
reference_state = deepcopy(self.work.state)
|
||||
exp, val, tb = sys.exc_info()
|
||||
listing = traceback.format_exception(exp, val, tb)
|
||||
user_exception = False
|
||||
used_runpy = False
|
||||
trace = []
|
||||
for p in listing:
|
||||
if "runpy.py" in p:
|
||||
trace = []
|
||||
used_runpy = True
|
||||
if user_exception:
|
||||
trace.append(p)
|
||||
if "ret = work_run(*args, **kwargs)" in p:
|
||||
user_exception = True
|
||||
|
||||
if used_runpy:
|
||||
trace = trace[1:]
|
||||
|
||||
self.work._calls[call_hash]["statuses"].append(
|
||||
make_status(WorkStageStatus.FAILED, message=str(e), reason=WorkFailureReasons.USER_EXCEPTION)
|
||||
make_status(
|
||||
WorkStageStatus.FAILED,
|
||||
message=str("\n".join(trace)),
|
||||
reason=WorkFailureReasons.USER_EXCEPTION,
|
||||
)
|
||||
)
|
||||
self.delta_queue.put(
|
||||
ComponentDelta(
|
||||
|
@ -452,6 +473,7 @@ class WorkRunner:
|
|||
logger.info(f"Received SIGTERM signal. Gracefully terminating {self.work.name.replace('root.', '')}...")
|
||||
persist_artifacts(work=self.work)
|
||||
with _state_observer_lock:
|
||||
self.work.on_exit()
|
||||
self.work._calls[call_hash]["statuses"] = []
|
||||
state = deepcopy(self.work.state)
|
||||
self.work._calls[call_hash]["statuses"].append(
|
||||
|
|
|
@ -7,7 +7,7 @@ from lightning_app.cli.lightning_cli import logs
|
|||
|
||||
@mock.patch("lightning_app.cli.lightning_cli.LightningClient")
|
||||
@mock.patch("lightning_app.cli.lightning_cli._get_project")
|
||||
def test_show_logs_errors(project, client):
|
||||
def test_show_logs_errors(_, client):
|
||||
"""Test that the CLI prints the errors for the show logs command."""
|
||||
|
||||
runner = CliRunner()
|
||||
|
@ -58,4 +58,4 @@ def test_show_logs_errors(project, client):
|
|||
result = runner.invoke(logs, ["MyFakeApp", "NonExistentComponent"])
|
||||
|
||||
assert result.exit_code == 1
|
||||
assert "Component 'NonExistentComponent' does not exist in app MyFakeApp." in result.output
|
||||
assert "Component 'root.NonExistentComponent' does not exist in app MyFakeApp." in result.output
|
||||
|
|
|
@ -55,7 +55,7 @@ def test_popen_python_script_failure():
|
|||
)
|
||||
run_work_isolated(python_script)
|
||||
assert python_script.has_failed
|
||||
assert python_script.status.message == "1"
|
||||
assert "Exception(self.exit_code)" in python_script.status.message
|
||||
|
||||
|
||||
def test_tracer_python_script_with_kwargs():
|
||||
|
@ -96,7 +96,7 @@ def test_tracer_component_with_code():
|
|||
|
||||
python_script = TracerPythonScript("file.py", script_args=["--b=1"], raise_exception=False, code=code)
|
||||
run_work_isolated(python_script, params={"a": "1"}, restart_count=0)
|
||||
assert python_script.status.message == "An error"
|
||||
assert "An error" in python_script.status.message
|
||||
|
||||
with open("file.py", "w") as f:
|
||||
f.write("import sys\n")
|
||||
|
|
Loading…
Reference in New Issue