spaCy/spacy/cli/project.py

451 lines
17 KiB
Python

from typing import List, Dict, Any, Optional
import typer
import srsly
from pathlib import Path
from wasabi import msg
import subprocess
import shlex
import os
import re
import shutil
import sys
import murmurhash
import hashlib
from ._app import app, Arg, Opt, COMMAND, NAME
from .. import about
from ..schemas import ProjectConfigSchema, validate
from ..util import ensure_path, run_command, make_tempdir, working_dir
CONFIG_FILE = "project.yml"
DVC_CONFIG = "dvc.yaml"
DIRS = [
"assets",
"metas",
"configs",
"packages",
"metrics",
"scripts",
"notebooks",
"training",
"corpus",
]
CACHES = [
Path.home() / ".torch",
Path.home() / ".caches" / "torch",
os.environ.get("TORCH_HOME"),
Path.home() / ".keras",
]
DVC_CONFIG_COMMENT = """# This file is auto-generated by spaCy based on your project.yml. Do not edit
# it directly and edit the project.yml instead and re-run the project."""
project_cli = typer.Typer(help="Command-line interface for spaCy projects")
@project_cli.callback(invoke_without_command=True)
def callback(ctx: typer.Context):
"""This runs before every project command and ensures DVC is installed and
everything is up to date.
"""
try:
subprocess.run(["dvc", "--version"], stdout=subprocess.DEVNULL)
except Exception:
msg.fail(
"spaCy projects require DVC (Data Version Control) and the 'dvc' command",
"You can install the Python package from pip (pip install dvc) or "
"conda (conda install -c conda-forge dvc). For more details, see the "
"documentation: https://dvc.org/doc/install",
exits=1,
)
@project_cli.command("clone")
def project_clone_cli(
# fmt: off
name: str = Arg(..., help="The name of the template to fetch"),
dest: Path = Arg(Path.cwd(), help="Where to download and work. Defaults to current working directory.", exists=False),
repo: str = Opt(about.__projects__, "--repo", "-r", help="The repository to look in."),
git: bool = Opt(False, "--git", "-G", help="Initialize project as a Git repo"),
no_init: bool = Opt(False, "--no-init", "-NI", help="Don't initialize the project with DVC"),
verbose: bool = Opt(False, "--verbose", "-V", help="Show detailed information")
# fmt: on
):
"""Clone a project template from a repository."""
project_clone(
name, dest, repo=repo, git=git, no_init=no_init, verbose=verbose, silent=True
)
def project_clone(
name: str,
dest: Path,
*,
repo: str = about.__projects__,
git: bool = False,
no_init: bool = False,
silent: bool = False,
verbose: bool = False,
) -> None:
dest = ensure_path(dest)
check_clone_dest(dest)
# When cloning a subdirectory with DVC, it will create a folder of that name
# within the destination dir, so we use a tempdir and then copy it into the
# parent directory to create the cloned directory
dest = dest.resolve()
with make_tempdir() as tmp_dir:
cmd = ["dvc", "get", repo, name, "-o", str(tmp_dir)]
if verbose:
cmd.append("--verbose")
if silent:
cmd.append("--quiet")
print(" ".join(cmd))
run_command(cmd)
shutil.move(str(tmp_dir / Path(name).name), str(dest))
msg.good(f"Cloned project '{name}' from {repo}")
for sub_dir in DIRS:
dir_path = dest / sub_dir
if not dir_path.exists():
dir_path.mkdir(parents=True)
if not no_init:
project_init(dest, git=git, silent=silent)
msg.good(f"Your project is now ready!", dest)
print(f"To fetch the assets, run:\npython -m {NAME} project assets {dest}")
@project_cli.command("init")
def project_init_cli(
path: Path = Arg(..., help="Path to cloned project", exists=True, file_okay=False),
git: bool = Opt(False, "--git", "-G", help="Initialize project as a Git repo"),
):
"""Initialize a project directory with DVC and Git (optional). This should
typically be taken care of automatically when you run the "project clone"
command.
"""
project_init(path, git=git, silent=True)
def project_init(
dest: Path, *, git: bool = False, silent: bool = False, analytics: bool = False
):
with working_dir(dest):
# TODO: check that .dvc exists in other commands?
init_cmd = ["dvc", "init"]
if silent:
init_cmd.append("--quiet")
if not git:
init_cmd.append("--no-scm")
if git:
run_command(["git", "init"])
run_command(init_cmd)
if not analytics:
# TODO: find a better solution for this?
run_command(["dvc", "config", "core.analytics", "false"])
config = load_project_config(dest)
with msg.loading("Updating DVC config..."):
updated = update_dvc_config(dest, config, silent=True)
if updated:
msg.good(f"Updated DVC config from {CONFIG_FILE}")
@project_cli.command("assets")
def project_assets_cli(
# fmt: off
path: Path = Arg(..., help="Path to cloned project", exists=True, file_okay=False),
# fmt: on
):
"""Use Data Version Control to get the assets for the project."""
project_assets(path)
def project_assets(project_path: Path) -> None:
project_path = ensure_path(project_path)
config = load_project_config(project_path)
with msg.loading("Updating DVC config..."):
updated = update_dvc_config(project_path, config, silent=True)
if updated:
msg.good(f"Updated DVC config from changed {CONFIG_FILE}")
assets = config.get("assets", {})
if not assets:
msg.warn(f"No assets specified in {CONFIG_FILE}", exits=0)
msg.info(f"Fetching {len(assets)} asset(s)")
variables = config.get("variables", {})
for asset in assets:
url = asset["url"].format(**variables)
dest = asset["dest"].format(**variables)
fetch_asset(project_path, url, dest, asset.get("checksum"))
def fetch_asset(
project_path: Path, url: str, dest: Path, checksum: Optional[str] = None
) -> None:
check_asset(url)
dest_path = project_path / dest
if dest_path.exists() and checksum:
# If there's already a file, check for checksum
# TODO: add support for chaches
if checksum == get_checksum(dest_path):
msg.good(f"Skipping download with matching checksum: {dest}")
return
with working_dir(project_path):
try:
dvc_cmd = ["dvc", "get-url", url, str(dest_path)]
# If this fails, we don't want to output an error or info message
out = subprocess.check_output(dvc_cmd, stderr=subprocess.DEVNULL)
print(out)
except subprocess.CalledProcessError:
# TODO: Can we read out Weak ETags error?
# TODO: replace curl
run_command(["curl", url, "--output", str(dest_path)])
run_command(["dvc", "add", str(dest_path)])
msg.good(f"Fetched asset {dest}")
@project_cli.command(
"run-all",
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
def project_run_all_cli(
# fmt: off
ctx: typer.Context,
project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False),
show_help: bool = Opt(False, "--help", help="Show help message and available subcommands")
# fmt: on
):
"""Run all commands. Additional arguments are passed to dvc repro."""
if show_help:
print_run_help(project_dir)
else:
project_run_all(project_dir, *ctx.args)
def project_run_all(project_dir: Path, *dvc_args) -> None:
config = load_project_config(project_dir)
with msg.loading("Updating DVC config..."):
updated = update_dvc_config(project_dir, config, silent=True)
if updated:
msg.good(f"Updated DVC config from changed {CONFIG_FILE}")
dvc_cmd = ["dvc", "repro", *dvc_args]
with working_dir(project_dir):
run_command(dvc_cmd)
@project_cli.command(
"run", context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
def project_run_cli(
# fmt: off
ctx: typer.Context,
project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False),
subcommand: str = Arg(None, help="Name of command defined in project config"),
show_help: bool = Opt(False, "--help", help="Show help message and available subcommands")
# fmt: on
):
"""Run scripts defined in the project."""
if show_help or not subcommand:
print_run_help(project_dir, subcommand)
else:
project_run(project_dir, subcommand, *ctx.args)
def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
"""Simulate a CLI help prompt using the info available in the project config."""
config = load_project_config(project_dir)
config_commands = config.get("commands", [])
commands = {cmd["name"]: cmd for cmd in config_commands}
if subcommand:
if subcommand not in commands:
msg.fail(f"Can't find command '{subcommand}' in project config", exits=1)
print(f"Usage: {COMMAND} project run {project_dir} {subcommand}")
help_text = commands[subcommand].get("help")
if help_text:
msg.text(f"\n{help_text}\n")
else:
print(f"\nAvailable commands in {CONFIG_FILE}")
print(f"Usage: {COMMAND} project run {project_dir} [COMMAND]")
msg.table([(cmd["name"], cmd.get("help", "")) for cmd in config_commands])
def project_run(project_dir: Path, subcommand: str, *dvc_args) -> None:
config = load_project_config(project_dir)
with msg.loading("Updating DVC config..."):
updated = update_dvc_config(project_dir, config, silent=True)
if updated:
msg.good(f"Updated DVC config from changed {CONFIG_FILE}")
config_commands = config.get("commands", [])
variables = config.get("variables", {})
commands = {cmd["name"]: cmd for cmd in config_commands}
if subcommand not in commands:
msg.fail(f"Can't find command '{subcommand}' in project config", exits=1)
if subcommand in config.get("run", []):
# This is one of the pipeline commands tracked in DVC
dvc_cmd = ["dvc", "repro", subcommand, *dvc_args]
run_command(dvc_cmd)
else:
cmd = commands[subcommand]
# Deps in non-DVC commands aren't tracked, but if they're defined,
# make sure they exist before running the command
for dep in cmd.get("deps", []):
if not (project_dir / dep).exists():
err = f"Missing dependency specified by command '{subcommand}': {dep}"
msg.fail(err, exits=1)
with working_dir(project_dir):
run_commands(cmd["script"], variables)
@project_cli.command("exec")
def project_exec_cli(
# fmt: off
project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False),
subcommand: str = Arg(..., help="Name of command defined in project config"),
# fmt: on
):
"""Internals"""
project_exec(project_dir, subcommand)
def project_exec(project_dir: Path, subcommand: str):
config = load_project_config(project_dir)
config_commands = config.get("commands", [])
variables = config.get("variables", {})
commands = {cmd["name"]: cmd for cmd in config_commands}
with working_dir(project_dir):
run_commands(commands[subcommand]["script"], variables)
@project_cli.command("update-dvc")
def project_update_dvc_cli(
# fmt: off
project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False),
verbose: bool = Opt(False, "--verbose", "-V", help="Print more info"),
force: bool = Opt(False, "--force", "-F", help="Force update DVC config"),
# fmt: on
):
config = load_project_config(project_dir)
updated = update_dvc_config(project_dir, config, verbose=verbose, force=force)
if updated:
msg.good(f"Updated DVC config from {CONFIG_FILE}")
else:
msg.info(f"No changes found in {CONFIG_FILE}, no update needed")
app.add_typer(project_cli, name="project")
def load_project_config(path: Path) -> Dict[str, Any]:
config_path = path / CONFIG_FILE
if not config_path.exists():
msg.fail("Can't find project config", config_path, exits=1)
config = srsly.read_yaml(config_path)
errors = validate(ProjectConfigSchema, config)
if errors:
msg.fail(f"Invalid project config in {CONFIG_FILE}", "\n".join(errors), exits=1)
return config
def update_dvc_config(
path: Path,
config: Dict[str, Any],
verbose: bool = False,
silent: bool = False,
force: bool = False,
) -> bool:
"""Re-run the DVC commands in dry mode and update dvc.yml file in the
project directory. The file is auto-generated based on the config.
"""
config_hash = get_hash(config)
dvc_config_path = path / DVC_CONFIG
if dvc_config_path.exists():
# Cneck if the file was generated using the current config, if not, redo
with dvc_config_path.open("r", encoding="utf8") as f:
ref_hash = f.readline().strip().replace("# ", "")
if ref_hash == config_hash and not force:
return False # Nothing has changed in project config, don't need to update
dvc_config_path.unlink()
variables = config.get("variables", {})
commands = []
# We only want to include commands that are part of the main list of "run"
# commands in project.yml and should be run in sequence
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
for name in config.get("run", []):
if name not in config_commands:
msg.fail(f"Can't find command '{name}' in project config", exits=1)
command = config_commands[name]
deps = command.get("deps", [])
outputs = command.get("outputs", [])
outputs_no_cache = command.get("outputs_no_cache", [])
if not deps and not outputs and not outputs_no_cache:
continue
# Default to "." as the project path since dvc.yaml is auto-generated
# and we don't want arbitrary paths in there
project_cmd = ["python", "-m", NAME, "project", "exec", ".", name]
deps_cmd = [c for cl in [["-d", p] for p in deps] for c in cl]
outputs_cmd = [c for cl in [["-o", p] for p in outputs] for c in cl]
outputs_nc_cmd = [c for cl in [["-O", p] for p in outputs_no_cache] for c in cl]
dvc_cmd = ["dvc", "run", "-n", name, "-w", str(path), "--no-exec"]
if verbose:
dvc_cmd.append("--verbose")
if silent:
dvc_cmd.append("--quiet")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
commands.append(" ".join(full_cmd))
with working_dir(path):
run_commands(commands, variables, silent=True)
with dvc_config_path.open("r+", encoding="utf8") as f:
content = f.read()
f.seek(0, 0)
f.write(f"# {config_hash}\n{DVC_CONFIG_COMMENT}\n{content}")
return True
def run_commands(
commands: List[str] = tuple(), variables: Dict[str, str] = {}, silent: bool = False
) -> None:
for command in commands:
# Substitute variables, e.g. "./{NAME}.json"
command = command.format(**variables)
command = shlex.split(command)
# TODO: is this needed / a good idea?
if len(command) and command[0] == "python":
command[0] = sys.executable
if not silent:
print(" ".join(command))
run_command(command)
def check_asset(url: str) -> None:
# If the asset URL is a regular GitHub URL it's likely a mistake
# TODO: support loading from GitHub URLs? Automatically convert to raw?
if re.match("(http(s?)):\/\/github.com", url):
msg.warn(
"Downloading from a regular GitHub URL. This will only download "
"the source of the page, not the actual file. If you want to "
"download the raw file, click on 'Download' on the GitHub page "
"and copy the raw.githubusercontent.com URL instead."
)
# url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/").replace("/tree/", "/")
def check_clone_dest(dest: Path) -> None:
"""Check and validate that the destination path can be used to clone."""
if not dest:
msg.fail(f"Not a valid directory to clone project: {dest}", exits=1)
if dest.exists():
# Directory already exists (not allowed, clone needs to create it)
msg.fail(f"Can't clone project, directory already exists: {dest}", exits=1)
if not dest.parent.exists():
# We're not creating parents, parent dir should exist
msg.fail(
f"Can't clone project, parent directory doesn't exist: {dest.parent}",
exits=1,
)
def get_hash(data) -> str:
return str(murmurhash.hash(srsly.json_dumps(data, sort_keys=True)))
def get_checksum(path: Path) -> str:
return hashlib.md5(path.read_bytes()).hexdigest()