[App] Add status endpoint, enable `ready` (#16075)

Co-authored-by: thomas chaton <thomas@grid.ai>
This commit is contained in:
Ethan Harris 2022-12-19 14:10:58 +00:00 committed by GitHub
parent f3157f306a
commit 2a85d9b257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 132 additions and 58 deletions

View File

@ -43,6 +43,10 @@ class BoringApp(L.LightningFlow):
raise_exception=True,
)
@property
def ready(self) -> bool:
return self.dest_work.is_running
def run(self):
self.source_work.run()
if self.source_work.has_succeeded:

View File

@ -20,6 +20,8 @@ class ServeStreamlit(LightningWork, abc.ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ready = False
self._process = None
@property
@ -58,6 +60,7 @@ class ServeStreamlit(LightningWork, abc.ABC):
],
env=env,
)
self.ready = True
self._process.wait()
def on_exit(self) -> None:

View File

@ -34,6 +34,7 @@ from lightning_app.core.constants import (
from lightning_app.core.queues import QueuingSystem
from lightning_app.storage import Drive
from lightning_app.utilities.app_helpers import InMemoryStateStore, Logger, StateStore
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.cloud import is_running_in_cloud
from lightning_app.utilities.component import _context
from lightning_app.utilities.enum import ComponentContext, OpenAPITags
@ -66,18 +67,24 @@ global_app_state_store.add(TEST_SESSION_UUID)
lock = Lock()
app_spec: Optional[List] = None
app_status: Optional[AppStatus] = None
# In the future, this would be abstracted to support horizontal scaling.
responses_store = {}
logger = Logger(__name__)
# This can be replaced with a consumer that publishes states in a kv-store
# in a serverless architecture
class UIRefresher(Thread):
def __init__(self, api_publish_state_queue, api_response_queue, refresh_interval: float = 0.1) -> None:
def __init__(
self,
api_publish_state_queue,
api_response_queue,
refresh_interval: float = 0.1,
) -> None:
super().__init__(daemon=True)
self.api_publish_state_queue = api_publish_state_queue
self.api_response_queue = api_response_queue
@ -98,7 +105,8 @@ class UIRefresher(Thread):
def run_once(self):
try:
state = self.api_publish_state_queue.get(timeout=0)
global app_status
state, app_status = self.api_publish_state_queue.get(timeout=0)
with lock:
global_app_state_store.set_app_state(TEST_SESSION_UUID, state)
except queue.Empty:
@ -326,6 +334,17 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi
return f"Successfully uploaded '{filename}' to the Drive"
@fastapi_service.get("/api/v1/status", response_model=AppStatus)
async def get_status() -> AppStatus:
"""Get the current status of the app and works."""
global app_status
if app_status is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="App status hasn't been reported yet."
)
return app_status
@fastapi_service.get("/healthz", status_code=200)
async def healthz(response: Response):
"""Health check endpoint used in the cloud FastAPI servers to check the status periodically."""

View File

@ -35,6 +35,7 @@ from lightning_app.utilities.app_helpers import (
_should_dispatch_app,
Logger,
)
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.commands.base import _process_requests
from lightning_app.utilities.component import _convert_paths_after_init, _validate_root_flow
from lightning_app.utilities.enum import AppStage, CacheCallsKeys
@ -140,6 +141,7 @@ class LightningApp:
self.exception = None
self.collect_changes: bool = True
self.status: Optional[AppStatus] = None
# TODO: Enable ready locally for opening the UI.
self.ready = False
@ -150,6 +152,7 @@ class LightningApp:
self.checkpointing: bool = False
self._update_layout()
self._update_status()
self.is_headless: Optional[bool] = None
@ -418,6 +421,7 @@ class LightningApp:
self._update_layout()
self._update_is_headless()
self._update_status()
self.maybe_apply_changes()
if self.checkpointing and self._should_snapshot():
@ -485,19 +489,12 @@ class LightningApp:
self._original_state = deepcopy(self.state)
done = False
# TODO: Re-enable the `ready` property once issues are resolved
if not self.root.ready:
warnings.warn(
"One of your Flows returned `.ready` as `False`. "
"This feature is not yet enabled so this will be ignored.",
UserWarning,
)
self.ready = True
self.ready = self.root.ready
self._start_with_flow_works()
if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)
if self.should_publish_changes_to_api and self.api_publish_state_queue is not None:
self.api_publish_state_queue.put((self.state_vars, self.status))
self._reset_run_time_monitor()
@ -506,8 +503,8 @@ class LightningApp:
self._update_run_time_monitor()
if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)
if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue is not None:
self.api_publish_state_queue.put((self.state_vars, self.status))
self._has_updated = False
@ -532,6 +529,23 @@ class LightningApp:
# This ensures support for apps which dynamically add a UI at runtime.
_handle_is_headless(self)
def _update_status(self) -> None:
old_status = self.status
work_statuses = {}
for work in breadth_first(self.root, types=(lightning_app.LightningWork,)):
work_statuses[work.name] = work.status
self.status = AppStatus(
is_ui_ready=self.ready,
work_statuses=work_statuses,
)
# If the work statuses changed, the state delta will trigger an update.
# If ready has changed, we trigger an update manually.
if self.status != old_status:
self._has_updated = True
def _apply_restarting(self) -> bool:
self._reset_original_state()
# apply stage after restoring the original state.

