lightning/pytorch_lightning/accelerators/accelerator_connector.py

105 lines
4.2 KiB
Python
Raw Normal View History

from pytorch_lightning import accelerators
import os
import torch
from pytorch_lightning.utilities import device_parser
from pytorch_lightning.utilities import rank_zero_warn
class AcceleratorConnector:
def __init__(self, trainer):
self.trainer = trainer
def on_trainer_init(self, num_processes, tpu_cores, distributed_backend, auto_select_gpus, gpus):
self.trainer.tpu_cores = device_parser.parse_tpu_cores(tpu_cores)
self.trainer.on_tpu = self.trainer.tpu_cores is not None
self.trainer.tpu_id = self.trainer.tpu_cores[0] if isinstance(self.trainer.tpu_cores, list) else None
if num_processes != 1 and distributed_backend != "ddp_cpu":
rank_zero_warn("num_processes is only used for distributed_backend=\"ddp_cpu\". Ignoring it.")
self.trainer.num_processes = num_processes
# override with environment flag
gpus = os.environ.get('PL_TRAINER_GPUS', gpus)
# for gpus allow int, string and gpu list
if auto_select_gpus and isinstance(gpus, int):
self.trainer.gpus = self.trainer.tuner.pick_multiple_gpus(gpus)
else:
self.trainer.gpus = gpus
self.trainer.data_parallel_device_ids = device_parser.parse_gpu_ids(self.trainer.gpus)
self.trainer.root_gpu = device_parser.determine_root_gpu_device(self.trainer.data_parallel_device_ids)
self.trainer.root_device = torch.device("cpu")
self.trainer.on_gpu = True if (self.trainer.data_parallel_device_ids and torch.cuda.is_available()) else False
# tpu state flags
self.trainer.use_tpu = False
self.trainer.tpu_local_core_rank = None
self.trainer.tpu_global_core_rank = None
# distributed backend choice
self.trainer.distributed_backend = distributed_backend
self.trainer.set_distributed_mode(distributed_backend)
# override dist backend when using tpus
if self.trainer.on_tpu:
self.trainer.distributed_backend = 'tpu'
self.trainer.init_tpu()
# init flags for SLURM+DDP to work
self.trainer.world_size = 1
self.trainer.interactive_ddp_procs = []
self.trainer.configure_slurm_ddp(self.trainer.num_nodes)
self.trainer.node_rank = self.trainer.determine_ddp_node_rank()
self.trainer.local_rank = self.trainer.determine_local_rank()
self.trainer.global_rank = 0
# NVIDIA setup
self.trainer.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids)
def select_accelerator(self):
# SLURM ddp
use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks
# torchelastic or general non_slurm ddp
te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ)
use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed
use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend in ['ddp_cpu', 'ddp_spawn']
# choose the appropriate accelerator backend
if self.trainer.use_ddp2:
accelerator_backend = accelerators.DDP2Backend(self.trainer)
elif use_slurm_ddp:
accelerator_backend = accelerators.DDPBackend(self.trainer, mode='slurm_ddp')
elif use_torchelastic_ddp:
accelerator_backend = accelerators.DDPBackend(self.trainer, mode='torchelastic_ddp')
elif use_ddp_spawn:
accelerator_backend = accelerators.DDPSpawnBackend(self.trainer, nprocs=self.trainer.num_processes)
elif self.trainer.distributed_backend == 'ddp':
accelerator_backend = accelerators.DDPBackend(self.trainer, mode='ddp')
elif self.trainer.use_dp:
accelerator_backend = accelerators.DataParallelBackend(self.trainer)
elif self.trainer.use_horovod:
accelerator_backend = accelerators.HorovodBackend(self.trainer)
elif self.trainer.use_single_gpu:
accelerator_backend = accelerators.GPUBackend(self.trainer)
elif self.trainer.use_tpu:
accelerator_backend = accelerators.TPUBackend(self.trainer)
else:
accelerator_backend = accelerators.CPUBackend(self.trainer)
return accelerator_backend