# Copyright The Lightning AI team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import glob import logging import os import pathlib import re import shutil import tarfile import tempfile import urllib.request from distutils.version import LooseVersion from itertools import chain from os.path import dirname, isfile from pathlib import Path from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union from pkg_resources import Requirement, parse_requirements, yield_lines REQUIREMENT_FILES = { "pytorch": ( "requirements/pytorch/base.txt", "requirements/pytorch/extra.txt", "requirements/pytorch/strategies.txt", "requirements/pytorch/examples.txt", ), "app": ( "requirements/app/app.txt", "requirements/app/cloud.txt", "requirements/app/ui.txt", ), "fabric": ( "requirements/fabric/base.txt", "requirements/fabric/strategies.txt", ), "data": ("requirements/data/data.txt",), } REQUIREMENT_FILES_ALL = list(chain(*REQUIREMENT_FILES.values())) _PROJECT_ROOT = os.path.dirname(os.path.dirname(__file__)) class _RequirementWithComment(Requirement): strict_string = "# strict" def __init__(self, *args: Any, comment: str = "", pip_argument: Optional[str] = None, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.comment = comment assert pip_argument is None or pip_argument # sanity check that it's not an empty str self.pip_argument = pip_argument self.strict = self.strict_string in comment.lower() def adjust(self, unfreeze: str) -> str: """Remove version restrictions unless they are strict. >>> _RequirementWithComment("arrow<=1.2.2,>=1.2.0", comment="# anything").adjust("none") 'arrow<=1.2.2,>=1.2.0' >>> _RequirementWithComment("arrow<=1.2.2,>=1.2.0", comment="# strict").adjust("none") 'arrow<=1.2.2,>=1.2.0 # strict' >>> _RequirementWithComment("arrow<=1.2.2,>=1.2.0", comment="# my name").adjust("all") 'arrow>=1.2.0' >>> _RequirementWithComment("arrow>=1.2.0, <=1.2.2", comment="# strict").adjust("all") 'arrow<=1.2.2,>=1.2.0 # strict' >>> _RequirementWithComment("arrow").adjust("all") 'arrow' >>> _RequirementWithComment("arrow>=1.2.0, <=1.2.2", comment="# cool").adjust("major") 'arrow<2.0,>=1.2.0' >>> _RequirementWithComment("arrow>=1.2.0, <=1.2.2", comment="# strict").adjust("major") 'arrow<=1.2.2,>=1.2.0 # strict' >>> _RequirementWithComment("arrow>=1.2.0").adjust("major") 'arrow>=1.2.0' >>> _RequirementWithComment("arrow").adjust("major") 'arrow' """ out = str(self) if self.strict: return f"{out} {self.strict_string}" if unfreeze == "major": for operator, version in self.specs: if operator in ("<", "<="): major = LooseVersion(version).version[0] # replace upper bound with major version increased by one return out.replace(f"{operator}{version}", f"<{major + 1}.0") elif unfreeze == "all": for operator, version in self.specs: if operator in ("<", "<="): # drop upper bound return out.replace(f"{operator}{version},", "") elif unfreeze != "none": raise ValueError(f"Unexpected unfreeze: {unfreeze!r} value.") return out def _parse_requirements(strs: Union[str, Iterable[str]]) -> Iterator[_RequirementWithComment]: """Adapted from `pkg_resources.parse_requirements` to include comments. >>> txt = ['# ignored', '', 'this # is an', '--piparg', 'example', 'foo # strict', 'thing', '-r different/file.txt'] >>> [r.adjust('none') for r in _parse_requirements(txt)] ['this', 'example', 'foo # strict', 'thing'] >>> txt = '\\n'.join(txt) >>> [r.adjust('none') for r in _parse_requirements(txt)] ['this', 'example', 'foo # strict', 'thing'] """ lines = yield_lines(strs) pip_argument = None for line in lines: # Drop comments -- a hash without a space may be in a URL. if " #" in line: comment_pos = line.find(" #") line, comment = line[:comment_pos], line[comment_pos:] else: comment = "" # If there is a line continuation, drop it, and append the next line. if line.endswith("\\"): line = line[:-2].strip() try: line += next(lines) except StopIteration: return # If there's a pip argument, save it if line.startswith("--"): pip_argument = line continue if line.startswith("-r "): # linked requirement files are unsupported continue yield _RequirementWithComment(line, comment=comment, pip_argument=pip_argument) pip_argument = None def load_requirements(path_dir: str, file_name: str = "base.txt", unfreeze: str = "all") -> List[str]: """Loading requirements from a file. >>> path_req = os.path.join(_PROJECT_ROOT, "requirements") >>> load_requirements(path_req, "docs.txt", unfreeze="major") # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE ['sphinx<...] """ assert unfreeze in {"none", "major", "all"} path = Path(path_dir) / file_name if not path.exists(): logging.warning(f"Folder {path_dir} does not have any base requirements.") return [] assert path.exists(), (path_dir, file_name, path) text = path.read_text() return [req.adjust(unfreeze) for req in _parse_requirements(text)] def load_readme_description(path_dir: str, homepage: str, version: str) -> str: """Load readme as decribtion. >>> load_readme_description(_PROJECT_ROOT, "", "") # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE '...PyTorch Lightning is just organized PyTorch...' """ path_readme = os.path.join(path_dir, "README.md") with open(path_readme, encoding="utf-8") as fo: text = fo.read() # drop images from readme text = text.replace( "![PT to PL](docs/source-pytorch/_static/images/general/pl_quick_start_full_compressed.gif)", "" ) # https://github.com/Lightning-AI/lightning/raw/master/docs/source/_static/images/lightning_module/pt_to_pl.png github_source_url = os.path.join(homepage, "raw", version) # replace relative repository path to absolute link to the release # do not replace all "docs" as in the readme we reger some other sources with particular path to docs text = text.replace( "docs/source-pytorch/_static/", f"{os.path.join(github_source_url, 'docs/source-app/_static/')}" ) # readthedocs badge text = text.replace("badge/?version=stable", f"badge/?version={version}") text = text.replace("pytorch-lightning.readthedocs.io/en/stable/", f"pytorch-lightning.readthedocs.io/en/{version}") # codecov badge text = text.replace("/branch/master/graph/badge.svg", f"/release/{version}/graph/badge.svg") # github actions badge text = text.replace("badge.svg?branch=master&event=push", f"badge.svg?tag={version}") # azure pipelines badge text = text.replace("?branchName=master", f"?branchName=refs%2Ftags%2F{version}") skip_begin = r"" skip_end = r"" # todo: wrap content as commented description return re.sub(rf"{skip_begin}.+?{skip_end}", "", text, flags=re.IGNORECASE + re.DOTALL) # # https://github.com/Borda/pytorch-lightning/releases/download/1.1.0a6/codecov_badge.png # github_release_url = os.path.join(homepage, "releases", "download", version) # # download badge and replace url with local file # text = _parse_for_badge(text, github_release_url) def distribute_version(src_folder: str, ver_file: str = "version.info") -> None: """Copy the global version to all packages.""" ls_ver = glob.glob(os.path.join(src_folder, "*", "__version__.py")) ver_template = os.path.join(src_folder, ver_file) for fpath in ls_ver: fpath = os.path.join(os.path.dirname(fpath), ver_file) print("Distributing the version to", fpath) if os.path.isfile(fpath): os.remove(fpath) shutil.copy2(ver_template, fpath) def _download_frontend(pkg_path: str, version: str = "v0.0.0"): """Downloads an archive file for a specific release of the Lightning frontend and extracts it to the correct directory.""" try: frontend_dir = pathlib.Path(pkg_path, "ui") download_dir = tempfile.mkdtemp() shutil.rmtree(frontend_dir, ignore_errors=True) # TODO: remove this once lightning-ui package is ready as a dependency frontend_release_url = f"https://lightning-packages.s3.amazonaws.com/ui/{version}.tar.gz" response = urllib.request.urlopen(frontend_release_url) file = tarfile.open(fileobj=response, mode="r|gz") file.extractall(path=download_dir) # noqa: S202 shutil.move(download_dir, frontend_dir) print("The Lightning UI has successfully been downloaded!") # If installing from source without internet connection, we don't want to break the installation except Exception: print("The Lightning UI downloading has failed!") def _load_aggregate_requirements(req_dir: str = "requirements", freeze_requirements: bool = False) -> None: """Load all base requirements from all particular packages and prune duplicates. >>> _load_aggregate_requirements(os.path.join(_PROJECT_ROOT, "requirements")) """ requires = [ load_requirements(d, unfreeze="none" if freeze_requirements else "major") for d in glob.glob(os.path.join(req_dir, "*")) # skip empty folder (git artifacts), and resolving Will's special issue if os.path.isdir(d) and len(glob.glob(os.path.join(d, "*"))) > 0 and not os.path.basename(d).startswith("_") ] if not requires: return # TODO: add some smarter version aggregation per each package requires = sorted(set(chain(*requires))) with open(os.path.join(req_dir, "base.txt"), "w") as fp: fp.writelines([ln + os.linesep for ln in requires] + [os.linesep]) def _retrieve_files(directory: str, *ext: str) -> List[str]: all_files = [] for root, _, files in os.walk(directory): for fname in files: if not ext or any(os.path.split(fname)[1].lower().endswith(e) for e in ext): all_files.append(os.path.join(root, fname)) return all_files def _replace_imports(lines: List[str], mapping: List[Tuple[str, str]], lightning_by: str = "") -> List[str]: """Replace imports of standalone package to lightning. >>> lns = [ ... '"lightning_app"', ... "lightning_app", ... "lightning_app/", ... "delete_cloud_lightning_apps", ... "from lightning_app import", ... "lightning_apps = []", ... "lightning_app and pytorch_lightning are ours", ... "def _lightning_app():", ... ":class:`~lightning_app.core.flow.LightningFlow`", ... "http://pytorch_lightning.ai", ... "from lightning import __version__", ... "@lightning.ai" ... ] >>> mapping = [("lightning_app", "lightning.app"), ("pytorch_lightning", "lightning.pytorch")] >>> _replace_imports(lns, mapping, lightning_by="lightning_fabric") # doctest: +NORMALIZE_WHITESPACE ['"lightning.app"', \ 'lightning.app', \ 'lightning_app/', \ 'delete_cloud_lightning_apps', \ 'from lightning.app import', \ 'lightning_apps = []', \ 'lightning.app and lightning.pytorch are ours', \ 'def _lightning_app():', \ ':class:`~lightning.app.core.flow.LightningFlow`', \ 'http://pytorch_lightning.ai', \ 'from lightning_fabric import __version__', \ '@lightning.ai'] """ out = lines[:] for source_import, target_import in mapping: for i, ln in enumerate(out): out[i] = re.sub( rf"([^_/@]|^){source_import}([^_\w/]|$)", rf"\1{target_import}\2", ln, ) if lightning_by: # in addition, replace base package out[i] = out[i].replace("from lightning import ", f"from {lightning_by} import ") out[i] = out[i].replace("import lightning ", f"import {lightning_by} ") return out def copy_replace_imports( source_dir: str, source_imports: Sequence[str], target_imports: Sequence[str], target_dir: Optional[str] = None, lightning_by: str = "", ) -> None: """Copy package content with import adjustments.""" print(f"Replacing imports: {locals()}") assert len(source_imports) == len(target_imports), ( "source and target imports must have the same length, " f"source: {len(source_imports)}, target: {len(target_imports)}" ) if target_dir is None: target_dir = source_dir ls = _retrieve_files(source_dir) for fp in ls: fp_new = fp.replace(source_dir, target_dir) _, ext = os.path.splitext(fp) if ext in (".png", ".jpg", ".ico"): os.makedirs(dirname(fp_new), exist_ok=True) if not isfile(fp_new): shutil.copy(fp, fp_new) continue if ext in (".pyc",): continue # Try to parse everything else with open(fp, encoding="utf-8") as fo: try: lines = fo.readlines() except UnicodeDecodeError: # a binary file, skip print(f"Skipped replacing imports for {fp}") continue lines = _replace_imports(lines, list(zip(source_imports, target_imports)), lightning_by=lightning_by) os.makedirs(os.path.dirname(fp_new), exist_ok=True) with open(fp_new, "w", encoding="utf-8") as fo: fo.writelines(lines) def create_mirror_package(source_dir: str, package_mapping: Dict[str, str]) -> None: # replace imports and copy the code mapping = package_mapping.copy() mapping.pop("lightning", None) # pop this key to avoid replacing `lightning` to `lightning.lightning` mapping = {f"lightning.{sp}": sl for sp, sl in mapping.items()} for pkg_from, pkg_to in mapping.items(): copy_replace_imports( source_dir=os.path.join(source_dir, pkg_from.replace(".", os.sep)), # pytorch_lightning uses lightning_fabric, so we need to replace all imports for all directories source_imports=mapping.keys(), target_imports=mapping.values(), target_dir=os.path.join(source_dir, pkg_to.replace(".", os.sep)), lightning_by=pkg_from, ) class AssistantCLI: @staticmethod def requirements_prune_pkgs(packages: Sequence[str], req_files: Sequence[str] = REQUIREMENT_FILES_ALL) -> None: """Remove some packages from given requirement files.""" if isinstance(req_files, str): req_files = [req_files] for req in req_files: AssistantCLI._prune_packages(req, packages) @staticmethod def _prune_packages(req_file: str, packages: Sequence[str]) -> None: """Remove some packages from given requirement files.""" path = Path(req_file) assert path.exists() text = path.read_text() lines = text.splitlines() final = [] for line in lines: ln_ = line.strip() if not ln_ or ln_.startswith("#"): final.append(line) continue req = list(parse_requirements(ln_))[0] if req.name not in packages: final.append(line) print(final) path.write_text("\n".join(final) + "\n") @staticmethod def _replace_min(fname: str) -> None: with open(fname, encoding="utf-8") as fo: req = fo.read().replace(">=", "==") with open(fname, "w", encoding="utf-8") as fw: fw.write(req) @staticmethod def replace_oldest_ver(requirement_fnames: Sequence[str] = REQUIREMENT_FILES_ALL) -> None: """Replace the min package version by fixed one.""" for fname in requirement_fnames: print(fname) AssistantCLI._replace_min(fname) @staticmethod def copy_replace_imports( source_dir: str, source_import: str, target_import: str, target_dir: Optional[str] = None, lightning_by: str = "", ) -> None: """Copy package content with import adjustments.""" source_imports = source_import.strip().split(",") target_imports = target_import.strip().split(",") copy_replace_imports( source_dir, source_imports, target_imports, target_dir=target_dir, lightning_by=lightning_by ) @staticmethod def pull_docs_files( gh_user_repo: str, target_dir: str = "docs/source-pytorch/XXX", checkout: str = "refs/tags/1.0.0", source_dir: str = "docs/source", single_page: Optional[str] = None, as_orphan: bool = False, ) -> None: """Pull docs pages from external source and append to local docs. Args: gh_user_repo: standard GitHub user/repo string target_dir: relative location inside the docs folder checkout: specific tag or branch to checkout source_dir: relative location inside the remote / external repo single_page: copy only single page from the remote repo and name it as the repo name as_orphan: append orphan statement to the page """ import zipfile zip_url = f"https://github.com/{gh_user_repo}/archive/{checkout}.zip" with tempfile.TemporaryDirectory() as tmp: zip_file = os.path.join(tmp, "repo.zip") try: urllib.request.urlretrieve(zip_url, zip_file) except urllib.error.HTTPError: raise RuntimeError(f"Requesting file '{zip_url}' does not exist or it is just unavailable.") with zipfile.ZipFile(zip_file, "r") as zip_ref: zip_ref.extractall(tmp) # noqa: S202 zip_dirs = [d for d in glob.glob(os.path.join(tmp, "*")) if os.path.isdir(d)] # check that the extracted archive has only repo folder assert len(zip_dirs) == 1 repo_dir = zip_dirs[0] if single_page: # special case for copying single page single_page = os.path.join(repo_dir, source_dir, single_page) assert os.path.isfile(single_page), f"File '{single_page}' does not exist." name = re.sub(r"lightning[-_]?", "", gh_user_repo.split("/")[-1]) new_rst = os.path.join(_PROJECT_ROOT, target_dir, f"{name}.rst") AssistantCLI._copy_rst(single_page, new_rst, as_orphan=as_orphan) return # continue with copying all pages ls_pages = glob.glob(os.path.join(repo_dir, source_dir, "*.rst")) ls_pages += glob.glob(os.path.join(repo_dir, source_dir, "**", "*.rst")) for rst in ls_pages: rel_rst = rst.replace(os.path.join(repo_dir, source_dir) + os.path.sep, "") rel_dir = os.path.dirname(rel_rst) os.makedirs(os.path.join(_PROJECT_ROOT, target_dir, rel_dir), exist_ok=True) new_rst = os.path.join(_PROJECT_ROOT, target_dir, rel_rst) if os.path.isfile(new_rst): logging.warning(f"Page {new_rst} already exists in the local tree so it will be skipped.") continue AssistantCLI._copy_rst(rst, new_rst, as_orphan=as_orphan) @staticmethod def _copy_rst(rst_in, rst_out, as_orphan: bool = False): """Copy RST page with optional inserting orphan statement.""" with open(rst_in, encoding="utf-8") as fopen: page = fopen.read() if as_orphan and ":orphan:" not in page: page = ":orphan:\n\n" + page with open(rst_out, "w", encoding="utf-8") as fopen: fopen.write(page) if __name__ == "__main__": import jsonargparse jsonargparse.CLI(AssistantCLI, as_positional=False)