Fix the `examples/app_dag` App (#14359)
* Fix app dag example * Add test * Update doc * Update tests/tests_app_examples/test_app_dag.py Co-authored-by: Sherin Thomas <sherin@grid.ai>
This commit is contained in:
parent
cfb27bd75b
commit
2b61c92ceb
|
@ -39,10 +39,9 @@ First, let's define the component we need:
|
|||
:lines: 55-79
|
||||
|
||||
And its run method executes the steps described above.
|
||||
Additionally, ``work.stop`` is used to reduce cost when running in the cloud.
|
||||
|
||||
.. literalinclude:: ../../../examples/app_dag/app.py
|
||||
:lines: 81-108
|
||||
:lines: 80-103
|
||||
|
||||
----
|
||||
|
||||
|
@ -51,4 +50,4 @@ Step 2: Define the scheduling
|
|||
*****************************
|
||||
|
||||
.. literalinclude:: ../../../examples/app_dag/app.py
|
||||
:lines: 109-137
|
||||
:lines: 106-135
|
||||
|
|
|
@ -56,7 +56,7 @@ class DAG(L.LightningFlow):
|
|||
|
||||
"""This component is a DAG."""
|
||||
|
||||
def __init__(self, models_paths):
|
||||
def __init__(self, models_paths: list):
|
||||
super().__init__()
|
||||
# Step 1: Create a work to get the data.
|
||||
self.data_collector = GetDataWork()
|
||||
|
@ -80,12 +80,10 @@ class DAG(L.LightningFlow):
|
|||
def run(self):
|
||||
# Step 1 and 2: Download and process the data.
|
||||
self.data_collector.run()
|
||||
self.data_collector.stop() # Stop the data_collector to reduce cost
|
||||
self.processing.run(
|
||||
df_data=self.data_collector.df_data,
|
||||
df_target=self.data_collector.df_target,
|
||||
)
|
||||
self.processing.stop() # Stop the processing to reduce cost
|
||||
|
||||
# Step 3: Launch n models training in parallel.
|
||||
for model, work in self.dict.items():
|
||||
|
@ -128,7 +126,7 @@ class ScheduledDAG(L.LightningFlow):
|
|||
app = L.LightningApp(
|
||||
ScheduledDAG(
|
||||
DAG,
|
||||
models=[
|
||||
models_paths=[
|
||||
"svm.SVR",
|
||||
"linear_model.LinearRegression",
|
||||
"tree.DecisionTreeRegressor",
|
||||
|
|
|
@ -221,6 +221,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
|
|||
- Resolved a bug where the `install` command was not installing the latest version of an app/component by default ([#14181](https://github.com/Lightning-AI/lightning/pull/14181))
|
||||
|
||||
|
||||
- Fixed the `examples/app_dag` example ([#14359](https://github.com/Lightning-AI/lightning/pull/14359))
|
||||
|
||||
|
||||
## [0.5.5] - 2022-08-9
|
||||
|
||||
### Deprecated
|
||||
|
|
|
@ -219,7 +219,7 @@ def _run_cli(args) -> Generator:
|
|||
def run_app_in_cloud(
|
||||
app_folder: str, app_name: str = "app.py", extra_args: List[str] = [], debug: bool = True
|
||||
) -> Generator:
|
||||
"""This utility is used to automate testing e2e application with lightning_app.ai."""
|
||||
"""This utility is used to automate testing e2e application with lightning.ai."""
|
||||
# 1. Validate the provide app_folder is correct.
|
||||
if not os.path.exists(os.path.join(app_folder, "app.py")):
|
||||
raise Exception("The app folder should contain an app.py file.")
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
import os
|
||||
from time import sleep
|
||||
|
||||
import pytest
|
||||
from tests_app import _PROJECT_ROOT
|
||||
|
||||
from lightning_app.testing.testing import run_app_in_cloud
|
||||
|
||||
|
||||
@pytest.mark.cloud
|
||||
def test_app_dag_example_cloud() -> None:
|
||||
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_dag")) as (_, _, fetch_logs, _):
|
||||
|
||||
launch_log, finish_log = False, False
|
||||
while not (launch_log and finish_log):
|
||||
for log in fetch_logs(["flow"]):
|
||||
if "Launching a new DAG" in log:
|
||||
launch_log = True
|
||||
elif "Finished training and evaluating" in log:
|
||||
finish_log = True
|
||||
sleep(1)
|
Loading…
Reference in New Issue