lightning/docs/source-pytorch/accelerators/gpu_intermediate.rst

572 lines
21 KiB
ReStructuredText

:orphan:
.. _gpu_intermediate:
GPU training (Intermediate)
===========================
**Audience:** Users looking to train across machines or experiment with different scaling techniques.
----
Distributed Training strategies
-------------------------------
Lightning supports multiple ways of doing distributed training.
.. raw:: html
<video width="50%" max-width="400px" controls
poster="https://pl-bolts-doc-images.s3.us-east-2.amazonaws.com/pl_docs/trainer_flags/yt_thumbs/thumb_multi_gpus.png"
src="https://pl-bolts-doc-images.s3.us-east-2.amazonaws.com/pl_docs/yt/Trainer+flags+4-+multi+node+training_3.mp4"></video>
|
- Data Parallel (``strategy='dp'``) (multiple-gpus, 1 machine)
- DistributedDataParallel (multiple-gpus across many machines)
- Regular (``strategy='ddp'``)
- Spawn (``strategy='ddp_spawn'``)
- Fork (``strategy='ddp_fork'``)
- Horovod (``strategy='horovod'``) (multi-machine, multi-gpu, configured at runtime)
- Bagua (``strategy='bagua'``) (multiple-gpus across many machines with advanced training algorithms)
.. note::
If you request multiple GPUs or nodes without setting a mode, DDP Spawn will be automatically used.
For a deeper understanding of what Lightning is doing, feel free to read this
`guide <https://medium.com/@_willfalcon/9-tips-for-training-lightning-fast-neural-networks-in-pytorch-8e63a502f565>`_.
Data Parallel
^^^^^^^^^^^^^
:class:`~torch.nn.DataParallel` (DP) splits a batch across k GPUs.
That is, if you have a batch of 32 and use DP with 2 GPUs, each GPU will process 16 samples,
after which the root node will aggregate the results.
.. warning:: DP use is discouraged by PyTorch and Lightning. State is not maintained on the replicas created by the
:class:`~torch.nn.DataParallel` wrapper and you may see errors or misbehavior if you assign state to the module
in the ``forward()`` or ``*_step()`` methods. For the same reason we cannot fully support
:doc:`Manual Optimization <../model/manual_optimization>` with DP. Use DDP which is more stable and at least 3x faster.
.. warning:: DP only supports scattering and gathering primitive collections of tensors like lists, dicts, etc.
Therefore the :meth:`~pytorch_lightning.core.hooks.ModelHooks.transfer_batch_to_device` hook does not apply in
this mode and if you have overridden it, it will not be called.
.. testcode::
:skipif: torch.cuda.device_count() < 2
# train on 2 GPUs (using DP mode)
trainer = Trainer(accelerator="gpu", devices=2, strategy="dp")
Distributed Data Parallel
^^^^^^^^^^^^^^^^^^^^^^^^^
:class:`~torch.nn.parallel.DistributedDataParallel` (DDP) works as follows:
1. Each GPU across each node gets its own process.
2. Each GPU gets visibility into a subset of the overall dataset. It will only ever see that subset.
3. Each process inits the model.
4. Each process performs a full forward and backward pass in parallel.
5. The gradients are synced and averaged across all processes.
6. Each process updates its optimizer.
.. code-block:: python
# train on 8 GPUs (same machine (ie: node))
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp")
# train on 32 GPUs (4 nodes)
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp", num_nodes=4)
This Lightning implementation of DDP calls your script under the hood multiple times with the correct environment
variables:
.. code-block:: bash
# example for 3 GPUs DDP
MASTER_ADDR=localhost MASTER_PORT=random() WORLD_SIZE=3 NODE_RANK=0 LOCAL_RANK=0 python my_file.py --accelerator 'gpu' --devices 3 --etc
MASTER_ADDR=localhost MASTER_PORT=random() WORLD_SIZE=3 NODE_RANK=1 LOCAL_RANK=0 python my_file.py --accelerator 'gpu' --devices 3 --etc
MASTER_ADDR=localhost MASTER_PORT=random() WORLD_SIZE=3 NODE_RANK=2 LOCAL_RANK=0 python my_file.py --accelerator 'gpu' --devices 3 --etc
We use DDP this way because `ddp_spawn` has a few limitations (due to Python and PyTorch):
1. Since `.spawn()` trains the model in subprocesses, the model on the main process does not get updated.
2. Dataloader(num_workers=N), where N is large, bottlenecks training with DDP... ie: it will be VERY slow or won't work at all. This is a PyTorch limitation.
3. Forces everything to be picklable.
There are cases in which it is NOT possible to use DDP. Examples are:
- Jupyter Notebook, Google COLAB, Kaggle, etc.
- You have a nested script without a root package
In these situations you should use `dp` or `ddp_spawn` instead.
Distributed Data Parallel 2
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. warning::
The DDP2 strategy is no longer supported. For single-node use, we recommend ``strategy='ddp'`` or
``strategy='dp'`` as a replacement. If you need DDP2, you will need ``torch < 1.9``,
``pytorch-lightning < 1.5``, and set it as ``accelerator='ddp2'``.
In certain cases, it's advantageous to use all batches on the same machine instead of a subset.
For instance, you might want to compute a NCE loss where it pays to have more negative samples.
In this case, we can use DDP2 which behaves like DP in a machine and DDP across nodes. DDP2 does the following:
1. Copies a subset of the data to each node.
2. Inits a model on each node.
3. Runs a forward and backward pass using DP.
4. Syncs gradients across nodes.
5. Applies the optimizer updates.
.. code-block:: python
# train on 32 GPUs (4 nodes)
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp2", num_nodes=4)
Distributed Data Parallel Spawn
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
`ddp_spawn` is exactly like `ddp` except that it uses .spawn to start the training processes.
.. warning:: It is STRONGLY recommended to use `DDP` for speed and performance.
.. code-block:: python
mp.spawn(self.ddp_train, nprocs=self.num_processes, args=(model,))
If your script does not support being called from the command line (ie: it is nested without a root
project module) you can use the following method:
.. code-block:: python
# train on 8 GPUs (same machine (ie: node))
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp_spawn")
We STRONGLY discourage this use because it has limitations (due to Python and PyTorch):
1. The model you pass in will not update. Please save a checkpoint and restore from there.
2. Set Dataloader(num_workers=0) or it will bottleneck training.
`ddp` is MUCH faster than `ddp_spawn`. We recommend you
1. Install a top-level module for your project using setup.py
.. code-block:: python
# setup.py
#!/usr/bin/env python
from setuptools import setup, find_packages
setup(
name="src",
version="0.0.1",
description="Describe Your Cool Project",
author="",
author_email="",
url="https://github.com/YourSeed", # REPLACE WITH YOUR OWN GITHUB PROJECT LINK
install_requires=["pytorch-lightning"],
packages=find_packages(),
)
2. Setup your project like so:
.. code-block:: bash
/project
/src
some_file.py
/or_a_folder
setup.py
3. Install as a root-level package
.. code-block:: bash
cd /project
pip install -e .
You can then call your scripts anywhere
.. code-block:: bash
cd /project/src
python some_file.py --accelerator 'gpu' --devices 8 --strategy 'ddp'
Distributed Data Parallel Fork
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
DDP Fork is an alternative to Spawn that can be used in interactive Python and Jupyter notebooks, Google Colab, Kaggle notebooks, and so on:
.. code-block:: python
# train on 8 GPUs in a Jupyter notebook
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp_fork")
Data Parallel (``strategy="dp"``) is the only other strategy supported in interactive environments but is slower, is discouraged by PyTorch and has other limitations.
Among the native distributed strategies, regular DDP (``strategy="ddp"``) is still recommended as the go-to strategy over Spawn and Fork for its speed and stability but it can only be used with scripts.
Comparison of DDP variants and tradeoffs
****************************************
.. list-table:: DDP variants and their tradeoffs
:widths: 40 20 20 20
:header-rows: 1
* -
- DDP
- DDP Spawn
- DDP Fork
* - Works in Jupyter notebooks / IPython environments
- No
- No
- Yes
* - Supports multi-node
- Yes
- Yes
- Yes
* - Supported platforms
- Linux, Mac, Win
- Linux, Mac, Win
- Linux, Mac
* - Requires all objects to be picklable
- No
- Yes
- No
* - Is the guard ``if __name__=="__main__"`` required?
- Yes
- Yes
- No
* - Limitations in the main process
- None
- None
- GPU operations such as moving tensors to the GPU or calling ``torch.cuda`` functions before invoking ``Trainer.fit`` is not allowed.
* - Process creation time
- Slow
- Slow
- Fast
Horovod
^^^^^^^
`Horovod <http://horovod.ai>`_ allows the same training script to be used for single-GPU,
multi-GPU, and multi-node training.
Like Distributed Data Parallel, every process in Horovod operates on a single GPU with a fixed
subset of the data. Gradients are averaged across all GPUs in parallel during the backward pass,
then synchronously applied before beginning the next step.
The number of worker processes is configured by a driver application (`horovodrun` or `mpirun`). In
the training script, Horovod will detect the number of workers from the environment, and automatically
scale the learning rate to compensate for the increased total batch size.
Horovod can be configured in the training script to run with any number of GPUs / processes as follows:
.. code-block:: python
# train Horovod on GPU (number of GPUs / machines provided on command-line)
trainer = Trainer(strategy="horovod", accelerator="gpu", devices=1)
# train Horovod on CPU (number of processes / machines provided on command-line)
trainer = Trainer(strategy="horovod")
When starting the training job, the driver application will then be used to specify the total
number of worker processes:
.. code-block:: bash
# run training with 4 GPUs on a single machine
horovodrun -np 4 python train.py
# run training with 8 GPUs on two machines (4 GPUs each)
horovodrun -np 8 -H hostname1:4,hostname2:4 python train.py
See the official `Horovod documentation <https://horovod.readthedocs.io/en/stable>`_ for details
on installation and performance tuning.
Bagua
^^^^^
`Bagua <https://github.com/BaguaSys/bagua>`_ is a deep learning training acceleration framework which supports
multiple advanced distributed training algorithms including:
- `Gradient AllReduce <https://tutorials.baguasys.com/algorithms/gradient-allreduce>`_ for centralized synchronous communication, where gradients are averaged among all workers.
- `Decentralized SGD <https://tutorials.baguasys.com/algorithms/decentralized>`_ for decentralized synchronous communication, where each worker exchanges data with one or a few specific workers.
- `ByteGrad <https://tutorials.baguasys.com/algorithms/bytegrad>`_ and `QAdam <https://tutorials.baguasys.com/algorithms/q-adam>`_ for low precision communication, where data is compressed into low precision before communication.
- `Asynchronous Model Average <https://tutorials.baguasys.com/algorithms/async-model-average>`_ for asynchronous communication, where workers are not required to be synchronized in the same iteration in a lock-step style.
By default, Bagua uses *Gradient AllReduce* algorithm, which is also the algorithm implemented in Distributed Data Parallel and Horovod,
but Bagua can usually produce a higher training throughput due to its backend written in Rust.
.. code-block:: python
# train on 4 GPUs (using Bagua mode)
trainer = Trainer(strategy="bagua", accelerator="gpu", devices=4)
By specifying the ``algorithm`` in the ``BaguaStrategy``, you can select more advanced training algorithms featured by Bagua:
.. code-block:: python
# train on 4 GPUs, using Bagua Gradient AllReduce algorithm
trainer = Trainer(
strategy=BaguaStrategy(algorithm="gradient_allreduce"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Bagua ByteGrad algorithm
trainer = Trainer(
strategy=BaguaStrategy(algorithm="bytegrad"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Bagua Decentralized SGD
trainer = Trainer(
strategy=BaguaStrategy(algorithm="decentralized"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Bagua Low Precision Decentralized SGD
trainer = Trainer(
strategy=BaguaStrategy(algorithm="low_precision_decentralized"),
accelerator="gpu",
devices=4,
)
# train on 4 GPUs, using Asynchronous Model Average algorithm, with a synchronization interval of 100ms
trainer = Trainer(
strategy=BaguaStrategy(algorithm="async", sync_interval_ms=100),
accelerator="gpu",
devices=4,
)
To use *QAdam*, we need to initialize
`QAdamOptimizer <https://bagua.readthedocs.io/en/latest/autoapi/bagua/torch_api/algorithms/q_adam/index.html#bagua.torch_api.algorithms.q_adam.QAdamOptimizer>`_ first:
.. code-block:: python
from pytorch_lightning.strategies import BaguaStrategy
from bagua.torch_api.algorithms.q_adam import QAdamOptimizer
class MyModel(pl.LightningModule):
...
def configure_optimizers(self):
# initialize QAdam Optimizer
return QAdamOptimizer(self.parameters(), lr=0.05, warmup_steps=100)
model = MyModel()
trainer = Trainer(
accelerator="gpu",
devices=4,
strategy=BaguaStrategy(algorithm="qadam"),
)
trainer.fit(model)
Bagua relies on its own `launcher <https://tutorials.baguasys.com/getting-started/#launch-job>`_ to schedule jobs.
Below, find examples using ``bagua.distributed.launch`` which follows ``torch.distributed.launch`` API:
.. code-block:: bash
# start training with 8 GPUs on a single node
python -m bagua.distributed.launch --nproc_per_node=8 train.py
If the ssh service is available with passwordless login on each node, you can launch the distributed job on a
single node with ``baguarun`` which has a similar syntax as ``mpirun``. When staring the job, ``baguarun`` will
automatically spawn new processes on each of your training node provided by ``--host_list`` option and each node in it
is described as an ip address followed by a ssh port.
.. code-block:: bash
# Run on node1 (or node2) to start training on two nodes (node1 and node2), 8 GPUs per node
baguarun --host_list hostname1:ssh_port1,hostname2:ssh_port2 --nproc_per_node=8 --master_port=port1 train.py
.. note:: You can also start training in the same way as Distributed Data Parallel. However, system optimizations like
`Bagua-Net <https://tutorials.baguasys.com/more-optimizations/bagua-net>`_ and
`Performance autotuning <https://tutorials.baguasys.com/performance-autotuning/>`_ can only be enabled through bagua
launcher. It is worth noting that with ``Bagua-Net``, Distributed Data Parallel can also achieve
better performance without modifying the training script.
See `Bagua Tutorials <https://tutorials.baguasys.com/>`_ for more details on installation and advanced features.
DP caveats
^^^^^^^^^^
In DP each GPU within a machine sees a portion of a batch.
It does roughly the following:
.. testcode::
def distributed_forward(batch, model):
batch = torch.Tensor(32, 8)
gpu_0_batch = batch[:8]
gpu_1_batch = batch[8:16]
gpu_2_batch = batch[16:24]
gpu_3_batch = batch[24:]
y_0 = model_copy_gpu_0(gpu_0_batch)
y_1 = model_copy_gpu_1(gpu_1_batch)
y_2 = model_copy_gpu_2(gpu_2_batch)
y_3 = model_copy_gpu_3(gpu_3_batch)
return [y_0, y_1, y_2, y_3]
So, when Lightning calls any of the `training_step`, `validation_step`, `test_step`
you will only be operating on one of those pieces.
.. testcode::
# the batch here is a portion of the FULL batch
def training_step(self, batch, batch_idx):
y_0 = batch
For most metrics, this doesn't really matter. However, if you want to add something to your computational graph using
all batch parts you can use the `training_step_end` step.
.. testcode::
def training_step_end(self, outputs):
# only use when on dp
outputs = torch.cat(outputs, dim=1)
softmax = softmax(outputs, dim=1)
out = softmax.mean()
return out
In pseudocode, the full sequence is:
.. code-block:: python
# get data
batch = next(dataloader)
# copy model and data to each gpu
batch_splits = split_batch(batch, num_gpus)
models = copy_model_to_gpus(model)
# in parallel, operate on each batch chunk
all_results = []
for gpu_num in gpus:
batch_split = batch_splits[gpu_num]
gpu_model = models[gpu_num]
out = gpu_model(batch_split)
all_results.append(out)
# use the full batch for something like softmax
full_out = model.training_step_end(all_results)
If `training_step_end` is defined it will be called regardless of TPU, DP, DDP, etc... which means
it will behave the same regardless of the backend.
Validation and test step have the same option when using DP.
.. testcode::
def validation_step_end(self, step_output):
...
def test_step_end(self, step_output):
...
Distributed and 16-bit precision
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Due to an issue with Apex and DataParallel (PyTorch and NVIDIA issue), Lightning does
not allow 16-bit and DP training. We tried to get this to work, but it's an issue on their end.
Below are the possible configurations we support.
+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+
| 1 GPU | 1+ GPUs | DP | DDP | 16-bit | command |
+=======+=========+=====+=====+========+=======================================================================+
| Y | | | | | `Trainer(accelerator="gpu", devices=1)` |
+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+
| Y | | | | Y | `Trainer(accelerator="gpu", devices=1, precision=16)` |
+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+
| | Y | Y | | | `Trainer(accelerator="gpu", devices=k, strategy='dp')` |
+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+
| | Y | | Y | | `Trainer(accelerator="gpu", devices=k, strategy='ddp')` |
+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+
| | Y | | Y | Y | `Trainer(accelerator="gpu", devices=k, strategy='ddp', precision=16)` |
+-------+---------+-----+-----+--------+-----------------------------------------------------------------------+
Implement Your Own Distributed (DDP) training
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you need your own way to init PyTorch DDP you can override :meth:`pytorch_lightning.strategies.ddp.DDPStrategy.init_dist_connection`.
If you also need to use your own DDP implementation, override :meth:`pytorch_lightning.strategies.ddp.DDPStrategy.configure_ddp`.
----------
Torch Distributed Elastic
-------------------------
Lightning supports the use of Torch Distributed Elastic to enable fault-tolerant and elastic distributed job scheduling. To use it, specify the 'ddp' backend and the number of GPUs you want to use in the trainer.
.. code-block:: python
Trainer(accelerator="gpu", devices=8, strategy="ddp")
To launch a fault-tolerant job, run the following on all nodes.
.. code-block:: bash
python -m torch.distributed.run
--nnodes=NUM_NODES
--nproc_per_node=TRAINERS_PER_NODE
--rdzv_id=JOB_ID
--rdzv_backend=c10d
--rdzv_endpoint=HOST_NODE_ADDR
YOUR_LIGHTNING_TRAINING_SCRIPT.py (--arg1 ... train script args...)
To launch an elastic job, run the following on at least ``MIN_SIZE`` nodes and at most ``MAX_SIZE`` nodes.
.. code-block:: bash
python -m torch.distributed.run
--nnodes=MIN_SIZE:MAX_SIZE
--nproc_per_node=TRAINERS_PER_NODE
--rdzv_id=JOB_ID
--rdzv_backend=c10d
--rdzv_endpoint=HOST_NODE_ADDR
YOUR_LIGHTNING_TRAINING_SCRIPT.py (--arg1 ... train script args...)
See the official `Torch Distributed Elastic documentation <https://pytorch.org/docs/stable/distributed.elastic.html>`_ for details
on installation and more use cases.
Optimize multi-machine communication
------------------------------------
By default, Lightning will select the ``nccl`` backend over ``gloo`` when running on GPUs.
Find more information about PyTorch's supported backends `here <https://pytorch.org/docs/stable/distributed.html>`__.
Lightning allows explicitly specifying the backend via the `process_group_backend` constructor argument on the relevant Strategy classes. By default, Lightning will select the appropriate process group backend based on the hardware used.
.. code-block:: python
from pytorch_lightning.strategies import DDPStrategy
# Explicitly specify the process group backend if you choose to
ddp = DDPStrategy(process_group_backend="nccl")
# Configure the strategy on the Trainer
trainer = Trainer(strategy=ddp, accelerator="gpu", devices=8)