2021-06-18 12:54:59 +00:00
|
|
|
# Copyright The PyTorch Lightning team.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
from collections import OrderedDict
|
2021-09-03 13:02:34 +00:00
|
|
|
from functools import lru_cache
|
2022-02-28 10:51:33 +00:00
|
|
|
from typing import Any, cast, Dict, Optional
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
from deprecate import void
|
2022-02-02 19:53:50 +00:00
|
|
|
from torch.utils.data import DataLoader
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
from pytorch_lightning.loops.base import Loop
|
2021-10-01 16:37:17 +00:00
|
|
|
from pytorch_lightning.trainer.progress import BatchProgress
|
2021-12-17 18:20:11 +00:00
|
|
|
from pytorch_lightning.trainer.states import TrainerFn
|
2021-11-30 20:28:55 +00:00
|
|
|
from pytorch_lightning.trainer.supporters import CombinedLoader
|
2021-11-25 17:31:53 +00:00
|
|
|
from pytorch_lightning.utilities.auto_restart import (
|
|
|
|
_collect_states_on_rank_zero_over_collection,
|
|
|
|
_reload_dataloader_state_dict,
|
|
|
|
)
|
2021-11-30 20:28:55 +00:00
|
|
|
from pytorch_lightning.utilities.exceptions import MisconfigurationException
|
2022-02-24 18:01:35 +00:00
|
|
|
from pytorch_lightning.utilities.fetching import AbstractDataFetcher, DataLoaderIterDataFetcher
|
2021-11-25 17:31:53 +00:00
|
|
|
from pytorch_lightning.utilities.imports import _fault_tolerant_training
|
2021-09-03 13:02:34 +00:00
|
|
|
from pytorch_lightning.utilities.model_helpers import is_overridden
|
|
|
|
from pytorch_lightning.utilities.types import EPOCH_OUTPUT, STEP_OUTPUT
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
class EvaluationEpochLoop(Loop):
|
2021-09-06 12:49:09 +00:00
|
|
|
"""This is the loop performing the evaluation.
|
|
|
|
|
|
|
|
It mainly loops over the given dataloader and runs the validation or test step (depending on the trainer's current
|
|
|
|
state).
|
2021-06-18 12:54:59 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
super().__init__()
|
2021-10-01 16:37:17 +00:00
|
|
|
self.batch_progress = BatchProgress()
|
|
|
|
|
2021-11-28 20:09:30 +00:00
|
|
|
self._outputs: EPOCH_OUTPUT = []
|
2021-11-30 20:28:55 +00:00
|
|
|
self._dl_max_batches = 0
|
|
|
|
self._data_fetcher: Optional[AbstractDataFetcher] = None
|
2021-11-26 18:00:18 +00:00
|
|
|
self._dataloader_state_dict: Dict[str, Any] = {}
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def done(self) -> bool:
|
|
|
|
"""Returns ``True`` if the current iteration count reaches the number of dataloader batches."""
|
2021-07-19 08:31:45 +00:00
|
|
|
return self.batch_progress.current.completed >= self._dl_max_batches
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
def reset(self) -> None:
|
|
|
|
"""Resets the loop's internal state."""
|
2021-11-30 20:28:55 +00:00
|
|
|
self._dl_max_batches = 0
|
2021-10-01 16:37:17 +00:00
|
|
|
self._data_fetcher = None
|
2021-11-28 20:09:30 +00:00
|
|
|
self._outputs = []
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-07-19 08:31:45 +00:00
|
|
|
if not self.restarting:
|
2021-09-25 02:27:54 +00:00
|
|
|
self.batch_progress.reset_on_run()
|
2021-09-17 16:11:32 +00:00
|
|
|
else:
|
2021-09-25 02:27:54 +00:00
|
|
|
self.batch_progress.reset_on_restart()
|
2021-12-17 18:20:11 +00:00
|
|
|
# when restarting, if we are running `validate` or `test` twice, since there's no concept of `max_epochs` we
|
|
|
|
# need to reset the current state when the loop has finished running
|
|
|
|
if self.done and self.trainer.state.fn != TrainerFn.FITTING:
|
|
|
|
self.batch_progress.reset_on_run()
|
2021-07-19 08:31:45 +00:00
|
|
|
|
2021-11-30 20:28:55 +00:00
|
|
|
def on_run_start( # type: ignore[override]
|
2022-01-22 15:57:12 +00:00
|
|
|
self, data_fetcher: AbstractDataFetcher, dl_max_batches: int, kwargs: OrderedDict
|
2021-06-18 12:54:59 +00:00
|
|
|
) -> None:
|
2021-09-06 12:49:09 +00:00
|
|
|
"""Adds the passed arguments to the loop's state if necessary.
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
Args:
|
2021-08-26 09:36:29 +00:00
|
|
|
data_fetcher: the current data_fetcher wrapping the dataloader
|
2021-06-18 12:54:59 +00:00
|
|
|
dl_max_batches: maximum number of batches the dataloader can produce
|
2022-01-22 15:57:12 +00:00
|
|
|
kwargs: the kwargs passed down to the hooks.
|
2021-06-18 12:54:59 +00:00
|
|
|
"""
|
2022-01-22 15:57:12 +00:00
|
|
|
void(kwargs)
|
2021-07-15 18:32:44 +00:00
|
|
|
self._dl_max_batches = dl_max_batches
|
2021-10-01 16:37:17 +00:00
|
|
|
self._reload_dataloader_state_dict(data_fetcher)
|
2022-02-28 10:51:33 +00:00
|
|
|
# creates the iterator inside the fetcher but returns `self`
|
|
|
|
self._data_fetcher = cast(AbstractDataFetcher, iter(data_fetcher))
|
2022-02-28 18:31:18 +00:00
|
|
|
# add the previous `fetched` value to properly track `is_last_batch` with no prefetching
|
|
|
|
data_fetcher.fetched += self.batch_progress.current.ready
|
2021-08-24 18:45:54 +00:00
|
|
|
|
2021-11-30 20:28:55 +00:00
|
|
|
def advance( # type: ignore[override]
|
2022-01-22 15:57:12 +00:00
|
|
|
self,
|
|
|
|
data_fetcher: AbstractDataFetcher,
|
|
|
|
dl_max_batches: int,
|
|
|
|
kwargs: OrderedDict,
|
2021-06-18 12:54:59 +00:00
|
|
|
) -> None:
|
|
|
|
"""Calls the evaluation step with the corresponding hooks and updates the logger connector.
|
|
|
|
|
|
|
|
Args:
|
2021-10-01 16:37:17 +00:00
|
|
|
data_fetcher: iterator over the dataloader
|
2021-06-18 12:54:59 +00:00
|
|
|
dl_max_batches: maximum number of batches the dataloader can produce
|
2022-01-22 15:57:12 +00:00
|
|
|
kwargs: the kwargs passed down to the hooks.
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
Raises:
|
|
|
|
StopIteration: If the current batch is None
|
|
|
|
"""
|
2021-12-03 20:01:46 +00:00
|
|
|
void(dl_max_batches)
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2022-02-24 18:01:35 +00:00
|
|
|
if not isinstance(data_fetcher, DataLoaderIterDataFetcher):
|
|
|
|
batch_idx = self.batch_progress.current.ready
|
2022-02-28 10:51:33 +00:00
|
|
|
batch = next(data_fetcher)
|
2022-02-24 18:01:35 +00:00
|
|
|
else:
|
2022-02-28 10:51:33 +00:00
|
|
|
batch_idx, batch = next(data_fetcher)
|
|
|
|
self.batch_progress.is_last_batch = data_fetcher.done
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2022-01-17 14:46:55 +00:00
|
|
|
# configure step_kwargs
|
2022-02-24 18:01:35 +00:00
|
|
|
kwargs = self._build_kwargs(kwargs, batch, batch_idx)
|
2021-07-05 08:31:39 +00:00
|
|
|
|
2021-07-19 08:31:45 +00:00
|
|
|
self.batch_progress.increment_ready()
|
|
|
|
|
2021-06-18 12:54:59 +00:00
|
|
|
# hook
|
2021-12-03 20:01:46 +00:00
|
|
|
self._on_evaluation_batch_start(**kwargs)
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-07-19 08:31:45 +00:00
|
|
|
self.batch_progress.increment_started()
|
|
|
|
|
2021-06-18 12:54:59 +00:00
|
|
|
# lightning module methods
|
2021-12-14 19:49:19 +00:00
|
|
|
output = self._evaluation_step(**kwargs)
|
|
|
|
output = self._evaluation_step_end(output)
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-07-19 08:31:45 +00:00
|
|
|
self.batch_progress.increment_processed()
|
|
|
|
|
2021-08-14 02:00:23 +00:00
|
|
|
# track loss history
|
2021-12-03 20:01:46 +00:00
|
|
|
self._on_evaluation_batch_end(output, **kwargs)
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-07-19 08:31:45 +00:00
|
|
|
self.batch_progress.increment_completed()
|
|
|
|
|
2021-06-18 12:54:59 +00:00
|
|
|
# log batch metrics
|
|
|
|
self.trainer.logger_connector.update_eval_step_metrics()
|
|
|
|
|
|
|
|
# track epoch level outputs
|
2021-11-22 15:58:21 +00:00
|
|
|
if self._should_track_batch_outputs_for_epoch_end() and output is not None:
|
2021-11-28 20:09:30 +00:00
|
|
|
self._outputs.append(output)
|
2021-11-22 15:58:21 +00:00
|
|
|
|
|
|
|
if self.trainer.move_metrics_to_cpu:
|
|
|
|
# the evaluation step output is not moved as they are not considered "metrics"
|
|
|
|
assert self.trainer._results is not None
|
|
|
|
self.trainer._results.cpu()
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-10-01 16:37:17 +00:00
|
|
|
if not self.batch_progress.is_last_batch:
|
|
|
|
# if fault tolerant is enabled and process has been notified, exit.
|
|
|
|
self.trainer._exit_gracefully_on_signal()
|
|
|
|
|
2021-09-03 13:02:34 +00:00
|
|
|
def on_run_end(self) -> EPOCH_OUTPUT:
|
2021-09-06 12:49:09 +00:00
|
|
|
"""Returns the outputs of the whole run."""
|
2021-11-28 20:09:30 +00:00
|
|
|
outputs, self._outputs = self._outputs, [] # free memory
|
2021-10-01 16:37:17 +00:00
|
|
|
self._data_fetcher = None
|
2021-07-02 13:36:14 +00:00
|
|
|
return outputs
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-09-15 14:12:27 +00:00
|
|
|
def teardown(self) -> None:
|
|
|
|
# in case the model changes
|
|
|
|
self._should_track_batch_outputs_for_epoch_end.cache_clear()
|
|
|
|
|
2021-10-01 16:37:17 +00:00
|
|
|
def on_save_checkpoint(self) -> Dict:
|
|
|
|
state_dict = super().on_save_checkpoint()
|
|
|
|
|
|
|
|
if (
|
2022-02-16 20:57:21 +00:00
|
|
|
self.trainer is not None
|
|
|
|
and self.trainer.state._fault_tolerant_mode.is_enabled
|
|
|
|
and self._data_fetcher is not None
|
|
|
|
and not self._num_completed_batches_reached() # did not finish
|
|
|
|
and self.batch_progress.current.ready # did start
|
2021-10-01 16:37:17 +00:00
|
|
|
):
|
2022-02-16 20:57:21 +00:00
|
|
|
state = CombinedLoader._state_dict_fn(self._data_fetcher.dataloader_iter, self._has_completed())
|
|
|
|
if state:
|
|
|
|
state_dict["dataloader_state_dict"] = _collect_states_on_rank_zero_over_collection(state)
|
2021-10-01 16:37:17 +00:00
|
|
|
|
|
|
|
return state_dict
|
|
|
|
|
|
|
|
def on_load_checkpoint(self, state_dict: Dict) -> None:
|
|
|
|
# cache the dataloader state dict until the dataloader objects are available
|
2021-11-25 17:31:53 +00:00
|
|
|
# dataset states are collected across all ranks
|
|
|
|
dataloader_state_dict = state_dict.get("dataloader_state_dict", None)
|
|
|
|
if not _fault_tolerant_training() or not dataloader_state_dict:
|
|
|
|
return
|
|
|
|
self._dataloader_state_dict = dataloader_state_dict[self.trainer.global_rank]
|
2021-10-01 16:37:17 +00:00
|
|
|
|
2021-11-30 20:28:55 +00:00
|
|
|
def _reload_dataloader_state_dict(self, data_fetcher: AbstractDataFetcher) -> None:
|
|
|
|
if self.trainer.sanity_checking or not self._dataloader_state_dict:
|
|
|
|
return
|
|
|
|
dataloader = data_fetcher.dataloader
|
|
|
|
if isinstance(dataloader, CombinedLoader):
|
|
|
|
raise MisconfigurationException(
|
|
|
|
"Reloading support hasn't been implemented for `CombinedLoader`. You can request it by opening an issue"
|
|
|
|
" in `https://github.com/PyTorchLightning/pytorch-lightning/issues`."
|
|
|
|
)
|
2022-02-02 19:53:50 +00:00
|
|
|
assert isinstance(dataloader, DataLoader)
|
2021-11-30 20:28:55 +00:00
|
|
|
_reload_dataloader_state_dict(dataloader, self._dataloader_state_dict)
|
|
|
|
self._dataloader_state_dict = {}
|
2021-10-01 16:37:17 +00:00
|
|
|
|
|
|
|
def _num_completed_batches_reached(self) -> bool:
|
|
|
|
epoch_finished_on_completed = self.batch_progress.current.completed == self._dl_max_batches
|
|
|
|
dataloader_consumed_successfully = self.batch_progress.is_last_batch and self._has_completed()
|
|
|
|
return epoch_finished_on_completed or dataloader_consumed_successfully
|
|
|
|
|
|
|
|
def _has_completed(self) -> bool:
|
|
|
|
return self.batch_progress.current.ready == self.batch_progress.current.completed
|
|
|
|
|
2021-12-03 20:01:46 +00:00
|
|
|
def _evaluation_step(self, **kwargs: Any) -> Optional[STEP_OUTPUT]:
|
2021-06-18 12:54:59 +00:00
|
|
|
"""The evaluation step (validation_step or test_step depending on the trainer's state).
|
|
|
|
|
|
|
|
Args:
|
|
|
|
batch: The current batch to run through the step.
|
|
|
|
batch_idx: The index of the current batch
|
|
|
|
dataloader_idx: the index of the dataloader producing the current batch
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
the outputs of the step
|
|
|
|
"""
|
|
|
|
if self.trainer.testing:
|
2021-12-19 01:53:03 +00:00
|
|
|
output = self.trainer._call_strategy_hook("test_step", *kwargs.values())
|
2021-06-18 12:54:59 +00:00
|
|
|
else:
|
2021-12-19 01:53:03 +00:00
|
|
|
output = self.trainer._call_strategy_hook("validation_step", *kwargs.values())
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
return output
|
|
|
|
|
2021-09-15 14:12:27 +00:00
|
|
|
def _evaluation_step_end(self, *args: Any, **kwargs: Any) -> Optional[STEP_OUTPUT]:
|
2021-09-06 12:49:09 +00:00
|
|
|
"""Calls the `{validation/test}_step_end` hook."""
|
2021-06-18 12:54:59 +00:00
|
|
|
hook_name = "test_step_end" if self.trainer.testing else "validation_step_end"
|
2021-12-04 21:39:55 +00:00
|
|
|
model_output = self.trainer._call_lightning_module_hook(hook_name, *args, **kwargs)
|
2021-12-19 01:53:03 +00:00
|
|
|
strategy_output = self.trainer._call_strategy_hook(hook_name, *args, **kwargs)
|
|
|
|
output = strategy_output if model_output is None else model_output
|
2021-06-18 12:54:59 +00:00
|
|
|
return output
|
|
|
|
|
2021-12-03 20:01:46 +00:00
|
|
|
def _on_evaluation_batch_start(self, **kwargs: Any) -> None:
|
2021-06-18 12:54:59 +00:00
|
|
|
"""Calls the ``on_{validation/test}_batch_start`` hook.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
batch: The current batch to run through the step
|
|
|
|
batch_idx: The index of the current batch
|
|
|
|
dataloader_idx: The index of the dataloader producing the current batch
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
AssertionError: If the number of dataloaders is None (has not yet been set).
|
|
|
|
"""
|
2021-12-03 20:01:46 +00:00
|
|
|
self.trainer.logger_connector.on_batch_start(**kwargs)
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-12-03 20:01:46 +00:00
|
|
|
kwargs.setdefault("dataloader_idx", 0) # TODO: the argument should be keyword for these
|
2022-02-03 19:51:56 +00:00
|
|
|
hook_name = "on_test_batch_start" if self.trainer.testing else "on_validation_batch_start"
|
|
|
|
self.trainer._call_callback_hooks(hook_name, *kwargs.values())
|
|
|
|
self.trainer._call_lightning_module_hook(hook_name, *kwargs.values())
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-12-03 20:01:46 +00:00
|
|
|
def _on_evaluation_batch_end(self, output: Optional[STEP_OUTPUT], **kwargs: Any) -> None:
|
2021-06-18 12:54:59 +00:00
|
|
|
"""The ``on_{validation/test}_batch_end`` hook.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
output: The output of the performed step
|
|
|
|
batch: The input batch for the step
|
|
|
|
batch_idx: The index of the current batch
|
|
|
|
dataloader_idx: Index of the dataloader producing the current batch
|
|
|
|
"""
|
2021-12-03 20:01:46 +00:00
|
|
|
kwargs.setdefault("dataloader_idx", 0) # TODO: the argument should be keyword for these
|
2021-06-18 12:54:59 +00:00
|
|
|
hook_name = "on_test_batch_end" if self.trainer.testing else "on_validation_batch_end"
|
2021-12-04 21:39:55 +00:00
|
|
|
self.trainer._call_callback_hooks(hook_name, output, *kwargs.values())
|
|
|
|
self.trainer._call_lightning_module_hook(hook_name, output, *kwargs.values())
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
self.trainer.logger_connector.on_batch_end()
|
|
|
|
|
2022-02-24 18:01:35 +00:00
|
|
|
def _build_kwargs(self, kwargs: OrderedDict, batch: Any, batch_idx: int) -> OrderedDict:
|
2021-09-06 12:49:09 +00:00
|
|
|
"""Helper function to build the arguments for the current step.
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
Args:
|
2022-01-22 15:57:12 +00:00
|
|
|
kwargs: The kwargs passed down to the hooks.
|
|
|
|
batch: The current batch to run through the step.
|
2021-06-18 12:54:59 +00:00
|
|
|
|
|
|
|
Returns:
|
2022-01-22 15:57:12 +00:00
|
|
|
The kwargs passed down to the hooks.
|
2021-06-18 12:54:59 +00:00
|
|
|
"""
|
2022-02-24 18:01:35 +00:00
|
|
|
kwargs.update({"batch": batch, "batch_idx": batch_idx})
|
2022-01-22 15:57:12 +00:00
|
|
|
kwargs.move_to_end("batch_idx", last=False)
|
|
|
|
kwargs.move_to_end("batch", last=False)
|
|
|
|
return kwargs
|
2021-06-18 12:54:59 +00:00
|
|
|
|
2021-09-03 13:02:34 +00:00
|
|
|
@lru_cache(1)
|
|
|
|
def _should_track_batch_outputs_for_epoch_end(self) -> bool:
|
2021-09-06 12:49:09 +00:00
|
|
|
"""Whether the batch outputs should be stored for later usage."""
|
2021-09-03 13:02:34 +00:00
|
|
|
model = self.trainer.lightning_module
|
|
|
|
if self.trainer.testing:
|
|
|
|
return is_overridden("test_epoch_end", model)
|
|
|
|
return is_overridden("validation_epoch_end", model)
|