diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index 78a4e370e7..ba8cdd796c 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Add support to run Lightning apps on Lightning AI BYOC clusters ([#13894](https://github.com/Lightning-AI/lightning/pull/13894)) - Add support for listing Lightning AI apps ([#13987](https://github.com/Lightning-AI/lightning/pull/13987)) - Adds `LightningTrainingComponent`. `LightningTrainingComponent` orchestrates multi-node training in the cloud ([#13830](https://github.com/Lightning-AI/lightning/pull/13830)) +- Add support for printing application logs using CLI `lightning show logs [components]` ([#13634](https://github.com/Lightning-AI/lightning/pull/13634)) + ### Changed diff --git a/src/lightning_app/cli/lightning_cli.py b/src/lightning_app/cli/lightning_cli.py index fb4c40330d..45c80d4dcc 100644 --- a/src/lightning_app/cli/lightning_cli.py +++ b/src/lightning_app/cli/lightning_cli.py @@ -8,7 +8,9 @@ from uuid import uuid4 import click import requests +import rich from requests.exceptions import ConnectionError +from rich.color import ANSI_COLOR_NAMES from lightning_app import __version__ as ver from lightning_app.cli import cmd_init, cmd_install, cmd_pl_init, cmd_react_ui_init @@ -18,12 +20,15 @@ from lightning_app.cli.lightning_cli_list import get_list from lightning_app.core.constants import get_lightning_cloud_url, LOCAL_LAUNCH_ADMIN_VIEW from lightning_app.runners.runtime import dispatch from lightning_app.runners.runtime_type import RuntimeType +from lightning_app.utilities.app_logs import _app_logs_reader from lightning_app.utilities.cli_helpers import ( _format_input_env_variables, _retrieve_application_url_and_available_commands, ) +from lightning_app.utilities.cloud import _get_project from lightning_app.utilities.install_components import register_all_external_components from lightning_app.utilities.login import Auth +from lightning_app.utilities.network import LightningClient from lightning_app.utilities.state import headers_for logger = logging.getLogger(__name__) @@ -50,9 +55,93 @@ def main(): @click.version_option(ver) def _main(): register_all_external_components() + + +@_main.group() +def show(): + """Show given resource.""" pass +@show.command() +@click.argument("app_name", required=False) +@click.argument("components", nargs=-1, required=False) +@click.option("-f", "--follow", required=False, is_flag=True, help="Wait for new logs, to exit use CTRL+C.") +def logs(app_name: str, components: List[str], follow: bool) -> None: + """Show cloud application logs. By default prints logs for all currently available components. + + Example uses: + + Print all application logs: + + $ lightning show logs my-application + + + Print logs only from the flow (no work): + + $ lightning show logs my-application flow + + + Print logs only from selected works: + + $ lightning show logs my-application root.work_a root.work_b + """ + + client = LightningClient() + project = _get_project(client) + + apps = { + app.name: app + for app in client.lightningapp_instance_service_list_lightningapp_instances(project.project_id).lightningapps + } + + if not apps: + raise click.ClickException( + "You don't have any application in the cloud. Please, run an application first with `--cloud`." + ) + + if not app_name: + raise click.ClickException( + f"You have not specified any Lightning App. Please select one of available: [{', '.join(apps.keys())}]" + ) + + if app_name not in apps: + raise click.ClickException( + f"The Lightning App '{app_name}' does not exist. Please select one of following: [{', '.join(apps.keys())}]" + ) + + # Fetch all lightning works from given application + # 'Flow' component is somewhat implicit, only one for whole app, + # and not listed in lightningwork API - so we add it directly to the list + works = client.lightningwork_service_list_lightningwork( + project_id=project.project_id, app_id=apps[app_name].id + ).lightningworks + app_component_names = ["flow"] + [f.name for f in apps[app_name].spec.flow_servers] + [w.name for w in works] + + if not components: + components = app_component_names + + for component in components: + if component not in app_component_names: + raise click.ClickException(f"Component '{component}' does not exist in app {app_name}.") + + log_reader = _app_logs_reader( + client=client, + project_id=project.project_id, + app_id=apps[app_name].id, + component_names=components, + follow=follow, + ) + + rich_colors = list(ANSI_COLOR_NAMES) + colors = {c: rich_colors[i + 1] for i, c in enumerate(components)} + + for component_name, log_event in log_reader: + date = log_event.timestamp.strftime("%m/%d/%Y %H:%M:%S") + color = colors[component_name] + rich.print(f"[{color}]{component_name}[/{color}] {date} {log_event.message}") + + @_main.command() def login(): """Log in to your Lightning.ai account.""" diff --git a/src/lightning_app/testing/testing.py b/src/lightning_app/testing/testing.py index e1cc2e180d..74d57db38c 100644 --- a/src/lightning_app/testing/testing.py +++ b/src/lightning_app/testing/testing.py @@ -318,7 +318,7 @@ def run_app_in_cloud(app_folder: str, app_name: str = "app.py", extra_args: [str ) try: - yield admin_page, view_page, fetch_logs + yield admin_page, view_page, fetch_logs, name except KeyboardInterrupt: pass finally: diff --git a/src/lightning_app/utilities/app_logs.py b/src/lightning_app/utilities/app_logs.py new file mode 100644 index 0000000000..4a7af9b5c5 --- /dev/null +++ b/src/lightning_app/utilities/app_logs.py @@ -0,0 +1,125 @@ +import json +import queue +import sys +from dataclasses import dataclass +from datetime import datetime, timedelta +from json import JSONDecodeError +from threading import Thread +from typing import Iterator, List, Optional, Tuple + +import dateutil.parser +from websocket import WebSocketApp + +from lightning_app.utilities.logs_socket_api import _LightningLogsSocketAPI +from lightning_app.utilities.network import LightningClient + + +@dataclass +class _LogEventLabels: + app: str + container: str + filename: str + job: str + namespace: str + node_name: str + pod: str + stream: Optional[str] = None + + +@dataclass +class _LogEvent: + message: str + timestamp: datetime + labels: _LogEventLabels + + +def _push_logevents_to_read_queue_callback(component_name: str, read_queue: queue.PriorityQueue): + """Pushes _LogEvents from websocket to read_queue. + + Returns callback function used with `on_message_callback` of websocket.WebSocketApp. + """ + + def callback(ws_app: WebSocketApp, msg: str): + # We strongly trust that the contract on API will hold atm :D + event_dict = json.loads(msg) + labels = _LogEventLabels(**event_dict["labels"]) + if "message" in event_dict: + event = _LogEvent( + message=event_dict["message"], + timestamp=dateutil.parser.isoparse(event_dict["timestamp"]), + labels=labels, + ) + read_queue.put((event.timestamp, component_name, event)) + + return callback + + +def _error_callback(ws_app: WebSocketApp, error: Exception): + errors = { + KeyError: "Malformed log message, missing key", + JSONDecodeError: "Malformed log message", + TypeError: "Malformed log format", + ValueError: "Malformed date format", + } + print(f"Error while reading logs ({errors.get(type(error), 'Unknown')})", file=sys.stderr) + ws_app.close() + + +def _app_logs_reader( + client: LightningClient, project_id: str, app_id: str, component_names: List[str], follow: bool +) -> Iterator[Tuple[str, _LogEvent]]: + + read_queue = queue.PriorityQueue() + logs_api_client = _LightningLogsSocketAPI(client.api_client) + + # We will use a socket per component + log_sockets = [ + logs_api_client.create_lightning_logs_socket( + project_id=project_id, + app_id=app_id, + component=component_name, + on_message_callback=_push_logevents_to_read_queue_callback(component_name, read_queue), + on_error_callback=_error_callback, + ) + for component_name in component_names + ] + + # And each socket on separate thread pushing log event to print queue + # run_forever() will run until we close() the connection from outside + log_threads = [Thread(target=work.run_forever) for work in log_sockets] + + # Establish connection and begin pushing logs to the print queue + for th in log_threads: + th.start() + + user_log_start = "<<< BEGIN USER_RUN_FLOW SECTION >>>" + start_timestamp = None + + # Print logs from queue when log event is available + try: + while True: + _, component_name, log_event = read_queue.get(timeout=None if follow else 1.0) + log_event: _LogEvent + + if user_log_start in log_event.message: + start_timestamp = log_event.timestamp + timedelta(seconds=0.5) + + if start_timestamp and log_event.timestamp > start_timestamp: + yield component_name, log_event + + except queue.Empty: + # Empty is raised by queue.get if timeout is reached. Follow = False case. + pass + + except KeyboardInterrupt: + # User pressed CTRL+C to exit, we sould respect that + pass + + finally: + # Close connections - it will cause run_forever() to finish -> thread as finishes aswell + for socket in log_sockets: + socket.close() + + # Because all socket were closed, we can just wait for threads to finish. + for th in log_threads: + th.join() diff --git a/src/lightning_app/utilities/logs_socket_api.py b/src/lightning_app/utilities/logs_socket_api.py new file mode 100644 index 0000000000..0ab9a5c24f --- /dev/null +++ b/src/lightning_app/utilities/logs_socket_api.py @@ -0,0 +1,95 @@ +from typing import Callable, Optional +from urllib.parse import urlparse + +from lightning_cloud.openapi import ApiClient, AuthServiceApi, V1LoginRequest +from websocket import WebSocketApp + +from lightning_app.utilities.login import Auth + + +class _LightningLogsSocketAPI: + def __init__(self, api_client: ApiClient): + self.api_client = api_client + self._auth = Auth() + self._auth.authenticate() + self._auth_service = AuthServiceApi(api_client) + + def _get_api_token(self) -> str: + token_resp = self._auth_service.auth_service_login( + body=V1LoginRequest( + username=self._auth.username, + api_key=self._auth.api_key, + ) + ) + return token_resp.token + + @staticmethod + def _socket_url(host: str, project_id: str, app_id: str, token: str, component: str) -> str: + return ( + f"wss://{host}/v1/projects/{project_id}/appinstances/{app_id}/logs?" + f"token={token}&component={component}&follow=true" + ) + + def create_lightning_logs_socket( + self, + project_id: str, + app_id: str, + component: str, + on_message_callback: Callable[[WebSocketApp, str], None], + on_error_callback: Optional[Callable[[Exception, str], None]] = None, + ) -> WebSocketApp: + """Creates and returns WebSocketApp to listen to lightning app logs. + + .. code-block:: python + # Synchronous reading, run_forever() is blocking + + + def print_log_msg(ws_app, msg): + print(msg) + + + flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg) + flow_socket.run_forever() + + .. code-block:: python + # Asynchronous reading (with Threads) + + + def print_log_msg(ws_app, msg): + print(msg) + + + flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg) + work_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "work_1", print_log_msg) + + flow_logs_thread = Thread(target=flow_logs_socket.run_forever) + work_logs_thread = Thread(target=work_logs_socket.run_forever) + + flow_logs_thread.start() + work_logs_thread.start() + # ....... + + flow_logs_socket.close() + work_logs_thread.close() + + Arguments: + project_id: Project ID. + app_id: Application ID. + component: Component name eg flow. + on_message_callback: Callback object which is called when received data. + on_error_callback: Callback object which is called when we get error. + + Returns: + WebSocketApp of the wanted socket + """ + _token = self._get_api_token() + clean_ws_host = urlparse(self.api_client.configuration.host).netloc + socket_url = self._socket_url( + host=clean_ws_host, + project_id=project_id, + app_id=app_id, + token=_token, + component=component, + ) + + return WebSocketApp(socket_url, on_message=on_message_callback, on_error=on_error_callback) diff --git a/tests/tests_app/cli/test_cmd_show_logs.py b/tests/tests_app/cli/test_cmd_show_logs.py new file mode 100644 index 0000000000..0dc0602515 --- /dev/null +++ b/tests/tests_app/cli/test_cmd_show_logs.py @@ -0,0 +1,61 @@ +from unittest import mock + +from click.testing import CliRunner + +from lightning_app.cli.lightning_cli import logs + + +@mock.patch("lightning_app.cli.lightning_cli.LightningClient") +@mock.patch("lightning_app.cli.lightning_cli._get_project") +def test_show_logs_errors(project, client): + """Test that the CLI prints the errors for the show logs command.""" + + runner = CliRunner() + + # Response prep + app = mock.MagicMock() + app.name = "MyFakeApp" + work = mock.MagicMock() + work.name = "MyFakeWork" + flow = mock.MagicMock() + flow.name = "MyFakeFlow" + + # No apps ever run + apps = {} + client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps + + result = runner.invoke(logs, ["NonExistentApp"]) + + assert result.exit_code == 1 + assert "Error: You don't have any application in the cloud" in result.output + + # App not specified + apps = {app} + client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps + + result = runner.invoke(logs) + + assert result.exit_code == 1 + assert "Please select one of available: [MyFakeApp]" in str(result.output) + + # App does not exit + apps = {app} + client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps + + result = runner.invoke(logs, ["ThisAppDoesNotExist"]) + + assert result.exit_code == 1 + assert "The Lightning App 'ThisAppDoesNotExist' does not exist." in str(result.output) + + # Component does not exist + apps = {app} + works = {work} + flows = {flow} + client.return_value.lightningapp_instance_service_list_lightningapp_instances.return_value.lightningapps = apps + client.return_value.lightningwork_service_list_lightningwork.return_value.lightningworks = works + app.spec.flow_servers = flows + + result = runner.invoke(logs, ["MyFakeApp", "NonExistentComponent"]) + + assert result.exit_code == 1 + assert "Component 'NonExistentComponent' does not exist in app MyFakeApp." in result.output diff --git a/tests/tests_app_examples/test_boring_app.py b/tests/tests_app_examples/test_boring_app.py index 1f681260de..f8143b1db1 100644 --- a/tests/tests_app_examples/test_boring_app.py +++ b/tests/tests_app_examples/test_boring_app.py @@ -1,8 +1,10 @@ import os import pytest +from click.testing import CliRunner from tests_app import _PROJECT_ROOT +from lightning_app.cli.lightning_cli import logs from lightning_app.testing.testing import run_app_in_cloud, wait_for @@ -12,6 +14,7 @@ def test_boring_app_example_cloud() -> None: _, view_page, _, + name, ): def check_hello_there(*_, **__): @@ -21,3 +24,15 @@ def test_boring_app_example_cloud() -> None: return True wait_for(view_page, check_hello_there) + + runner = CliRunner() + result = runner.invoke(logs, [name]) + lines = result.output.splitlines() + + assert result.exit_code == 0 + assert result.exception is None + assert len(lines) > 1, result.output + # We know that at some point we need to intstall lightning, so we check for that + assert any( + "Successfully built lightning" in line for line in lines + ), f"Did not find logs with lightning installation: {result.output}" diff --git a/tests/tests_app_examples/test_collect_failures.py b/tests/tests_app_examples/test_collect_failures.py index f263ebb1a9..c149211e10 100644 --- a/tests/tests_app_examples/test_collect_failures.py +++ b/tests/tests_app_examples/test_collect_failures.py @@ -26,6 +26,7 @@ def test_collect_failures_example_cloud() -> None: _, _, fetch_logs, + _, ): last_found_log_index = -1 while len(expected_logs) != 0: diff --git a/tests/tests_app_examples/test_commands.py b/tests/tests_app_examples/test_commands.py index 5116b1b9d5..266f0305c7 100644 --- a/tests/tests_app_examples/test_commands.py +++ b/tests/tests_app_examples/test_commands.py @@ -16,6 +16,7 @@ def test_commands_example_cloud() -> None: admin_page, _, fetch_logs, + _, ): app_id = admin_page.url.split("/")[-1] cmd = f"lightning trigger_with_client_command --name=something --app_id {app_id}" diff --git a/tests/tests_app_examples/test_custom_work_dependencies.py b/tests/tests_app_examples/test_custom_work_dependencies.py index 8390233e2e..d7c9db5ef6 100644 --- a/tests/tests_app_examples/test_custom_work_dependencies.py +++ b/tests/tests_app_examples/test_custom_work_dependencies.py @@ -13,7 +13,7 @@ def test_custom_work_dependencies_example_cloud() -> None: with run_app_in_cloud( os.path.join(_PROJECT_ROOT, "tests/tests_app_examples/custom_work_dependencies/"), app_name="app.py", - ) as (_, _, fetch_logs): + ) as (_, _, fetch_logs, _): has_logs = False while not has_logs: for log in fetch_logs(): diff --git a/tests/tests_app_examples/test_drive.py b/tests/tests_app_examples/test_drive.py index 9cebca9cf1..14efc34587 100644 --- a/tests/tests_app_examples/test_drive.py +++ b/tests/tests_app_examples/test_drive.py @@ -13,6 +13,7 @@ def test_drive_example_cloud() -> None: _, view_page, fetch_logs, + _, ): has_logs = False diff --git a/tests/tests_app_examples/test_idle_timeout.py b/tests/tests_app_examples/test_idle_timeout.py index fb58a83aef..a39ae3f693 100644 --- a/tests/tests_app_examples/test_idle_timeout.py +++ b/tests/tests_app_examples/test_idle_timeout.py @@ -13,6 +13,7 @@ def test_idle_timeout_example_cloud() -> None: _, _, fetch_logs, + _, ): has_logs = False while not has_logs: diff --git a/tests/tests_app_examples/test_payload.py b/tests/tests_app_examples/test_payload.py index 28d2391c18..58fc28a4a8 100644 --- a/tests/tests_app_examples/test_payload.py +++ b/tests/tests_app_examples/test_payload.py @@ -9,7 +9,7 @@ from lightning_app.testing.testing import run_app_in_cloud @pytest.mark.cloud def test_payload_example_cloud() -> None: - with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_payload")) as (_, _, fetch_logs): + with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_payload")) as (_, _, fetch_logs, _): has_logs = False while not has_logs: diff --git a/tests/tests_app_examples/test_quick_start.py b/tests/tests_app_examples/test_quick_start.py index 9db693a5dc..454c1084ca 100644 --- a/tests/tests_app_examples/test_quick_start.py +++ b/tests/tests_app_examples/test_quick_start.py @@ -51,7 +51,7 @@ def test_quick_start_example(caplog, monkeypatch): @pytest.mark.cloud def test_quick_start_example_cloud() -> None: - with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "lightning-quick-start/")) as (_, view_page, _): + with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "lightning-quick-start/")) as (_, view_page, _, _): def click_gradio_demo(*_, **__): button = view_page.locator('button:has-text("Interactive demo")') diff --git a/tests/tests_app_examples/test_template_react_ui.py b/tests/tests_app_examples/test_template_react_ui.py index 2e348035fe..4b4588d239 100644 --- a/tests/tests_app_examples/test_template_react_ui.py +++ b/tests/tests_app_examples/test_template_react_ui.py @@ -14,6 +14,7 @@ def test_template_react_ui_example_cloud() -> None: _, view_page, fetch_logs, + _, ): def click_button(*_, **__): diff --git a/tests/tests_app_examples/test_template_streamlit_ui.py b/tests/tests_app_examples/test_template_streamlit_ui.py index a8ba93794f..e2c3330529 100644 --- a/tests/tests_app_examples/test_template_streamlit_ui.py +++ b/tests/tests_app_examples/test_template_streamlit_ui.py @@ -14,6 +14,7 @@ def test_template_streamlit_ui_example_cloud() -> None: _, view_page, fetch_logs, + _, ): def click_button(*_, **__): diff --git a/tests/tests_app_examples/test_v0_app.py b/tests/tests_app_examples/test_v0_app.py index d34a92d610..acc9e285c4 100644 --- a/tests/tests_app_examples/test_v0_app.py +++ b/tests/tests_app_examples/test_v0_app.py @@ -74,5 +74,6 @@ def test_v0_app_example_cloud() -> None: _, view_page, fetch_logs, + _, ): run_v0_app(fetch_logs, view_page)