ref: enable custom clusters (1/n) (#4048)

* enable cluster plugins

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices
This commit is contained in:
William Falcon 2020-10-10 08:09:29 -04:00 committed by GitHub
parent efec8c7c88
commit 2b255a3df4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 58 deletions

View File

@ -127,14 +127,14 @@ class AcceleratorConnector:
self.trainer.replace_sampler_ddp = replace_sampler_ddp
def _select_environment(self):
env = None
if self.trainer.plugin_connector.cloud_environment:
return self.trainer.plugin_connector.cloud_environment
elif self._is_using_torchelastic():
env = TorchElasticEnvironment()
env = self.trainer.plugin_connector.cloud_environment
elif self.trainer.is_slurm_managing_tasks:
env = SLURMEnvironment()
elif self._is_using_torchelastic():
env = TorchElasticEnvironment()
else:
env = TorchElasticEnvironment()
return env
def _is_using_torchelastic(self):
@ -163,45 +163,55 @@ class AcceleratorConnector:
if os.environ.get('PL_DDP_PID', False):
use_torchelastic_ddp = False
cluster_env = self._select_environment()
# choose the appropriate accelerator backend
if self.trainer.use_ddp2:
accelerator_backend = accelerators.DDP2Backend(self.trainer)
accelerator_backend = accelerators.DDP2Backend(self.trainer, cluster_env)
elif use_ddp_cpu_slurm:
accelerator_backend = accelerators.DDPCPUSLURMBackend(self.trainer)
accelerator_backend = accelerators.DDPCPUSLURMBackend(self.trainer, cluster_env)
elif use_slurm_ddp:
accelerator_backend = accelerators.DDPSLURMBackend(self.trainer)
accelerator_backend = accelerators.DDPSLURMBackend(self.trainer, cluster_env)
elif use_ddp_cpu_torch_elastic:
accelerator_backend = accelerators.DDPCPUTorchElasticBackend(self.trainer)
accelerator_backend = accelerators.DDPCPUTorchElasticBackend(self.trainer, cluster_env)
elif use_torchelastic_ddp:
accelerator_backend = accelerators.DDPTorchElasticBackend(self.trainer)
accelerator_backend = accelerators.DDPTorchElasticBackend(self.trainer, cluster_env)
elif use_ddp_spawn:
accelerator_backend = accelerators.DDPSpawnBackend(self.trainer, nprocs=self.trainer.num_processes)
accelerator_backend = accelerators.DDPSpawnBackend(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env
)
elif use_ddp_cpu_spawn:
accelerator_backend = accelerators.DDPCPUSpawnBackend(self.trainer, nprocs=self.trainer.num_processes)
accelerator_backend = accelerators.DDPCPUSpawnBackend(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env
)
elif self.trainer.distributed_backend == "ddp":
accelerator_backend = accelerators.DDPBackend(self.trainer)
accelerator_backend = accelerators.DDPBackend(self.trainer, cluster_env)
elif self.trainer.use_dp:
accelerator_backend = accelerators.DataParallelBackend(self.trainer)
accelerator_backend = accelerators.DataParallelBackend(self.trainer, cluster_env)
elif self.trainer.use_horovod:
accelerator_backend = accelerators.HorovodBackend(self.trainer)
accelerator_backend = accelerators.HorovodBackend(self.trainer, cluster_env)
elif self.trainer.use_single_gpu:
accelerator_backend = accelerators.GPUBackend(self.trainer)
accelerator_backend = accelerators.GPUBackend(self.trainer, cluster_env)
elif self.trainer.use_tpu:
accelerator_backend = accelerators.TPUBackend(self.trainer)
accelerator_backend = accelerators.TPUBackend(self.trainer, cluster_env)
elif self.trainer.distributed_backend is None:
accelerator_backend = accelerators.CPUBackend(self.trainer)
accelerator_backend = accelerators.CPUBackend(self.trainer, cluster_env)
else:
raise MisconfigurationException(
f'Trainer(distributed_backend={self.trainer.distributed_backend} is not a supported backend'

View File

@ -195,45 +195,9 @@ class Accelerator(object):
def init_ddp_connection(
self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True
) -> None:
if is_slurm_managing_tasks:
self.trainer.slurm_connector.connect_ddp(global_rank, world_size)
else:
self.connect_torchelastic(global_rank, world_size)
def connect_torchelastic(
self, global_rank: int, world_size: int
) -> None:
"""
Override to define your custom way of setting up a distributed environment.
Lightning's implementation uses env:// init by default and sets the first node as root
for SLURM managed cluster.
Args:
global_rank: The global process idx.
world_size: Number of GPUs being use across all nodes. (num_nodes * num_gpus).
"""
if "MASTER_ADDR" not in os.environ:
rank_zero_warn(
"MASTER_ADDR environment variable is not defined. Set as localhost"
)
os.environ["MASTER_ADDR"] = "127.0.0.1"
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
if "MASTER_PORT" not in os.environ:
rank_zero_warn(
"MASTER_PORT environment variable is not defined. Set as 12910"
)
os.environ["MASTER_PORT"] = "12910"
log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")
if "WORLD_SIZE" in os.environ and int(os.environ["WORLD_SIZE"]) != world_size:
rank_zero_warn(
f"WORLD_SIZE environment variable ({os.environ['WORLD_SIZE']}) "
f"is not equal to the computed world size ({world_size}). Ignored."
)
os.environ["MASTER_ADDR"] = str(self.cluster_environment.master_address())
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["WORLD_SIZE"] = str(self.cluster_environment.world_size())
torch_backend = "nccl" if self.trainer.on_gpu else "gloo"
if not torch.distributed.is_initialized():

View File

@ -22,7 +22,7 @@ class SLURMConnector:
# extract SLURM flag vars
# whenever we have the correct number of tasks, we let slurm manage processes
# otherwise we launch the required number of processes
if self.trainer.use_ddp:
if self.trainer.use_ddp or self.trainer.use_ddp2:
self.trainer.num_requested_gpus = self.trainer.num_gpus * num_gpu_nodes
self.trainer.num_slurm_tasks = 0
try:

View File

@ -43,6 +43,7 @@ omit =
pytorch_lightning/accelerators/ddp2_*.py
pytorch_lightning/accelerators/dp_*.py
pytorch_lightning/accelerators/tpu_*.py
pytorch_lightning/cluster_environments/*.py
[flake8]
# TODO: this should be 88 or 100 according PEP8

View File

@ -17,6 +17,7 @@ import os
from tests.base.boring_model import BoringModel
from pytorch_lightning.callbacks import Callback
from pytorch_lightning import accelerators, Trainer
from pytorch_lightning.cluster_environments import SLURMEnvironment, TorchElasticEnvironment, ClusterEnvironment
from unittest import mock
@ -24,6 +25,7 @@ def test_accelerator_choice_cpu(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert isinstance(trainer.accelerator_backend, accelerators.CPUBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
model = BoringModel()
trainer = Trainer(
@ -36,7 +38,9 @@ def test_accelerator_choice_cpu(tmpdir):
def test_accelerator_choice_ddp_cpu(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSpawnBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()
model = BoringModel()
@ -55,7 +59,9 @@ def test_accelerator_choice_ddp_cpu(tmpdir):
def test_accelerator_choice_ddp(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()
model = BoringModel()
@ -75,7 +81,9 @@ def test_accelerator_choice_ddp(tmpdir):
def test_accelerator_choice_ddp_spawn(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPSpawnBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()
model = BoringModel()
@ -101,7 +109,9 @@ def test_accelerator_choice_ddp_spawn(tmpdir):
def test_accelerator_choice_ddp_slurm(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPSLURMBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment)
raise SystemExit()
model = BoringModel()
@ -128,7 +138,9 @@ def test_accelerator_choice_ddp_slurm(tmpdir):
def test_accelerator_choice_ddp2_slurm(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp2
assert isinstance(trainer.accelerator_backend, accelerators.DDP2Backend)
assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment)
raise SystemExit()
model = BoringModel()
@ -153,7 +165,9 @@ def test_accelerator_choice_ddp2_slurm(tmpdir):
def test_accelerator_choice_ddp_te(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPTorchElasticBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()
model = BoringModel()
@ -168,6 +182,33 @@ def test_accelerator_choice_ddp_te(tmpdir):
trainer.fit(model)
@mock.patch.dict(os.environ, {
"CUDA_VISIBLE_DEVICES": "0,1",
"WORLD_SIZE": "2",
"LOCAL_RANK": "0",
"NODE_RANK": "0"
})
@mock.patch('torch.cuda.device_count', return_value=2)
def test_accelerator_choice_ddp2_te(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp2
assert isinstance(trainer.accelerator_backend, accelerators.DDP2Backend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()
model = BoringModel()
trainer = Trainer(
fast_dev_run=True,
distributed_backend='ddp2',
gpus=2,
callbacks=[CB()]
)
with pytest.raises(SystemExit):
trainer.fit(model)
@mock.patch.dict(os.environ, {
"WORLD_SIZE": "1",
"LOCAL_RANK": "0",
@ -177,7 +218,9 @@ def test_accelerator_choice_ddp_te(tmpdir):
def test_accelerator_choice_ddp_cpu_te(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUTorchElasticBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()
model = BoringModel()
@ -203,7 +246,9 @@ def test_accelerator_choice_ddp_cpu_te(tmpdir):
def test_accelerator_choice_ddp_cpu_slurm(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSLURMBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment)
raise SystemExit()
model = BoringModel()
@ -216,3 +261,39 @@ def test_accelerator_choice_ddp_cpu_slurm(tmpdir):
with pytest.raises(SystemExit):
trainer.fit(model)
@mock.patch.dict(os.environ, {
"SLURM_NTASKS": "1",
"SLURM_JOB_NAME": "SOME_NAME",
"SLURM_NODEID": "0",
"LOCAL_RANK": "0",
"SLURM_LOCALID": "0"
})
@mock.patch('torch.cuda.device_count', return_value=0)
def test_accelerator_choice_ddp_cpu_custom_cluster(tmpdir):
"""
Test that we choose the custom cluster even when SLURM or TE flags are around
"""
class CustomCluster(ClusterEnvironment):
def master_address(self):
return 'asdf'
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSLURMBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, CustomCluster)
raise SystemExit()
model = BoringModel()
trainer = Trainer(
plugins=[CustomCluster()],
fast_dev_run=True,
distributed_backend='ddp_cpu',
num_processes=1,
callbacks=[CB()]
)
with pytest.raises(SystemExit):
trainer.fit(model)