support launching Lightning ddp with traditional command (#7480)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com>
Co-authored-by: Jirka Borovec <Borda@users.noreply.github.com>
This commit is contained in:
Adrian Wälchli 2021-07-14 13:25:36 +02:00 committed by GitHub
parent b2ba2e6333
commit b42efa7d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 39 additions and 12 deletions

View File

@ -166,6 +166,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added support for `accelerator='cpu'|'gpu'|'tpu'|'ipu'|'auto'` ([#7808](https://github.com/PyTorchLightning/pytorch-lightning/pull/7808))
- Enabled traditional/manual launching of DDP processes through `LOCAL_RANK` and `NODE_RANK` environment variable assignments ([#7480](https://github.com/PyTorchLightning/pytorch-lightning/pull/7480))
### Changed

View File

@ -23,12 +23,16 @@ class LightningEnvironment(ClusterEnvironment):
"""
The default environment used by Lightning for a single node or free cluster (not managed).
The master process must be launched by the user and Lightning will spawn new
worker processes for distributed training, either in a single node or across multiple nodes.
There are two modes the Lightning environment can operate with:
1. The user only launches the main process by :code:`python train.py ...` with no additional environment variables
set. Lightning will spawn new worker processes for distributed training in the current node.
2. The user launches all processes manually or with utilities like :code:`torch.distributed.launch`.
The appropriate environment variables need to be set, and at minimum :code:`LOCAL_RANK`.
If the master address and port are not provided, the default environment will choose them
automatically. It is recommended to use this default environment for single-node distributed
training as it provides the most convenient way to launch the training script.
training as it provides a convenient way to launch the training script.
"""
def __init__(self):
@ -38,7 +42,12 @@ class LightningEnvironment(ClusterEnvironment):
self._world_size: int = 1
def creates_children(self) -> bool:
return False
"""
Returns whether the cluster creates the processes or not.
If at least :code:`LOCAL_RANK` is available as environment variable, Lightning assumes the user acts as the
process launcher/job scheduler and Lightning will not launch new processes.
"""
return "LOCAL_RANK" in os.environ
def master_address(self) -> str:
return os.environ.get("MASTER_ADDR", "127.0.0.1")

View File

@ -156,7 +156,7 @@ class DDPPlugin(ParallelPlugin):
def setup_environment(self) -> None:
# start the other scripts
if not self.cluster_environment.creates_children() and os.environ.get("PL_IN_DDP_SUBPROCESS", "0") != "1":
if not self.cluster_environment.creates_children():
self._call_children_scripts()
# set the task idx
@ -208,8 +208,6 @@ class DDPPlugin(ParallelPlugin):
if self.parallel_devices is None:
raise MisconfigurationException("you selected (distribute_backend = ddp) but did not set Trainer(gpus=?)")
os.environ["PL_IN_DDP_SUBPROCESS"] = "1"
os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}"
self.interactive_ddp_procs = []

View File

@ -519,11 +519,6 @@ class AcceleratorConnector(object):
use_ddp_sharded_spawn = self._distrib_type == DistributedType.DDP_SHARDED_SPAWN
use_ddp_fully_sharded = self._distrib_type == DistributedType.DDP_FULLY_SHARDED
# TODO: decouple from TE
# ddp script mode uses the same flags as TE
if os.environ.get("PL_IN_DDP_SUBPROCESS", False):
use_torchelastic_ddp = False
if use_tpu_spawn:
ddp_plugin_cls = TPUSpawnPlugin
elif use_ddp_sharded:

View File

@ -14,6 +14,8 @@
import os
from unittest import mock
import pytest
from pytorch_lightning.plugins.environments import LightningEnvironment
@ -50,6 +52,20 @@ def test_attributes_from_environment_variables():
assert env.world_size() == 100
@pytest.mark.parametrize(
"environ, creates_children", [
({}, False),
(dict(LOCAL_RANK="2"), True),
(dict(NODE_RANK="1"), False),
]
)
def test_manual_user_launch(environ, creates_children):
""" Test that the environment switches to manual user mode when LOCAL_RANK env variable detected. """
with mock.patch.dict(os.environ, environ):
env = LightningEnvironment()
assert env.creates_children() == creates_children
@mock.patch.dict(os.environ, {
"GROUP_RANK": "1",
})

View File

@ -78,6 +78,12 @@ if [ $? -eq 0 ]; then
report+="Ran\ttests/utilities/test_warnings.py\n"
fi
# test that a user can manually launch individual processes
args="--trainer.gpus 2 --trainer.accelerator ddp --trainer.fast_dev_run 1"
MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=1 python pl_examples/basic_examples/simple_image_classifier.py ${args} &
MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=0 python pl_examples/basic_examples/simple_image_classifier.py ${args}
report+="Ran\tmanual ddp launch test\n"
# echo test report
printf '=%.s' {1..80}
printf "\n$report"