2021-02-17 20:23:42 +00:00
|
|
|
import json
|
|
|
|
import os
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
import torch
|
|
|
|
from torch import Tensor
|
|
|
|
from torch.optim import Optimizer
|
|
|
|
|
|
|
|
from pytorch_lightning import Trainer
|
|
|
|
from pytorch_lightning.plugins import DeepSpeedPlugin, DeepSpeedPrecisionPlugin
|
2021-02-21 00:24:44 +00:00
|
|
|
from pytorch_lightning.plugins.training_type.deepspeed import LightningDeepSpeedModule
|
2021-02-17 20:23:42 +00:00
|
|
|
from pytorch_lightning.utilities import _APEX_AVAILABLE, _DEEPSPEED_AVAILABLE, _NATIVE_AMP_AVAILABLE
|
|
|
|
from pytorch_lightning.utilities.exceptions import MisconfigurationException
|
|
|
|
from tests.helpers.boring_model import BoringModel
|
|
|
|
|
|
|
|
|
2021-02-21 00:24:44 +00:00
|
|
|
def test_deepspeed_lightning_module(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure that a model wrapped in `LightningDeepSpeedModule` moves types and device correctly.
|
|
|
|
"""
|
|
|
|
|
|
|
|
model = BoringModel()
|
|
|
|
module = LightningDeepSpeedModule(model, precision=16)
|
|
|
|
|
|
|
|
module.half()
|
|
|
|
assert module.dtype == torch.half
|
|
|
|
assert model.dtype == torch.half
|
|
|
|
|
|
|
|
module.to(torch.double)
|
|
|
|
assert module.dtype == torch.double
|
|
|
|
assert model.dtype == torch.double
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
def test_deepspeed_lightning_module_precision(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure that a model wrapped in `LightningDeepSpeedModule` moves tensors to half when precision 16.
|
|
|
|
"""
|
|
|
|
|
|
|
|
model = BoringModel()
|
|
|
|
module = LightningDeepSpeedModule(model, precision=16)
|
|
|
|
|
|
|
|
module.cuda().half()
|
|
|
|
assert module.dtype == torch.half
|
|
|
|
assert model.dtype == torch.half
|
|
|
|
|
|
|
|
x = torch.randn((1, 32), dtype=torch.float).cuda()
|
|
|
|
out = module(x)
|
|
|
|
|
|
|
|
assert out.dtype == torch.half
|
|
|
|
|
|
|
|
module.to(torch.double)
|
|
|
|
assert module.dtype == torch.double
|
|
|
|
assert model.dtype == torch.double
|
|
|
|
|
|
|
|
|
2021-02-17 20:23:42 +00:00
|
|
|
@pytest.fixture
|
|
|
|
def deepspeed_config():
|
|
|
|
return {
|
|
|
|
"optimizer": {
|
|
|
|
"type": "SGD",
|
|
|
|
"params": {
|
|
|
|
"lr": 3e-5,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
'scheduler': {
|
|
|
|
"type": "WarmupLR",
|
|
|
|
"params": {
|
|
|
|
"last_batch_iteration": -1,
|
|
|
|
"warmup_min_lr": 0,
|
|
|
|
"warmup_max_lr": 3e-5,
|
|
|
|
"warmup_num_steps": 100,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-02-21 00:24:44 +00:00
|
|
|
@pytest.fixture
|
|
|
|
def deepspeed_zero_config(deepspeed_config):
|
|
|
|
return {**deepspeed_config, 'zero_allow_untested_optimizer': True, 'zero_optimization': {'stage': 2}}
|
|
|
|
|
|
|
|
|
2021-02-17 20:23:42 +00:00
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_plugin_string(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure that the plugin can be passed via string, and parallel devices is correctly set.
|
|
|
|
"""
|
|
|
|
|
|
|
|
trainer = Trainer(
|
|
|
|
fast_dev_run=True,
|
|
|
|
default_root_dir=tmpdir,
|
|
|
|
plugins='deepspeed',
|
|
|
|
)
|
|
|
|
|
2021-02-18 15:54:12 +00:00
|
|
|
assert isinstance(trainer.accelerator.training_type_plugin, DeepSpeedPlugin)
|
|
|
|
assert trainer.accelerator.training_type_plugin.parallel_devices == [torch.device('cpu')]
|
2021-02-17 20:23:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_plugin(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure that the plugin can be passed directly, and parallel devices is correctly set.
|
|
|
|
"""
|
|
|
|
|
|
|
|
trainer = Trainer(
|
|
|
|
fast_dev_run=True,
|
|
|
|
default_root_dir=tmpdir,
|
|
|
|
plugins=[DeepSpeedPlugin()],
|
|
|
|
)
|
|
|
|
|
2021-02-18 15:54:12 +00:00
|
|
|
assert isinstance(trainer.accelerator.training_type_plugin, DeepSpeedPlugin)
|
|
|
|
assert trainer.accelerator.training_type_plugin.parallel_devices == [torch.device('cpu')]
|
2021-02-17 20:23:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_plugin_env(tmpdir, monkeypatch, deepspeed_config):
|
|
|
|
"""
|
|
|
|
Test to ensure that the plugin can be passed via a string with an environment variable.
|
|
|
|
"""
|
|
|
|
config_path = os.path.join(tmpdir, 'temp.json')
|
|
|
|
with open(config_path, 'w') as f:
|
|
|
|
f.write(json.dumps(deepspeed_config))
|
|
|
|
monkeypatch.setenv("PL_DEEPSPEED_CONFIG_PATH", config_path)
|
|
|
|
|
|
|
|
trainer = Trainer(
|
|
|
|
fast_dev_run=True,
|
|
|
|
default_root_dir=tmpdir,
|
|
|
|
plugins='deepspeed',
|
|
|
|
)
|
|
|
|
|
2021-02-18 15:54:12 +00:00
|
|
|
plugin = trainer.accelerator.training_type_plugin
|
2021-02-17 20:23:42 +00:00
|
|
|
assert isinstance(plugin, DeepSpeedPlugin)
|
|
|
|
assert plugin.parallel_devices == [torch.device('cpu')]
|
|
|
|
assert plugin.config == deepspeed_config
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"amp_backend", [
|
|
|
|
pytest.param("native", marks=pytest.mark.skipif(not _NATIVE_AMP_AVAILABLE, reason="Requires native AMP")),
|
|
|
|
pytest.param("apex", marks=pytest.mark.skipif(not _APEX_AVAILABLE, reason="Requires Apex")),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
@pytest.mark.skipif(not _NATIVE_AMP_AVAILABLE, reason="Requires native AMP")
|
|
|
|
def test_deepspeed_precision_choice(amp_backend, tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure precision plugin is also correctly chosen.
|
|
|
|
DeepSpeed handles precision via Custom DeepSpeedPrecisionPlugin
|
|
|
|
"""
|
|
|
|
|
|
|
|
trainer = Trainer(
|
|
|
|
fast_dev_run=True, default_root_dir=tmpdir, plugins='deepspeed', amp_backend=amp_backend, precision=16
|
|
|
|
)
|
|
|
|
|
2021-02-18 15:54:12 +00:00
|
|
|
assert isinstance(trainer.accelerator.training_type_plugin, DeepSpeedPlugin)
|
|
|
|
assert isinstance(trainer.accelerator.precision_plugin, DeepSpeedPrecisionPlugin)
|
|
|
|
assert trainer.accelerator.precision_plugin.precision == 16
|
2021-02-17 20:23:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_with_invalid_config_path(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure if we pass an invalid config path we throw an exception.
|
|
|
|
"""
|
|
|
|
|
|
|
|
with pytest.raises(
|
|
|
|
MisconfigurationException, match="You passed in a path to a DeepSpeed config but the path does not exist"
|
|
|
|
):
|
|
|
|
DeepSpeedPlugin(config='invalid_path.json')
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_with_env_path(tmpdir, monkeypatch, deepspeed_config):
|
|
|
|
"""
|
|
|
|
Test to ensure if we pass an env variable, we load the config from the path.
|
|
|
|
"""
|
|
|
|
config_path = os.path.join(tmpdir, 'temp.json')
|
|
|
|
with open(config_path, 'w') as f:
|
|
|
|
f.write(json.dumps(deepspeed_config))
|
|
|
|
monkeypatch.setenv("PL_DEEPSPEED_CONFIG_PATH", config_path)
|
|
|
|
plugin = DeepSpeedPlugin()
|
|
|
|
assert plugin.config == deepspeed_config
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_defaults(tmpdir):
|
|
|
|
"""
|
|
|
|
Ensure that defaults are correctly set as a config for DeepSpeed if no arguments are passed.
|
|
|
|
"""
|
|
|
|
plugin = DeepSpeedPlugin()
|
|
|
|
assert plugin.config is not None
|
|
|
|
assert isinstance(plugin.config["zero_optimization"], dict)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_invalid_deepspeed_defaults_no_precision(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure that using defaults, if precision is not set to 16, we throw an exception.
|
|
|
|
"""
|
|
|
|
model = BoringModel()
|
|
|
|
trainer = Trainer(
|
|
|
|
fast_dev_run=True,
|
|
|
|
default_root_dir=tmpdir,
|
|
|
|
plugins='deepspeed',
|
|
|
|
)
|
|
|
|
with pytest.raises(
|
|
|
|
MisconfigurationException, match='To use DeepSpeed ZeRO Optimization, you must set precision=16.'
|
|
|
|
):
|
|
|
|
trainer.fit(model)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_warn_deepspeed_override_backward(tmpdir):
|
|
|
|
"""
|
|
|
|
Test to ensure that if the backward hook in the LightningModule is overridden, we throw a warning.
|
|
|
|
"""
|
|
|
|
|
|
|
|
class TestModel(BoringModel):
|
|
|
|
|
|
|
|
def backward(self, loss: Tensor, optimizer: Optimizer, optimizer_idx: int, *args, **kwargs) -> None:
|
|
|
|
return loss.backward()
|
|
|
|
|
|
|
|
model = TestModel()
|
2021-02-21 00:24:44 +00:00
|
|
|
trainer = Trainer(fast_dev_run=True, default_root_dir=tmpdir, plugins=DeepSpeedPlugin(), gpus=1, precision=16)
|
2021-02-17 20:23:42 +00:00
|
|
|
with pytest.warns(UserWarning, match='Overridden backward hook in the LightningModule will be ignored'):
|
|
|
|
trainer.fit(model)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
def test_deepspeed_run_configure_optimizers(tmpdir):
|
|
|
|
"""
|
|
|
|
Test end to end that deepspeed works with defaults (without ZeRO as that requires compilation),
|
|
|
|
whilst using configure_optimizers for optimizers and schedulers.
|
|
|
|
"""
|
|
|
|
|
|
|
|
class TestModel(BoringModel):
|
|
|
|
|
|
|
|
def on_train_start(self) -> None:
|
2021-02-21 00:24:44 +00:00
|
|
|
from deepspeed.runtime.zero.stage2 import FP16_DeepSpeedZeroOptimizer
|
|
|
|
|
|
|
|
assert isinstance(self.trainer.optimizers[0], FP16_DeepSpeedZeroOptimizer)
|
|
|
|
assert isinstance(self.trainer.optimizers[0].optimizer, torch.optim.SGD)
|
2021-02-17 20:23:42 +00:00
|
|
|
assert self.trainer.lr_schedulers == [] # DeepSpeed manages LR scheduler internally
|
|
|
|
# Ensure DeepSpeed engine has initialized with our optimizer/lr_scheduler
|
|
|
|
assert isinstance(self.trainer.model.lr_scheduler, torch.optim.lr_scheduler.StepLR)
|
|
|
|
|
|
|
|
model = TestModel()
|
|
|
|
trainer = Trainer(
|
2021-02-21 00:24:44 +00:00
|
|
|
plugins=DeepSpeedPlugin(), # disable ZeRO so our optimizers are not wrapped
|
2021-02-17 20:23:42 +00:00
|
|
|
default_root_dir=tmpdir,
|
|
|
|
gpus=1,
|
|
|
|
fast_dev_run=True,
|
2021-02-21 00:24:44 +00:00
|
|
|
precision=16
|
2021-02-17 20:23:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
trainer.fit(model)
|
|
|
|
|
|
|
|
_assert_save_model_is_equal(model, tmpdir, trainer)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
2021-02-21 00:24:44 +00:00
|
|
|
def test_deepspeed_config(tmpdir, deepspeed_zero_config):
|
2021-02-17 20:23:42 +00:00
|
|
|
"""
|
|
|
|
Test to ensure deepspeed works correctly when passed a DeepSpeed config object including optimizers/schedulers
|
|
|
|
and saves the model weights to load correctly.
|
|
|
|
"""
|
|
|
|
|
|
|
|
class TestModel(BoringModel):
|
|
|
|
|
|
|
|
def on_train_start(self) -> None:
|
2021-02-21 00:24:44 +00:00
|
|
|
from deepspeed.runtime.lr_schedules import WarmupLR
|
|
|
|
from deepspeed.runtime.zero.stage2 import FP16_DeepSpeedZeroOptimizer
|
|
|
|
|
|
|
|
assert isinstance(self.trainer.optimizers[0], FP16_DeepSpeedZeroOptimizer)
|
|
|
|
assert isinstance(self.trainer.optimizers[0].optimizer, torch.optim.SGD)
|
2021-02-17 20:23:42 +00:00
|
|
|
assert self.trainer.lr_schedulers == [] # DeepSpeed manages LR scheduler internally
|
2021-02-21 00:24:44 +00:00
|
|
|
# Ensure DeepSpeed engine has initialized with our optimizer/lr_scheduler
|
|
|
|
assert isinstance(self.trainer.model.lr_scheduler, WarmupLR)
|
2021-02-17 20:23:42 +00:00
|
|
|
|
|
|
|
model = TestModel()
|
|
|
|
trainer = Trainer(
|
2021-02-21 00:24:44 +00:00
|
|
|
plugins=[DeepSpeedPlugin(config=deepspeed_zero_config)],
|
2021-02-17 20:23:42 +00:00
|
|
|
default_root_dir=tmpdir,
|
|
|
|
gpus=1,
|
|
|
|
fast_dev_run=True,
|
2021-02-21 00:24:44 +00:00
|
|
|
precision=16
|
2021-02-17 20:23:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
trainer.fit(model)
|
|
|
|
trainer.test(model)
|
|
|
|
|
|
|
|
_assert_save_model_is_equal(model, tmpdir, trainer)
|
|
|
|
|
|
|
|
|
2021-02-21 20:43:11 +00:00
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
def test_deepspeed_custom_precision_params(tmpdir):
|
|
|
|
"""
|
|
|
|
Ensure if we modify the FP16 parameters via the DeepSpeedPlugin, the deepspeed config contains these changes.
|
|
|
|
"""
|
|
|
|
|
|
|
|
class TestModel(BoringModel):
|
|
|
|
|
|
|
|
def on_train_start(self) -> None:
|
|
|
|
assert self.trainer.training_type_plugin.config['fp16']['loss_scale'] == 10
|
|
|
|
assert self.trainer.training_type_plugin.config['fp16']['initial_scale_power'] == 10
|
|
|
|
assert self.trainer.training_type_plugin.config['fp16']['loss_scale_window'] == 10
|
|
|
|
assert self.trainer.training_type_plugin.config['fp16']['hysteresis'] == 10
|
|
|
|
assert self.trainer.training_type_plugin.config['fp16']['min_loss_scale'] == 10
|
|
|
|
raise SystemExit()
|
|
|
|
|
|
|
|
model = TestModel()
|
|
|
|
trainer = Trainer(
|
|
|
|
plugins=[
|
|
|
|
DeepSpeedPlugin(
|
|
|
|
loss_scale=10, initial_scale_power=10, loss_scale_window=10, hysteresis=10, min_loss_scale=10
|
|
|
|
)
|
|
|
|
],
|
|
|
|
precision=16,
|
|
|
|
gpus=1
|
|
|
|
)
|
|
|
|
with pytest.raises(SystemExit):
|
|
|
|
trainer.fit(model)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
def test_deepspeed_assert_config_zero_offload_disabled(tmpdir, deepspeed_zero_config):
|
|
|
|
"""
|
|
|
|
Ensure if we use a config and turn off cpu_offload, that this is set to False within the config.
|
|
|
|
"""
|
|
|
|
|
|
|
|
deepspeed_zero_config['zero_optimization']['cpu_offload'] = False
|
|
|
|
|
|
|
|
class TestModel(BoringModel):
|
|
|
|
|
|
|
|
def on_train_start(self) -> None:
|
|
|
|
assert self.trainer.training_type_plugin.config['zero_optimization']['cpu_offload'] is False
|
|
|
|
raise SystemExit()
|
|
|
|
|
|
|
|
model = TestModel()
|
|
|
|
trainer = Trainer(plugins=[DeepSpeedPlugin(config=deepspeed_zero_config)], precision=16, gpus=1)
|
|
|
|
with pytest.raises(SystemExit):
|
|
|
|
trainer.fit(model)
|
|
|
|
|
|
|
|
|
2021-02-17 20:23:42 +00:00
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires GPU machine")
|
|
|
|
@pytest.mark.skipif(not _DEEPSPEED_AVAILABLE, reason="DeepSpeed not available.")
|
|
|
|
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
|
|
|
|
@pytest.mark.skipif(
|
|
|
|
not os.getenv("PL_RUNNING_SPECIAL_TESTS", '0') == '1', reason="test should be run outside of pytest"
|
|
|
|
)
|
|
|
|
def test_deepspeed_multigpu(tmpdir, deepspeed_config):
|
|
|
|
"""
|
|
|
|
Test to ensure that DeepSpeed with multiple GPUs works, without ZeRO Optimization as this requires compilation.
|
|
|
|
"""
|
|
|
|
model = BoringModel()
|
|
|
|
trainer = Trainer(
|
2021-02-21 00:24:44 +00:00
|
|
|
plugins=[DeepSpeedPlugin()],
|
2021-02-17 20:23:42 +00:00
|
|
|
default_root_dir=tmpdir,
|
|
|
|
gpus=2,
|
|
|
|
fast_dev_run=True,
|
|
|
|
precision=16,
|
|
|
|
)
|
|
|
|
trainer.fit(model)
|
|
|
|
trainer.test(model)
|
|
|
|
|
|
|
|
_assert_save_model_is_equal(model, tmpdir, trainer)
|
|
|
|
|
|
|
|
|
|
|
|
def _assert_save_model_is_equal(model, tmpdir, trainer):
|
|
|
|
checkpoint_path = os.path.join(tmpdir, 'model.pt')
|
|
|
|
trainer.save_checkpoint(checkpoint_path)
|
|
|
|
# carry out the check only on rank 0
|
|
|
|
if trainer.global_rank == 0:
|
|
|
|
saved_model = BoringModel.load_from_checkpoint(checkpoint_path)
|
2021-02-21 00:24:44 +00:00
|
|
|
if model.dtype == torch.half:
|
|
|
|
saved_model = saved_model.half() # model is loaded in float32 as default, move it to float16
|
|
|
|
model = model.cpu()
|
2021-02-17 20:23:42 +00:00
|
|
|
# Assert model parameters are identical after loading
|
|
|
|
for orig_param, trained_model_param in zip(model.parameters(), saved_model.parameters()):
|
|
|
|
assert torch.equal(orig_param, trained_model_param)
|