diff --git a/.azure/gpu-tests-pytorch.yml b/.azure/gpu-tests-pytorch.yml index 78c0850b76..df085fd8a3 100644 --- a/.azure/gpu-tests-pytorch.yml +++ b/.azure/gpu-tests-pytorch.yml @@ -112,8 +112,6 @@ jobs: - bash: | set -e - CUDA_VERSION_BAGUA=$(python -c "print([ver for ver in [116,113,111,102] if $CUDA_VERSION_MM >= ver][0])") - pip install "bagua-cuda$CUDA_VERSION_BAGUA" CUDA_VERSION_MM_COLOSSALAI=$(python -c "import torch ; print(''.join(map(str, torch.version.cuda)))") CUDA_VERSION_COLOSSALAI=$(python -c "print([ver for ver in [11.3, 11.1] if $CUDA_VERSION_MM_COLOSSALAI >= ver][0])") diff --git a/dockers/base-cuda/Dockerfile b/dockers/base-cuda/Dockerfile index f4a272954c..1273776d0a 100644 --- a/dockers/base-cuda/Dockerfile +++ b/dockers/base-cuda/Dockerfile @@ -98,19 +98,6 @@ RUN \ pip install -r requirements/pytorch/base.txt --no-cache-dir --find-links https://download.pytorch.org/whl/cu${CUDA_VERSION_MM}/torch_stable.html && \ rm assistant.py - -RUN \ - # install Bagua - if [[ $PYTORCH_VERSION != "1.13" ]]; then \ - CUDA_VERSION_MM=$(python -c "print(''.join('$CUDA_VERSION'.split('.')[:2]))") ; \ - CUDA_VERSION_BAGUA=$(python -c "print([ver for ver in [116,113,111,102] if $CUDA_VERSION_MM >= ver][0])") ; \ - pip install "bagua-cuda$CUDA_VERSION_BAGUA" ; \ - if [[ "$CUDA_VERSION_MM" = "$CUDA_VERSION_BAGUA" ]]; then \ - python -c "import bagua_core; bagua_core.install_deps()"; \ - fi ; \ - python -c "import bagua; print(bagua.__version__)"; \ - fi - RUN \ # install ColossalAI # TODO: 1.13 wheels are not released, remove skip once they are diff --git a/docs/source-pytorch/accelerators/gpu_intermediate.rst b/docs/source-pytorch/accelerators/gpu_intermediate.rst index b8b5822c0a..6deca43653 100644 --- a/docs/source-pytorch/accelerators/gpu_intermediate.rst +++ b/docs/source-pytorch/accelerators/gpu_intermediate.rst @@ -25,7 +25,6 @@ Lightning supports multiple ways of doing distributed training. - Regular (``strategy='ddp'``) - Spawn (``strategy='ddp_spawn'``) - Notebook/Fork (``strategy='ddp_notebook'``) -- Bagua (``strategy='bagua'``) (multiple-gpus across many machines with advanced training algorithms) .. note:: If you request multiple GPUs or nodes without setting a mode, DDP Spawn will be automatically used. @@ -235,119 +234,6 @@ Comparison of DDP variants and tradeoffs - Fast -Bagua -^^^^^ -`Bagua `_ is a deep learning training acceleration framework which supports -multiple advanced distributed training algorithms including: - -- `Gradient AllReduce `_ for centralized synchronous communication, where gradients are averaged among all workers. -- `Decentralized SGD `_ for decentralized synchronous communication, where each worker exchanges data with one or a few specific workers. -- `ByteGrad `_ and `QAdam `_ for low precision communication, where data is compressed into low precision before communication. -- `Asynchronous Model Average `_ for asynchronous communication, where workers are not required to be synchronized in the same iteration in a lock-step style. - -By default, Bagua uses *Gradient AllReduce* algorithm, which is also the algorithm implemented in DDP, -but Bagua can usually produce a higher training throughput due to its backend written in Rust. - -.. code-block:: python - - # train on 4 GPUs (using Bagua mode) - trainer = Trainer(strategy="bagua", accelerator="gpu", devices=4) - - -By specifying the ``algorithm`` in the ``BaguaStrategy``, you can select more advanced training algorithms featured by Bagua: - - -.. code-block:: python - - # train on 4 GPUs, using Bagua Gradient AllReduce algorithm - trainer = Trainer( - strategy=BaguaStrategy(algorithm="gradient_allreduce"), - accelerator="gpu", - devices=4, - ) - - # train on 4 GPUs, using Bagua ByteGrad algorithm - trainer = Trainer( - strategy=BaguaStrategy(algorithm="bytegrad"), - accelerator="gpu", - devices=4, - ) - - # train on 4 GPUs, using Bagua Decentralized SGD - trainer = Trainer( - strategy=BaguaStrategy(algorithm="decentralized"), - accelerator="gpu", - devices=4, - ) - - # train on 4 GPUs, using Bagua Low Precision Decentralized SGD - trainer = Trainer( - strategy=BaguaStrategy(algorithm="low_precision_decentralized"), - accelerator="gpu", - devices=4, - ) - - # train on 4 GPUs, using Asynchronous Model Average algorithm, with a synchronization interval of 100ms - trainer = Trainer( - strategy=BaguaStrategy(algorithm="async", sync_interval_ms=100), - accelerator="gpu", - devices=4, - ) - -To use *QAdam*, we need to initialize -`QAdamOptimizer `_ first: - -.. code-block:: python - - from pytorch_lightning.strategies import BaguaStrategy - from bagua.torch_api.algorithms.q_adam import QAdamOptimizer - - - class MyModel(pl.LightningModule): - ... - - def configure_optimizers(self): - # initialize QAdam Optimizer - return QAdamOptimizer(self.parameters(), lr=0.05, warmup_steps=100) - - - model = MyModel() - trainer = Trainer( - accelerator="gpu", - devices=4, - strategy=BaguaStrategy(algorithm="qadam"), - ) - trainer.fit(model) - -Bagua relies on its own `launcher `_ to schedule jobs. -Below, find examples using ``bagua.distributed.launch`` which follows ``torch.distributed.launch`` API: - -.. code-block:: bash - - # start training with 8 GPUs on a single node - python -m bagua.distributed.launch --nproc_per_node=8 train.py - -If the ssh service is available with passwordless login on each node, you can launch the distributed job on a -single node with ``baguarun`` which has a similar syntax as ``mpirun``. When staring the job, ``baguarun`` will -automatically spawn new processes on each of your training node provided by ``--host_list`` option and each node in it -is described as an ip address followed by a ssh port. - -.. code-block:: bash - - # Run on node1 (or node2) to start training on two nodes (node1 and node2), 8 GPUs per node - baguarun --host_list hostname1:ssh_port1,hostname2:ssh_port2 --nproc_per_node=8 --master_port=port1 train.py - - -.. note:: You can also start training in the same way as Distributed Data Parallel. However, system optimizations like - `Bagua-Net `_ and - `Performance autotuning `_ can only be enabled through bagua - launcher. It is worth noting that with ``Bagua-Net``, Distributed Data Parallel can also achieve - better performance without modifying the training script. - - -See `Bagua Tutorials `_ for more details on installation and advanced features. - - DP caveats ^^^^^^^^^^ In DP each GPU within a machine sees a portion of a batch. diff --git a/docs/source-pytorch/api_references.rst b/docs/source-pytorch/api_references.rst index c30f4c2019..cd9007e581 100644 --- a/docs/source-pytorch/api_references.rst +++ b/docs/source-pytorch/api_references.rst @@ -213,7 +213,6 @@ strategies :nosignatures: :template: classtemplate.rst - BaguaStrategy ColossalAIStrategy DDPSpawnStrategy DDPStrategy diff --git a/docs/source-pytorch/extensions/strategy.rst b/docs/source-pytorch/extensions/strategy.rst index 7ca360b02f..c92de855c4 100644 --- a/docs/source-pytorch/extensions/strategy.rst +++ b/docs/source-pytorch/extensions/strategy.rst @@ -69,9 +69,6 @@ The below table lists all relevant strategies available in Lightning with their * - Name - Class - Description - * - bagua - - :class:`~pytorch_lightning.strategies.BaguaStrategy` - - Strategy for training using the Bagua library, with advanced distributed training algorithms and system optimizations. :ref:`Learn more. ` * - colossalai - :class:`~pytorch_lightning.strategies.ColossalAIStrategy` - Colossal-AI provides a collection of parallel components for you. It aims to support you to write your distributed deep learning models just like how you write your model on your laptop. `Learn more. `__ diff --git a/requirements/pytorch/check-avail-strategies.py b/requirements/pytorch/check-avail-strategies.py index 94bb9b924b..af7fee95cc 100644 --- a/requirements/pytorch/check-avail-strategies.py +++ b/requirements/pytorch/check-avail-strategies.py @@ -1,3 +1,2 @@ if __name__ == "__main__": - import bagua # noqa: F401 import deepspeed # noqa: F401 diff --git a/src/lightning/pytorch/plugins/environments/__init__.py b/src/lightning/pytorch/plugins/environments/__init__.py index 41141f5b25..7b0fc50c1d 100644 --- a/src/lightning/pytorch/plugins/environments/__init__.py +++ b/src/lightning/pytorch/plugins/environments/__init__.py @@ -21,4 +21,3 @@ from lightning.fabric.plugins.environments import ( # noqa: F401 TorchElasticEnvironment, XLAEnvironment, ) -from lightning.pytorch.plugins.environments.bagua_environment import BaguaEnvironment # noqa: F401 diff --git a/src/lightning/pytorch/plugins/environments/bagua_environment.py b/src/lightning/pytorch/plugins/environments/bagua_environment.py deleted file mode 100644 index d32522be8b..0000000000 --- a/src/lightning/pytorch/plugins/environments/bagua_environment.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os - -from lightning.fabric.plugins import ClusterEnvironment - -log = logging.getLogger(__name__) - - -class BaguaEnvironment(ClusterEnvironment): - """Environment for distributed training with `Bagua `_""" - - @property - def creates_processes_externally(self) -> bool: - return True - - @property - def main_address(self) -> str: - return os.environ.get("MASTER_ADDR", "127.0.0.1") - - @property - def main_port(self) -> int: - return int(os.environ.get("MASTER_PORT", -1)) - - @property - def service_port(self) -> int: - return int(os.environ.get("BAGUA_SERVICE_PORT", -1)) - - @staticmethod - def detect() -> bool: - return "BAGUA_SERVICE_PORT" in os.environ - - def world_size(self) -> int: - return int(os.environ["WORLD_SIZE"]) - - def set_world_size(self, size: int) -> None: - log.debug("`BaguaEnvironment.set_world_size` was called, but setting world size is not allowed. Ignored.") - - def global_rank(self) -> int: - return int(os.environ["RANK"]) - - def set_global_rank(self, rank: int) -> None: - log.debug("`BaguaEnvironment.set_global_rank` was called, but setting global rank is not allowed. Ignored.") - - def local_rank(self) -> int: - return int(os.environ.get("LOCAL_RANK", 0)) - - def node_rank(self) -> int: - return int(os.environ.get("NODE_RANK", 0)) diff --git a/src/lightning/pytorch/strategies/__init__.py b/src/lightning/pytorch/strategies/__init__.py index eb77a7302d..f4c21aa076 100644 --- a/src/lightning/pytorch/strategies/__init__.py +++ b/src/lightning/pytorch/strategies/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. from lightning.fabric.strategies.registry import _StrategyRegistry -from lightning.pytorch.strategies.bagua import BaguaStrategy # noqa: F401 from lightning.pytorch.strategies.colossalai import ColossalAIStrategy # noqa: F401 from lightning.pytorch.strategies.ddp import DDPStrategy # noqa: F401 from lightning.pytorch.strategies.ddp_spawn import DDPSpawnStrategy # noqa: F401 diff --git a/src/lightning/pytorch/strategies/bagua.py b/src/lightning/pytorch/strategies/bagua.py deleted file mode 100644 index 0ca3cdd72f..0000000000 --- a/src/lightning/pytorch/strategies/bagua.py +++ /dev/null @@ -1,295 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import os -from typing import Any, Dict, List, Optional, Union - -import torch -from lightning_utilities.core.imports import module_available -from torch import Tensor -from torch.nn import Module - -import lightning.pytorch as pl -from lightning.fabric.plugins import CheckpointIO, ClusterEnvironment -from lightning.fabric.utilities.optimizer import _optimizers_to_device -from lightning.fabric.utilities.seed import reset_seed -from lightning.fabric.utilities.types import ReduceOp -from lightning.pytorch.overrides.base import _LightningModuleWrapperBase, _LightningPrecisionModuleWrapperBase -from lightning.pytorch.plugins.precision import PrecisionPlugin -from lightning.pytorch.strategies.ddp import DDPStrategy -from lightning.pytorch.strategies.strategy import TBroadcast -from lightning.pytorch.trainer.states import TrainerFn -from lightning.pytorch.utilities.exceptions import MisconfigurationException - -_BAGUA_AVAILABLE = module_available("bagua.torch_api") - -if _BAGUA_AVAILABLE: - import bagua.torch_api as bagua - from bagua.torch_api.algorithms import Algorithm - from bagua.torch_api.algorithms.q_adam import QAdamOptimizer - from bagua.torch_api.communication import allreduce_inplace, barrier, broadcast_object, is_initialized - from bagua.torch_api.communication import ReduceOp as BaguaReduceOp - from bagua.torch_api.data_parallel.distributed import DistributedDataParallel_V1_9_0 as BaguaDistributedDataParallel - - # Convert a reduce op to its equivalent `bagua.torch_api.ReduceOp` - _bagua_reduce_ops = { - ReduceOp.SUM: BaguaReduceOp.SUM, - ReduceOp.PRODUCT: BaguaReduceOp.PRODUCT, - ReduceOp.MIN: BaguaReduceOp.MIN, - ReduceOp.MAX: BaguaReduceOp.MAX, - ReduceOp.BAND: BaguaReduceOp.BAND, - ReduceOp.BOR: BaguaReduceOp.BOR, - ReduceOp.BXOR: BaguaReduceOp.BXOR, - "avg": BaguaReduceOp.AVG, - "mean": BaguaReduceOp.AVG, - "sum": BaguaReduceOp.SUM, - } -else: - _bagua_reduce_ops = {} - -log = logging.getLogger(__name__) - - -class LightningBaguaModule(_LightningModuleWrapperBase): - def __init__( - self, - forward_module: Union["pl.LightningModule", _LightningPrecisionModuleWrapperBase], - ) -> None: - super().__init__(forward_module=forward_module) - # Bagua use `bagua_module_name` to distinguish different modules - self._bagua_module_name = f"{forward_module.__class__.__name__}{id(forward_module)}" - - def forward(self, *inputs: Any, **kwargs: Any) -> Any: - pl_module = self.lightning_module - trainer = pl_module._trainer - - if trainer is not None: - if trainer.training: - output = self._forward_module.training_step(*inputs, **kwargs) - # In manual_optimization, we need to prevent DDP reducer as - # it is done manually in `LightningModule.manual_backward` - # `require_backward_grad_sync` will be reset in the - # ddp_strategy `post_training_step` hook - if not pl_module.automatic_optimization: - # Using bagua strategy, the model is redefined in model.inner - # and cannot be accessed directly. We need this to make manual - # backward work. - trainer.model.inner.require_backward_grad_sync = False # type: ignore[union-attr] - return output - else: - return super().forward(*inputs, **kwargs) - return self._forward_module(*inputs, **kwargs) - - -class BaguaStrategy(DDPStrategy): - strategy_name = "bagua" - - def __init__( - self, - algorithm: str = "gradient_allreduce", - flatten: bool = True, - accelerator: Optional["pl.accelerators.Accelerator"] = None, - parallel_devices: Optional[List[torch.device]] = None, - cluster_environment: Optional[ClusterEnvironment] = None, - checkpoint_io: Optional[CheckpointIO] = None, - precision_plugin: Optional[PrecisionPlugin] = None, - **bagua_kwargs: Union[Any, Dict[str, Any]], - ): - """Strategy for training using the `Bagua `_ library, with advanced - distributed training algorithms and system optimizations. - - This strategy requires the `bagua` package to be installed. See - `installation guide `_ for more information. - - The :class:`BaguaStrategy` is only supported on GPU and on Linux systems. - - Arguments: - algorithm: Distributed algorithm used to do the actual communication and update. Built-in algorithms - include "gradient_allreduce", "bytegrad", "decentralized", "low_precision_decentralized", "qadam" and - "async". - flatten: Whether to flatten the Bagua communication buckets. The flatten operation will reset data - pointer of bucket tensors so that they can use faster code paths. - bagua_kwargs: Additional keyword arguments that will be passed to initialize the Bagua algorithm. More - details on keyword arguments accepted for each algorithm can be found in the - `documentation `_. - """ - if not _BAGUA_AVAILABLE: - raise MisconfigurationException( - "To use the `BaguaStrategy`, you must have `Bagua` installed. Use `pip install bagua` to install it." - ) - - super().__init__( - accelerator=accelerator, - parallel_devices=parallel_devices, - cluster_environment=cluster_environment, - checkpoint_io=checkpoint_io, - precision_plugin=precision_plugin, - ) - - self._bagua_algorithm = algorithm - self._bagua_flatten = flatten - self._bagua_kwargs = bagua_kwargs - - def setup_distributed(self) -> None: - reset_seed() - - # determine which process we are and world size - self.set_world_ranks() - - self._init_bagua_distributed() - - def _init_bagua_distributed(self) -> None: - self._set_node_environment_variables() - log.info( - "Initializing Bagua Distributed: " - f"GLOBAL_RANK: {self.global_rank}, " - f"MEMBER: {self.global_rank + 1}/{self.world_size}" - ) - - # need to set device first before initialize Bagua distributed environment - # Note: setup_environment calls super().setup_distributed after calling init_distributed() - torch.cuda.set_device(self.local_rank) - - if not is_initialized(): - bagua.init_process_group() - - def _set_node_environment_variables(self) -> None: - """Set the environment variables as required by the :func:`bagua.init_process_group` call. - - This enables the use of other cluster environments which don't set these exact variables, e.g., Bagua can be - launched with ``torch.distributed.run``. - """ - os.environ["MASTER_ADDR"] = self.cluster_environment.main_address # type: ignore[union-attr] - os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port) # type: ignore[union-attr] - os.environ["RANK"] = str(self.global_rank) - os.environ["NODE_RANK"] = str(self.node_rank) - os.environ["WORLD_SIZE"] = str(self.world_size) - os.environ["LOCAL_RANK"] = str(self.local_rank) - - def setup(self, trainer: "pl.Trainer") -> None: - assert self.accelerator is not None - self.accelerator.setup(trainer) - - # move the model to the correct device - self.model_to_device() - - trainer_fn = trainer.state.fn - - if trainer_fn == TrainerFn.FITTING: - if self._layer_sync and self.model: - self.model = self._layer_sync.apply(self.model) - - self.setup_precision_plugin() - - if trainer_fn == TrainerFn.FITTING: - # set up optimizers after the module has been moved to the device - # but before the module has been wrapped - self.setup_optimizers(trainer) - _optimizers_to_device(self.optimizers, self.root_device) - - # skip wrapping the model if we are not fitting as no gradients need to be exchanged - self._configure_bagua_model(trainer) - - def _check_qadam_optimizer(self) -> None: - has_qadam_optimizer = any([isinstance(opt, QAdamOptimizer) for opt in self.optimizers]) - - if not has_qadam_optimizer or len(self.optimizers) > 1 or len(self.lr_scheduler_configs) > 1: - raise MisconfigurationException("Bagua QAdam can only accept one QAdamOptimizer and one LR Scheduler.") - - self._bagua_kwargs["q_adam_optimizer"] = self.optimizers[0] - - def _configure_bagua_model(self, trainer: "pl.Trainer") -> None: - model = LightningBaguaModule(self.model) # type: ignore[arg-type] - self.model = self._setup_model(model) - - # start the background communication for async algorithm - if trainer.training and self._bagua_algorithm == "async": - self.model.bagua_algorithm.resume(self.model) # type: ignore - - def _setup_model(self, model: Module) -> "BaguaDistributedDataParallel": - """Wraps the model into a Bagua distributed module.""" - - if self._bagua_algorithm == "qadam": - self._check_qadam_optimizer() - - algorithm = Algorithm.init(self._bagua_algorithm, **self._bagua_kwargs) - return BaguaDistributedDataParallel( - module=model, - optimizers=self.optimizers, - algorithm=algorithm, - gradient_as_bucket_view=self._bagua_flatten, - ) - - @classmethod - def register_strategies(cls, strategy_registry: Dict) -> None: - strategy_registry.register( - cls.strategy_name, - cls, - description=f"{cls.__class__.__name__}", - ) - - def teardown(self) -> None: - # abort the background communication for async algorithm - assert self.lightning_module is not None - if self.lightning_module.trainer.training and self._bagua_algorithm == "async": - self.model.bagua_algorithm.abort(self.model) # type: ignore - - if isinstance(self.model, BaguaDistributedDataParallel): - self.model = self.lightning_module - - super().teardown() - - def barrier(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] - if is_initialized(): - barrier() - - def broadcast(self, obj: TBroadcast, src: int = 0) -> TBroadcast: - return broadcast_object(obj, src) - - def post_training_step(self) -> None: - assert self.lightning_module is not None - # Using bagua strategy, the model is redefined in model.inner - # and cannot be accessed directly. We need to redefine the - # post_training_step function to make manual backward work. - if not self.lightning_module.automatic_optimization: - self.model.inner.require_backward_grad_sync = True # type: ignore[union-attr] - - def reduce( - self, tensor: Tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean" - ) -> Tensor: - """Reduces a tensor from several distributed processes to one aggregated tensor. - - Args: - tensor: The tensor to sync and reduce. - group: The process group to gather results from. Defaults to all processes (world). - reduce_op: The reduction operation. - Can also be a string 'sum' or ReduceOp. - - Return: - The reduced value, except when the input was not a tensor the output remains is unchanged. - """ - if not isinstance(tensor, Tensor): - return tensor - if group is not None: - raise ValueError("`Bagua` does not support allreduce using a subcommunicator at this time. Unset `group`.") - - if reduce_op is None: - op = BaguaReduceOp.AVG - else: - op = _bagua_reduce_ops.get(reduce_op, None) - if op is None: - raise ValueError(f"Unrecognized `reduce_op` for `BaguaStrategy`: {reduce_op}") - - allreduce_inplace(tensor, op=op) - return tensor diff --git a/src/lightning/pytorch/trainer/connectors/accelerator_connector.py b/src/lightning/pytorch/trainer/connectors/accelerator_connector.py index 98417cd8f5..b8c42d7468 100644 --- a/src/lightning/pytorch/trainer/connectors/accelerator_connector.py +++ b/src/lightning/pytorch/trainer/connectors/accelerator_connector.py @@ -51,7 +51,6 @@ from lightning.pytorch.plugins import ( TPUBf16PrecisionPlugin, TPUPrecisionPlugin, ) -from lightning.pytorch.plugins.environments import BaguaEnvironment from lightning.pytorch.plugins.layer_sync import LayerSync, TorchSyncBatchNorm from lightning.pytorch.plugins.precision.fsdp import FSDPMixedPrecisionPlugin from lightning.pytorch.strategies import ( @@ -422,7 +421,6 @@ class AcceleratorConnector: return self._cluster_environment_flag for env_type in ( SLURMEnvironment, - BaguaEnvironment, TorchElasticEnvironment, KubeflowEnvironment, LSFEnvironment, diff --git a/tests/tests_pytorch/helpers/runif.py b/tests/tests_pytorch/helpers/runif.py index e666dd2b4b..87cd6c97e6 100644 --- a/tests/tests_pytorch/helpers/runif.py +++ b/tests/tests_pytorch/helpers/runif.py @@ -27,7 +27,6 @@ from lightning.pytorch.accelerators.ipu import _IPU_AVAILABLE from lightning.pytorch.accelerators.mps import MPSAccelerator from lightning.pytorch.accelerators.tpu import TPUAccelerator from lightning.pytorch.callbacks.progress.rich_progress import _RICH_AVAILABLE -from lightning.pytorch.strategies.bagua import _BAGUA_AVAILABLE from lightning.pytorch.strategies.colossalai import _COLOSSALAI_AVAILABLE from lightning.pytorch.strategies.deepspeed import _DEEPSPEED_AVAILABLE from lightning.pytorch.utilities.imports import _OMEGACONF_AVAILABLE, _PSUTIL_AVAILABLE, _TORCH_QUANTIZE_AVAILABLE @@ -61,7 +60,6 @@ class RunIf: deepspeed: bool = False, rich: bool = False, omegaconf: bool = False, - bagua: bool = False, colossalai: bool = False, psutil: bool = False, sklearn: bool = False, @@ -87,7 +85,6 @@ class RunIf: deepspeed: Require that microsoft/DeepSpeed is installed. rich: Require that willmcgugan/rich is installed. omegaconf: Require that omry/omegaconf is installed. - bagua: Require that BaguaSys/bagua is installed. psutil: Require that psutil is installed. sklearn: Require that scikit-learn is installed. **kwargs: Any :class:`pytest.mark.skipif` keyword arguments. @@ -182,10 +179,6 @@ class RunIf: conditions.append(not _OMEGACONF_AVAILABLE) reasons.append("omegaconf") - if bagua: - conditions.append(not _BAGUA_AVAILABLE or sys.platform in ("win32", "darwin")) - reasons.append("Bagua") - if colossalai: conditions.append(not _COLOSSALAI_AVAILABLE) reasons.append("ColossalAI") diff --git a/tests/tests_pytorch/strategies/test_bagua_strategy.py b/tests/tests_pytorch/strategies/test_bagua_strategy.py deleted file mode 100644 index 7d346b10c1..0000000000 --- a/tests/tests_pytorch/strategies/test_bagua_strategy.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright The Lightning AI team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from unittest import mock - -import pytest -import torch - -from lightning.pytorch import Trainer -from lightning.pytorch.demos.boring_classes import BoringModel, ManualOptimBoringModel -from lightning.pytorch.strategies import BaguaStrategy -from lightning.pytorch.trainer.states import TrainerFn -from lightning.pytorch.utilities.exceptions import MisconfigurationException -from tests_pytorch.helpers.runif import RunIf - - -class BoringModel4QAdam(BoringModel): - def configure_optimizers(self): - from bagua.torch_api.algorithms.q_adam import QAdamOptimizer - - optimizer = QAdamOptimizer(self.layer.parameters(), lr=0.05, warmup_steps=20) - lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) - return [optimizer], [lr_scheduler] - - -@RunIf(min_cuda_gpus=1, bagua=True) -def test_bagua_default(tmpdir): - trainer = Trainer( - default_root_dir=tmpdir, - fast_dev_run=1, - strategy="bagua", - accelerator="gpu", - devices=1, - ) - assert isinstance(trainer.strategy, BaguaStrategy) - - -@pytest.mark.xfail(raises=AssertionError, reason="Internal error in Bagua") # Unexpected rsp== 8, - reason="Async does not support this CUDA architecture", -) -@RunIf(min_cuda_gpus=2, standalone=True, bagua=True) -def test_async_algorithm(tmpdir): - model = BoringModel() - bagua_strategy = BaguaStrategy(algorithm="async") - trainer = Trainer( - default_root_dir=tmpdir, - fast_dev_run=1, - strategy=bagua_strategy, - accelerator="gpu", - devices=2, - enable_progress_bar=False, - enable_model_summary=False, - ) - trainer.fit(model) - - for param in model.parameters(): - assert torch.norm(param) < 3 - - -@RunIf(min_cuda_gpus=1, bagua=True) -@pytest.mark.parametrize( - "algorithm", ["gradient_allreduce", "bytegrad", "qadam", "decentralized", "low_precision_decentralized"] -) -def test_configuration(algorithm, tmpdir): - model = BoringModel() - bagua_strategy = BaguaStrategy(algorithm=algorithm) - trainer = Trainer( - default_root_dir=tmpdir, - fast_dev_run=1, - strategy=bagua_strategy, - accelerator="gpu", - devices=1, - ) - trainer.state.fn = TrainerFn.FITTING - trainer.strategy.connect(model) - trainer.lightning_module.trainer = trainer - - with mock.patch( - "bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel.__init__", return_value=None - ), mock.patch("bagua.torch_api.communication.is_initialized", return_value=True): - if algorithm == "qadam": - with pytest.raises(MisconfigurationException, match="Bagua QAdam can only accept one QAdamOptimizer"): - trainer.strategy._configure_bagua_model(trainer) - else: - trainer.strategy._configure_bagua_model(trainer) - - -@RunIf(min_cuda_gpus=1, bagua=True) -def test_qadam_configuration(tmpdir): - model = BoringModel4QAdam() - bagua_strategy = BaguaStrategy(algorithm="qadam") - trainer = Trainer( - default_root_dir=tmpdir, - fast_dev_run=1, - strategy=bagua_strategy, - accelerator="gpu", - devices=1, - ) - trainer.state.fn = TrainerFn.FITTING - trainer.strategy.connect(model) - trainer.lightning_module.trainer = trainer - trainer.strategy.setup_optimizers(trainer) - - with mock.patch( - "bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel.__init__", return_value=None - ), mock.patch("bagua.torch_api.communication.is_initialized", return_value=True): - trainer.strategy._configure_bagua_model(trainer) - - -def test_bagua_not_available(cuda_count_1, monkeypatch): - import lightning.pytorch.strategies.bagua as imports - - monkeypatch.setattr(imports, "_BAGUA_AVAILABLE", False) - with pytest.raises(MisconfigurationException, match="you must have `Bagua` installed"): - Trainer(strategy="bagua", accelerator="gpu", devices=1) diff --git a/tests/tests_pytorch/utilities/test_imports.py b/tests/tests_pytorch/utilities/test_imports.py index a559cb5f88..69610e5702 100644 --- a/tests/tests_pytorch/utilities/test_imports.py +++ b/tests/tests_pytorch/utilities/test_imports.py @@ -24,19 +24,11 @@ from lightning_utilities.core.imports import compare_version, RequirementCache from torch.distributed import is_available from lightning.pytorch.accelerators.ipu import _POPTORCH_AVAILABLE -from lightning.pytorch.strategies.bagua import _BAGUA_AVAILABLE from lightning.pytorch.utilities import _OMEGACONF_AVAILABLE from tests_pytorch.helpers.runif import RunIf def test_imports(): - try: - import bagua # noqa - except ModuleNotFoundError: - assert not _BAGUA_AVAILABLE - else: - assert _BAGUA_AVAILABLE - try: import omegaconf # noqa except ModuleNotFoundError: