Remove the BaguaStrategy (#16746)

* remove bagua

* remove

* remove docker file entry
This commit is contained in:
Adrian Wälchli 2023-02-14 14:58:58 +01:00 committed by GitHub
parent 39020887d2
commit c4074419b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 0 additions and 656 deletions

View File

@ -112,8 +112,6 @@ jobs:
- bash: |
set -e
CUDA_VERSION_BAGUA=$(python -c "print([ver for ver in [116,113,111,102] if $CUDA_VERSION_MM >= ver][0])")
pip install "bagua-cuda$CUDA_VERSION_BAGUA"
CUDA_VERSION_MM_COLOSSALAI=$(python -c "import torch ; print(''.join(map(str, torch.version.cuda)))")
CUDA_VERSION_COLOSSALAI=$(python -c "print([ver for ver in [11.3, 11.1] if $CUDA_VERSION_MM_COLOSSALAI >= ver][0])")

View File

@ -98,19 +98,6 @@ RUN \
pip install -r requirements/pytorch/base.txt --no-cache-dir --find-links https://download.pytorch.org/whl/cu${CUDA_VERSION_MM}/torch_stable.html && \
rm assistant.py
RUN \
# install Bagua
if [[ $PYTORCH_VERSION != "1.13" ]]; then \
CUDA_VERSION_MM=$(python -c "print(''.join('$CUDA_VERSION'.split('.')[:2]))") ; \
CUDA_VERSION_BAGUA=$(python -c "print([ver for ver in [116,113,111,102] if $CUDA_VERSION_MM >= ver][0])") ; \
pip install "bagua-cuda$CUDA_VERSION_BAGUA" ; \
if [[ "$CUDA_VERSION_MM" = "$CUDA_VERSION_BAGUA" ]]; then \
python -c "import bagua_core; bagua_core.install_deps()"; \
fi ; \
python -c "import bagua; print(bagua.__version__)"; \
fi
RUN \
# install ColossalAI
# TODO: 1.13 wheels are not released, remove skip once they are

View File

@ -25,7 +25,6 @@ Lightning supports multiple ways of doing distributed training.
- Regular (``strategy='ddp'``)
- Spawn (``strategy='ddp_spawn'``)
- Notebook/Fork (``strategy='ddp_notebook'``)
- Bagua (``strategy='bagua'``) (multiple-gpus across many machines with advanced training algorithms)
.. note::
If you request multiple GPUs or nodes without setting a mode, DDP Spawn will be automatically used.
@ -235,119 +234,6 @@ Comparison of DDP variants and tradeoffs
- Fast
Bagua
^^^^^
`Bagua <https://github.com/BaguaSys/bagua>`_ is a deep learning training acceleration framework which supports
multiple advanced distributed training algorithms including:
- `Gradient AllReduce <https://tutorials.baguasys.com/algorithms/gradient-allreduce>`_ for centralized synchronous communication, where gradients are averaged among all workers.
- `Decentralized SGD <https://tutorials.baguasys.com/algorithms/decentralized>`_ for decentralized synchronous communication, where each worker exchanges data with one or a few specific workers.
- `ByteGrad <https://tutorials.baguasys.com/algorithms/bytegrad>`_ and `QAdam <https://tutorials.baguasys.com/algorithms/q-adam>`_ for low precision communication, where data is compressed into low precision before communication.
- `Asynchronous Model Average <https://tutorials.baguasys.com/algorithms/async-model-average>`_ for asynchronous communication, where workers are not required to be synchronized in the same iteration in a lock-step style.
By default, Bagua uses *Gradient AllReduce* algorithm, which is also the algorithm implemented in DDP,
but Bagua can usually produce a higher training throughput due to its backend written in Rust.
.. code-block:: python
# train on 4 GPUs (using Bagua mode)
trainer = Trainer(strategy="bagua", accelerator="gpu", devices=4)
By specifying the ``algorithm`` in the ``BaguaStrategy``, you can select more advanced training algorithms featured by Bagua:
.. code-block:: python
# train on 4 GPUs, using Bagua Gradient AllReduce algorithm
trainer = Trainer(
strategy=BaguaStrategy(algorithm="gradient_allreduce"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Bagua ByteGrad algorithm
trainer = Trainer(
strategy=BaguaStrategy(algorithm="bytegrad"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Bagua Decentralized SGD
trainer = Trainer(
strategy=BaguaStrategy(algorithm="decentralized"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Bagua Low Precision Decentralized SGD
trainer = Trainer(
strategy=BaguaStrategy(algorithm="low_precision_decentralized"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Asynchronous Model Average algorithm, with a synchronization interval of 100ms
trainer = Trainer(
strategy=BaguaStrategy(algorithm="async", sync_interval_ms=100),
accelerator="gpu",
devices=4,
)
To use *QAdam*, we need to initialize
`QAdamOptimizer <https://bagua.readthedocs.io/en/latest/autoapi/bagua/torch_api/algorithms/q_adam/index.html#bagua.torch_api.algorithms.q_adam.QAdamOptimizer>`_ first:
.. code-block:: python
from pytorch_lightning.strategies import BaguaStrategy
from bagua.torch_api.algorithms.q_adam import QAdamOptimizer
class MyModel(pl.LightningModule):
...
def configure_optimizers(self):
# initialize QAdam Optimizer
return QAdamOptimizer(self.parameters(), lr=0.05, warmup_steps=100)
model = MyModel()
trainer = Trainer(
accelerator="gpu",
devices=4,
strategy=BaguaStrategy(algorithm="qadam"),
)
trainer.fit(model)
Bagua relies on its own `launcher <https://tutorials.baguasys.com/getting-started/#launch-job>`_ to schedule jobs.
Below, find examples using ``bagua.distributed.launch`` which follows ``torch.distributed.launch`` API:
.. code-block:: bash
# start training with 8 GPUs on a single node
python -m bagua.distributed.launch --nproc_per_node=8 train.py
If the ssh service is available with passwordless login on each node, you can launch the distributed job on a
single node with ``baguarun`` which has a similar syntax as ``mpirun``. When staring the job, ``baguarun`` will
automatically spawn new processes on each of your training node provided by ``--host_list`` option and each node in it
is described as an ip address followed by a ssh port.
.. code-block:: bash
# Run on node1 (or node2) to start training on two nodes (node1 and node2), 8 GPUs per node
baguarun --host_list hostname1:ssh_port1,hostname2:ssh_port2 --nproc_per_node=8 --master_port=port1 train.py
.. note:: You can also start training in the same way as Distributed Data Parallel. However, system optimizations like
`Bagua-Net <https://tutorials.baguasys.com/more-optimizations/bagua-net>`_ and
`Performance autotuning <https://tutorials.baguasys.com/performance-autotuning/>`_ can only be enabled through bagua
launcher. It is worth noting that with ``Bagua-Net``, Distributed Data Parallel can also achieve
better performance without modifying the training script.
See `Bagua Tutorials <https://tutorials.baguasys.com/>`_ for more details on installation and advanced features.
DP caveats
^^^^^^^^^^
In DP each GPU within a machine sees a portion of a batch.

View File

@ -213,7 +213,6 @@ strategies
:nosignatures:
:template: classtemplate.rst
BaguaStrategy
ColossalAIStrategy
DDPSpawnStrategy
DDPStrategy

View File

@ -69,9 +69,6 @@ The below table lists all relevant strategies available in Lightning with their
* - Name
- Class
- Description
* - bagua
- :class:`~pytorch_lightning.strategies.BaguaStrategy`
- Strategy for training using the Bagua library, with advanced distributed training algorithms and system optimizations. :ref:`Learn more. <accelerators/gpu_intermediate:Bagua>`
* - colossalai
- :class:`~pytorch_lightning.strategies.ColossalAIStrategy`
- Colossal-AI provides a collection of parallel components for you. It aims to support you to write your distributed deep learning models just like how you write your model on your laptop. `Learn more. <https://www.colossalai.org/>`__

View File

@ -1,3 +1,2 @@
if __name__ == "__main__":
import bagua # noqa: F401
import deepspeed # noqa: F401

View File

@ -21,4 +21,3 @@ from lightning.fabric.plugins.environments import ( # noqa: F401
TorchElasticEnvironment,
XLAEnvironment,
)
from lightning.pytorch.plugins.environments.bagua_environment import BaguaEnvironment # noqa: F401

View File

@ -1,62 +0,0 @@
# Copyright The Lightning AI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from lightning.fabric.plugins import ClusterEnvironment
log = logging.getLogger(__name__)
class BaguaEnvironment(ClusterEnvironment):
"""Environment for distributed training with `Bagua <https://tutorials.baguasys.com/>`_"""
@property
def creates_processes_externally(self) -> bool:
return True
@property
def main_address(self) -> str:
return os.environ.get("MASTER_ADDR", "127.0.0.1")
@property
def main_port(self) -> int:
return int(os.environ.get("MASTER_PORT", -1))
@property
def service_port(self) -> int:
return int(os.environ.get("BAGUA_SERVICE_PORT", -1))
@staticmethod
def detect() -> bool:
return "BAGUA_SERVICE_PORT" in os.environ
def world_size(self) -> int:
return int(os.environ["WORLD_SIZE"])
def set_world_size(self, size: int) -> None:
log.debug("`BaguaEnvironment.set_world_size` was called, but setting world size is not allowed. Ignored.")
def global_rank(self) -> int:
return int(os.environ["RANK"])
def set_global_rank(self, rank: int) -> None:
log.debug("`BaguaEnvironment.set_global_rank` was called, but setting global rank is not allowed. Ignored.")
def local_rank(self) -> int:
return int(os.environ.get("LOCAL_RANK", 0))
def node_rank(self) -> int:
return int(os.environ.get("NODE_RANK", 0))

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from lightning.fabric.strategies.registry import _StrategyRegistry
from lightning.pytorch.strategies.bagua import BaguaStrategy # noqa: F401
from lightning.pytorch.strategies.colossalai import ColossalAIStrategy # noqa: F401
from lightning.pytorch.strategies.ddp import DDPStrategy # noqa: F401
from lightning.pytorch.strategies.ddp_spawn import DDPSpawnStrategy # noqa: F401

View File

@ -1,295 +0,0 @@
# Copyright The Lightning AI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from typing import Any, Dict, List, Optional, Union
import torch
from lightning_utilities.core.imports import module_available
from torch import Tensor
from torch.nn import Module
import lightning.pytorch as pl
from lightning.fabric.plugins import CheckpointIO, ClusterEnvironment
from lightning.fabric.utilities.optimizer import _optimizers_to_device
from lightning.fabric.utilities.seed import reset_seed
from lightning.fabric.utilities.types import ReduceOp
from lightning.pytorch.overrides.base import _LightningModuleWrapperBase, _LightningPrecisionModuleWrapperBase
from lightning.pytorch.plugins.precision import PrecisionPlugin
from lightning.pytorch.strategies.ddp import DDPStrategy
from lightning.pytorch.strategies.strategy import TBroadcast
from lightning.pytorch.trainer.states import TrainerFn
from lightning.pytorch.utilities.exceptions import MisconfigurationException
_BAGUA_AVAILABLE = module_available("bagua.torch_api")
if _BAGUA_AVAILABLE:
import bagua.torch_api as bagua
from bagua.torch_api.algorithms import Algorithm
from bagua.torch_api.algorithms.q_adam import QAdamOptimizer
from bagua.torch_api.communication import allreduce_inplace, barrier, broadcast_object, is_initialized
from bagua.torch_api.communication import ReduceOp as BaguaReduceOp
from bagua.torch_api.data_parallel.distributed import DistributedDataParallel_V1_9_0 as BaguaDistributedDataParallel
# Convert a reduce op to its equivalent `bagua.torch_api.ReduceOp`
_bagua_reduce_ops = {
ReduceOp.SUM: BaguaReduceOp.SUM,
ReduceOp.PRODUCT: BaguaReduceOp.PRODUCT,
ReduceOp.MIN: BaguaReduceOp.MIN,
ReduceOp.MAX: BaguaReduceOp.MAX,
ReduceOp.BAND: BaguaReduceOp.BAND,
ReduceOp.BOR: BaguaReduceOp.BOR,
ReduceOp.BXOR: BaguaReduceOp.BXOR,
"avg": BaguaReduceOp.AVG,
"mean": BaguaReduceOp.AVG,
"sum": BaguaReduceOp.SUM,
}
else:
_bagua_reduce_ops = {}
log = logging.getLogger(__name__)
class LightningBaguaModule(_LightningModuleWrapperBase):
def __init__(
self,
forward_module: Union["pl.LightningModule", _LightningPrecisionModuleWrapperBase],
) -> None:
super().__init__(forward_module=forward_module)
# Bagua use `bagua_module_name` to distinguish different modules
self._bagua_module_name = f"{forward_module.__class__.__name__}{id(forward_module)}"
def forward(self, *inputs: Any, **kwargs: Any) -> Any:
pl_module = self.lightning_module
trainer = pl_module._trainer
if trainer is not None:
if trainer.training:
output = self._forward_module.training_step(*inputs, **kwargs)
# In manual_optimization, we need to prevent DDP reducer as
# it is done manually in `LightningModule.manual_backward`
# `require_backward_grad_sync` will be reset in the
# ddp_strategy `post_training_step` hook
if not pl_module.automatic_optimization:
# Using bagua strategy, the model is redefined in model.inner
# and cannot be accessed directly. We need this to make manual
# backward work.
trainer.model.inner.require_backward_grad_sync = False # type: ignore[union-attr]
return output
else:
return super().forward(*inputs, **kwargs)
return self._forward_module(*inputs, **kwargs)
class BaguaStrategy(DDPStrategy):
strategy_name = "bagua"
def __init__(
self,
algorithm: str = "gradient_allreduce",
flatten: bool = True,
accelerator: Optional["pl.accelerators.Accelerator"] = None,
parallel_devices: Optional[List[torch.device]] = None,
cluster_environment: Optional[ClusterEnvironment] = None,
checkpoint_io: Optional[CheckpointIO] = None,
precision_plugin: Optional[PrecisionPlugin] = None,
**bagua_kwargs: Union[Any, Dict[str, Any]],
):
"""Strategy for training using the `Bagua <https://github.com/BaguaSys/bagua>`_ library, with advanced
distributed training algorithms and system optimizations.
This strategy requires the `bagua` package to be installed. See
`installation guide <https://tutorials.baguasys.com/installation>`_ for more information.
The :class:`BaguaStrategy` is only supported on GPU and on Linux systems.
Arguments:
algorithm: Distributed algorithm used to do the actual communication and update. Built-in algorithms
include "gradient_allreduce", "bytegrad", "decentralized", "low_precision_decentralized", "qadam" and
"async".
flatten: Whether to flatten the Bagua communication buckets. The flatten operation will reset data
pointer of bucket tensors so that they can use faster code paths.
bagua_kwargs: Additional keyword arguments that will be passed to initialize the Bagua algorithm. More
details on keyword arguments accepted for each algorithm can be found in the
`documentation <https://bagua.readthedocs.io/en/latest/autoapi/bagua/torch_api/algorithms/index.html>`_.
"""
if not _BAGUA_AVAILABLE:
raise MisconfigurationException(
"To use the `BaguaStrategy`, you must have `Bagua` installed. Use `pip install bagua` to install it."
)
super().__init__(
accelerator=accelerator,
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
checkpoint_io=checkpoint_io,
precision_plugin=precision_plugin,
)
self._bagua_algorithm = algorithm
self._bagua_flatten = flatten
self._bagua_kwargs = bagua_kwargs
def setup_distributed(self) -> None:
reset_seed()
# determine which process we are and world size
self.set_world_ranks()
self._init_bagua_distributed()
def _init_bagua_distributed(self) -> None:
self._set_node_environment_variables()
log.info(
"Initializing Bagua Distributed: "
f"GLOBAL_RANK: {self.global_rank}, "
f"MEMBER: {self.global_rank + 1}/{self.world_size}"
)
# need to set device first before initialize Bagua distributed environment
# Note: setup_environment calls super().setup_distributed after calling init_distributed()
torch.cuda.set_device(self.local_rank)
if not is_initialized():
bagua.init_process_group()
def _set_node_environment_variables(self) -> None:
"""Set the environment variables as required by the :func:`bagua.init_process_group` call.
This enables the use of other cluster environments which don't set these exact variables, e.g., Bagua can be
launched with ``torch.distributed.run``.
"""
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address # type: ignore[union-attr]
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port) # type: ignore[union-attr]
os.environ["RANK"] = str(self.global_rank)
os.environ["NODE_RANK"] = str(self.node_rank)
os.environ["WORLD_SIZE"] = str(self.world_size)
os.environ["LOCAL_RANK"] = str(self.local_rank)
def setup(self, trainer: "pl.Trainer") -> None:
assert self.accelerator is not None
self.accelerator.setup(trainer)
# move the model to the correct device
self.model_to_device()
trainer_fn = trainer.state.fn
if trainer_fn == TrainerFn.FITTING:
if self._layer_sync and self.model:
self.model = self._layer_sync.apply(self.model)
self.setup_precision_plugin()
if trainer_fn == TrainerFn.FITTING:
# set up optimizers after the module has been moved to the device
# but before the module has been wrapped
self.setup_optimizers(trainer)
_optimizers_to_device(self.optimizers, self.root_device)
# skip wrapping the model if we are not fitting as no gradients need to be exchanged
self._configure_bagua_model(trainer)
def _check_qadam_optimizer(self) -> None:
has_qadam_optimizer = any([isinstance(opt, QAdamOptimizer) for opt in self.optimizers])
if not has_qadam_optimizer or len(self.optimizers) > 1 or len(self.lr_scheduler_configs) > 1:
raise MisconfigurationException("Bagua QAdam can only accept one QAdamOptimizer and one LR Scheduler.")
self._bagua_kwargs["q_adam_optimizer"] = self.optimizers[0]
def _configure_bagua_model(self, trainer: "pl.Trainer") -> None:
model = LightningBaguaModule(self.model) # type: ignore[arg-type]
self.model = self._setup_model(model)
# start the background communication for async algorithm
if trainer.training and self._bagua_algorithm == "async":
self.model.bagua_algorithm.resume(self.model) # type: ignore
def _setup_model(self, model: Module) -> "BaguaDistributedDataParallel":
"""Wraps the model into a Bagua distributed module."""
if self._bagua_algorithm == "qadam":
self._check_qadam_optimizer()
algorithm = Algorithm.init(self._bagua_algorithm, **self._bagua_kwargs)
return BaguaDistributedDataParallel(
module=model,
optimizers=self.optimizers,
algorithm=algorithm,
gradient_as_bucket_view=self._bagua_flatten,
)
@classmethod
def register_strategies(cls, strategy_registry: Dict) -> None:
strategy_registry.register(
cls.strategy_name,
cls,
description=f"{cls.__class__.__name__}",
)
def teardown(self) -> None:
# abort the background communication for async algorithm
assert self.lightning_module is not None
if self.lightning_module.trainer.training and self._bagua_algorithm == "async":
self.model.bagua_algorithm.abort(self.model) # type: ignore
if isinstance(self.model, BaguaDistributedDataParallel):
self.model = self.lightning_module
super().teardown()
def barrier(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
if is_initialized():
barrier()
def broadcast(self, obj: TBroadcast, src: int = 0) -> TBroadcast:
return broadcast_object(obj, src)
def post_training_step(self) -> None:
assert self.lightning_module is not None
# Using bagua strategy, the model is redefined in model.inner
# and cannot be accessed directly. We need to redefine the
# post_training_step function to make manual backward work.
if not self.lightning_module.automatic_optimization:
self.model.inner.require_backward_grad_sync = True # type: ignore[union-attr]
def reduce(
self, tensor: Tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean"
) -> Tensor:
"""Reduces a tensor from several distributed processes to one aggregated tensor.
Args:
tensor: The tensor to sync and reduce.
group: The process group to gather results from. Defaults to all processes (world).
reduce_op: The reduction operation.
Can also be a string 'sum' or ReduceOp.
Return:
The reduced value, except when the input was not a tensor the output remains is unchanged.
"""
if not isinstance(tensor, Tensor):
return tensor
if group is not None:
raise ValueError("`Bagua` does not support allreduce using a subcommunicator at this time. Unset `group`.")
if reduce_op is None:
op = BaguaReduceOp.AVG
else:
op = _bagua_reduce_ops.get(reduce_op, None)
if op is None:
raise ValueError(f"Unrecognized `reduce_op` for `BaguaStrategy`: {reduce_op}")
allreduce_inplace(tensor, op=op)
return tensor

View File

@ -51,7 +51,6 @@ from lightning.pytorch.plugins import (
TPUBf16PrecisionPlugin,
TPUPrecisionPlugin,
)
from lightning.pytorch.plugins.environments import BaguaEnvironment
from lightning.pytorch.plugins.layer_sync import LayerSync, TorchSyncBatchNorm
from lightning.pytorch.plugins.precision.fsdp import FSDPMixedPrecisionPlugin
from lightning.pytorch.strategies import (
@ -422,7 +421,6 @@ class AcceleratorConnector:
return self._cluster_environment_flag
for env_type in (
SLURMEnvironment,
BaguaEnvironment,
TorchElasticEnvironment,
KubeflowEnvironment,
LSFEnvironment,

View File

@ -27,7 +27,6 @@ from lightning.pytorch.accelerators.ipu import _IPU_AVAILABLE
from lightning.pytorch.accelerators.mps import MPSAccelerator
from lightning.pytorch.accelerators.tpu import TPUAccelerator
from lightning.pytorch.callbacks.progress.rich_progress import _RICH_AVAILABLE
from lightning.pytorch.strategies.bagua import _BAGUA_AVAILABLE
from lightning.pytorch.strategies.colossalai import _COLOSSALAI_AVAILABLE
from lightning.pytorch.strategies.deepspeed import _DEEPSPEED_AVAILABLE
from lightning.pytorch.utilities.imports import _OMEGACONF_AVAILABLE, _PSUTIL_AVAILABLE, _TORCH_QUANTIZE_AVAILABLE
@ -61,7 +60,6 @@ class RunIf:
deepspeed: bool = False,
rich: bool = False,
omegaconf: bool = False,
bagua: bool = False,
colossalai: bool = False,
psutil: bool = False,
sklearn: bool = False,
@ -87,7 +85,6 @@ class RunIf:
deepspeed: Require that microsoft/DeepSpeed is installed.
rich: Require that willmcgugan/rich is installed.
omegaconf: Require that omry/omegaconf is installed.
bagua: Require that BaguaSys/bagua is installed.
psutil: Require that psutil is installed.
sklearn: Require that scikit-learn is installed.
**kwargs: Any :class:`pytest.mark.skipif` keyword arguments.
@ -182,10 +179,6 @@ class RunIf:
conditions.append(not _OMEGACONF_AVAILABLE)
reasons.append("omegaconf")
if bagua:
conditions.append(not _BAGUA_AVAILABLE or sys.platform in ("win32", "darwin"))
reasons.append("Bagua")
if colossalai:
conditions.append(not _COLOSSALAI_AVAILABLE)
reasons.append("ColossalAI")

View File

@ -1,146 +0,0 @@
# Copyright The Lightning AI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
import pytest
import torch
from lightning.pytorch import Trainer
from lightning.pytorch.demos.boring_classes import BoringModel, ManualOptimBoringModel
from lightning.pytorch.strategies import BaguaStrategy
from lightning.pytorch.trainer.states import TrainerFn
from lightning.pytorch.utilities.exceptions import MisconfigurationException
from tests_pytorch.helpers.runif import RunIf
class BoringModel4QAdam(BoringModel):
def configure_optimizers(self):
from bagua.torch_api.algorithms.q_adam import QAdamOptimizer
optimizer = QAdamOptimizer(self.layer.parameters(), lr=0.05, warmup_steps=20)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1)
return [optimizer], [lr_scheduler]
@RunIf(min_cuda_gpus=1, bagua=True)
def test_bagua_default(tmpdir):
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=1,
strategy="bagua",
accelerator="gpu",
devices=1,
)
assert isinstance(trainer.strategy, BaguaStrategy)
@pytest.mark.xfail(raises=AssertionError, reason="Internal error in Bagua") # Unexpected rsp=<Response [500]'
@RunIf(min_cuda_gpus=1, bagua=True)
def test_manual_optimization(tmpdir):
model = ManualOptimBoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
limit_train_batches=1,
limit_val_batches=0,
max_epochs=1,
strategy="bagua",
accelerator="gpu",
devices=1,
logger=False,
enable_checkpointing=False,
enable_model_summary=False,
enable_progress_bar=False,
)
trainer.fit(model)
@pytest.mark.skipif(
torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8,
reason="Async does not support this CUDA architecture",
)
@RunIf(min_cuda_gpus=2, standalone=True, bagua=True)
def test_async_algorithm(tmpdir):
model = BoringModel()
bagua_strategy = BaguaStrategy(algorithm="async")
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=1,
strategy=bagua_strategy,
accelerator="gpu",
devices=2,
enable_progress_bar=False,
enable_model_summary=False,
)
trainer.fit(model)
for param in model.parameters():
assert torch.norm(param) < 3
@RunIf(min_cuda_gpus=1, bagua=True)
@pytest.mark.parametrize(
"algorithm", ["gradient_allreduce", "bytegrad", "qadam", "decentralized", "low_precision_decentralized"]
)
def test_configuration(algorithm, tmpdir):
model = BoringModel()
bagua_strategy = BaguaStrategy(algorithm=algorithm)
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=1,
strategy=bagua_strategy,
accelerator="gpu",
devices=1,
)
trainer.state.fn = TrainerFn.FITTING
trainer.strategy.connect(model)
trainer.lightning_module.trainer = trainer
with mock.patch(
"bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel.__init__", return_value=None
), mock.patch("bagua.torch_api.communication.is_initialized", return_value=True):
if algorithm == "qadam":
with pytest.raises(MisconfigurationException, match="Bagua QAdam can only accept one QAdamOptimizer"):
trainer.strategy._configure_bagua_model(trainer)
else:
trainer.strategy._configure_bagua_model(trainer)
@RunIf(min_cuda_gpus=1, bagua=True)
def test_qadam_configuration(tmpdir):
model = BoringModel4QAdam()
bagua_strategy = BaguaStrategy(algorithm="qadam")
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=1,
strategy=bagua_strategy,
accelerator="gpu",
devices=1,
)
trainer.state.fn = TrainerFn.FITTING
trainer.strategy.connect(model)
trainer.lightning_module.trainer = trainer
trainer.strategy.setup_optimizers(trainer)
with mock.patch(
"bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel.__init__", return_value=None
), mock.patch("bagua.torch_api.communication.is_initialized", return_value=True):
trainer.strategy._configure_bagua_model(trainer)
def test_bagua_not_available(cuda_count_1, monkeypatch):
import lightning.pytorch.strategies.bagua as imports
monkeypatch.setattr(imports, "_BAGUA_AVAILABLE", False)
with pytest.raises(MisconfigurationException, match="you must have `Bagua` installed"):
Trainer(strategy="bagua", accelerator="gpu", devices=1)

View File

@ -24,19 +24,11 @@ from lightning_utilities.core.imports import compare_version, RequirementCache
from torch.distributed import is_available
from lightning.pytorch.accelerators.ipu import _POPTORCH_AVAILABLE
from lightning.pytorch.strategies.bagua import _BAGUA_AVAILABLE
from lightning.pytorch.utilities import _OMEGACONF_AVAILABLE
from tests_pytorch.helpers.runif import RunIf
def test_imports():
try:
import bagua # noqa
except ModuleNotFoundError:
assert not _BAGUA_AVAILABLE
else:
assert _BAGUA_AVAILABLE
try:
import omegaconf # noqa
except ModuleNotFoundError: