diff --git a/pyproject.toml b/pyproject.toml index 9a67020bf4..4eee3ca1d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,7 @@ module = [ "lightning.app.source_code.tar", "lightning.app.source_code.uploader", "lightning.app.storage.copier", + "lightning.app.storage.filesystem", "lightning.app.storage.drive", "lightning.app.storage.orchestrator", "lightning.app.storage.path", diff --git a/src/lightning/app/CHANGELOG.md b/src/lightning/app/CHANGELOG.md index b6b4420a8d..c69d06e7c0 100644 --- a/src/lightning/app/CHANGELOG.md +++ b/src/lightning/app/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added experimental support for interruptible GPU in the cloud ([#16399](https://github.com/Lightning-AI/lightning/pull/16399)) +- Added FileSystem abstraction to simply manipulation of files ([#16581](https://github.com/Lightning-AI/lightning/pull/16581)) ### Changed diff --git a/src/lightning/app/storage/__init__.py b/src/lightning/app/storage/__init__.py index 561969dec0..8ed541dd2d 100644 --- a/src/lightning/app/storage/__init__.py +++ b/src/lightning/app/storage/__init__.py @@ -1,4 +1,5 @@ from lightning.app.storage.drive import Drive # noqa: F401 +from lightning.app.storage.filesystem import FileSystem # noqa: F401 from lightning.app.storage.mount import Mount # noqa: F401 from lightning.app.storage.orchestrator import StorageOrchestrator # noqa: F401 from lightning.app.storage.path import Path # noqa: F401 diff --git a/src/lightning/app/storage/copier.py b/src/lightning/app/storage/copier.py index 1db043bc29..0e88df819e 100644 --- a/src/lightning/app/storage/copier.py +++ b/src/lightning/app/storage/copier.py @@ -19,6 +19,7 @@ from threading import Thread from time import time from typing import Optional, TYPE_CHECKING, Union +from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem from lightning.app.core.queues import BaseQueue @@ -103,7 +104,11 @@ def _find_matching_path(work, request: _GetRequest) -> Optional["lightning.app.s return candidate -def _copy_files(source_path: pathlib.Path, destination_path: pathlib.Path) -> None: +def _copy_files( + source_path: pathlib.Path, + destination_path: pathlib.Path, + fs: Optional[AbstractFileSystem] = None, +) -> None: """Copy files from one path to another. The source path must either be an existing file or folder. If the source is a folder, the destination path is @@ -111,7 +116,8 @@ def _copy_files(source_path: pathlib.Path, destination_path: pathlib.Path) -> No Files in a folder are copied recursively and efficiently using multiple threads. """ - fs = _filesystem() + if fs is None: + fs = _filesystem() def _copy(from_path: pathlib.Path, to_path: pathlib.Path) -> Optional[Exception]: _logger.debug(f"Copying {str(from_path)} -> {str(to_path)}") diff --git a/src/lightning/app/storage/filesystem.py b/src/lightning/app/storage/filesystem.py new file mode 100644 index 0000000000..eba0612f14 --- /dev/null +++ b/src/lightning/app/storage/filesystem.py @@ -0,0 +1,158 @@ +import os +import shutil +from pathlib import Path +from typing import Callable, List + +from fsspec.implementations.local import LocalFileSystem + +from lightning.app.storage.copier import _copy_files +from lightning.app.storage.path import _filesystem, _shared_storage_path + + +def _get_files(fs, src: Path, dst: Path, overwrite: bool = True): + dst = dst.resolve() + if fs.isdir(src): + if isinstance(fs, LocalFileSystem): + dst = dst.resolve() + if fs.exists(dst): + if overwrite: + fs.rm(str(dst), recursive=True) + else: + raise FileExistsError(f"The file {dst} was found. Add get(..., overwrite=True) to replace it.") + + shutil.copytree(src, dst) + else: + glob = f"{str(src)}/**" + fs.get(glob, str(dst), recursive=False) + else: + fs.get(str(src), str(dst), recursive=False) + + +class FileSystem: + + """This filesystem enables to easily move files from and to the shared storage.""" + + def __init__(self) -> None: + self._fs = _filesystem() + self._root = str(_shared_storage_path()) + + def put(self, src_path: str, dst_path: str, put_fn: Callable = _copy_files) -> None: + """This method enables to put a file to the shared storage in a blocking fashion. + + Arguments: + src_path: The path to your files locally + dst_path: The path to your files transfered in the shared storage. + put_fn: The method to use to put files in the shared storage. + """ + if not os.path.exists(Path(src_path).resolve()): + raise FileExistsError(f"The provided path {src_path} doesn't exist") + + if not dst_path.startswith("/"): + raise Exception(f"The provided destination {dst_path} needs to start with `/`.") + + src = Path(src_path).resolve() + dst = Path(os.path.join(self._root, dst_path[1:])).resolve() + + return put_fn(src, dst, fs=self._fs) + + def get(self, src_path: str, dst_path: str, overwrite: bool = True, get_fn: Callable = _get_files) -> None: + """This method enables to get files from the shared storage in a blocking fashion. + + Arguments: + src_path: The path to your files in the shared storage + dst_path: The path to your files transfered locally + get_fn: The method to use to put files in the shared storage. + """ + if not src_path.startswith("/"): + raise Exception(f"The provided destination {src_path} needs to start with `/`.") + + src = Path(os.path.join(self._root, src_path[1:])).resolve() + dst = Path(dst_path).resolve() + + return get_fn(fs=self._fs, src=src, dst=dst, overwrite=overwrite) + + def listdir(self, path: str) -> List[str]: + """This method enables to list files from the shared storage in a blocking fashion. + + Arguments: + path: The path to files to list. + """ + if not path.startswith("/"): + raise Exception(f"The provided destination {path} needs to start with `/`.") + + shared_path = Path(os.path.join(self._root, path[1:])).resolve() + + if not self._fs.exists(shared_path): + raise RuntimeError(f"The provided path {shared_path} doesn't exist.") + + # Invalidate cache before running ls in case new directories have been added + # TODO: Re-evaluate this - may lead to performance issues + self._fs.invalidate_cache() + + paths = self._fs.ls(shared_path) + if not paths: + return paths + + return sorted([path.replace(self._root + os.sep, "") for path in paths if not path.endswith("info.txt")]) + + def walk(self, path: str) -> List[str]: + """This method enables to list files from the shared storage in a blocking fashion. + + Arguments: + path: The path to files to list. + """ + if not path.startswith("/"): + raise Exception(f"The provided destination {path} needs to start with `/`.") + + shared_path = Path(os.path.join(self._root, path[1:])).resolve() + + if not self._fs.exists(shared_path): + raise RuntimeError(f"The provided path {shared_path} doesn't exist.") + + # Invalidate cache before running ls in case new directories have been added + # TODO: Re-evaluate this - may lead to performance issues + self._fs.invalidate_cache() + + paths = self._fs.ls(shared_path) + if not paths: + return paths + + out = [] + + for shared_path in paths: + path = str(shared_path).replace(self._root, "") + if self._fs.isdir(shared_path): + out.extend(self.walk(path)) + else: + if path.endswith("info.txt"): + continue + out.append(path[1:]) + return sorted(out) + + def rm(self, path) -> None: + if not path.startswith("/"): + raise Exception(f"The provided destination {path} needs to start with `/`.") + + delete_path = Path(os.path.join(self._root, path[1:])).resolve() + + if self._fs.exists(str(delete_path)): + if self._fs.isdir(str(delete_path)): + self._fs.rmdir(str(delete_path)) + else: + self._fs.rm(str(delete_path)) + else: + raise Exception(f"The file path {path} doesn't exist.") + + def isfile(self, path: str) -> bool: + if not path.startswith("/"): + raise Exception(f"The provided destination {path} needs to start with `/`.") + + path = Path(os.path.join(self._root, path[1:])).resolve() + return self._fs.isfile(path) + + def isdir(self, path: str) -> bool: + if not path.startswith("/"): + raise Exception(f"The provided destination {path} needs to start with `/`.") + + path = Path(os.path.join(self._root, path[1:])).resolve() + return self._fs.isdir(path) diff --git a/tests/tests_app/storage/test_filesystem.py b/tests/tests_app/storage/test_filesystem.py new file mode 100644 index 0000000000..90f74fca77 --- /dev/null +++ b/tests/tests_app/storage/test_filesystem.py @@ -0,0 +1,59 @@ +import os +import sys + +import pytest + +from lightning.app.storage import FileSystem + + +@pytest.mark.skipif(sys.platform == "win32", reason="TODO: Add support for windows") +def test_filesystem(tmpdir): + fs = FileSystem() + + with open(f"{tmpdir}/a.txt", "w") as f: + f.write("example") + + os.makedirs(f"{tmpdir}/checkpoints", exist_ok=True) + with open(f"{tmpdir}/checkpoints/a.txt", "w") as f: + f.write("example") + + with open(f"{tmpdir}/info.txt", "w") as f: + f.write("example") + + assert fs.listdir("/") == [] + fs.put(f"{tmpdir}/a.txt", "/a.txt") + fs.put(f"{tmpdir}/info.txt", "/info.txt") + assert fs.listdir("/") == ["a.txt"] + + assert fs.isfile("/a.txt") + + fs.put(f"{tmpdir}/checkpoints", "/checkpoints") + assert not fs.isfile("/checkpoints") + assert fs.isdir("/checkpoints") + assert fs.isfile("/checkpoints/a.txt") + + assert fs.listdir("/") == ["a.txt", "checkpoints"] + assert fs.walk("/") == ["a.txt", "checkpoints/a.txt"] + + os.remove(f"{tmpdir}/a.txt") + + assert not os.path.exists(f"{tmpdir}/a.txt") + + fs.get("/a.txt", f"{tmpdir}/a.txt") + + assert os.path.exists(f"{tmpdir}/a.txt") + + fs.rm("/a.txt") + + assert fs.listdir("/") == ["checkpoints"] + fs.rm("/checkpoints/a.txt") + assert fs.listdir("/") == ["checkpoints"] + assert fs.walk("/checkpoints") == [] + fs.rm("/checkpoints/") + assert fs.listdir("/") == [] + + with pytest.raises(FileExistsError, match="HERE"): + fs.put("HERE", "/HERE") + + with pytest.raises(RuntimeError, match="The provided path"): + fs.listdir("/space")