Track CPU stats with DeviceStatsMonitor (#11795)

Co-authored-by: ananthsub <ananth.subramaniam@gmail.com>
Co-authored-by: Jirka Borovec <Borda@users.noreply.github.com>
Co-authored-by: Rohit Gupta <rohitgr1998@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Kaushik B <kaushikbokka@gmail.com>
Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com>
This commit is contained in:
Eric Wiener 2022-05-10 03:57:38 -07:00 committed by GitHub
parent fb40cbce2e
commit 3f78c4ca7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 158 additions and 34 deletions

View File

@ -590,6 +590,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Fixed an issue with resuming from a checkpoint trained with QAT ([#11346](https://github.com/PyTorchLightning/pytorch-lightning/pull/11346))
- Added CPU metric tracking to `DeviceStatsMonitor` ([#11795](https://github.com/PyTorchLightning/pytorch-lightning/pull/11795))
## [1.5.10] - 2022-02-08
### Fixed

View File

@ -32,12 +32,12 @@ local tputests = base.BaseTest {
pip install -e .
echo $KUBE_GOOGLE_CLOUD_TPU_ENDPOINTS
export XRT_TPU_CONFIG="tpu_worker;0;${KUBE_GOOGLE_CLOUD_TPU_ENDPOINTS:7}"
# TODO (@kaushikb11): Add device stats tests here
coverage run --source=pytorch_lightning -m pytest -v --capture=no \
tests/strategies/test_tpu_spawn.py \
tests/profiler/test_xla_profiler.py \
pytorch_lightning/utilities/xla_device.py \
tests/accelerators/test_tpu.py \
tests/callbacks/test_device_stats_monitor.py \
tests/models/test_tpu.py
test_exit_code=$?
echo "\n||| END PYTEST LOGS |||\n"

View File

@ -119,3 +119,6 @@ This can be measured with the :class:`~pytorch_lightning.callbacks.device_stats_
from pytorch_lightning.callbacks import DeviceStatsMonitor
trainer = Trainer(callbacks=[DeviceStatsMonitor()])
CPU metrics will be tracked by default on the CPU accelerator. To enable it for other accelerators set ``DeviceStatsMonitor(cpu_stats=True)``. To disable logging
CPU metrics, you can specify ``DeviceStatsMonitor(cpu_stats=False)``.

View File

@ -18,6 +18,7 @@ import torch
from pytorch_lightning.accelerators.accelerator import Accelerator
from pytorch_lightning.utilities import device_parser
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.imports import _PSUTIL_AVAILABLE
from pytorch_lightning.utilities.types import _DEVICE
@ -35,8 +36,8 @@ class CPUAccelerator(Accelerator):
raise MisconfigurationException(f"Device should be CPU, got {root_device} instead.")
def get_device_stats(self, device: _DEVICE) -> Dict[str, Any]:
"""CPU device stats aren't supported yet."""
return {}
"""Get CPU stats from ``psutil`` package."""
return get_cpu_stats()
@staticmethod
def parse_devices(devices: Union[int, str, List[int]]) -> int:
@ -67,3 +68,24 @@ class CPUAccelerator(Accelerator):
cls,
description=f"{cls.__class__.__name__}",
)
# CPU device metrics
_CPU_VM_PERCENT = "cpu_vm_percent"
_CPU_PERCENT = "cpu_percent"
_CPU_SWAP_PERCENT = "cpu_swap_percent"
def get_cpu_stats() -> Dict[str, float]:
if not _PSUTIL_AVAILABLE:
raise ModuleNotFoundError(
"Fetching CPU device stats requires `psutil` to be installed."
" Install it by running `pip install -U psutil`."
)
import psutil
return {
_CPU_VM_PERCENT: psutil.virtual_memory().percent,
_CPU_PERCENT: psutil.cpu_percent(),
_CPU_SWAP_PERCENT: psutil.swap_memory().percent,
}

View File

@ -23,8 +23,9 @@ from typing import Any, Dict, Optional
import pytorch_lightning as pl
from pytorch_lightning.callbacks.base import Callback
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.imports import _PSUTIL_AVAILABLE
from pytorch_lightning.utilities.rank_zero import rank_zero_deprecation, rank_zero_warn
from pytorch_lightning.utilities.types import STEP_OUTPUT
from pytorch_lightning.utilities.warnings import rank_zero_deprecation
class DeviceStatsMonitor(Callback):
@ -32,6 +33,13 @@ class DeviceStatsMonitor(Callback):
Automatically monitors and logs device stats during training stage. ``DeviceStatsMonitor``
is a special callback as it requires a ``logger`` to passed as argument to the ``Trainer``.
Args:
cpu_stats: if ``None``, it will log CPU stats only if the accelerator is CPU.
It will raise a warning if ``psutil`` is not installed till v1.9.0.
If ``True``, it will log CPU stats regardless of the accelerator, and it will
raise an exception if ``psutil`` is not installed.
If ``False``, it will not log CPU stats regardless of the accelerator.
Raises:
MisconfigurationException:
If ``Trainer`` has no logger.
@ -43,45 +51,68 @@ class DeviceStatsMonitor(Callback):
>>> trainer = Trainer(callbacks=[device_stats]) # doctest: +SKIP
"""
def setup(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule", stage: Optional[str] = None) -> None:
if not trainer.loggers:
raise MisconfigurationException("Cannot use DeviceStatsMonitor callback with Trainer that has no logger.")
def __init__(self, cpu_stats: Optional[bool] = None) -> None:
self._cpu_stats = cpu_stats
def on_train_batch_start(
self, trainer: "pl.Trainer", pl_module: "pl.LightningModule", batch: Any, batch_idx: int
def setup(
self,
trainer: "pl.Trainer",
pl_module: "pl.LightningModule",
stage: Optional[str] = None,
) -> None:
if stage != "fit":
return
if not trainer.loggers:
raise MisconfigurationException("Cannot use `DeviceStatsMonitor` callback with `Trainer(logger=False)`.")
# warn in setup to warn once
device = trainer.strategy.root_device
if self._cpu_stats is None and device.type == "cpu" and not _PSUTIL_AVAILABLE:
# TODO: raise an exception from v1.9
rank_zero_warn(
"`DeviceStatsMonitor` will not log CPU stats as `psutil` is not installed."
" To install `psutil`, run `pip install psutil`."
" It will raise an exception if `psutil` is not installed post v1.9.0."
)
self._cpu_stats = False
def _get_and_log_device_stats(self, trainer: "pl.Trainer", key: str) -> None:
if not trainer._logger_connector.should_update_logs:
return
device = trainer.strategy.root_device
if self._cpu_stats is False and device.type == "cpu":
# cpu stats are disabled
return
device_stats = trainer.accelerator.get_device_stats(device)
if self._cpu_stats and device.type != "cpu":
# Don't query CPU stats twice if CPU is accelerator
from pytorch_lightning.accelerators.cpu import get_cpu_stats
device_stats.update(get_cpu_stats())
for logger in trainer.loggers:
separator = logger.group_separator
prefixed_device_stats = _prefix_metric_keys(
device_stats, f"{self.__class__.__qualname__}.on_train_batch_start", separator
)
prefixed_device_stats = _prefix_metric_keys(device_stats, f"{self.__class__.__qualname__}.{key}", separator)
logger.log_metrics(prefixed_device_stats, step=trainer.fit_loop.epoch_loop._batches_that_stepped)
def on_train_batch_start(
self,
trainer: "pl.Trainer",
pl_module: "pl.LightningModule",
batch: Any,
batch_idx: int,
unused: Optional[int] = 0,
) -> None:
self._get_and_log_device_stats(trainer, "on_train_batch_start")
def on_train_batch_end(
self, trainer: "pl.Trainer", pl_module: "pl.LightningModule", outputs: STEP_OUTPUT, batch: Any, batch_idx: int
) -> None:
if not trainer.loggers:
raise MisconfigurationException("Cannot use `DeviceStatsMonitor` callback with `Trainer(logger=False)`.")
if not trainer._logger_connector.should_update_logs:
return
device = trainer.strategy.root_device
device_stats = trainer.accelerator.get_device_stats(device)
for logger in trainer.loggers:
separator = logger.group_separator
prefixed_device_stats = _prefix_metric_keys(
device_stats, f"{self.__class__.__qualname__}.on_train_batch_end", separator
)
logger.log_metrics(prefixed_device_stats, step=trainer.fit_loop.epoch_loop._batches_that_stepped)
self._get_and_log_device_stats(trainer, "on_train_batch_end")
def _prefix_metric_keys(metrics_dict: Dict[str, float], prefix: str, separator: str) -> Dict[str, float]:

View File

@ -105,6 +105,7 @@ _FAIRSCALE_AVAILABLE = not _IS_WINDOWS and _module_available("fairscale.nn")
_FAIRSCALE_OSS_FP16_BROADCAST_AVAILABLE = _FAIRSCALE_AVAILABLE and _compare_version("fairscale", operator.ge, "0.3.3")
_FAIRSCALE_FULLY_SHARDED_AVAILABLE = _FAIRSCALE_AVAILABLE and _compare_version("fairscale", operator.ge, "0.3.4")
_GROUP_AVAILABLE = not _IS_WINDOWS and _module_available("torch.distributed.group")
_HABANA_FRAMEWORK_AVAILABLE = _package_available("habana_frameworks")
_HIVEMIND_AVAILABLE = _package_available("hivemind")
_HOROVOD_AVAILABLE = _module_available("horovod.torch")
_HYDRA_AVAILABLE = _package_available("hydra")
@ -115,7 +116,7 @@ _NEPTUNE_AVAILABLE = _package_available("neptune")
_NEPTUNE_GREATER_EQUAL_0_9 = _NEPTUNE_AVAILABLE and _compare_version("neptune", operator.ge, "0.9.0")
_OMEGACONF_AVAILABLE = _package_available("omegaconf")
_POPTORCH_AVAILABLE = _package_available("poptorch")
_HABANA_FRAMEWORK_AVAILABLE = _package_available("habana_frameworks")
_PSUTIL_AVAILABLE = _package_available("psutil")
_RICH_AVAILABLE = _package_available("rich") and _compare_version("rich", operator.ge, "10.2.2")
_TORCH_QUANTIZE_AVAILABLE = bool([eg for eg in torch.backends.quantized.supported_engines if eg != "none"])
_TORCHTEXT_AVAILABLE = _package_available("torchtext")

View File

@ -12,4 +12,5 @@ pytest-forked
cloudpickle>=1.3
scikit-learn>0.22.1
onnxruntime
psutil # for `DeviceStatsMonitor`
pandas # needed in benchmarks

View File

@ -12,10 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Optional
from unittest import mock
from unittest.mock import Mock
import pytest
import torch
from pytorch_lightning import Trainer
from pytorch_lightning.accelerators.cpu import _CPU_PERCENT, _CPU_SWAP_PERCENT, _CPU_VM_PERCENT, get_cpu_stats
from pytorch_lightning.callbacks import DeviceStatsMonitor
from pytorch_lightning.callbacks.device_stats_monitor import _prefix_metric_keys
from pytorch_lightning.loggers import CSVLogger
@ -34,9 +38,13 @@ def test_device_stats_gpu_from_torch(tmpdir):
class DebugLogger(CSVLogger):
@rank_zero_only
def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None:
fields = ["allocated_bytes.all.freed", "inactive_split.all.peak", "reserved_bytes.large_pool.peak"]
fields = [
"allocated_bytes.all.freed",
"inactive_split.all.peak",
"reserved_bytes.large_pool.peak",
]
for f in fields:
assert any(f in h for h in metrics.keys())
assert any(f in h for h in metrics)
trainer = Trainer(
default_root_dir=tmpdir,
@ -54,6 +62,41 @@ def test_device_stats_gpu_from_torch(tmpdir):
trainer.fit(model)
@RunIf(psutil=True)
@pytest.mark.parametrize("cpu_stats", (None, True, False))
@mock.patch("pytorch_lightning.accelerators.cpu.get_cpu_stats", side_effect=get_cpu_stats)
def test_device_stats_cpu(cpu_stats_mock, tmpdir, cpu_stats):
"""Test CPU stats are logged when no accelerator is used."""
model = BoringModel()
CPU_METRIC_KEYS = (_CPU_VM_PERCENT, _CPU_SWAP_PERCENT, _CPU_PERCENT)
class DebugLogger(CSVLogger):
def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None:
enabled = cpu_stats is not False
for f in CPU_METRIC_KEYS:
has_cpu_metrics = any(f in h for h in metrics)
assert has_cpu_metrics if enabled else not has_cpu_metrics
device_stats = DeviceStatsMonitor(cpu_stats=cpu_stats)
trainer = Trainer(
default_root_dir=tmpdir,
max_epochs=1,
limit_train_batches=2,
limit_val_batches=0,
log_every_n_steps=1,
callbacks=device_stats,
logger=DebugLogger(tmpdir),
enable_checkpointing=False,
enable_progress_bar=False,
accelerator="cpu",
)
trainer.fit(model)
expected = 4 if cpu_stats is not False else 0 # (batch_start + batch_end) * train_batches
assert cpu_stats_mock.call_count == expected
@pytest.mark.skipif(True, reason="TODO (@kaushikb11): fix this test, timeout")
@RunIf(tpu=True)
def test_device_stats_monitor_tpu(tmpdir):
"""Test TPU stats are logged using a logger."""
@ -66,14 +109,14 @@ def test_device_stats_monitor_tpu(tmpdir):
def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None:
fields = ["avg. free memory (MB)", "avg. peak memory (MB)"]
for f in fields:
assert any(f in h for h in metrics.keys())
assert any(f in h for h in metrics)
trainer = Trainer(
default_root_dir=tmpdir,
max_epochs=1,
limit_train_batches=1,
limit_train_batches=2,
accelerator="tpu",
devices=8,
devices=1,
log_every_n_steps=1,
callbacks=[device_stats],
logger=DebugLogger(tmpdir),
@ -99,7 +142,7 @@ def test_device_stats_monitor_no_logger(tmpdir):
enable_progress_bar=False,
)
with pytest.raises(MisconfigurationException, match="Trainer that has no logger."):
with pytest.raises(MisconfigurationException, match="Cannot use `DeviceStatsMonitor` callback."):
trainer.fit(model)
@ -110,3 +153,16 @@ def test_prefix_metric_keys(tmpdir):
separator = "."
converted_metrics = _prefix_metric_keys(metrics, prefix, separator)
assert converted_metrics == {"foo.1": 1.0, "foo.2": 2.0, "foo.3": 3.0}
def test_device_stats_monitor_warning_when_psutil_not_available(monkeypatch):
"""Test that warning is raised when psutil is not available."""
import pytorch_lightning.callbacks.device_stats_monitor as imports
monkeypatch.setattr(imports, "_PSUTIL_AVAILABLE", False)
monitor = DeviceStatsMonitor()
trainer = Trainer()
assert trainer.strategy.root_device == torch.device("cpu")
# TODO: raise an exception from v1.9
with pytest.warns(UserWarning, match="psutil` is not installed"):
monitor.setup(trainer, Mock(), "fit")

View File

@ -20,7 +20,7 @@ import torch
from packaging.version import Version
from pkg_resources import get_distribution
from pytorch_lightning.utilities import (
from pytorch_lightning.utilities.imports import (
_APEX_AVAILABLE,
_BAGUA_AVAILABLE,
_DEEPSPEED_AVAILABLE,
@ -31,6 +31,7 @@ from pytorch_lightning.utilities import (
_HPU_AVAILABLE,
_IPU_AVAILABLE,
_OMEGACONF_AVAILABLE,
_PSUTIL_AVAILABLE,
_RICH_AVAILABLE,
_TORCH_GREATER_EQUAL_1_10,
_TORCH_QUANTIZE_AVAILABLE,
@ -85,6 +86,7 @@ class RunIf:
omegaconf: bool = False,
slow: bool = False,
bagua: bool = False,
psutil: bool = False,
hivemind: bool = False,
**kwargs,
):
@ -113,6 +115,7 @@ class RunIf:
omegaconf: Require that omry/omegaconf is installed.
slow: Mark the test as slow, our CI will run it in a separate job.
bagua: Require that BaguaSys/bagua is installed.
psutil: Require that psutil is installed.
hivemind: Require that Hivemind is installed.
**kwargs: Any :class:`pytest.mark.skipif` keyword arguments.
"""
@ -234,6 +237,10 @@ class RunIf:
conditions.append(not _BAGUA_AVAILABLE or sys.platform in ("win32", "darwin"))
reasons.append("Bagua")
if psutil:
conditions.append(not _PSUTIL_AVAILABLE)
reasons.append("psutil")
if hivemind:
conditions.append(not _HIVEMIND_AVAILABLE or sys.platform in ("win32", "darwin"))
reasons.append("Hivemind")