import os import pytest import torch import tests.models.utils as tutils from pytorch_lightning import Trainer from pytorch_lightning.callbacks import ( ModelCheckpoint, ) from pytorch_lightning.core import memory from tests.models import ( LightningTestModel, ) from pytorch_lightning.trainer.distrib_parts import ( parse_gpu_ids, determine_root_gpu_device, ) from pytorch_lightning.utilities.debugging import MisconfigurationException PRETEND_N_OF_GPUS = 16 def test_multi_gpu_model_ddp2(tmpdir): """Make sure DDP2 works.""" if not tutils.can_run_gpu_test(): return tutils.reset_seed() tutils.set_random_master_port() model, hparams = tutils.get_model() trainer_options = dict( default_save_path=tmpdir, show_progress_bar=True, max_epochs=1, train_percent_check=0.4, val_percent_check=0.2, gpus=2, weights_summary=None, distributed_backend='ddp2' ) tutils.run_model_test(trainer_options, model) def test_multi_gpu_model_ddp(tmpdir): """Make sure DDP works.""" if not tutils.can_run_gpu_test(): return tutils.reset_seed() tutils.set_random_master_port() model, hparams = tutils.get_model() trainer_options = dict( default_save_path=tmpdir, show_progress_bar=False, max_epochs=1, train_percent_check=0.4, val_percent_check=0.2, gpus=[0, 1], distributed_backend='ddp' ) tutils.run_model_test(trainer_options, model) def test_ddp_all_dataloaders_passed_to_fit(tmpdir): """Make sure DDP works with dataloaders passed to fit()""" if not tutils.can_run_gpu_test(): return tutils.reset_seed() tutils.set_random_master_port() model, hparams = tutils.get_model() trainer_options = dict(default_save_path=tmpdir, show_progress_bar=False, max_epochs=1, train_percent_check=0.4, val_percent_check=0.2, gpus=[0, 1], distributed_backend='ddp') fit_options = dict(train_dataloader=model.train_dataloader(), val_dataloaders=model.val_dataloader()) trainer = Trainer(**trainer_options) result = trainer.fit(model, **fit_options) assert result == 1, "DDP doesn't work with dataloaders passed to fit()." def test_optimizer_return_options(): tutils.reset_seed() trainer = Trainer() model, hparams = tutils.get_model() # single optimizer opt_a = torch.optim.Adam(model.parameters(), lr=0.002) opt_b = torch.optim.SGD(model.parameters(), lr=0.002) optim, lr_sched = trainer.init_optimizers(opt_a) assert len(optim) == 1 and len(lr_sched) == 0 # opt tuple opts = (opt_a, opt_b) optim, lr_sched = trainer.init_optimizers(opts) assert len(optim) == 2 and optim[0] == opts[0] and optim[1] == opts[1] assert len(lr_sched) == 0 # opt list opts = [opt_a, opt_b] optim, lr_sched = trainer.init_optimizers(opts) assert len(optim) == 2 and optim[0] == opts[0] and optim[1] == opts[1] assert len(lr_sched) == 0 # opt tuple of lists scheduler = torch.optim.lr_scheduler.StepLR(opt_a, 10) opts = ([opt_a], [scheduler]) optim, lr_sched = trainer.init_optimizers(opts) assert len(optim) == 1 and len(lr_sched) == 1 assert optim[0] == opts[0][0] and \ lr_sched[0] == dict(scheduler=scheduler, interval='epoch', frequency=1, reduce_on_plateau=False, monitor='val_loss') def test_cpu_slurm_save_load(tmpdir): """Verify model save/load/checkpoint on CPU.""" tutils.reset_seed() hparams = tutils.get_hparams() model = LightningTestModel(hparams) # logger file to get meta logger = tutils.get_test_tube_logger(tmpdir, False) version = logger.version trainer_options = dict( max_epochs=1, logger=logger, checkpoint_callback=ModelCheckpoint(tmpdir) ) # fit model trainer = Trainer(**trainer_options) result = trainer.fit(model) real_global_step = trainer.global_step # traning complete assert result == 1, 'amp + ddp model failed to complete' # predict with trained model before saving # make a prediction dataloaders = model.test_dataloader() if not isinstance(dataloaders, list): dataloaders = [dataloaders] for dataloader in dataloaders: for batch in dataloader: break x, y = batch x = x.view(x.size(0), -1) model.eval() pred_before_saving = model(x) # test HPC saving # simulate snapshot on slurm saved_filepath = trainer.hpc_save(tmpdir, logger) assert os.path.exists(saved_filepath) # new logger file to get meta logger = tutils.get_test_tube_logger(tmpdir, False, version=version) trainer_options = dict( max_epochs=1, logger=logger, checkpoint_callback=ModelCheckpoint(tmpdir), ) trainer = Trainer(**trainer_options) model = LightningTestModel(hparams) # set the epoch start hook so we can predict before the model does the full training def assert_pred_same(): assert trainer.global_step == real_global_step and trainer.global_step > 0 # predict with loaded model to make sure answers are the same trainer.model.eval() new_pred = trainer.model(x) assert torch.all(torch.eq(pred_before_saving, new_pred)).item() == 1 model.on_epoch_start = assert_pred_same # by calling fit again, we trigger training, loading weights from the cluster # and our hook to predict using current model before any more weight updates trainer.fit(model) def test_multi_gpu_none_backend(tmpdir): """Make sure when using multiple GPUs the user can't use `distributed_backend = None`.""" tutils.reset_seed() if not tutils.can_run_gpu_test(): return model, hparams = tutils.get_model() trainer_options = dict( default_save_path=tmpdir, show_progress_bar=False, max_epochs=1, train_percent_check=0.1, val_percent_check=0.1, gpus='-1' ) with pytest.warns(UserWarning): tutils.run_model_test(trainer_options, model) def test_multi_gpu_model_dp(tmpdir): """Make sure DP works.""" tutils.reset_seed() if not tutils.can_run_gpu_test(): return model, hparams = tutils.get_model() trainer_options = dict( default_save_path=tmpdir, show_progress_bar=False, distributed_backend='dp', max_epochs=1, train_percent_check=0.1, val_percent_check=0.1, gpus='-1' ) tutils.run_model_test(trainer_options, model) # test memory helper functions memory.get_memory_profile('min_max') @pytest.fixture def mocked_device_count(monkeypatch): def device_count(): return PRETEND_N_OF_GPUS monkeypatch.setattr(torch.cuda, 'device_count', device_count) @pytest.fixture def mocked_device_count_0(monkeypatch): def device_count(): return 0 monkeypatch.setattr(torch.cuda, 'device_count', device_count) @pytest.mark.gpus_param_tests @pytest.mark.parametrize(["gpus", "expected_num_gpus", "distributed_backend"], [ pytest.param(None, 0, None, id="None - expect 0 gpu to use."), pytest.param(0, 0, None, id="Oth gpu, expect 1 gpu to use."), pytest.param(1, 1, None, id="1st gpu, expect 1 gpu to use."), pytest.param(-1, PRETEND_N_OF_GPUS, "ddp", id="-1 - use all gpus"), pytest.param('-1', PRETEND_N_OF_GPUS, "ddp", id="'-1' - use all gpus"), pytest.param(3, 3, "ddp", id="3rd gpu - 1 gpu to use (backend:ddp)") ]) def test_trainer_gpu_parse(mocked_device_count, gpus, expected_num_gpus, distributed_backend): assert Trainer(gpus=gpus, distributed_backend=distributed_backend).num_gpus == expected_num_gpus @pytest.mark.gpus_param_tests @pytest.mark.parametrize(["gpus", "expected_num_gpus", "distributed_backend"], [ pytest.param(None, 0, None, id="None - expect 0 gpu to use."), pytest.param(None, 0, "ddp", id="None - expect 0 gpu to use."), ]) def test_trainer_num_gpu_0(mocked_device_count_0, gpus, expected_num_gpus, distributed_backend): assert Trainer(gpus=gpus, distributed_backend=distributed_backend).num_gpus == expected_num_gpus @pytest.mark.gpus_param_tests @pytest.mark.parametrize(['gpus', 'expected_root_gpu', "distributed_backend"], [ pytest.param(None, None, "ddp", id="None is None"), pytest.param(0, None, "ddp", id="O gpus, expect gpu root device to be None."), pytest.param(1, 0, "ddp", id="1 gpu, expect gpu root device to be 0."), pytest.param(-1, 0, "ddp", id="-1 - use all gpus, expect gpu root device to be 0."), pytest.param('-1', 0, "ddp", id="'-1' - use all gpus, expect gpu root device to be 0."), pytest.param(3, 0, "ddp", id="3 gpus, expect gpu root device to be 0.(backend:ddp)") ]) def test_root_gpu_property(mocked_device_count, gpus, expected_root_gpu, distributed_backend): assert Trainer(gpus=gpus, distributed_backend=distributed_backend).root_gpu == expected_root_gpu @pytest.mark.gpus_param_tests @pytest.mark.parametrize([ 'gpus', 'expected_root_gpu', "distributed_backend"], [ pytest.param(None, None, None, id="None is None"), pytest.param(None, None, "ddp", id="None is None"), pytest.param(0, None, "ddp", id="None is None"), ]) def test_root_gpu_property_0_passing( mocked_device_count_0, gpus, expected_root_gpu, distributed_backend): assert Trainer(gpus=gpus, distributed_backend=distributed_backend).root_gpu == expected_root_gpu # Asking for a gpu when non are available will result in a MisconfigurationException @pytest.mark.gpus_param_tests @pytest.mark.parametrize([ 'gpus', 'expected_root_gpu', "distributed_backend"], [ pytest.param(1, None, "ddp"), pytest.param(3, None, "ddp"), pytest.param(3, None, "ddp"), pytest.param([1, 2], None, "ddp"), pytest.param([0, 1], None, "ddp"), pytest.param(-1, None, "ddp"), pytest.param('-1', None, "ddp") ]) def test_root_gpu_property_0_raising( mocked_device_count_0, gpus, expected_root_gpu, distributed_backend): with pytest.raises(MisconfigurationException): Trainer(gpus=gpus, distributed_backend=distributed_backend).root_gpu @pytest.mark.gpus_param_tests @pytest.mark.parametrize(['gpus', 'expected_root_gpu'], [ pytest.param(None, None, id="No gpus, expect gpu root device to be None"), pytest.param([0], 0, id="Oth gpu, expect gpu root device to be 0."), pytest.param([1], 1, id="1st gpu, expect gpu root device to be 1."), pytest.param([3], 3, id="3rd gpu, expect gpu root device to be 3."), pytest.param([1, 2], 1, id="[1, 2] gpus, expect gpu root device to be 1."), ]) def test_determine_root_gpu_device(gpus, expected_root_gpu): assert determine_root_gpu_device(gpus) == expected_root_gpu @pytest.mark.gpus_param_tests @pytest.mark.parametrize(['gpus', 'expected_gpu_ids'], [ pytest.param(None, None), pytest.param(0, None), pytest.param(1, [0]), pytest.param(3, [0, 1, 2]), pytest.param(-1, list(range(PRETEND_N_OF_GPUS)), id="-1 - use all gpus"), pytest.param([0], [0]), pytest.param([1, 3], [1, 3]), pytest.param('0', [0]), pytest.param('3', [3]), pytest.param('1, 3', [1, 3]), pytest.param('-1', list(range(PRETEND_N_OF_GPUS)), id="'-1' - use all gpus"), ]) def test_parse_gpu_ids(mocked_device_count, gpus, expected_gpu_ids): assert parse_gpu_ids(gpus) == expected_gpu_ids @pytest.mark.gpus_param_tests @pytest.mark.parametrize(['gpus'], [ pytest.param(0.1), pytest.param(-2), pytest.param(False), pytest.param([]), pytest.param([-1]), pytest.param([None]), pytest.param(['0']), pytest.param((0, 1)), ]) def test_parse_gpu_fail_on_unsupported_inputs(mocked_device_count, gpus): with pytest.raises(MisconfigurationException): parse_gpu_ids(gpus) @pytest.mark.gpus_param_tests @pytest.mark.parametrize("gpus", ['']) def test_parse_gpu_fail_on_empty_string(mocked_device_count, gpus): # This currently results in a ValueError instead of MisconfigurationException with pytest.raises(ValueError): parse_gpu_ids(gpus) @pytest.mark.gpus_param_tests @pytest.mark.parametrize("gpus", [[1, 2, 19], -1, '-1']) def test_parse_gpu_fail_on_non_existant_id(mocked_device_count_0, gpus): with pytest.raises(MisconfigurationException): parse_gpu_ids(gpus) @pytest.mark.gpus_param_tests def test_parse_gpu_fail_on_non_existant_id_2(mocked_device_count): with pytest.raises(MisconfigurationException): parse_gpu_ids([1, 2, 19]) @pytest.mark.gpus_param_tests @pytest.mark.parametrize("gpus", [-1, '-1']) def test_parse_gpu_returns_None_when_no_devices_are_available(mocked_device_count_0, gpus): with pytest.raises(MisconfigurationException): parse_gpu_ids(gpus) # if __name__ == '__main__': # pytest.main([__file__])