lightning/tests/test_profiler.py

499 lines
16 KiB
Python

# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import platform
import time
from copy import deepcopy
from distutils.version import LooseVersion
from pathlib import Path
import numpy as np
import pytest
import torch
from pytorch_lightning import Callback, Trainer
from pytorch_lightning.profiler import AdvancedProfiler, PyTorchProfiler, SimpleProfiler
from pytorch_lightning.profiler.pytorch import RegisterRecordFunction
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.imports import _TORCH_GREATER_EQUAL_1_8
from tests.helpers import BoringModel
from tests.helpers.runif import RunIf
PROFILER_OVERHEAD_MAX_TOLERANCE = 0.0005
def _get_python_cprofile_total_duration(profile):
return sum([x.inlinetime for x in profile.getstats()])
def _sleep_generator(durations):
"""
the profile_iterable method needs an iterable in which we can ensure that we're
properly timing how long it takes to call __next__
"""
for duration in durations:
time.sleep(duration)
yield duration
@pytest.fixture
def simple_profiler():
return SimpleProfiler()
@pytest.mark.parametrize(["action", "expected"], [
pytest.param("a", [3, 1]),
pytest.param("b", [2]),
pytest.param("c", [1]),
])
def test_simple_profiler_durations(simple_profiler, action: str, expected: list):
"""Ensure the reported durations are reasonably accurate."""
for duration in expected:
with simple_profiler.profile(action):
time.sleep(duration)
# different environments have different precision when it comes to time.sleep()
# see: https://github.com/PyTorchLightning/pytorch-lightning/issues/796
np.testing.assert_allclose(simple_profiler.recorded_durations[action], expected, rtol=0.2)
@pytest.mark.parametrize(["action", "expected"], [
pytest.param("a", [3, 1]),
pytest.param("b", [2]),
pytest.param("c", [1]),
])
def test_simple_profiler_iterable_durations(simple_profiler, action: str, expected: list):
"""Ensure the reported durations are reasonably accurate."""
iterable = _sleep_generator(expected)
for _ in simple_profiler.profile_iterable(iterable, action):
pass
# we exclude the last item in the recorded durations since that's when StopIteration is raised
np.testing.assert_allclose(simple_profiler.recorded_durations[action][:-1], expected, rtol=0.2)
def test_simple_profiler_overhead(simple_profiler, n_iter=5):
"""Ensure that the profiler doesn't introduce too much overhead during training."""
for _ in range(n_iter):
with simple_profiler.profile("no-op"):
pass
durations = np.array(simple_profiler.recorded_durations["no-op"])
assert all(durations < PROFILER_OVERHEAD_MAX_TOLERANCE)
def test_simple_profiler_value_errors(simple_profiler):
"""Ensure errors are raised where expected."""
action = "test"
with pytest.raises(ValueError):
simple_profiler.stop(action)
simple_profiler.start(action)
with pytest.raises(ValueError):
simple_profiler.start(action)
simple_profiler.stop(action)
def test_simple_profiler_deepcopy(tmpdir):
simple_profiler = SimpleProfiler(dirpath=tmpdir, filename="test")
simple_profiler.describe()
assert deepcopy(simple_profiler)
def test_simple_profiler_log_dir(tmpdir):
"""Ensure the profiler dirpath defaults to `trainer.log_dir` when not present"""
profiler = SimpleProfiler(filename="profiler")
assert profiler._log_dir is None
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
max_epochs=1,
profiler=profiler,
)
trainer.fit(model)
expected = tmpdir / "lightning_logs" / "version_0"
assert trainer.log_dir == expected
assert profiler._log_dir == trainer.log_dir
assert expected.join("fit-profiler.txt").exists()
@RunIf(skip_windows=True)
def test_simple_profiler_distributed_files(tmpdir):
"""Ensure the proper files are saved in distributed"""
profiler = SimpleProfiler(dirpath=tmpdir, filename='profiler')
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=2,
accelerator="ddp_cpu",
num_processes=2,
profiler=profiler,
logger=False,
)
trainer.fit(model)
trainer.validate(model)
trainer.test(model)
actual = set(os.listdir(profiler.dirpath))
expected = {f"{stage}-profiler-{rank}.txt" for stage in ("fit", "validate", "test") for rank in (0, 1)}
assert actual == expected
for f in profiler.dirpath.listdir():
assert f.read_text('utf-8')
def test_simple_profiler_logs(tmpdir, caplog, simple_profiler):
"""Ensure that the number of printed logs is correct"""
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=2,
profiler=simple_profiler,
logger=False,
)
with caplog.at_level(logging.INFO, logger="pytorch_lightning.profiler.profilers"):
trainer.fit(model)
trainer.test(model)
assert caplog.text.count("Profiler Report") == 2
@pytest.fixture
def advanced_profiler(tmpdir):
return AdvancedProfiler(dirpath=tmpdir, filename="profiler")
@pytest.mark.parametrize(["action", "expected"], [
pytest.param("a", [3, 1]),
pytest.param("b", [2]),
pytest.param("c", [1]),
])
def test_advanced_profiler_durations(advanced_profiler, action: str, expected: list):
for duration in expected:
with advanced_profiler.profile(action):
time.sleep(duration)
# different environments have different precision when it comes to time.sleep()
# see: https://github.com/PyTorchLightning/pytorch-lightning/issues/796
recored_total_duration = _get_python_cprofile_total_duration(advanced_profiler.profiled_actions[action])
expected_total_duration = np.sum(expected)
np.testing.assert_allclose(recored_total_duration, expected_total_duration, rtol=0.2)
@pytest.mark.parametrize(["action", "expected"], [
pytest.param("a", [3, 1]),
pytest.param("b", [2]),
pytest.param("c", [1]),
])
def test_advanced_profiler_iterable_durations(advanced_profiler, action: str, expected: list):
"""Ensure the reported durations are reasonably accurate."""
iterable = _sleep_generator(expected)
for _ in advanced_profiler.profile_iterable(iterable, action):
pass
recored_total_duration = _get_python_cprofile_total_duration(advanced_profiler.profiled_actions[action])
expected_total_duration = np.sum(expected)
np.testing.assert_allclose(recored_total_duration, expected_total_duration, rtol=0.2)
def test_advanced_profiler_overhead(advanced_profiler, n_iter=5):
"""
ensure that the profiler doesn't introduce too much overhead during training
"""
for _ in range(n_iter):
with advanced_profiler.profile("no-op"):
pass
action_profile = advanced_profiler.profiled_actions["no-op"]
total_duration = _get_python_cprofile_total_duration(action_profile)
average_duration = total_duration / n_iter
assert average_duration < PROFILER_OVERHEAD_MAX_TOLERANCE
def test_advanced_profiler_describe(tmpdir, advanced_profiler):
"""
ensure the profiler won't fail when reporting the summary
"""
# record at least one event
with advanced_profiler.profile("test"):
pass
# log to stdout and print to file
advanced_profiler.describe()
path = advanced_profiler.dirpath / f"{advanced_profiler.filename}.txt"
data = path.read_text("utf-8")
assert len(data) > 0
def test_advanced_profiler_value_errors(advanced_profiler):
"""Ensure errors are raised where expected."""
action = "test"
with pytest.raises(ValueError):
advanced_profiler.stop(action)
advanced_profiler.start(action)
advanced_profiler.stop(action)
def test_advanced_profiler_deepcopy(advanced_profiler):
advanced_profiler.describe()
assert deepcopy(advanced_profiler)
@pytest.fixture
def pytorch_profiler(tmpdir):
return PyTorchProfiler(dirpath=tmpdir, filename="profiler")
def test_pytorch_profiler_describe(pytorch_profiler):
"""Ensure the profiler won't fail when reporting the summary."""
with pytorch_profiler.profile("on_test_start"):
torch.tensor(0)
# log to stdout and print to file
pytorch_profiler.describe()
path = pytorch_profiler.dirpath / f"{pytorch_profiler.filename}.txt"
data = path.read_text("utf-8")
assert len(data) > 0
def test_pytorch_profiler_raises(pytorch_profiler):
"""Ensure errors are raised where expected."""
with pytest.raises(MisconfigurationException, match="profiled_functions` and `PyTorchProfiler.record"):
PyTorchProfiler(profiled_functions=["a"], record_functions=["b"])
@RunIf(min_torch="1.6.0")
def test_advanced_profiler_cprofile_deepcopy(tmpdir):
"""Checks for pickle issue reported in #6522"""
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=True,
profiler="advanced",
stochastic_weight_avg=True,
)
trainer.fit(model)
@RunIf(min_gpus=2, special=True)
def test_pytorch_profiler_trainer_ddp(tmpdir, pytorch_profiler):
"""Ensure that the profiler can be given to the training and default step are properly recorded. """
model = BoringModel()
trainer = Trainer(
max_epochs=1,
default_root_dir=tmpdir,
limit_train_batches=2,
limit_val_batches=2,
profiler=pytorch_profiler,
accelerator="ddp",
gpus=2,
)
trainer.fit(model)
expected = ('validation_step', 'training_step_and_backward', 'training_step', 'backward')
for name in expected:
assert sum(e.name == name for e in pytorch_profiler.function_events)
files = set(os.listdir(pytorch_profiler.dirpath))
expected = f"fit-profiler-{trainer.local_rank}.txt"
assert expected in files
path = os.path.join(pytorch_profiler.dirpath, expected)
assert Path(path).read_text()
def test_pytorch_profiler_trainer_test(tmpdir, pytorch_profiler):
"""Ensure that the profiler can be given to the trainer and test step are properly recorded. """
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
max_epochs=1,
limit_test_batches=2,
profiler=pytorch_profiler,
)
trainer.test(model)
assert sum(e.name == 'test_step' for e in pytorch_profiler.function_events)
path = pytorch_profiler.dirpath / f"test-{pytorch_profiler.filename}.txt"
assert path.read_text("utf-8")
def test_pytorch_profiler_trainer_predict(tmpdir, pytorch_profiler):
"""Ensure that the profiler can be given to the trainer and predict function are properly recorded. """
model = BoringModel()
model.predict_dataloader = model.train_dataloader
trainer = Trainer(
default_root_dir=tmpdir,
max_epochs=1,
limit_test_batches=2,
profiler=pytorch_profiler,
)
trainer.predict(model)
assert sum(e.name == 'predict_step' for e in pytorch_profiler.function_events)
path = pytorch_profiler.dirpath / f"predict-{pytorch_profiler.filename}.txt"
assert path.read_text("utf-8")
def test_pytorch_profiler_trainer_validate(tmpdir, pytorch_profiler):
"""Ensure that the profiler can be given to the trainer and validate function are properly recorded. """
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
max_epochs=1,
limit_val_batches=2,
profiler=pytorch_profiler,
)
trainer.validate(model)
assert sum(e.name == 'validation_step' for e in pytorch_profiler.function_events)
path = pytorch_profiler.dirpath / f"validate-{pytorch_profiler.filename}.txt"
assert path.read_text("utf-8")
def test_pytorch_profiler_nested(tmpdir):
"""Ensure that the profiler handles nested context"""
pytorch_profiler = PyTorchProfiler(
profiled_functions=["a", "b", "c"], use_cuda=False, dirpath=tmpdir, filename="profiler"
)
with pytorch_profiler.profile("a"):
a = torch.ones(42)
with pytorch_profiler.profile("b"):
b = torch.zeros(42)
with pytorch_profiler.profile("c"):
_ = a + b
pytorch_profiler.describe()
events_name = {e.name for e in pytorch_profiler.function_events}
if platform.system() == "Windows":
expected = {'a', 'add', 'b', 'c', 'profiler::_record_function_enter', 'profiler::_record_function_exit'}
else:
expected = {
'signed char', 'add', 'profiler::_record_function_exit', 'bool', 'char', 'profiler::_record_function_enter'
}
if LooseVersion(torch.__version__) >= LooseVersion("1.6.0"):
expected = {'add', 'zeros', 'ones', 'zero_', 'b', 'fill_', 'c', 'a', 'empty'}
if LooseVersion(torch.__version__) >= LooseVersion("1.7.0"):
expected = {
'aten::zeros', 'aten::add', 'aten::zero_', 'c', 'b', 'a', 'aten::fill_', 'aten::empty', 'aten::ones'
}
if LooseVersion(torch.__version__) >= LooseVersion("1.8.0"):
expected = {
'aten::ones', 'a', 'aten::add', 'aten::empty', 'aten::zero_', 'b', 'c', 'aten::zeros', 'aten::fill_'
}
assert events_name == expected, (events_name, torch.__version__, platform.system())
@RunIf(min_gpus=1, special=True)
def test_pytorch_profiler_nested_emit_nvtx(tmpdir):
"""
This test check emit_nvtx is correctly supported
"""
profiler = PyTorchProfiler(use_cuda=True, emit_nvtx=True)
model = BoringModel()
trainer = Trainer(
fast_dev_run=True,
profiler=profiler,
gpus=1,
)
trainer.fit(model)
@RunIf(min_torch="1.5.0")
def test_register_record_function(tmpdir):
use_cuda = torch.cuda.is_available()
pytorch_profiler = PyTorchProfiler(
export_to_chrome=False,
record_functions=["a"],
use_cuda=use_cuda,
dirpath=tmpdir,
filename="profiler",
)
class TestModel(BoringModel):
def __init__(self):
super().__init__()
self.layer = torch.nn.Sequential(torch.nn.Linear(8, 8), torch.nn.ReLU(), torch.nn.Linear(8, 2))
model = TestModel()
input = torch.rand((1, 8))
if use_cuda:
model = model.cuda()
input = input.cuda()
with pytorch_profiler.profile("a"):
with RegisterRecordFunction(model):
model(input)
pytorch_profiler.describe()
event_names = [e.name for e in pytorch_profiler.function_events]
assert 'torch.nn.modules.container.Sequential: layer' in event_names
assert 'torch.nn.modules.linear.Linear: layer.0' in event_names
assert 'torch.nn.modules.activation.ReLU: layer.1' in event_names
assert 'torch.nn.modules.linear.Linear: layer.2' in event_names
@pytest.mark.parametrize("cls", (SimpleProfiler, AdvancedProfiler, PyTorchProfiler))
def test_profiler_teardown(tmpdir, cls):
"""
This test checks if profiler teardown method is called when trainer is exiting.
"""
class TestCallback(Callback):
def on_fit_end(self, trainer, *args, **kwargs) -> None:
# describe sets it to None
assert trainer.profiler._output_file is None
profiler = cls(dirpath=tmpdir, filename="profiler")
model = BoringModel()
trainer = Trainer(default_root_dir=tmpdir, fast_dev_run=True, profiler=profiler, callbacks=[TestCallback()])
trainer.fit(model)
assert profiler._output_file is None
@pytest.mark.skipif(_TORCH_GREATER_EQUAL_1_8, reason="currently not supported for PyTorch 1.8")
def test_pytorch_profiler_deepcopy(pytorch_profiler):
pytorch_profiler.start("on_train_start")
torch.tensor(1)
pytorch_profiler.describe()
assert deepcopy(pytorch_profiler)