View File

@ -249,10 +249,7 @@ class LightningFlow:
@property
def ready(self) -> bool:
"""Not currently enabled.
Override to customize when your App should be ready.
"""
"""Override to customize when your App should be ready."""
flows = self.flows
return all(flow.ready for flow in flows.values()) if flows else True

View File

@ -12,13 +12,13 @@ from lightning_app.storage import Path
from lightning_app.storage.drive import _maybe_create_drive, Drive
from lightning_app.storage.payload import Payload
from lightning_app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden
from lightning_app.utilities.app_status import WorkStatus
from lightning_app.utilities.component import _is_flow_context, _sanitize_state
from lightning_app.utilities.enum import (
CacheCallsKeys,
make_status,
WorkFailureReasons,
WorkStageStatus,
WorkStatus,
WorkStopReasons,
)
from lightning_app.utilities.exceptions import LightningWorkException

View File

@ -121,7 +121,7 @@ class Runtime:
self._add_stopped_status_to_work(work)
# Publish the updated state and wait for the frontend to update.
self.app.api_publish_state_queue.put(self.app.state)
self.app.api_publish_state_queue.put((self.app.state, self.app.status))
for thread in self.threads + self.app.threads:
thread.join(timeout=0)

View File

@ -0,0 +1,29 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel
class WorkStatus(BaseModel):
"""The ``WorkStatus`` captures the status of a work according to the app."""
stage: str
timestamp: float
reason: Optional[str] = None
message: Optional[str] = None
count: int = 1
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)
class AppStatus(BaseModel):
"""The ``AppStatus`` captures the current status of the app and its components."""
# ``True`` when the app UI is ready to be viewed
is_ui_ready: bool
# The statuses of ``LightningWork`` objects currently associated with this app
work_statuses: Dict[str, WorkStatus]

View File

