"""This module contains helpers and subcommands for integrating spaCy projects with Data Version Controk (DVC). https://dvc.org""" from typing import Dict, Any, List, Optional import subprocess from pathlib import Path from wasabi import msg from .util import PROJECT_FILE, load_project_config from .._app import project_cli, Arg, Opt, NAME, COMMAND from ...util import get_hash, working_dir, split_command, join_command, run_command DVC_CONFIG = "dvc.yaml" DVC_DIR = ".dvc" UPDATE_COMMAND = "dvc" DVC_CONFIG_COMMENT = f"""# This file is auto-generated by spaCy based on your {PROJECT_FILE}. If you've # edited your {PROJECT_FILE}, you can regenerate this file by running: # {COMMAND} project {UPDATE_COMMAND}""" @project_cli.command(UPDATE_COMMAND) def project_update_dvc_cli( # fmt: off project_dir: Path = Arg(Path.cwd(), help="Location of project directory. Defaults to current working directory.", exists=True, file_okay=False), workflow: Optional[str] = Arg(None, help=f"Name of workflow defined in {PROJECT_FILE}. Defaults to first workflow if not set."), verbose: bool = Opt(False, "--verbose", "-V", help="Print more info"), force: bool = Opt(False, "--force", "-F", help="Force update DVC config"), # fmt: on ): """Auto-generate Data Version Control (DVC) config. A DVC project can only define one pipeline, so you need to specify one workflow defined in the project.yml. If no workflow is specified, the first defined workflow is used. The DVC config will only be updated if """ project_update_dvc(project_dir, workflow, verbose=verbose, force=force) def project_update_dvc( project_dir: Path, workflow: Optional[str] = None, *, verbose: bool = False, force: bool = False, ) -> None: """Update the auto-generated Data Version Control (DVC) config file. A DVC project can only define one pipeline, so you need to specify one workflow defined in the project.yml. Will only update the file if the checksum changed. project_dir (Path): The project directory. workflow (Optional[str]): Optional name of workflow defined in project.yml. If not set, the first workflow will be used. verbose (bool): Print more info. force (bool): Force update DVC config. """ config = load_project_config(project_dir) updated = update_dvc_config( project_dir, config, workflow, verbose=verbose, force=force ) help_msg = "To execute the workflow with DVC, run: dvc repro" if updated: msg.good(f"Updated DVC config from {PROJECT_FILE}", help_msg) else: msg.info(f"No changes found in {PROJECT_FILE}, no update needed", help_msg) def update_dvc_config( path: Path, config: Dict[str, Any], workflow: Optional[str] = None, verbose: bool = False, silent: bool = False, force: bool = False, ) -> bool: """Re-run the DVC commands in dry mode and update dvc.yaml file in the project directory. The file is auto-generated based on the config. The first line of the auto-generated file specifies the hash of the config dict, so if any of the config values change, the DVC config is regenerated. path (Path): The path to the project directory. config (Dict[str, Any]): The loaded project.yml. verbose (bool): Whether to print additional info (via DVC). silent (bool): Don't output anything (via DVC). force (bool): Force update, even if hashes match. RETURNS (bool): Whether the DVC config file was updated. """ ensure_dvc(path) workflows = config.get("workflows", {}) workflow_names = list(workflows.keys()) check_workflows(workflow_names, workflow) if not workflow: workflow = workflow_names[0] config_hash = get_hash(config) path = path.resolve() dvc_config_path = path / DVC_CONFIG if dvc_config_path.exists(): # Check if the file was generated using the current config, if not, redo with dvc_config_path.open("r", encoding="utf8") as f: ref_hash = f.readline().strip().replace("# ", "") if ref_hash == config_hash and not force: return False # Nothing has changed in project.yml, don't need to update dvc_config_path.unlink() variables = config.get("variables", {}) dvc_commands = [] config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} for name in workflows[workflow]: command = config_commands[name] deps = command.get("deps", []) outputs = command.get("outputs", []) outputs_no_cache = command.get("outputs_no_cache", []) if not deps and not outputs and not outputs_no_cache: continue # Default to the working dir as the project path since dvc.yaml is auto-generated # and we don't want arbitrary paths in there project_cmd = ["python", "-m", NAME, "project", "run", name] deps_cmd = [c for cl in [["-d", p] for p in deps] for c in cl] outputs_cmd = [c for cl in [["-o", p] for p in outputs] for c in cl] outputs_nc_cmd = [c for cl in [["-O", p] for p in outputs_no_cache] for c in cl] dvc_cmd = ["run", "-n", name, "-w", str(path), "--no-exec"] full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd] dvc_commands.append(join_command(full_cmd)) with working_dir(path): dvc_flags = {"--verbose": verbose, "--quiet": silent} run_dvc_commands(dvc_commands, variables, flags=dvc_flags) with dvc_config_path.open("r+", encoding="utf8") as f: content = f.read() f.seek(0, 0) f.write(f"# {config_hash}\n{DVC_CONFIG_COMMENT}\n{content}") return True def run_dvc_commands( commands: List[str] = tuple(), variables: Dict[str, str] = {}, flags: Dict[str, bool] = {}, ) -> None: """Run a sequence of DVC commands in a subprocess, in order. commands (List[str]): The string commands without the leading "dvc". variables (Dict[str, str]): Dictionary of variable names, mapped to their values. Will be used to substitute format string variables in the commands. flags (Dict[str, bool]): Conditional flags to be added to command. Makes it easier to pass flags like --quiet that depend on a variable or command-line setting while avoiding lots of nested conditionals. """ for command in commands: # Substitute variables, e.g. "./{NAME}.json" command = command.format(**variables) command = split_command(command) dvc_command = ["dvc", *command] # Add the flags if they are set to True for flag, is_active in flags.items(): if is_active: dvc_command.append(flag) run_command(dvc_command) def check_workflows(workflows: List[str], workflow: Optional[str] = None) -> None: """Validate workflows provided in project.yml and check that a given workflow can be used to generate a DVC config. workflows (List[str]): Names of the available workflows. workflow (Optional[str]): The name of the workflow to convert. """ if not workflows: msg.fail( f"No workflows defined in {PROJECT_FILE}. To generate a DVC config, " f"define at least one list of commands.", exits=1, ) if workflow is not None and workflow not in workflows: msg.fail( f"Workflow '{workflow}' not defined in {PROJECT_FILE}. " f"Available workflows: {', '.join(workflows)}", exits=1, ) if not workflow: msg.warn( f"No workflow specified for DVC pipeline. Using the first workflow " f"defined in {PROJECT_FILE}: '{workflows[0]}'" ) def ensure_dvc(project_dir: Path) -> None: """Ensure that the "dvc" command is available and that the current project directory is an initialized DVC project. """ try: subprocess.run(["dvc", "--version"], stdout=subprocess.DEVNULL) except Exception: msg.fail( "To use spaCy projects with DVC (Data Version Control), DVC needs " "to be installed and the 'dvc' command needs to be available", "You can install the Python package from pip (pip install dvc) or " "conda (conda install -c conda-forge dvc). For more details, see the " "documentation: https://dvc.org/doc/install", exits=1, ) if not (project_dir / ".dvc").exists(): msg.fail( "Project not initialized as a DVC project", "To initialize a DVC project, you can run 'dvc init' in the project " "directory. For more details, see the documentation: " "https://dvc.org/doc/command-reference/init", exits=1, )