spaCy/spacy/cli/project/assets.py

155 lines
5.5 KiB
Python
Raw Normal View History

from typing import Optional
from pathlib import Path
from wasabi import msg
import requests
import tqdm
import re
import shutil
from ...util import ensure_path, get_checksum, working_dir
from .._app import project_cli, Arg
from .util import PROJECT_FILE, load_project_config
# TODO: find a solution for caches
# CACHES = [
# Path.home() / ".torch",
# Path.home() / ".caches" / "torch",
# os.environ.get("TORCH_HOME"),
# Path.home() / ".keras",
# ]
@project_cli.command("assets")
def project_assets_cli(
# fmt: off
project_dir: Path = Arg(Path.cwd(), help="Path to cloned project. Defaults to current working directory.", exists=True, file_okay=False),
# fmt: on
):
"""Fetch project assets like datasets and pretrained weights. Assets are
defined in the "assets" section of the project.yml. If a checksum is
provided in the project.yml, the file is only downloaded if no local file
with the same checksum exists.
"""
project_assets(project_dir)
def project_assets(project_dir: Path) -> None:
"""Fetch assets for a project using DVC if possible.
project_dir (Path): Path to project directory.
"""
project_path = ensure_path(project_dir)
config = load_project_config(project_path)
assets = config.get("assets", {})
if not assets:
msg.warn(f"No assets specified in {PROJECT_FILE}", exits=0)
msg.info(f"Fetching {len(assets)} asset(s)")
variables = config.get("variables", {})
for asset in assets:
dest = asset["dest"].format(**variables)
url = asset.get("url")
checksum = asset.get("checksum")
if not url:
# project.yml defines asset without URL that the user has to place
check_private_asset(dest, checksum)
continue
url = url.format(**variables)
fetch_asset(project_path, url, dest, checksum)
def check_private_asset(dest: Path, checksum: Optional[str] = None) -> None:
"""Check and validate assets without a URL (private assets that the user
has to provide themselves) and give feedback about the checksum.
dest (Path): Desintation path of the asset.
checksum (Optional[str]): Optional checksum of the expected file.
"""
if not Path(dest).exists():
err = f"No URL provided for asset. You need to add this file yourself: {dest}"
msg.warn(err)
else:
if checksum and checksum == get_checksum(dest):
msg.good(f"Asset exists with matching checksum: {dest}")
else:
msg.fail(f"Asset available but with incorrect checksum: {dest}")
def fetch_asset(
project_path: Path, url: str, dest: Path, checksum: Optional[str] = None
) -> None:
"""Fetch an asset from a given URL or path. If a checksum is provided and a
local file exists, it's only re-downloaded if the checksum doesn't match.
project_path (Path): Path to project directory.
url (str): URL or path to asset.
checksum (Optional[str]): Optional expected checksum of local file.
RETURNS (Optional[Path]): The path to the fetched asset or None if fetching
the asset failed.
"""
# TODO: add support for caches
dest_path = (project_path / dest).resolve()
if dest_path.exists() and checksum:
# If there's already a file, check for checksum
if checksum == get_checksum(dest_path):
msg.good(f"Skipping download with matching checksum: {dest}")
return dest_path
with working_dir(project_path):
url = convert_asset_url(url)
try:
download_file(url, dest_path)
msg.good(f"Downloaded asset {dest}")
except requests.exceptions.RequestException as e:
if Path(url).exists() and Path(url).is_file():
# If it's a local file, copy to destination
shutil.copy(url, str(dest_path))
msg.good(f"Copied local asset {dest}")
else:
msg.fail(f"Download failed: {dest}", e)
return
if checksum and checksum != get_checksum(dest_path):
msg.fail(f"Checksum doesn't match value defined in {PROJECT_FILE}: {dest}")
def convert_asset_url(url: str) -> str:
"""Check and convert the asset URL if needed.
url (str): The asset URL.
RETURNS (str): The converted URL.
"""
# If the asset URL is a regular GitHub URL it's likely a mistake
if re.match(r"(http(s?)):\/\/github.com", url):
converted = url.replace("github.com", "raw.githubusercontent.com")
converted = re.sub(r"/(tree|blob)/", "/", converted)
msg.warn(
"Downloading from a regular GitHub URL. This will only download "
"the source of the page, not the actual file. Converting the URL "
"to a raw URL.",
converted,
)
return converted
return url
def download_file(url: str, dest: Path, chunk_size: int = 1024) -> None:
"""Download a file using requests.
url (str): The URL of the file.
dest (Path): The destination path.
chunk_size (int): The size of chunks to read/write.
"""
response = requests.get(url, stream=True)
response.raise_for_status()
total = int(response.headers.get("content-length", 0))
progress_settings = {
"total": total,
"unit": "iB",
"unit_scale": True,
"unit_divisor": chunk_size,
"leave": False,
}
with dest.open("wb") as f, tqdm.tqdm(**progress_settings) as bar:
for data in response.iter_content(chunk_size=chunk_size):
size = f.write(data)
bar.update(size)