@ -1,5 +1,4 @@
import enum
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional
@ -47,18 +46,6 @@ class WorkStageStatus:
FAILED = "failed"
@dataclass
class WorkStatus:
stage: WorkStageStatus
timestamp: float
reason: Optional[str] = None
message: Optional[str] = None
count: int = 1
def __post_init__(self):
assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)
def make_status(stage: str, message: Optional[str] = None, reason: Optional[str] = None):
status = {
"stage": stage,

View File

@ -31,6 +31,7 @@ from lightning_app.core.constants import APP_SERVER_PORT
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage.drive import Drive
from lightning_app.testing.helpers import _MockQueue
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.component import _set_frontend_context, _set_work_context
from lightning_app.utilities.enum import AppStage
from lightning_app.utilities.load_app import extract_metadata_from_app
@ -195,7 +196,7 @@ def test_update_publish_state_and_maybe_refresh_ui():
publish_state_queue = _MockQueue("publish_state_queue")
api_response_queue = _MockQueue("api_response_queue")
publish_state_queue.put(app.state_with_changes)
publish_state_queue.put((app.state_with_changes, None))
thread = UIRefresher(publish_state_queue, api_response_queue)
thread.run_once()
@ -226,7 +227,7 @@ async def test_start_server(x_lightning_type, monkeypatch):
has_started_queue = _MockQueue("has_started_queue")
api_response_queue = _MockQueue("api_response_queue")
state = app.state_with_changes
publish_state_queue.put(state)
publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses={})))
spec = extract_metadata_from_app(app)
ui_refresher = start_server(
publish_state_queue,
@ -284,6 +285,9 @@ async def test_start_server(x_lightning_type, monkeypatch):
{"name": "main_4", "content": "https://te"},
]
response = await client.get("/api/v1/status")
assert response.json() == {"is_ui_ready": True, "work_statuses": {}}
response = await client.post("/api/v1/state", json={"state": new_state}, headers=headers)
assert change_state_queue._queue[1].to_dict() == {
"values_changed": {"root['vars']['counter']": {"new_value": 1}}

View File

@ -5,7 +5,7 @@ from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from time import time
from unittest.mock import ANY, MagicMock
from unittest.mock import ANY
import pytest
from deepdiff import DeepDiff, Delta
@ -19,7 +19,7 @@ from lightning_app.storage import Path
from lightning_app.storage.path import _storage_root_dir
from lightning_app.structures import Dict as LDict
from lightning_app.structures import List as LList
from lightning_app.testing.helpers import EmptyFlow, EmptyWork
from lightning_app.testing.helpers import _MockQueue, EmptyFlow, EmptyWork
from lightning_app.utilities.app_helpers import (
_delta_to_app_state_delta,
_LightningAppRef,
@ -891,21 +891,37 @@ class FlowReady(LightningFlow):
def test_flow_ready():
"""This test validates the api publish state queue is populated only once ready is True."""
"""This test validates that the app status queue is populated correctly."""
mock_queue = _MockQueue("api_publish_state_queue")
def run_patch(method):
app.api_publish_state_queue = MagicMock()
app.should_publish_changes_to_api = False
app.should_publish_changes_to_api = True
app.api_publish_state_queue = mock_queue
method()
state = {"done": False}
def lagged_run_once(method):
"""Ensure that the full loop is run after the app exits."""
new_done = method()
if state["done"]:
return True
state["done"] = new_done
return False
app = LightningApp(FlowReady())
app._run = partial(run_patch, method=app._run)
app.run_once = partial(lagged_run_once, method=app.run_once)
MultiProcessRuntime(app, start_server=False).dispatch()
# Validates the state has been added only when ready was true.
state = app.api_publish_state_queue.put._mock_call_args[0][0]
call_hash = state["works"]["w"]["calls"]["latest_call_hash"]
assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded"
_, first_status = mock_queue.get()
assert not first_status.is_ui_ready
_, last_status = mock_queue.get()
while len(mock_queue) > 0:
_, last_status = mock_queue.get()
assert last_status.is_ui_ready
def test_structures_register_work_cloudcompute():

View File

@ -39,7 +39,7 @@ def test_orchestrator():
# orchestrator is now waiting for a response for copier in Work A
assert "work_b" in orchestrator.waiting_for_response
assert not request_queues["work_a"]
assert len(request_queues["work_a"]) == 0
assert request in copy_request_queues["work_a"]
assert request.destination == "work_b"
@ -54,7 +54,7 @@ def test_orchestrator():
# orchestrator processes confirmation and confirms to the pending request from Work B
orchestrator.run_once("work_a")
assert not copy_response_queues["work_a"]
assert len(copy_response_queues["work_a"]) == 0
assert response in response_queues["work_b"]
assert not orchestrator.waiting_for_response
orchestrator.run_once("work_b")
@ -71,7 +71,7 @@ def test_orchestrator():
assert response.exception is None
# all queues should be empty
assert all(not queue for queue in request_queues.values())
assert all(not queue for queue in response_queues.values())
assert all(not queue for queue in copy_request_queues.values())
assert all(not queue for queue in copy_response_queues.values())
assert all(len(queue) == 0 for queue in request_queues.values())
assert all(len(queue) == 0 for queue in response_queues.values())
assert all(len(queue) == 0 for queue in copy_request_queues.values())
assert all(len(queue) == 0 for queue in copy_response_queues.values())

View File

@ -606,7 +606,7 @@ def test_path_response_not_matching_reqeuest(tmpdir):
path.get()
# simulate a response that has a different hash than the request had
assert not response_queue
assert len(response_queue) == 0
response.path = str(path)
response.hash = "other_hash"
response_queue.put(response)

View File

@ -250,6 +250,7 @@ class WorkRunnerPatch(WorkRunner):
state = deepcopy(self.work.state)
self.work._calls[call_hash]["statuses"].append(
{
"name": self.work.name,
"stage": WorkStageStatus.FAILED,
"reason": WorkFailureReasons.TIMEOUT,
"timestamp": time.time(),
@ -547,7 +548,7 @@ def test_work_state_observer():
# 1. Simulate no state changes
##############################
work.run(use_setattr=False, use_containers=False)
assert not delta_queue
assert len(delta_queue) == 0
############################
# 2. Simulate a setattr call
@ -563,16 +564,16 @@ def test_work_state_observer():
assert len(observer._delta_memory) == 1
# The observer should not trigger any deltas being sent and only consume the delta memory
assert not delta_queue
assert len(delta_queue) == 0
observer.run_once()
assert not delta_queue
assert len(delta_queue) == 0
assert not observer._delta_memory
################################
# 3. Simulate a container update
################################
work.run(use_setattr=False, use_containers=True)
assert not delta_queue
assert len(delta_queue) == 0
assert not observer._delta_memory
observer.run_once()
observer.run_once() # multiple runs should not affect how many deltas are sent unless there are changes
@ -591,7 +592,7 @@ def test_work_state_observer():
delta = delta_queue.get().delta.to_dict()
assert delta == {"values_changed": {"root['vars']['var']": {"new_value": 3}}}
assert not delta_queue
assert len(delta_queue) == 0
assert len(observer._delta_memory) == 1
observer.run_once()
@ -599,7 +600,7 @@ def test_work_state_observer():
assert delta["values_changed"] == {"root['vars']['dict']['counter']": {"new_value": 2}}
assert delta["iterable_item_added"] == {"root['vars']['list'][1]": 1}
assert not delta_queue
assert len(delta_queue) == 0
assert not observer._delta_memory