lightning/tests/test_models.py

684 lines
18 KiB
Python
Raw Normal View History

2019-08-05 07:51:47 +00:00
import os
import shutil
import warnings
2019-07-24 12:31:57 +00:00
import pytest
2019-08-05 07:51:47 +00:00
import numpy as np
import torch
2019-07-24 12:31:57 +00:00
from pytorch_lightning import Trainer
2019-08-05 07:51:47 +00:00
from examples import LightningTemplateModel
from pytorch_lightning.testing.lm_test_module import LightningTestModel
2019-07-24 12:31:57 +00:00
from argparse import Namespace
2019-07-26 16:14:58 +00:00
from test_tube import Experiment, SlurmCluster
2019-07-24 19:48:35 +00:00
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
2019-08-05 07:51:47 +00:00
from pytorch_lightning.utilities.debugging import MisconfigurationException
2019-07-24 21:41:08 +00:00
from pytorch_lightning.root_module import memory
2019-07-25 00:21:57 +00:00
from pytorch_lightning.models.trainer import reduce_distributed_output
2019-07-26 16:29:19 +00:00
from pytorch_lightning.root_module import model_saving
2019-07-24 12:31:57 +00:00
2019-07-24 12:53:00 +00:00
SEED = 2334
torch.manual_seed(SEED)
np.random.seed(SEED)
2019-07-24 12:31:57 +00:00
2019-07-24 16:04:48 +00:00
# ------------------------------------------------------------------------
2019-07-24 16:03:39 +00:00
# TESTS
2019-07-24 16:04:48 +00:00
# ------------------------------------------------------------------------
2019-07-28 13:21:41 +00:00
def test_amp_gpu_ddp():
"""
Make sure DDP + AMP work
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_amp_gpu_ddp cannot run. Rerun on a GPU node to run this test')
return
if not torch.cuda.device_count() > 1:
warnings.warn('test_amp_gpu_ddp cannot run. Rerun on a node with 2+ GPUs to run this test')
return
os.environ['MASTER_PORT'] = str(np.random.randint(12000, 19000, 1)[0])
hparams = get_hparams()
model = LightningTestModel(hparams)
trainer_options = dict(
progress_bar=True,
max_nb_epochs=1,
gpus=[0, 1],
distributed_backend='ddp',
use_amp=True
)
run_gpu_model_test(trainer_options, model, hparams)
2019-07-27 02:33:31 +00:00
def test_cpu_slurm_save_load():
"""
Verify model save/load/checkpoint on CPU
:return:
"""
hparams = get_hparams()
model = LightningTestModel(hparams)
save_dir = init_save_dir()
# exp file to get meta
exp = get_exp(False)
exp.argparse(hparams)
exp.save()
2019-07-27 03:02:18 +00:00
cluster_a = SlurmCluster()
2019-07-27 02:33:31 +00:00
trainer_options = dict(
max_nb_epochs=1,
2019-07-27 03:02:18 +00:00
cluster=cluster_a,
2019-07-27 02:33:31 +00:00
experiment=exp,
checkpoint_callback=ModelCheckpoint(save_dir)
)
# fit model
trainer = Trainer(**trainer_options)
result = trainer.fit(model)
real_global_step = trainer.global_step
# traning complete
assert result == 1, 'amp + ddp model failed to complete'
# predict with trained model before saving
# make a prediction
for batch in model.test_dataloader:
break
x, y = batch
x = x.view(x.size(0), -1)
model.eval()
pred_before_saving = model(x)
# test registering a save function
trainer.enable_auto_hpc_walltime_manager()
# test HPC saving
# simulate snapshot on slurm
saved_filepath = trainer.hpc_save(save_dir, exp)
assert os.path.exists(saved_filepath)
2019-07-27 02:39:44 +00:00
# wipe-out trainer and model
2019-07-27 02:51:33 +00:00
# retrain with not much data... this simulates picking training back up after slurm
2019-07-27 02:33:31 +00:00
# we want to see if the weights come back correctly
2019-07-27 03:04:41 +00:00
continue_tng_hparams = get_hparams(continue_training=True, hpc_exp_number=cluster_a.hpc_exp_number)
2019-07-27 02:51:33 +00:00
trainer_options = dict(
max_nb_epochs=1,
cluster=SlurmCluster(continue_tng_hparams),
experiment=exp,
2019-07-27 03:02:18 +00:00
checkpoint_callback=ModelCheckpoint(save_dir),
2019-07-27 02:51:33 +00:00
)
2019-07-27 02:39:44 +00:00
trainer = Trainer(**trainer_options)
2019-07-27 02:57:49 +00:00
model = LightningTestModel(hparams)
# set the epoch start hook so we can predict before the model does the full training
def assert_pred_same():
assert trainer.global_step == real_global_step and trainer.global_step > 0
# predict with loaded model to make sure answers are the same
trainer.model.eval()
new_pred = trainer.model(x)
assert torch.all(torch.eq(pred_before_saving, new_pred)).item() == 1
2019-07-27 02:33:31 +00:00
2019-07-27 02:57:49 +00:00
model.on_epoch_start = assert_pred_same
2019-07-27 02:33:31 +00:00
2019-07-27 02:57:49 +00:00
# by calling fit again, we trigger training, loading weights from the cluster
# and our hook to predict using current model before any more weight updates
trainer.fit(model)
2019-07-27 02:33:31 +00:00
clear_save_dir()
2019-07-26 16:29:19 +00:00
def test_loading_meta_tags():
hparams = get_hparams()
# save tags
exp = get_exp(False)
2019-08-05 21:57:39 +00:00
exp.tag({'some_str': 'a_str', 'an_int': 1, 'a_float': 2.0})
2019-07-26 16:29:19 +00:00
exp.argparse(hparams)
exp.save()
# load tags
tags_path = exp.get_data_path(exp.name, exp.version) + '/meta_tags.csv'
tags = model_saving.load_hparams_from_tags_csv(tags_path)
2019-07-26 16:35:28 +00:00
assert tags.batch_size == 32 and tags.hidden_dim == 1000
2019-07-26 16:31:26 +00:00
clear_save_dir()
2019-07-26 16:29:19 +00:00
2019-07-27 01:55:01 +00:00
2019-07-25 00:21:57 +00:00
def test_dp_output_reduce():
# test identity when we have a single gpu
out = torch.rand(3, 1)
2019-07-25 00:22:54 +00:00
assert reduce_distributed_output(out, nb_gpus=1) is out
2019-07-25 00:21:57 +00:00
# average when we have multiples
assert reduce_distributed_output(out, nb_gpus=2) == out.mean()
# when we have a dict of vals
out = {
'a': out,
'b': {
'c': out
}
}
reduced = reduce_distributed_output(out, nb_gpus=3)
assert reduced['a'] == out['a']
assert reduced['b']['c'] == out['b']['c']
2019-07-24 23:05:46 +00:00
2019-07-27 01:55:01 +00:00
def test_model_saving_loading():
2019-07-27 02:13:41 +00:00
"""
Tests use case where trainer saves the model, and user loads it from tags independently
:return:
"""
2019-07-27 01:55:01 +00:00
hparams = get_hparams()
model = LightningTestModel(hparams)
save_dir = init_save_dir()
# exp file to get meta
exp = get_exp(False)
exp.argparse(hparams)
exp.save()
trainer_options = dict(
max_nb_epochs=1,
cluster=SlurmCluster(),
experiment=exp,
checkpoint_callback=ModelCheckpoint(save_dir)
)
# fit model
trainer = Trainer(**trainer_options)
result = trainer.fit(model)
# traning complete
assert result == 1, 'amp + ddp model failed to complete'
# make a prediction
for batch in model.test_dataloader:
break
x, y = batch
x = x.view(x.size(0), -1)
# generate preds before saving model
model.eval()
pred_before_saving = model(x)
# save model
new_weights_path = os.path.join(save_dir, 'save_test.ckpt')
trainer.save_checkpoint(new_weights_path)
# load new model
tags_path = exp.get_data_path(exp.name, exp.version)
tags_path = os.path.join(tags_path, 'meta_tags.csv')
model_2 = LightningTestModel.load_from_metrics(weights_path=new_weights_path, tags_csv=tags_path, on_gpu=False)
model_2.eval()
# make prediction
# assert that both predictions are the same
new_pred = model_2(x)
2019-07-27 02:13:06 +00:00
assert torch.all(torch.eq(pred_before_saving, new_pred)).item() == 1
2019-07-27 01:55:01 +00:00
clear_save_dir()
2019-07-27 02:24:01 +00:00
def test_model_freeze_unfreeze():
hparams = get_hparams()
model = LightningTestModel(hparams)
model.freeze()
model.unfreeze()
2019-07-24 22:40:54 +00:00
def test_amp_gpu_ddp_slurm_managed():
"""
Make sure DDP + AMP work
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_amp_gpu_ddp cannot run. Rerun on a GPU node to run this test')
return
if not torch.cuda.device_count() > 1:
warnings.warn('test_amp_gpu_ddp cannot run. Rerun on a node with 2+ GPUs to run this test')
return
2019-07-25 15:10:21 +00:00
# simulate setting slurm flags
2019-07-24 22:40:54 +00:00
os.environ['MASTER_PORT'] = str(np.random.randint(12000, 19000, 1)[0])
2019-07-25 15:11:14 +00:00
os.environ['SLURM_LOCALID'] = str(0)
2019-07-24 22:40:54 +00:00
hparams = get_hparams()
model = LightningTestModel(hparams)
trainer_options = dict(
progress_bar=True,
max_nb_epochs=1,
2019-07-24 22:46:21 +00:00
gpus=[0],
2019-07-24 22:40:54 +00:00
distributed_backend='ddp',
use_amp=True
)
save_dir = init_save_dir()
# exp file to get meta
exp = get_exp(False)
exp.argparse(hparams)
exp.save()
# exp file to get weights
checkpoint = ModelCheckpoint(save_dir)
# add these to the trainer options
trainer_options['checkpoint_callback'] = checkpoint
trainer_options['experiment'] = exp
# fit model
trainer = Trainer(**trainer_options)
trainer.is_slurm_managing_tasks = True
result = trainer.fit(model)
# correct result and ok accuracy
assert result == 1, 'amp + ddp model failed to complete'
# test root model address
assert trainer.resolve_root_node_address('abc') == 'abc'
assert trainer.resolve_root_node_address('abc[23]') == 'abc23'
assert trainer.resolve_root_node_address('abc[23-24]') == 'abc23'
assert trainer.resolve_root_node_address('abc[23-24, 45-40, 40]') == 'abc23'
2019-07-25 00:21:57 +00:00
# test model loading with a map_location
2019-07-25 00:08:17 +00:00
map_location = 'cuda:1'
pretrained_model = load_model(exp, save_dir, True, map_location)
2019-07-24 22:40:54 +00:00
# test model preds
run_prediction(model.test_dataloader, pretrained_model)
if trainer.use_ddp:
# on hpc this would work fine... but need to hack it for the purpose of the test
trainer.model = pretrained_model
2019-07-28 13:21:41 +00:00
trainer.optimizers, trainer.lr_schedulers = pretrained_model.configure_optimizers()
2019-07-24 22:40:54 +00:00
# test HPC loading / saving
trainer.hpc_save(save_dir, exp)
trainer.hpc_load(save_dir, on_gpu=True)
2019-07-24 23:23:11 +00:00
# test freeze on gpu
model.freeze()
model.unfreeze()
2019-07-24 22:40:54 +00:00
clear_save_dir()
2019-07-24 22:16:22 +00:00
2019-07-25 00:08:17 +00:00
def test_early_stopping_cpu_model():
"""
Test each of the trainer options
:return:
"""
stopping = EarlyStopping()
trainer_options = dict(
early_stop_callback=stopping,
gradient_clip=1.0,
overfit_pct=0.20,
track_grad_norm=2,
print_nan_grads=True,
progress_bar=False,
experiment=get_exp(),
train_percent_check=0.1,
val_percent_check=0.1
)
model, hparams = get_model()
run_gpu_model_test(trainer_options, model, hparams, on_gpu=False)
# test freeze on cpu
model.freeze()
model.unfreeze()
def test_cpu_model_with_amp():
"""
Make sure model trains on CPU
:return:
"""
trainer_options = dict(
progress_bar=False,
experiment=get_exp(),
max_nb_epochs=1,
train_percent_check=0.4,
val_percent_check=0.4,
use_amp=True
)
model, hparams = get_model()
2019-07-25 01:31:43 +00:00
with pytest.raises((MisconfigurationException, ModuleNotFoundError)):
2019-07-25 00:08:17 +00:00
run_gpu_model_test(trainer_options, model, hparams, on_gpu=False)
2019-07-24 22:22:49 +00:00
def test_cpu_model():
"""
Make sure model trains on CPU
:return:
"""
trainer_options = dict(
progress_bar=False,
experiment=get_exp(),
max_nb_epochs=1,
train_percent_check=0.4,
val_percent_check=0.4
)
model, hparams = get_model()
run_gpu_model_test(trainer_options, model, hparams, on_gpu=False)
def test_all_features_cpu_model():
"""
Test each of the trainer options
:return:
"""
trainer_options = dict(
gradient_clip=1.0,
overfit_pct=0.20,
track_grad_norm=2,
print_nan_grads=True,
progress_bar=False,
experiment=get_exp(),
max_nb_epochs=1,
train_percent_check=0.4,
val_percent_check=0.4
)
model, hparams = get_model()
run_gpu_model_test(trainer_options, model, hparams, on_gpu=False)
def test_single_gpu_model():
"""
Make sure single GPU works (DP mode)
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_single_gpu_model cannot run. Rerun on a GPU node to run this test')
return
model, hparams = get_model()
trainer_options = dict(
progress_bar=False,
max_nb_epochs=1,
train_percent_check=0.1,
val_percent_check=0.1,
gpus=[0]
)
run_gpu_model_test(trainer_options, model, hparams)
def test_multi_gpu_model_dp():
"""
Make sure DP works
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_multi_gpu_model_dp cannot run. Rerun on a GPU node to run this test')
return
if not torch.cuda.device_count() > 1:
warnings.warn('test_multi_gpu_model_dp cannot run. Rerun on a node with 2+ GPUs to run this test')
return
model, hparams = get_model()
trainer_options = dict(
progress_bar=False,
max_nb_epochs=1,
train_percent_check=0.1,
val_percent_check=0.1,
2019-07-24 23:02:19 +00:00
gpus='-1'
2019-07-24 22:22:49 +00:00
)
run_gpu_model_test(trainer_options, model, hparams)
# test memory helper functions
memory.get_gpu_memory_map()
def test_amp_gpu_dp():
"""
Make sure DP + AMP work
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_amp_gpu_dp cannot run. Rerun on a GPU node to run this test')
return
if not torch.cuda.device_count() > 1:
warnings.warn('test_amp_gpu_dp cannot run. Rerun on a node with 2+ GPUs to run this test')
return
model, hparams = get_model()
trainer_options = dict(
max_nb_epochs=1,
gpus='0, 1', # test init with gpu string
distributed_backend='dp',
use_amp=True
)
with pytest.raises(MisconfigurationException):
run_gpu_model_test(trainer_options, model, hparams)
def test_multi_gpu_model_ddp():
"""
Make sure DDP works
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_multi_gpu_model_ddp cannot run. Rerun on a GPU node to run this test')
return
if not torch.cuda.device_count() > 1:
warnings.warn('test_multi_gpu_model_ddp cannot run. Rerun on a node with 2+ GPUs to run this test')
return
os.environ['MASTER_PORT'] = str(np.random.randint(12000, 19000, 1)[0])
model, hparams = get_model()
trainer_options = dict(
progress_bar=False,
max_nb_epochs=1,
2019-07-24 22:33:54 +00:00
train_percent_check=0.4,
val_percent_check=0.2,
2019-07-24 22:22:49 +00:00
gpus=[0, 1],
distributed_backend='ddp'
)
run_gpu_model_test(trainer_options, model, hparams)
def test_ddp_sampler_error():
"""
Make sure DDP + AMP work
:return:
"""
if not torch.cuda.is_available():
warnings.warn('test_amp_gpu_ddp cannot run. Rerun on a GPU node to run this test')
return
if not torch.cuda.device_count() > 1:
warnings.warn('test_amp_gpu_ddp cannot run. Rerun on a node with 2+ GPUs to run this test')
return
os.environ['MASTER_PORT'] = str(np.random.randint(12000, 19000, 1)[0])
hparams = get_hparams()
model = LightningTestModel(hparams, force_remove_distributed_sampler=True)
exp = get_exp(True)
exp.save()
trainer = Trainer(
experiment=exp,
progress_bar=False,
max_nb_epochs=1,
gpus=[0, 1],
distributed_backend='ddp',
use_amp=True
)
with pytest.raises(MisconfigurationException):
trainer.get_dataloaders(model)
clear_save_dir()
2019-07-24 21:11:25 +00:00
2019-07-24 21:41:08 +00:00
2019-07-24 16:13:28 +00:00
# ------------------------------------------------------------------------
# UTILS
# ------------------------------------------------------------------------
2019-07-24 20:57:21 +00:00
def run_gpu_model_test(trainer_options, model, hparams, on_gpu=True):
2019-07-24 16:00:40 +00:00
save_dir = init_save_dir()
# exp file to get meta
exp = get_exp(False)
exp.argparse(hparams)
exp.save()
# exp file to get weights
checkpoint = ModelCheckpoint(save_dir)
2019-07-24 12:44:00 +00:00
2019-07-24 16:13:28 +00:00
# add these to the trainer options
2019-07-24 16:14:26 +00:00
trainer_options['checkpoint_callback'] = checkpoint
trainer_options['experiment'] = exp
2019-07-24 12:44:00 +00:00
2019-07-24 16:13:28 +00:00
# fit model
trainer = Trainer(**trainer_options)
2019-07-24 12:44:00 +00:00
result = trainer.fit(model)
2019-07-24 13:15:26 +00:00
# correct result and ok accuracy
2019-07-24 14:24:15 +00:00
assert result == 1, 'amp + ddp model failed to complete'
2019-07-24 12:44:00 +00:00
2019-07-24 16:00:40 +00:00
# test model loading
2019-07-24 18:17:36 +00:00
pretrained_model = load_model(exp, save_dir, on_gpu)
2019-07-24 16:00:40 +00:00
# test model preds
run_prediction(model.test_dataloader, pretrained_model)
2019-07-24 12:44:00 +00:00
2019-07-24 22:18:58 +00:00
if trainer.use_ddp:
2019-07-24 22:21:22 +00:00
# on hpc this would work fine... but need to hack it for the purpose of the test
2019-07-24 22:18:58 +00:00
trainer.model = pretrained_model
2019-07-28 13:21:41 +00:00
trainer.optimizers, trainer.lr_schedulers = pretrained_model.configure_optimizers()
2019-07-24 22:18:58 +00:00
2019-07-24 22:10:30 +00:00
# test HPC loading / saving
trainer.hpc_save(save_dir, exp)
2019-07-24 22:11:29 +00:00
trainer.hpc_load(save_dir, on_gpu=on_gpu)
2019-07-24 22:10:30 +00:00
2019-07-24 16:00:40 +00:00
clear_save_dir()
2019-07-24 13:17:10 +00:00
2019-07-24 14:24:15 +00:00
2019-07-27 03:04:41 +00:00
def get_hparams(continue_training=False, hpc_exp_number=0):
2019-07-24 16:03:39 +00:00
root_dir = os.path.dirname(os.path.realpath(__file__))
2019-07-27 02:51:33 +00:00
args = {
'drop_prob': 0.2,
'batch_size': 32,
2019-08-05 21:57:39 +00:00
'in_features': 28 * 28,
'learning_rate': 0.001 * 8,
2019-07-27 02:51:33 +00:00
'optimizer_name': 'adam',
'data_root': os.path.join(root_dir, 'mnist'),
'out_features': 10,
'hidden_dim': 1000}
if continue_training:
args['test_tube_do_checkpoint_load'] = True
2019-07-27 03:04:41 +00:00
args['hpc_exp_number'] = hpc_exp_number
2019-07-27 02:51:33 +00:00
hparams = Namespace(**args)
2019-07-24 20:57:21 +00:00
return hparams
def get_model():
# set up model with these hyperparams
hparams = get_hparams()
2019-07-24 16:03:39 +00:00
model = LightningTemplateModel(hparams)
return model, hparams
def get_exp(debug=True):
# set up exp object without actually saving logs
root_dir = os.path.dirname(os.path.realpath(__file__))
exp = Experiment(debug=debug, save_dir=root_dir, name='tests_tt_dir')
return exp
def init_save_dir():
root_dir = os.path.dirname(os.path.realpath(__file__))
save_dir = os.path.join(root_dir, 'save_dir')
if os.path.exists(save_dir):
shutil.rmtree(save_dir)
os.makedirs(save_dir, exist_ok=True)
return save_dir
def clear_save_dir():
root_dir = os.path.dirname(os.path.realpath(__file__))
save_dir = os.path.join(root_dir, 'save_dir')
if os.path.exists(save_dir):
shutil.rmtree(save_dir)
2019-07-25 00:08:17 +00:00
def load_model(exp, save_dir, on_gpu, map_location=None):
2019-07-24 16:03:39 +00:00
# load trained model
tags_path = exp.get_data_path(exp.name, exp.version)
tags_path = os.path.join(tags_path, 'meta_tags.csv')
checkpoints = [x for x in os.listdir(save_dir) if '.ckpt' in x]
weights_dir = os.path.join(save_dir, checkpoints[0])
2019-07-25 00:08:17 +00:00
trained_model = LightningTemplateModel.load_from_metrics(weights_path=weights_dir,
tags_csv=tags_path,
on_gpu=on_gpu,
map_location=map_location)
2019-07-24 16:03:39 +00:00
assert trained_model is not None, 'loading model failed'
return trained_model
def run_prediction(dataloader, trained_model):
# run prediction on 1 batch
for batch in dataloader:
break
x, y = batch
x = x.view(x.size(0), -1)
y_hat = trained_model(x)
# acc
labels_hat = torch.argmax(y_hat, dim=1)
val_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)
val_acc = torch.tensor(val_acc)
val_acc = val_acc.item()
print(val_acc)
2019-07-24 23:12:03 +00:00
assert val_acc > 0.50, f'this model is expected to get > 0.50 in test set (it got {val_acc})'
2019-07-24 16:03:39 +00:00
def assert_ok_acc(trainer):
# this model should get 0.80+ acc
acc = trainer.tng_tqdm_dic['val_acc']
2019-07-24 23:12:03 +00:00
assert acc > 0.50, f'model failed to get expected 0.50 validation accuracy. Got: {acc}'
2019-07-24 16:03:39 +00:00
2019-07-24 12:31:57 +00:00
if __name__ == '__main__':
pytest.main([__file__])