''' Generates a summary of a model's layers and dimensionality ''' import gc import os import subprocess import numpy as np import pandas as pd import torch import logging class ModelSummary(object): def __init__(self, model, mode='full'): ''' Generates summaries of model layers and dimensions. ''' self.model = model self.mode = mode self.in_sizes = [] self.out_sizes = [] self.summarize() def __str__(self): return self.summary.__str__() def __repr__(self): return self.summary.__str__() def named_modules(self): if self.mode == 'full': mods = self.model.named_modules() mods = list(mods)[1:] # do not include root module (LightningModule) elif self.mode == 'top': # the children are the top-level modules mods = self.model.named_children() else: mods = [] return list(mods) def get_variable_sizes(self): '''Run sample input through each layer to get output sizes''' mods = self.named_modules() in_sizes = [] out_sizes = [] input_ = self.model.example_input_array if self.model.on_gpu: device = next(self.model.parameters()).get_device() # test if input is a list or a tuple if isinstance(input_, (list, tuple)): input_ = [input_i.cuda(device) if torch.is_tensor(input_i) else input_i for input_i in input_] else: input_ = input_.cuda(device) if self.model.trainer.use_amp: # test if it is not a list or a tuple if isinstance(input_, (list, tuple)): input_ = [input_i.half() if torch.is_tensor(input_i) else input_i for input_i in input_] else: input_ = input_.half() with torch.no_grad(): for _, m in mods: if isinstance(input_, (list, tuple)): # pragma: no cover out = m(*input_) else: out = m(input_) if isinstance(input_, (list, tuple)): # pragma: no cover in_size = [] for x in input_: if type(x) is list: in_size.append(len(x)) else: in_size.append(x.size()) else: in_size = np.array(input_.size()) in_sizes.append(in_size) if isinstance(out, (list, tuple)): # pragma: no cover out_size = np.asarray([x.size() for x in out]) else: out_size = np.array(out.size()) out_sizes.append(out_size) input_ = out self.in_sizes = in_sizes self.out_sizes = out_sizes assert len(in_sizes) == len(out_sizes) return def get_layer_names(self): '''Collect Layer Names''' mods = self.named_modules() names = [] layers = [] for name, m in mods: names += [name] layers += [str(m.__class__)] layer_types = [x.split('.')[-1][:-2] for x in layers] self.layer_names = names self.layer_types = layer_types return def get_parameter_sizes(self): '''Get sizes of all parameters in `model`''' mods = self.named_modules() sizes = [] for _, m in mods: p = list(m.parameters()) modsz = [] for j in range(len(p)): modsz.append(np.array(p[j].size())) sizes.append(modsz) self.param_sizes = sizes return def get_parameter_nums(self): '''Get number of parameters in each layer''' param_nums = [] for mod in self.param_sizes: all_params = 0 for p in mod: all_params += np.prod(p) param_nums.append(all_params) self.param_nums = param_nums return def make_summary(self): ''' Makes a summary listing with: Layer Name, Layer Type, Input Size, Output Size, Number of Parameters ''' cols = ['Name', 'Type', 'Params'] if self.model.example_input_array is not None: cols.extend(['In_sizes', 'Out_sizes']) df = pd.DataFrame(np.zeros((len(self.layer_names), len(cols)))) df.columns = cols df['Name'] = self.layer_names df['Type'] = self.layer_types df['Params'] = self.param_nums df['Params'] = df['Params'].map(get_human_readable_count) if self.model.example_input_array is not None: df['In_sizes'] = self.in_sizes df['Out_sizes'] = self.out_sizes self.summary = df return def summarize(self): self.get_layer_names() self.get_parameter_sizes() self.get_parameter_nums() if self.model.example_input_array is not None: self.get_variable_sizes() self.make_summary() def print_mem_stack(): # pragma: no cover for obj in gc.get_objects(): try: if torch.is_tensor(obj) or (hasattr(obj, 'data') and torch.is_tensor(obj.data)): logging.info(type(obj), obj.size()) except Exception: pass def count_mem_items(): # pragma: no cover num_params = 0 num_tensors = 0 for obj in gc.get_objects(): try: if torch.is_tensor(obj) or (hasattr(obj, 'data') and torch.is_tensor(obj.data)): obj_type = str(type(obj)) if 'parameter' in obj_type: num_params += 1 else: num_tensors += 1 except Exception: pass return num_params, num_tensors def get_memory_profile(mode): """ 'all' means return memory for all gpus 'min_max' means return memory for max and min :param mode: :return: """ memory_map = get_gpu_memory_map() if mode == 'min_max': min_index, min_memory = min(memory_map.items(), key=lambda item: item[1]) max_index, max_memory = max(memory_map.items(), key=lambda item: item[1]) memory_map = {min_index: min_memory, max_index: max_memory} return memory_map def get_gpu_memory_map(): """Get the current gpu usage. Returns ------- usage: dict Keys are device ids as integers. Values are memory usage as integers in MB. """ result = subprocess.run( [ 'nvidia-smi', '--query-gpu=memory.used', '--format=csv,nounits,noheader', ], encoding='utf-8', capture_output=True, check=True) # Convert lines into a dictionary gpu_memory = [int(x) for x in result.stdout.strip().split(os.linesep)] gpu_memory_map = {f'gpu_{index}': memory for index, memory in enumerate(gpu_memory)} return gpu_memory_map def get_human_readable_count(number): """ Abbreviates an integer number with K, M, B, T for thousands, millions, billions and trillions, respectively. Examples: 123 -> 123 1234 -> 1 K (one thousand) 2e6 -> 2 M (two million) 3e9 -> 3 B (three billion) 4e12 -> 4 T (four trillion) 5e15 -> 5,000 T :param number: a positive integer number :returns a string formatted according to the pattern described above. """ assert number >= 0 labels = [' ', 'K', 'M', 'B', 'T'] num_digits = int(np.floor(np.log10(number)) + 1 if number > 0 else 1) num_groups = int(np.ceil(num_digits / 3)) num_groups = min(num_groups, len(labels)) # don't abbreviate beyond trillions shift = -3 * (num_groups - 1) number = number * (10 ** shift) index = num_groups - 1 return f'{int(number):,d} {labels[index]}'