[App] Application logs in CLI (#13634)

This commit is contained in:
Adam Bobowski 2022-08-10 11:48:06 +02:00 committed by GitHub
parent dc8ff5ed26
commit ddb476d334
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 398 additions and 4 deletions

View File

@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Add support to run Lightning apps on Lightning AI BYOC clusters ([#13894](https://github.com/Lightning-AI/lightning/pull/13894))
- Add support for listing Lightning AI apps ([#13987](https://github.com/Lightning-AI/lightning/pull/13987))
- Adds `LightningTrainingComponent`. `LightningTrainingComponent` orchestrates multi-node training in the cloud ([#13830](https://github.com/Lightning-AI/lightning/pull/13830))
- Add support for printing application logs using CLI `lightning show logs <app_name> [components]` ([#13634](https://github.com/Lightning-AI/lightning/pull/13634))
### Changed

View File

@ -8,7 +8,9 @@ from uuid import uuid4
import click
import requests
import rich
from requests.exceptions import ConnectionError
from rich.color import ANSI_COLOR_NAMES
from lightning_app import __version__ as ver
from lightning_app.cli import cmd_init, cmd_install, cmd_pl_init, cmd_react_ui_init
@ -18,12 +20,15 @@ from lightning_app.cli.lightning_cli_list import get_list
from lightning_app.core.constants import get_lightning_cloud_url, LOCAL_LAUNCH_ADMIN_VIEW
from lightning_app.runners.runtime import dispatch
from lightning_app.runners.runtime_type import RuntimeType
from lightning_app.utilities.app_logs import _app_logs_reader
from lightning_app.utilities.cli_helpers import (
_format_input_env_variables,
_retrieve_application_url_and_available_commands,
)
from lightning_app.utilities.cloud import _get_project
from lightning_app.utilities.install_components import register_all_external_components
from lightning_app.utilities.login import Auth
from lightning_app.utilities.network import LightningClient
from lightning_app.utilities.state import headers_for
logger = logging.getLogger(__name__)
@ -50,9 +55,93 @@ def main():
@click.version_option(ver)
def _main():
register_all_external_components()
@_main.group()
def show():
"""Show given resource."""
pass
@show.command()
@click.argument("app_name", required=False)
@click.argument("components", nargs=-1, required=False)
@click.option("-f", "--follow", required=False, is_flag=True, help="Wait for new logs, to exit use CTRL+C.")
def logs(app_name: str, components: List[str], follow: bool) -> None:
"""Show cloud application logs. By default prints logs for all currently available components.
Example uses:
Print all application logs:
$ lightning show logs my-application
Print logs only from the flow (no work):
$ lightning show logs my-application flow
Print logs only from selected works:
$ lightning show logs my-application root.work_a root.work_b
"""
client = LightningClient()
project = _get_project(client)
apps = {
app.name: app
for app in client.lightningapp_instance_service_list_lightningapp_instances(project.project_id).lightningapps
}
if not apps:
raise click.ClickException(
"You don't have any application in the cloud. Please, run an application first with `--cloud`."
)
if not app_name:
raise click.ClickException(
f"You have not specified any Lightning App. Please select one of available: [{', '.join(apps.keys())}]"
)
if app_name not in apps:
raise click.ClickException(
f"The Lightning App '{app_name}' does not exist. Please select one of following: [{', '.join(apps.keys())}]"
)
# Fetch all lightning works from given application
# 'Flow' component is somewhat implicit, only one for whole app,
# and not listed in lightningwork API - so we add it directly to the list
works = client.lightningwork_service_list_lightningwork(
project_id=project.project_id, app_id=apps[app_name].id
).lightningworks
app_component_names = ["flow"] + [f.name for f in apps[app_name].spec.flow_servers] + [w.name for w in works]
if not components:
components = app_component_names
for component in components:
if component not in app_component_names:
raise click.ClickException(f"Component '{component}' does not exist in app {app_name}.")
log_reader = _app_logs_reader(
client=client,
project_id=project.project_id,
app_id=apps[app_name].id,
component_names=components,
follow=follow,
)
rich_colors = list(ANSI_COLOR_NAMES)
colors = {c: rich_colors[i + 1] for i, c in enumerate(components)}
for component_name, log_event in log_reader:
date = log_event.timestamp.strftime("%m/%d/%Y %H:%M:%S")
color = colors[component_name]
rich.print(f"[{color}]{component_name}[/{color}] {date} {log_event.message}")
@_main.command()
def login():
"""Log in to your Lightning.ai account."""

View File

@ -318,7 +318,7 @@ def run_app_in_cloud(app_folder: str, app_name: str = "app.py", extra_args: [str
)
try:
yield admin_page, view_page, fetch_logs
yield admin_page, view_page, fetch_logs, name
except KeyboardInterrupt:
pass
finally:

View File

@ -0,0 +1,125 @@
import json
import queue
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from json import JSONDecodeError
from threading import Thread
from typing import Iterator, List, Optional, Tuple
import dateutil.parser
from websocket import WebSocketApp
from lightning_app.utilities.logs_socket_api import _LightningLogsSocketAPI
from lightning_app.utilities.network import LightningClient
@dataclass
class _LogEventLabels:
app: str
container: str
filename: str
job: str
namespace: str
node_name: str
pod: str
stream: Optional[str] = None
@dataclass
class _LogEvent:
message: str
timestamp: datetime
labels: _LogEventLabels
def _push_logevents_to_read_queue_callback(component_name: str, read_queue: queue.PriorityQueue):
"""Pushes _LogEvents from websocket to read_queue.
Returns callback function used with `on_message_callback` of websocket.WebSocketApp.
"""
def callback(ws_app: WebSocketApp, msg: str):
# We strongly trust that the contract on API will hold atm :D
event_dict = json.loads(msg)
labels = _LogEventLabels(**event_dict["labels"])
if "message" in event_dict:
event = _LogEvent(
message=event_dict["message"],
timestamp=dateutil.parser.isoparse(event_dict["timestamp"]),
labels=labels,
)
read_queue.put((event.timestamp, component_name, event))
return callback
def _error_callback(ws_app: WebSocketApp, error: Exception):
errors = {
KeyError: "Malformed log message, missing key",
JSONDecodeError: "Malformed log message",
TypeError: "Malformed log format",
ValueError: "Malformed date format",
}
print(f"Error while reading logs ({errors.get(type(error), 'Unknown')})", file=sys.stderr)
ws_app.close()
def _app_logs_reader(
client: LightningClient, project_id: str, app_id: str, component_names: List[str], follow: bool
) -> Iterator[Tuple[str, _LogEvent]]:
read_queue = queue.PriorityQueue()
logs_api_client = _LightningLogsSocketAPI(client.api_client)
# We will use a socket per component
log_sockets = [
logs_api_client.create_lightning_logs_socket(
project_id=project_id,
app_id=app_id,
component=component_name,
on_message_callback=_push_logevents_to_read_queue_callback(component_name, read_queue),
on_error_callback=_error_callback,
)
for component_name in component_names
]
# And each socket on separate thread pushing log event to print queue
# run_forever() will run until we close() the connection from outside
log_threads = [Thread(target=work.run_forever) for work in log_sockets]
# Establish connection and begin pushing logs to the print queue
for th in log_threads:
th.start()
user_log_start = "<<< BEGIN USER_RUN_FLOW SECTION >>>"
start_timestamp = None
# Print logs from queue when log event is available
try:
while True:
_, component_name, log_event = read_queue.get(timeout=None if follow else 1.0)
log_event: _LogEvent
if user_log_start in log_event.message:
start_timestamp = log_event.timestamp + timedelta(seconds=0.5)
if start_timestamp and log_event.timestamp > start_timestamp:
yield component_name, log_event
except queue.Empty:
# Empty is raised by queue.get if timeout is reached. Follow = False case.
pass
except KeyboardInterrupt:
# User pressed CTRL+C to exit, we sould respect that
pass
finally:
# Close connections - it will cause run_forever() to finish -> thread as finishes aswell
for socket in log_sockets:
socket.close()
# Because all socket were closed, we can just wait for threads to finish.
for th in log_threads:
th.join()

View File

@ -0,0 +1,95 @@
from typing import Callable, Optional
from urllib.parse import urlparse
from lightning_cloud.openapi import ApiClient, AuthServiceApi, V1LoginRequest
from websocket import WebSocketApp
from lightning_app.utilities.login import Auth
class _LightningLogsSocketAPI:
def __init__(self, api_client: ApiClient):
self.api_client = api_client
self._auth = Auth()
self._auth.authenticate()
self._auth_service = AuthServiceApi(api_client)
def _get_api_token(self) -> str:
token_resp = self._auth_service.auth_service_login(
body=V1LoginRequest(
username=self._auth.username,
api_key=self._auth.api_key,
)
)
return token_resp.token
@staticmethod
def _socket_url(host: str, project_id: str, app_id: str, token: str, component: str) -> str:
return (
f"wss://{host}/v1/projects/{project_id}/appinstances/{app_id}/logs?"
f"token={token}&component={component}&follow=true"
)
def create_lightning_logs_socket(
self,
project_id: str,
app_id: str,
component: str,
on_message_callback: Callable[[WebSocketApp, str], None],
on_error_callback: Optional[Callable[[Exception, str], None]] = None,
) -> WebSocketApp:
"""Creates and returns WebSocketApp to listen to lightning app logs.
.. code-block:: python
# Synchronous reading, run_forever() is blocking
def print_log_msg(ws_app, msg):
print(msg)
flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg)
flow_socket.run_forever()
.. code-block:: python
# Asynchronous reading (with Threads)
def print_log_msg(ws_app, msg):
print(msg)
flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg)
work_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "work_1", print_log_msg)
flow_logs_thread = Thread(target=flow_logs_socket.run_forever)
work_logs_thread = Thread(target=work_logs_socket.run_forever)
flow_logs_thread.start()
work_logs_thread.start()
# .......
flow_logs_socket.close()
work_logs_thread.close()
Arguments:
project_id: Project ID.
app_id: Application ID.
component: Component name eg flow.
on_message_callback: Callback object which is called when received data.
on_error_callback: Callback object which is called when we get error.
Returns:
WebSocketApp of the wanted socket
"""
_token = self._get_api_token()
clean_ws_host = urlparse(self.api_client.configuration.host).netloc
socket_url = self._socket_url(
host=clean_ws_host,
project_id=project_id,
app_id=app_id,
token=_token,
component=component,
)
return WebSocketApp(socket_url, on_message=on_message_callback, on_error=on_error_callback)

View File

@ -0,0 +1,61 @@
from unittest import mock
from click.testing import CliRunner
from lightning_app.cli.lightning_cli import logs
@mock.patch("lightning_app.cli.lightning_cli.LightningClient")
@mock.patch("lightning_app.cli.lightning_cli._get_project")
def test_show_logs_errors(project, client):
"""Test that the CLI prints the errors for the show logs command."""
runner = CliRunner()
# Response prep
app = mock.MagicMock()
app.name = "MyFakeApp"
work = mock.MagicMock()
work.name = "MyFakeWork"
flow = mock.MagicMock()
flow.name = "MyFakeFlow"
# No apps ever run
apps = {}
client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps
result = runner.invoke(logs, ["NonExistentApp"])
assert result.exit_code == 1
assert "Error: You don't have any application in the cloud" in result.output
# App not specified
apps = {app}
client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps
result = runner.invoke(logs)
assert result.exit_code == 1
assert "Please select one of available: [MyFakeApp]" in str(result.output)
# App does not exit
apps = {app}
client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps
result = runner.invoke(logs, ["ThisAppDoesNotExist"])
assert result.exit_code == 1
assert "The Lightning App 'ThisAppDoesNotExist' does not exist." in str(result.output)
# Component does not exist
apps = {app}
works = {work}
flows = {flow}
client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps
client.return_value.lightningwork_service_list_lightningwork.return_value.lightningworks = works
app.spec.flow_servers = flows
result = runner.invoke(logs, ["MyFakeApp", "NonExistentComponent"])
assert result.exit_code == 1
assert "Component 'NonExistentComponent' does not exist in app MyFakeApp." in result.output

View File

@ -1,8 +1,10 @@
import os
import pytest
from click.testing import CliRunner
from tests_app import _PROJECT_ROOT
from lightning_app.cli.lightning_cli import logs
from lightning_app.testing.testing import run_app_in_cloud, wait_for
@ -12,6 +14,7 @@ def test_boring_app_example_cloud() -> None:
_,
view_page,
_,
name,
):
def check_hello_there(*_, **__):
@ -21,3 +24,15 @@ def test_boring_app_example_cloud() -> None:
return True
wait_for(view_page, check_hello_there)
runner = CliRunner()
result = runner.invoke(logs, [name])
lines = result.output.splitlines()
assert result.exit_code == 0
assert result.exception is None
assert len(lines) > 1, result.output
# We know that at some point we need to intstall lightning, so we check for that
assert any(
"Successfully built lightning" in line for line in lines
), f"Did not find logs with lightning installation: {result.output}"

View File

@ -26,6 +26,7 @@ def test_collect_failures_example_cloud() -> None:
_,
_,
fetch_logs,
_,
):
last_found_log_index = -1
while len(expected_logs) != 0:

View File

@ -16,6 +16,7 @@ def test_commands_example_cloud() -> None:
admin_page,
_,
fetch_logs,
_,
):
app_id = admin_page.url.split("/")[-1]
cmd = f"lightning trigger_with_client_command --name=something --app_id {app_id}"

View File

@ -13,7 +13,7 @@ def test_custom_work_dependencies_example_cloud() -> None:
with run_app_in_cloud(
os.path.join(_PROJECT_ROOT, "tests/tests_app_examples/custom_work_dependencies/"),
app_name="app.py",
) as (_, _, fetch_logs):
) as (_, _, fetch_logs, _):
has_logs = False
while not has_logs:
for log in fetch_logs():

View File

@ -13,6 +13,7 @@ def test_drive_example_cloud() -> None:
_,
view_page,
fetch_logs,
_,
):
has_logs = False

View File

@ -13,6 +13,7 @@ def test_idle_timeout_example_cloud() -> None:
_,
_,
fetch_logs,
_,
):
has_logs = False
while not has_logs:

View File

@ -9,7 +9,7 @@ from lightning_app.testing.testing import run_app_in_cloud
@pytest.mark.cloud
def test_payload_example_cloud() -> None:
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_payload")) as (_, _, fetch_logs):
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_payload")) as (_, _, fetch_logs, _):
has_logs = False
while not has_logs:

View File

@ -51,7 +51,7 @@ def test_quick_start_example(caplog, monkeypatch):
@pytest.mark.cloud
def test_quick_start_example_cloud() -> None:
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "lightning-quick-start/")) as (_, view_page, _):
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "lightning-quick-start/")) as (_, view_page, _, _):
def click_gradio_demo(*_, **__):
button = view_page.locator('button:has-text("Interactive demo")')

View File

@ -14,6 +14,7 @@ def test_template_react_ui_example_cloud() -> None:
_,
view_page,
fetch_logs,
_,
):
def click_button(*_, **__):

View File

@ -14,6 +14,7 @@ def test_template_streamlit_ui_example_cloud() -> None:
_,
view_page,
fetch_logs,
_,
):
def click_button(*_, **__):

View File

@ -74,5 +74,6 @@ def test_v0_app_example_cloud() -> None:
_,
view_page,
fetch_logs,
_,
):
run_v0_app(fetch_logs, view_page)