lightning/pytorch_lightning/trainer/connectors/data_connector.py

158 lines
6.6 KiB
Python
Raw Normal View History

# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
import pytorch_lightning as pl
from pytorch_lightning.trainer.supporters import prefetch_iterator
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.model_helpers import is_overridden
from pytorch_lightning.utilities.types import EVAL_DATALOADERS, TRAIN_DATALOADERS
class DataConnector:
def __init__(self, trainer: "pl.Trainer", multiple_trainloader_mode: str = "max_size_cycle"):
self.trainer = trainer
self.multiple_trainloader_mode = multiple_trainloader_mode
def on_trainer_init(
self, check_val_every_n_epoch: int, reload_dataloaders_every_epoch: bool, prepare_data_per_node: bool
) -> None:
self.trainer.datamodule = None
self.trainer.prepare_data_per_node = prepare_data_per_node
if not isinstance(check_val_every_n_epoch, int):
raise MisconfigurationException(
f"check_val_every_n_epoch should be an integer. Found {check_val_every_n_epoch}"
)
self.trainer.check_val_every_n_epoch = check_val_every_n_epoch
self.trainer.reload_dataloaders_every_epoch = reload_dataloaders_every_epoch
self.trainer._is_data_prepared = False
def get_profiled_train_dataloader(self, train_dataloader):
profiled_dl = self.trainer.profiler.profile_iterable(
enumerate(prefetch_iterator(train_dataloader)), "get_train_batch"
)
return profiled_dl
def prepare_data(self, model):
# on multi-gpu jobs we only want to manipulate (download, etc) on node_rank=0, local_rank=0
# or in the case where each node needs to do its own manipulation in which case just local_rank=0
if self.can_prepare_data():
if self.trainer.datamodule is not None:
self.trainer.datamodule.prepare_data()
model.prepare_data()
self.trainer._is_data_prepared = True
def can_prepare_data(self):
should_call_dm_prepare_data = True
if self.trainer.datamodule is not None and is_overridden('prepare_data', self.trainer.datamodule):
should_call_dm_prepare_data = not self.trainer.datamodule.has_prepared_data
if self.trainer.prepare_data_per_node:
return self.trainer.local_rank == 0 and should_call_dm_prepare_data
return self.trainer.node_rank == 0 and self.trainer.local_rank == 0 and should_call_dm_prepare_data
def attach_data(
self,
model: 'pl.LightningModule',
train_dataloaders: Optional[TRAIN_DATALOADERS] = None,
val_dataloaders: Optional[EVAL_DATALOADERS] = None,
test_dataloaders: Optional[EVAL_DATALOADERS] = None,
predict_dataloaders: Optional[EVAL_DATALOADERS] = None,
datamodule: Optional['pl.LightningDataModule'] = None
) -> None:
# set up the passed in dataloaders (if needed)
self.attach_dataloaders(
model,
train_dataloaders=train_dataloaders,
val_dataloaders=val_dataloaders,
test_dataloaders=test_dataloaders,
predict_dataloaders=predict_dataloaders,
)
self.attach_datamodule(model, datamodule=datamodule)
# set local properties on the model
self.trainer.model_connector.copy_trainer_model_properties(model)
Add PredictLoop (#5752) * integrate distrib_type * sync changes * sync * fixes * add forgotten generators * add missing logic * update * import * missed imports * import fixes * isort * mv f * changelog * format * move helper to parallel plugin * d * add world size * clean up * duplicate * activate ddp_sharded and tpu * set nvidia flags * remove unused colab var * use_tpu <-> on_tpu attrs * make some ddp_cpu and clusterplugin tests pass * Ref/accelerator connector (#5742) * final cleanup Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * connector cleanup Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * trainer cleanup Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * accelerator cleanup + missing logic in accelerator connector Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * add missing changes to callbacks Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * reflect accelerator changes to lightning module Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * clean cluster envs Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * cleanup plugins Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * add broadcasting Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * yapf * remove plugin connector Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * plugins * add predict_loop * manual optimization * clean predictloop * update optimizer routing * add predict loop on new accelerator * resolve a bug * add rank to torchelastic * add predict_loop * add predict loop on new accelerator * resolve a bug * fix memory mixed precision * update * setstate on trainer for pickling in ddp spawn * add predict_loop * clean predictloop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * add predict_loop * clean predictloop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * resolve tests * add predict method * add back commented accelerator code * adapt test for sync_batch_norm to new plugin * fix deprecated tests * fix ddp cpu choice when no num_processes are given * yapf format * skip a memory test that cannot pass anymore * remove sanetize * rename train to run_train * remove useless hooks * add misconfigurationException * remove wrong naming * resolve some legacy * udpate docstring * fix pickle error in spawn plugin * x * avoid * x * fix cyclic import in docs build * add support for sharded * update typing * add sharded and sharded_spawn to distributed types * make unwrap model default * refactor LightningShardedDataParallel similar to LightningDistributedDataParallel * update sharded spawn to reflect changes * update sharded to reflect changes * Merge 1.1.5 changes * fix merge * fix merge * yapf isort * fix merge * yapf isort * fix indentation in test * copy over reinit scheduler implementation from dev1.2 * fix apex tracking calls with dev_debugger * reduce diff to dev1.2, clean up * fix trainer config test when gpus>0 and num_processes >0 and ddp_cpu * sort plugin tests legacy/new * fix error handling for amp on cpu * fix merge fix merge fix merge * [Feat] Resolve manual_backward (#5837) * resolve manual_backward * resolve flake8 * update * resolve for ddp_spawn * resolve flake8 * resolve flake8 * resolve flake8 Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> * fix tests/accelerator tests on cpu * [BugFix] Resolve manual optimization (#5852) * resolve manual_optimization * update * update Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> * Remove copy trainer parameters to happen earlier within the loop and add safe guard to get ref model (#5856) * resovle a bug * Accelerator refactor sharded rpc (#5854) * rpc branch * merge * update handling of rpc * make devices etc. Optional in RPC * set devices etc. later if necessary * remove devices from sequential * make devices optional in rpc * fix import * uncomment everything * fix cluster selection Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> * resolve bug * fix assert in rpc test * resolve a test * fix docs compilation * accelerator refactor - fix for sharded parity test (#5866) * fix memory issue with ddp_spawn * x x x x x x x x x * x * Remove DDP2 as this does not apply * Add missing pre optimizer hook to ensure lambda closure is called * fix apex docstring * [accelerator][BugFix] Resolve some test for 1 gpu (#5863) * update * revert init * resolve a bug * update * resolve flake8 * update * update * update * revert init * resolve a bug * update * resolve flake8 * update * update * update * update * update * revert init * resolve a bug * update * resolve flake8 * update * update * update * revert init * update * resolve flake8 * update * update * update * update * update * all_gather * update * make plugins work, add misconfig for RPC * update * update * remove breaking test * resolve some tests * resolve flake8 * revert to ddp_spawn Co-authored-by: root <root@ip-172-31-88-60.ec2.internal> Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> Co-authored-by: Justus Schock <justus.schock@rwth-aachen.de> * yapf isort * resolve flake8 * fix apex doctests * fix apex doctests 2 * resolve docs * update drone * clean env * update * update * update * update * merge * Fix RPC related tests, clean out old API, update for new accelerator API [skip ci] (#5881) * Fix RPC related tests, clean out old API, update for new accelerator API * Move tests out of legacy folder, update paths and names * Update test_remove_1-4.py * Expose properties for tpu cores/gpus/num_gpus * Add root GPU property * Move properties to properties.py * move tests that were previously in drone * Fix root GPU property (#5908) * Move root GPU to property, remove horovod set as this is handled in horovod plugin, ensure we mock correctly to set GPU accelerator * Add missing tests back * fix best model path transfer when no checkpoint callback available * Fix setup hook order [wip] (#5858) * Call trainer setup hook before accelerator setup * Add test case * add new test * typo * fix callback order in test Co-authored-by: tchaton <thomas@grid.ai> Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * rename ddp sequential -> rpc sequential for special test * revert * fix stupid merge problem * Use property in connector for sampler (#5913) * merge the import conflicts * fix spawning of processes in slurm * [wip] Fix some bugs for TPU [skip ci] (#5878) * fixed for single tpu * fixed spawn * fixed spawn * update * update * wip * resolve bugs * resolve bug * update on comment * removed decorator * resolve comments * set to 4 * update * update * need cleaning * update * update * update * resolve flake8 * resolve bugs * exclude broadcast * resolve bugs * change test * update * update * skip if meet fails * properly raise trace * update * add catch * wrap test * resolve typo * update * typo Co-authored-by: Lezwon Castelino <lezwon@gmail.com> Co-authored-by: Your Name <you@example.com> * resolve some tests * update * fix imports * update * resolve flake8 * update azure pipeline * skip a sharded test on cpu that requires a gpu * resolve tpus * resolve bug * resolve flake8 * update * updat utils * revert permission change on files * suggestions from carlos Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * remove unrelated formatting changes * remove incomplete comment * Update pytorch_lightning/accelerators/__init__.py Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * remove unrelated formatting change * add types * warn 1.7 ddp manual backward only if ddp kwarg unset * yapf + isort * pep8 unused imports * fix cyclic import in docs * Apply suggestions from code review * typer in accelerator.py * typo * resolve flake8 * update code * update * Update pytorch_lightning/trainer/predict_loop.py Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * Update pytorch_lightning/trainer/predict_loop.py Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * fix merge * fix merge * reset legacy accelerator * add missing rename dispatch * rename post traning * update code * resolved comments * typo * typo * add flow description * resolve comments * update on comments * update flow * add backticks * resolve tpu Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> Co-authored-by: Justus Schock <12886177+justusschock@users.noreply.github.com> Co-authored-by: justusschock <justus.schock@posteo.de> Co-authored-by: Justus Schock <justus.schock@rwth-aachen.de> Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> Co-authored-by: Sean Naren <sean.narenthiran@gmail.com> Co-authored-by: SeanNaren <sean@grid.ai> Co-authored-by: root <root@ip-172-31-88-60.ec2.internal> Co-authored-by: Lezwon Castelino <lezwon@gmail.com> Co-authored-by: Your Name <you@example.com> Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> Co-authored-by: Jirka Borovec <Borda@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
2021-02-16 22:11:56 +00:00
def attach_dataloaders(
self,
model: 'pl.LightningModule',
train_dataloaders: Optional[TRAIN_DATALOADERS] = None,
val_dataloaders: Optional[EVAL_DATALOADERS] = None,
test_dataloaders: Optional[EVAL_DATALOADERS] = None,
predict_dataloaders: Optional[EVAL_DATALOADERS] = None,
) -> None:
# when dataloader is passed via fit, patch the train_dataloader
# functions to overwrite with these implementations
if train_dataloaders is not None:
model.train_dataloader = _PatchDataLoader(train_dataloaders)
if val_dataloaders is not None:
model.val_dataloader = _PatchDataLoader(val_dataloaders)
if test_dataloaders is not None:
model.test_dataloader = _PatchDataLoader(test_dataloaders)
Add PredictLoop (#5752) * integrate distrib_type * sync changes * sync * fixes * add forgotten generators * add missing logic * update * import * missed imports * import fixes * isort * mv f * changelog * format * move helper to parallel plugin * d * add world size * clean up * duplicate * activate ddp_sharded and tpu * set nvidia flags * remove unused colab var * use_tpu <-> on_tpu attrs * make some ddp_cpu and clusterplugin tests pass * Ref/accelerator connector (#5742) * final cleanup Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * connector cleanup Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * trainer cleanup Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * accelerator cleanup + missing logic in accelerator connector Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * add missing changes to callbacks Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * reflect accelerator changes to lightning module Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * clean cluster envs Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * cleanup plugins Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * add broadcasting Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * yapf * remove plugin connector Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * plugins * add predict_loop * manual optimization * clean predictloop * update optimizer routing * add predict loop on new accelerator * resolve a bug * add rank to torchelastic * add predict_loop * add predict loop on new accelerator * resolve a bug * fix memory mixed precision * update * setstate on trainer for pickling in ddp spawn * add predict_loop * clean predictloop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * add predict_loop * clean predictloop * add predict loop on new accelerator * resolve a bug * add predict_loop * add predict loop on new accelerator * resolve a bug * resolve tests * add predict method * add back commented accelerator code * adapt test for sync_batch_norm to new plugin * fix deprecated tests * fix ddp cpu choice when no num_processes are given * yapf format * skip a memory test that cannot pass anymore * remove sanetize * rename train to run_train * remove useless hooks * add misconfigurationException * remove wrong naming * resolve some legacy * udpate docstring * fix pickle error in spawn plugin * x * avoid * x * fix cyclic import in docs build * add support for sharded * update typing * add sharded and sharded_spawn to distributed types * make unwrap model default * refactor LightningShardedDataParallel similar to LightningDistributedDataParallel * update sharded spawn to reflect changes * update sharded to reflect changes * Merge 1.1.5 changes * fix merge * fix merge * yapf isort * fix merge * yapf isort * fix indentation in test * copy over reinit scheduler implementation from dev1.2 * fix apex tracking calls with dev_debugger * reduce diff to dev1.2, clean up * fix trainer config test when gpus>0 and num_processes >0 and ddp_cpu * sort plugin tests legacy/new * fix error handling for amp on cpu * fix merge fix merge fix merge * [Feat] Resolve manual_backward (#5837) * resolve manual_backward * resolve flake8 * update * resolve for ddp_spawn * resolve flake8 * resolve flake8 * resolve flake8 Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> * fix tests/accelerator tests on cpu * [BugFix] Resolve manual optimization (#5852) * resolve manual_optimization * update * update Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> * Remove copy trainer parameters to happen earlier within the loop and add safe guard to get ref model (#5856) * resovle a bug * Accelerator refactor sharded rpc (#5854) * rpc branch * merge * update handling of rpc * make devices etc. Optional in RPC * set devices etc. later if necessary * remove devices from sequential * make devices optional in rpc * fix import * uncomment everything * fix cluster selection Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> * resolve bug * fix assert in rpc test * resolve a test * fix docs compilation * accelerator refactor - fix for sharded parity test (#5866) * fix memory issue with ddp_spawn * x x x x x x x x x * x * Remove DDP2 as this does not apply * Add missing pre optimizer hook to ensure lambda closure is called * fix apex docstring * [accelerator][BugFix] Resolve some test for 1 gpu (#5863) * update * revert init * resolve a bug * update * resolve flake8 * update * update * update * revert init * resolve a bug * update * resolve flake8 * update * update * update * update * update * revert init * resolve a bug * update * resolve flake8 * update * update * update * revert init * update * resolve flake8 * update * update * update * update * update * all_gather * update * make plugins work, add misconfig for RPC * update * update * remove breaking test * resolve some tests * resolve flake8 * revert to ddp_spawn Co-authored-by: root <root@ip-172-31-88-60.ec2.internal> Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> Co-authored-by: Justus Schock <justus.schock@rwth-aachen.de> * yapf isort * resolve flake8 * fix apex doctests * fix apex doctests 2 * resolve docs * update drone * clean env * update * update * update * update * merge * Fix RPC related tests, clean out old API, update for new accelerator API [skip ci] (#5881) * Fix RPC related tests, clean out old API, update for new accelerator API * Move tests out of legacy folder, update paths and names * Update test_remove_1-4.py * Expose properties for tpu cores/gpus/num_gpus * Add root GPU property * Move properties to properties.py * move tests that were previously in drone * Fix root GPU property (#5908) * Move root GPU to property, remove horovod set as this is handled in horovod plugin, ensure we mock correctly to set GPU accelerator * Add missing tests back * fix best model path transfer when no checkpoint callback available * Fix setup hook order [wip] (#5858) * Call trainer setup hook before accelerator setup * Add test case * add new test * typo * fix callback order in test Co-authored-by: tchaton <thomas@grid.ai> Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> * rename ddp sequential -> rpc sequential for special test * revert * fix stupid merge problem * Use property in connector for sampler (#5913) * merge the import conflicts * fix spawning of processes in slurm * [wip] Fix some bugs for TPU [skip ci] (#5878) * fixed for single tpu * fixed spawn * fixed spawn * update * update * wip * resolve bugs * resolve bug * update on comment * removed decorator * resolve comments * set to 4 * update * update * need cleaning * update * update * update * resolve flake8 * resolve bugs * exclude broadcast * resolve bugs * change test * update * update * skip if meet fails * properly raise trace * update * add catch * wrap test * resolve typo * update * typo Co-authored-by: Lezwon Castelino <lezwon@gmail.com> Co-authored-by: Your Name <you@example.com> * resolve some tests * update * fix imports * update * resolve flake8 * update azure pipeline * skip a sharded test on cpu that requires a gpu * resolve tpus * resolve bug * resolve flake8 * update * updat utils * revert permission change on files * suggestions from carlos Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * remove unrelated formatting changes * remove incomplete comment * Update pytorch_lightning/accelerators/__init__.py Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * remove unrelated formatting change * add types * warn 1.7 ddp manual backward only if ddp kwarg unset * yapf + isort * pep8 unused imports * fix cyclic import in docs * Apply suggestions from code review * typer in accelerator.py * typo * resolve flake8 * update code * update * Update pytorch_lightning/trainer/predict_loop.py Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * Update pytorch_lightning/trainer/predict_loop.py Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> * fix merge * fix merge * reset legacy accelerator * add missing rename dispatch * rename post traning * update code * resolved comments * typo * typo * add flow description * resolve comments * update on comments * update flow * add backticks * resolve tpu Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com> Co-authored-by: Justus Schock <12886177+justusschock@users.noreply.github.com> Co-authored-by: justusschock <justus.schock@posteo.de> Co-authored-by: Justus Schock <justus.schock@rwth-aachen.de> Co-authored-by: Ubuntu <ubuntu@ip-172-31-88-60.ec2.internal> Co-authored-by: Sean Naren <sean.narenthiran@gmail.com> Co-authored-by: SeanNaren <sean@grid.ai> Co-authored-by: root <root@ip-172-31-88-60.ec2.internal> Co-authored-by: Lezwon Castelino <lezwon@gmail.com> Co-authored-by: Your Name <you@example.com> Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> Co-authored-by: Jirka Borovec <Borda@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
2021-02-16 22:11:56 +00:00
if predict_dataloaders is not None:
model.predict_dataloader = _PatchDataLoader(predict_dataloaders)
def attach_datamodule(
self, model: 'pl.LightningModule', datamodule: Optional['pl.LightningDataModule'] = None
) -> None:
# If we have a datamodule, attach necessary hooks + dataloaders
if datamodule is None:
return
# Override loader hooks
dl_methods = ('train_dataloader', 'val_dataloader', 'test_dataloader', 'predict_dataloader')
for method in dl_methods:
if is_overridden(method, datamodule):
setattr(model, method, getattr(datamodule, method))
# Override data transfer hooks if dataset-specific to_device logic has been defined in datamodule
batch_transfer_hooks = ('on_before_batch_transfer', 'transfer_batch_to_device', 'on_after_batch_transfer')
for hook in batch_transfer_hooks:
if is_overridden(hook, datamodule):
setattr(model, hook, getattr(datamodule, hook))
self.trainer.datamodule = datamodule
datamodule.trainer = self.trainer
# experimental feature for Flash
if hasattr(datamodule, "data_pipeline"):
model.data_pipeline = datamodule.data_pipeline
class _PatchDataLoader:
r"""
Callable object for patching dataloaders passed into trainer.fit().
Use this class to override model.*_dataloader() and be pickle-compatible.
Args:
dataloader: Dataloader object to return when called.
"""
def __init__(self, dataloader: Union[TRAIN_DATALOADERS, EVAL_DATALOADERS]) -> None:
self.dataloader = dataloader
# cannot pickle __code__ so cannot verify if PatchDataloader
# exists which shows dataloader methods have been overwritten.
# so, we hack it by using the string representation
self.patch_loader_code = str(self.__call__.__code__)
def __call__(self) -> Union[TRAIN_DATALOADERS, EVAL_DATALOADERS]:
return self.dataloader