lightning/tests/models/test_horovod.py

242 lines
8.6 KiB
Python

# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import platform
import shlex
import subprocess
import sys
from unittest.mock import patch
import pytest
import torch
import tests.base.develop_pipelines as tpipes
import tests.base.develop_utils as tutils
from pytorch_lightning import Trainer
from tests.base import EvalModelTemplate
from tests.base.models import BasicGAN
try:
from horovod.common.util import nccl_built
except ImportError:
HOROVOD_AVAILABLE = False
else:
HOROVOD_AVAILABLE = True
# This script will run the actual test model training in parallel
TEST_SCRIPT = os.path.join(os.path.dirname(__file__), 'data', 'horovod', 'train_default_model.py')
def _nccl_available():
if not HOROVOD_AVAILABLE:
return False
try:
return nccl_built()
except AttributeError:
# Horovod 0.19.1 nccl_built() does not yet work with Python 3.8:
# See: https://github.com/horovod/horovod/issues/1891
return False
def _run_horovod(trainer_options, on_gpu=False):
"""Execute the training script across multiple workers in parallel."""
num_processes = trainer_options.get('gpus', 2)
# for Horovod, we interpret `gpus` to be set per worker
trainer_options.update(gpus=1 if on_gpu else None)
tutils.reset_seed()
cmdline = [
'horovodrun',
'-np', str(num_processes),
sys.executable, TEST_SCRIPT,
'--trainer-options', shlex.quote(json.dumps(trainer_options))
]
if on_gpu:
cmdline += ['--on-gpu']
exit_code = subprocess.call(' '.join(cmdline), shell=True, env=os.environ.copy())
assert exit_code == 0
@pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
def test_horovod_cpu(tmpdir):
"""Test Horovod running multi-process on CPU."""
trainer_options = dict(
default_root_dir=str(tmpdir),
weights_save_path=str(tmpdir),
gradient_clip_val=1.0,
progress_bar_refresh_rate=0,
max_epochs=1,
limit_train_batches=0.4,
limit_val_batches=0.2,
distributed_backend='horovod',
deterministic=True,
)
_run_horovod(trainer_options)
@pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
def test_horovod_cpu_implicit(tmpdir):
"""Test Horovod without specifying a backend, inferring from env set by `horovodrun`."""
trainer_options = dict(
default_root_dir=str(tmpdir),
weights_save_path=str(tmpdir),
gradient_clip_val=1.0,
progress_bar_refresh_rate=0,
max_epochs=1,
limit_train_batches=0.4,
limit_val_batches=0.2,
deterministic=True,
)
_run_horovod(trainer_options)
@pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
@pytest.mark.skipif(not _nccl_available(), reason="test requires Horovod with NCCL support")
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
def test_horovod_multi_gpu(tmpdir):
"""Test Horovod with multi-GPU support."""
trainer_options = dict(
default_root_dir=str(tmpdir),
weights_save_path=str(tmpdir),
gradient_clip_val=1.0,
progress_bar_refresh_rate=0,
max_epochs=1,
limit_train_batches=0.4,
limit_val_batches=0.2,
gpus=2,
deterministic=True,
distributed_backend='horovod'
)
_run_horovod(trainer_options, on_gpu=True)
@pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
@pytest.mark.skipif(not _nccl_available(), reason="test requires Horovod with NCCL support")
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
def test_horovod_amp(tmpdir):
"""Test Horovod with multi-GPU support."""
trainer_options = dict(
default_root_dir=str(tmpdir),
weights_save_path=str(tmpdir),
gradient_clip_val=1.0,
progress_bar_refresh_rate=0,
max_epochs=1,
limit_train_batches=0.4,
limit_val_batches=0.2,
gpus=2,
deterministic=True,
distributed_backend='horovod',
precision=16,
)
_run_horovod(trainer_options, on_gpu=True)
@pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
@pytest.mark.skipif(not _nccl_available(), reason="test requires Horovod with NCCL support")
@pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires GPU machine")
def test_horovod_transfer_batch_to_gpu(tmpdir):
class TestTrainingStepModel(EvalModelTemplate):
def training_step(self, batch, *args, **kwargs):
x, y = batch
assert str(x.device) != 'cpu'
assert str(y.device) != 'cpu'
return super(TestTrainingStepModel, self).training_step(batch, *args, **kwargs)
def validation_step(self, batch, *args, **kwargs):
x, y = batch
assert str(x.device) != 'cpu'
assert str(y.device) != 'cpu'
return super(TestTrainingStepModel, self).validation_step(batch, *args, **kwargs)
hparams = EvalModelTemplate.get_default_hparams()
model = TestTrainingStepModel(**hparams)
trainer_options = dict(
default_root_dir=str(tmpdir),
progress_bar_refresh_rate=0,
max_epochs=1,
limit_train_batches=0.4,
limit_val_batches=0.2,
gpus=1,
deterministic=True,
distributed_backend='horovod'
)
tpipes.run_model_test_without_loggers(trainer_options, model)
@pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
def test_horovod_multi_optimizer(tmpdir):
model = BasicGAN(**EvalModelTemplate.get_default_hparams())
# fit model
trainer = Trainer(
default_root_dir=str(tmpdir),
progress_bar_refresh_rate=0,
max_epochs=1,
limit_train_batches=0.4,
limit_val_batches=0.2,
deterministic=True,
distributed_backend='horovod',
)
result = trainer.fit(model)
assert result == 1, 'model failed to complete'
assert len(trainer.optimizers) == 2
for i, optimizer in enumerate(trainer.optimizers):
assert hasattr(optimizer, 'synchronize'), 'optimizer has not been wrapped into DistributedOptimizer'
def get_model_params(model):
return set([p for p in model.parameters()])
def get_optimizer_params(optimizer):
return set([p for group in optimizer.param_groups for p in group.get('params', [])])
assert get_model_params(model.generator) != get_model_params(model.discriminator)
assert get_model_params(model.generator) == get_optimizer_params(trainer.optimizers[0])
assert get_model_params(model.discriminator) == get_optimizer_params(trainer.optimizers[1])
# @pytest.mark.skipif(platform.system() == "Windows", reason="Horovod is not supported on Windows")
# def test_horovod_multi_optimizer_with_scheduling_stepping(tmpdir):
# hparams = EvalModelTemplate.get_default_hparams()
# model = EvalModelTemplate(**hparams)
# model.configure_optimizers = model.configure_optimizers__multiple_schedulers
#
# num_workers = 8
# init_lr = hparams.get('learning_rate') * num_workers
#
# with patch('pytorch_lightning.accelerators.horovod_backend.hvd.size') as mock_hvd_size:
# mock_hvd_size.return_value = 8
#
# # fit model
# trainer = Trainer(
# default_root_dir=tmpdir,
# max_epochs=1,
# limit_val_batches=0.5,
# limit_train_batches=0.2,
# distributed_backend='horovod'
# )
# results = trainer.fit(model)
# assert results == 1
#
# adjusted_lr1 = [pg['lr'] for pg in trainer.optimizers[0].param_groups][0]
# adjusted_lr2 = [pg['lr'] for pg in trainer.optimizers[1].param_groups][0]
#
# # Called ones after end of epoch with gamma=0.1
# assert pytest.approx(init_lr * 0.1) == adjusted_lr1
#
# # Called every 3 steps, meaning for 1 epoch of 11 batches, it is called 3 times with gamma=0.1
# assert pytest.approx(init_lr * 0.1) == adjusted_lr2