diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ea6435b11..d569b031e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -166,6 +166,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added support for `accelerator='cpu'|'gpu'|'tpu'|'ipu'|'auto'` ([#7808](https://github.com/PyTorchLightning/pytorch-lightning/pull/7808)) +- Enabled traditional/manual launching of DDP processes through `LOCAL_RANK` and `NODE_RANK` environment variable assignments ([#7480](https://github.com/PyTorchLightning/pytorch-lightning/pull/7480)) + + ### Changed diff --git a/pytorch_lightning/plugins/environments/lightning_environment.py b/pytorch_lightning/plugins/environments/lightning_environment.py index 25da0cfb69..077ebf995e 100644 --- a/pytorch_lightning/plugins/environments/lightning_environment.py +++ b/pytorch_lightning/plugins/environments/lightning_environment.py @@ -23,12 +23,16 @@ class LightningEnvironment(ClusterEnvironment): """ The default environment used by Lightning for a single node or free cluster (not managed). - The master process must be launched by the user and Lightning will spawn new - worker processes for distributed training, either in a single node or across multiple nodes. + There are two modes the Lightning environment can operate with: + + 1. The user only launches the main process by :code:`python train.py ...` with no additional environment variables + set. Lightning will spawn new worker processes for distributed training in the current node. + 2. The user launches all processes manually or with utilities like :code:`torch.distributed.launch`. + The appropriate environment variables need to be set, and at minimum :code:`LOCAL_RANK`. If the master address and port are not provided, the default environment will choose them automatically. It is recommended to use this default environment for single-node distributed - training as it provides the most convenient way to launch the training script. + training as it provides a convenient way to launch the training script. """ def __init__(self): @@ -38,7 +42,12 @@ class LightningEnvironment(ClusterEnvironment): self._world_size: int = 1 def creates_children(self) -> bool: - return False + """ + Returns whether the cluster creates the processes or not. + If at least :code:`LOCAL_RANK` is available as environment variable, Lightning assumes the user acts as the + process launcher/job scheduler and Lightning will not launch new processes. + """ + return "LOCAL_RANK" in os.environ def master_address(self) -> str: return os.environ.get("MASTER_ADDR", "127.0.0.1") diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 9e0fbad33d..c3c05f49b4 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -156,7 +156,7 @@ class DDPPlugin(ParallelPlugin): def setup_environment(self) -> None: # start the other scripts - if not self.cluster_environment.creates_children() and os.environ.get("PL_IN_DDP_SUBPROCESS", "0") != "1": + if not self.cluster_environment.creates_children(): self._call_children_scripts() # set the task idx @@ -208,8 +208,6 @@ class DDPPlugin(ParallelPlugin): if self.parallel_devices is None: raise MisconfigurationException("you selected (distribute_backend = ddp) but did not set Trainer(gpus=?)") - os.environ["PL_IN_DDP_SUBPROCESS"] = "1" - os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}" self.interactive_ddp_procs = [] diff --git a/pytorch_lightning/trainer/connectors/accelerator_connector.py b/pytorch_lightning/trainer/connectors/accelerator_connector.py index a9355741a2..30e7ec97d6 100644 --- a/pytorch_lightning/trainer/connectors/accelerator_connector.py +++ b/pytorch_lightning/trainer/connectors/accelerator_connector.py @@ -519,11 +519,6 @@ class AcceleratorConnector(object): use_ddp_sharded_spawn = self._distrib_type == DistributedType.DDP_SHARDED_SPAWN use_ddp_fully_sharded = self._distrib_type == DistributedType.DDP_FULLY_SHARDED - # TODO: decouple from TE - # ddp script mode uses the same flags as TE - if os.environ.get("PL_IN_DDP_SUBPROCESS", False): - use_torchelastic_ddp = False - if use_tpu_spawn: ddp_plugin_cls = TPUSpawnPlugin elif use_ddp_sharded: diff --git a/tests/plugins/environments/test_lightning_environment.py b/tests/plugins/environments/test_lightning_environment.py index 29917877b2..70fe1b88e3 100644 --- a/tests/plugins/environments/test_lightning_environment.py +++ b/tests/plugins/environments/test_lightning_environment.py @@ -14,6 +14,8 @@ import os from unittest import mock +import pytest + from pytorch_lightning.plugins.environments import LightningEnvironment @@ -50,6 +52,20 @@ def test_attributes_from_environment_variables(): assert env.world_size() == 100 +@pytest.mark.parametrize( + "environ, creates_children", [ + ({}, False), + (dict(LOCAL_RANK="2"), True), + (dict(NODE_RANK="1"), False), + ] +) +def test_manual_user_launch(environ, creates_children): + """ Test that the environment switches to manual user mode when LOCAL_RANK env variable detected. """ + with mock.patch.dict(os.environ, environ): + env = LightningEnvironment() + assert env.creates_children() == creates_children + + @mock.patch.dict(os.environ, { "GROUP_RANK": "1", }) diff --git a/tests/special_tests.sh b/tests/special_tests.sh index 95311fb2df..96d1e3ba4a 100755 --- a/tests/special_tests.sh +++ b/tests/special_tests.sh @@ -78,6 +78,12 @@ if [ $? -eq 0 ]; then report+="Ran\ttests/utilities/test_warnings.py\n" fi +# test that a user can manually launch individual processes +args="--trainer.gpus 2 --trainer.accelerator ddp --trainer.fast_dev_run 1" +MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=1 python pl_examples/basic_examples/simple_image_classifier.py ${args} & +MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=0 python pl_examples/basic_examples/simple_image_classifier.py ${args} +report+="Ran\tmanual ddp launch test\n" + # echo test report printf '=%.s' {1..80} printf "\n$report"