lightning/pytorch_lightning/overrides/data_parallel.py

335 lines
12 KiB
Python

# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import threading
from collections.abc import Mapping, Iterable
from itertools import chain
import torch
from torch.cuda._utils import _get_device_index
from torch.nn import DataParallel
from torch.nn.parallel import DistributedDataParallel
from torch.nn.parallel._functions import Gather
from pytorch_lightning.core.step_result import Result
from pytorch_lightning.utilities.warning_utils import WarningCache
def _find_tensors(obj): # pragma: no-cover
r"""
Recursively find all tensors contained in the specified object.
"""
if isinstance(obj, torch.Tensor):
return [obj]
if isinstance(obj, (list, tuple)):
return itertools.chain(*map(_find_tensors, obj))
if isinstance(obj, dict):
return itertools.chain(*map(_find_tensors, obj.values()))
return []
def get_a_var(obj): # pragma: no-cover
if isinstance(obj, torch.Tensor):
return obj
if isinstance(obj, (list, tuple)):
for result in map(get_a_var, obj):
if isinstance(result, torch.Tensor):
return result
if isinstance(obj, dict):
for result in map(get_a_var, obj.items()):
if isinstance(result, torch.Tensor):
return result
return None
warning_cache = WarningCache()
class LightningDataParallel(DataParallel):
"""
Override the forward call in lightning so it goes to training and validation step respectively
"""
def forward(self, *inputs, **kwargs):
if not self.device_ids:
return self.module(*inputs, **kwargs)
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
if len(self.device_ids) == 1:
# lightning
if self.module.training:
return self.module.training_step(*inputs[0], **kwargs[0])
if self.module.testing:
return self.module.test_step(*inputs[0], **kwargs[0])
return self.module.validation_step(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
if isinstance(outputs[0], Result):
outputs = self.__gather_structured_result(outputs)
else:
outputs = self.gather(outputs)
return outputs
def __gather_structured_result(self, outputs):
prototype_output = outputs[0]
original_class = prototype_output.__class__
outputs = [dict(x) for x in outputs]
# remove all the meta info
meta = outputs[0]['meta']
for i, output in enumerate(outputs):
del output['meta']
outputs = self.gather(outputs)
# pass minimize to constructor for TrainResult
if 'minimize' in outputs:
result = original_class(outputs['minimize'])
else:
result = original_class()
result.update(outputs)
result['meta'] = meta
return result
def gather(self, outputs):
r"""
Override the gather method to support python scalars as well.
"""
def gather_map(outputs):
elem = outputs[0]
elem_type = type(elem)
if isinstance(elem, torch.Tensor):
return Gather.apply(self.output_device, self.dim, *outputs)
if elem is None:
return None
if isinstance(elem, Mapping):
if not all((len(elem) == len(d) for d in outputs)):
raise ValueError('All dicts must have the same number of keys')
return elem_type(((k, gather_map([d[k] for d in outputs]))
for k in elem))
if isinstance(elem, Iterable) and not isinstance(elem, str):
return elem_type(map(gather_map, zip(*outputs)))
return outputs
# Recursive function calls like this create reference cycles.
# Setting the function to None clears the refcycle.
try:
res = gather_map(outputs)
finally:
gather_map = None
return res
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
class LightningDistributedDataParallel(DistributedDataParallel):
"""
Override the forward call in lightning so it goes to training and validation step respectively
"""
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def forward(self, *inputs, **kwargs): # pragma: no-cover
self._sync_params()
fx_called: str = ''
if self.device_ids:
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
if len(self.device_ids) == 1:
# --------------
# LIGHTNING MOD
# --------------
# normal
# output = self.module(*inputs[0], **kwargs[0])
# lightning
if self.module.training:
output = self.module.training_step(*inputs[0], **kwargs[0])
fx_called = 'training_step'
elif self.module.testing:
output = self.module.test_step(*inputs[0], **kwargs[0])
fx_called = 'test_step'
else:
output = self.module.validation_step(*inputs[0], **kwargs[0])
fx_called = 'validation_step'
else:
outputs = self.parallel_apply(self._module_copies[:len(inputs)], inputs, kwargs)
output = self.gather(outputs, self.output_device)
else:
# output = self.module(*inputs, **kwargs)
# normal lightning (ddp_cpu)
if self.module.training:
output = self.module.training_step(*inputs, **kwargs)
elif self.module.testing:
output = self.module.test_step(*inputs, **kwargs)
else:
output = self.module.validation_step(*inputs, **kwargs)
if torch.is_grad_enabled():
# We'll return the output object verbatim since it is a freeform
# object. We need to find any tensors in this object, though,
# because we need to figure out which parameters were used during
# this forward pass, to ensure we short circuit reduction for any
# unused parameters. Only if `find_unused_parameters` is set.
if self.find_unused_parameters:
self.reducer.prepare_for_backward(list(_find_tensors(output)))
else:
self.reducer.prepare_for_backward([])
if output is None:
warn_missing_output(f'{fx_called} returned None. Did you forget to re')
return output
def warn_missing_output(fx_called):
if fx_called == 'training_step':
warning_cache.warn("Your training_step returned None. You should instead do:\n"
"`return loss`\n or\n `return TrainResult`")
elif fx_called in ['validation_step', 'test_step']:
warning_cache.warn(f"Your {fx_called} returned None. You should instead do:\n `return EvalResult")
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None): # pragma: no-cover
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
fx_called: str = ''
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
module = module.to(device)
# ---------------
# CHANGE
if module.training:
output = module.training_step(*input, **kwargs)
fx_called = 'training_step'
elif module.testing:
output = module.test_step(*input, **kwargs)
fx_called = 'test_step'
else:
output = module.validation_step(*input, **kwargs)
fx_called = 'validation_step'
if output is None:
warn_missing_output(fx_called)
if output is not None and (module.use_dp or module.use_ddp2):
auto_squeeze_dim_zeros(output)
# ---------------
with lock:
results[i] = output
except Exception as ex:
with lock:
results[i] = ex
# TODO: fix hack (maybe not a hack)
# make sure each module knows what training state it's in...
# fixes weird bug where copies are out of sync
root_m = modules[0]
for m in modules[1:]:
m.training = root_m.training
m.testing = root_m.testing
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs = []
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
def auto_squeeze_dim_zeros(output):
"""
In DP or DDP2 we need to unsqueeze dim 0
:param output:
:return:
"""
if isinstance(output, torch.Tensor):
output = output.unsqueeze(0)
return output
for k, v in output.items():
if not isinstance(v, torch.Tensor):
continue
is_scalar = v.dim() == 0
if is_scalar:
output[k] = output[k].unsqueeze(0)