spaCy/spacy/cli/project/dvc.py

221 lines
8.4 KiB
Python

"""This module contains helpers and subcommands for integrating spaCy projects
with Data Version Controk (DVC). https://dvc.org"""
import subprocess
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from wasabi import msg
from ...util import (
SimpleFrozenList,
join_command,
run_command,
split_command,
working_dir,
)
from .._util import (
COMMAND,
NAME,
PROJECT_FILE,
Arg,
Opt,
get_hash,
load_project_config,
project_cli,
)
DVC_CONFIG = "dvc.yaml"
DVC_DIR = ".dvc"
UPDATE_COMMAND = "dvc"
DVC_CONFIG_COMMENT = f"""# This file is auto-generated by spaCy based on your {PROJECT_FILE}. If you've
# edited your {PROJECT_FILE}, you can regenerate this file by running:
# {COMMAND} project {UPDATE_COMMAND}"""
@project_cli.command(UPDATE_COMMAND)
def project_update_dvc_cli(
# fmt: off
project_dir: Path = Arg(Path.cwd(), help="Location of project directory. Defaults to current working directory.", exists=True, file_okay=False),
workflow: Optional[str] = Arg(None, help=f"Name of workflow defined in {PROJECT_FILE}. Defaults to first workflow if not set."),
verbose: bool = Opt(False, "--verbose", "-V", help="Print more info"),
quiet: bool = Opt(False, "--quiet", "-q", help="Print less info"),
force: bool = Opt(False, "--force", "-F", help="Force update DVC config"),
# fmt: on
):
"""Auto-generate Data Version Control (DVC) config. A DVC
project can only define one pipeline, so you need to specify one workflow
defined in the project.yml. If no workflow is specified, the first defined
workflow is used. The DVC config will only be updated if the project.yml
changed.
DOCS: https://spacy.io/api/cli#project-dvc
"""
project_update_dvc(project_dir, workflow, verbose=verbose, quiet=quiet, force=force)
def project_update_dvc(
project_dir: Path,
workflow: Optional[str] = None,
*,
verbose: bool = False,
quiet: bool = False,
force: bool = False,
) -> None:
"""Update the auto-generated Data Version Control (DVC) config file. A DVC
project can only define one pipeline, so you need to specify one workflow
defined in the project.yml. Will only update the file if the checksum changed.
project_dir (Path): The project directory.
workflow (Optional[str]): Optional name of workflow defined in project.yml.
If not set, the first workflow will be used.
verbose (bool): Print more info.
quiet (bool): Print less info.
force (bool): Force update DVC config.
"""
config = load_project_config(project_dir)
updated = update_dvc_config(
project_dir, config, workflow, verbose=verbose, quiet=quiet, force=force
)
help_msg = "To execute the workflow with DVC, run: dvc repro"
if updated:
msg.good(f"Updated DVC config from {PROJECT_FILE}", help_msg)
else:
msg.info(f"No changes found in {PROJECT_FILE}, no update needed", help_msg)
def update_dvc_config(
path: Path,
config: Dict[str, Any],
workflow: Optional[str] = None,
verbose: bool = False,
quiet: bool = False,
force: bool = False,
) -> bool:
"""Re-run the DVC commands in dry mode and update dvc.yaml file in the
project directory. The file is auto-generated based on the config. The
first line of the auto-generated file specifies the hash of the config
dict, so if any of the config values change, the DVC config is regenerated.
path (Path): The path to the project directory.
config (Dict[str, Any]): The loaded project.yml.
verbose (bool): Whether to print additional info (via DVC).
quiet (bool): Don't output anything (via DVC).
force (bool): Force update, even if hashes match.
RETURNS (bool): Whether the DVC config file was updated.
"""
ensure_dvc(path)
workflows = config.get("workflows", {})
workflow_names = list(workflows.keys())
check_workflows(workflow_names, workflow)
if not workflow:
workflow = workflow_names[0]
config_hash = get_hash(config)
path = path.resolve()
dvc_config_path = path / DVC_CONFIG
if dvc_config_path.exists():
# Check if the file was generated using the current config, if not, redo
with dvc_config_path.open("r", encoding="utf8") as f:
ref_hash = f.readline().strip().replace("# ", "")
if ref_hash == config_hash and not force:
return False # Nothing has changed in project.yml, don't need to update
dvc_config_path.unlink()
dvc_commands = []
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
# some flags that apply to every command
flags = []
if verbose:
flags.append("--verbose")
if quiet:
flags.append("--quiet")
for name in workflows[workflow]:
command = config_commands[name]
deps = command.get("deps", [])
outputs = command.get("outputs", [])
outputs_no_cache = command.get("outputs_no_cache", [])
if not deps and not outputs and not outputs_no_cache:
continue
# Default to the working dir as the project path since dvc.yaml is auto-generated
# and we don't want arbitrary paths in there
project_cmd = ["python", "-m", NAME, "project", "run", name]
deps_cmd = [c for cl in [["-d", p] for p in deps] for c in cl]
outputs_cmd = [c for cl in [["-o", p] for p in outputs] for c in cl]
outputs_nc_cmd = [c for cl in [["-O", p] for p in outputs_no_cache] for c in cl]
dvc_cmd = ["run", *flags, "-n", name, "-w", str(path), "--no-exec"]
if command.get("no_skip"):
dvc_cmd.append("--always-changed")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
dvc_commands.append(join_command(full_cmd))
if not dvc_commands:
# If we don't check for this, then there will be an error when reading the
# config, since DVC wouldn't create it.
msg.fail(
"No usable commands for DVC found. This can happen if none of your "
"commands have dependencies or outputs.",
exits=1,
)
with working_dir(path):
for c in dvc_commands:
dvc_command = "dvc " + c
run_command(dvc_command)
with dvc_config_path.open("r+", encoding="utf8") as f:
content = f.read()
f.seek(0, 0)
f.write(f"# {config_hash}\n{DVC_CONFIG_COMMENT}\n{content}")
return True
def check_workflows(workflows: List[str], workflow: Optional[str] = None) -> None:
"""Validate workflows provided in project.yml and check that a given
workflow can be used to generate a DVC config.
workflows (List[str]): Names of the available workflows.
workflow (Optional[str]): The name of the workflow to convert.
"""
if not workflows:
msg.fail(
f"No workflows defined in {PROJECT_FILE}. To generate a DVC config, "
f"define at least one list of commands.",
exits=1,
)
if workflow is not None and workflow not in workflows:
msg.fail(
f"Workflow '{workflow}' not defined in {PROJECT_FILE}. "
f"Available workflows: {', '.join(workflows)}",
exits=1,
)
if not workflow:
msg.warn(
f"No workflow specified for DVC pipeline. Using the first workflow "
f"defined in {PROJECT_FILE}: '{workflows[0]}'"
)
def ensure_dvc(project_dir: Path) -> None:
"""Ensure that the "dvc" command is available and that the current project
directory is an initialized DVC project.
"""
try:
subprocess.run(["dvc", "--version"], stdout=subprocess.DEVNULL)
except Exception:
msg.fail(
"To use spaCy projects with DVC (Data Version Control), DVC needs "
"to be installed and the 'dvc' command needs to be available",
"You can install the Python package from pip (pip install dvc) or "
"conda (conda install -c conda-forge dvc). For more details, see the "
"documentation: https://dvc.org/doc/install",
exits=1,
)
if not (project_dir / ".dvc").exists():
msg.fail(
"Project not initialized as a DVC project",
"To initialize a DVC project, you can run 'dvc init' in the project "
"directory. For more details, see the documentation: "
"https://dvc.org/doc/command-reference/init",
exits=1,
)