''' Generates a summary of a model's layers and dimensionality ''' import gc import logging as log import os import subprocess from subprocess import PIPE import numpy as np import torch class ModelSummary(object): def __init__(self, model, mode='full'): ''' Generates summaries of model layers and dimensions. ''' self.model = model self.mode = mode self.in_sizes = [] self.out_sizes = [] self.summarize() def __str__(self): return self.summary.__str__() def __repr__(self): return self.summary.__str__() def named_modules(self): if self.mode == 'full': mods = self.model.named_modules() mods = list(mods)[1:] # do not include root module (LightningModule) elif self.mode == 'top': # the children are the top-level modules mods = self.model.named_children() else: mods = [] return list(mods) def get_variable_sizes(self): '''Run sample input through each layer to get output sizes''' mods = self.named_modules() in_sizes = [] out_sizes = [] input_ = self.model.example_input_array if self.model.on_gpu: device = next(self.model.parameters()).get_device() # test if input is a list or a tuple if isinstance(input_, (list, tuple)): input_ = [input_i.cuda(device) if torch.is_tensor(input_i) else input_i for input_i in input_] else: input_ = input_.cuda(device) if self.model.trainer.use_amp: # test if it is not a list or a tuple if isinstance(input_, (list, tuple)): input_ = [input_i.half() if torch.is_tensor(input_i) else input_i for input_i in input_] else: input_ = input_.half() with torch.no_grad(): for _, m in mods: if isinstance(input_, (list, tuple)): # pragma: no cover out = m(*input_) else: out = m(input_) if isinstance(input_, (list, tuple)): # pragma: no cover in_size = [] for x in input_: if isinstance(x, list): in_size.append(len(x)) else: in_size.append(x.size()) else: in_size = np.array(input_.size()) in_sizes.append(in_size) if isinstance(out, (list, tuple)): # pragma: no cover out_size = np.asarray([x.size() for x in out]) else: out_size = np.array(out.size()) out_sizes.append(out_size) input_ = out self.in_sizes = in_sizes self.out_sizes = out_sizes assert len(in_sizes) == len(out_sizes) def get_layer_names(self): '''Collect Layer Names''' mods = self.named_modules() names = [] layers = [] for name, m in mods: names += [name] layers += [str(m.__class__)] layer_types = [x.split('.')[-1][:-2] for x in layers] self.layer_names = names self.layer_types = layer_types def get_parameter_sizes(self): '''Get sizes of all parameters in `model`''' mods = self.named_modules() sizes = [] for _, m in mods: p = list(m.parameters()) modsz = [np.array(param.size()) for param in p] sizes.append(modsz) self.param_sizes = sizes def get_parameter_nums(self): '''Get number of parameters in each layer''' param_nums = [] for mod in self.param_sizes: all_params = 0 for p in mod: all_params += np.prod(p) param_nums.append(all_params) self.param_nums = param_nums def make_summary(self): ''' Makes a summary listing with: Layer Name, Layer Type, Input Size, Output Size, Number of Parameters ''' arrays = [['Name', self.layer_names], ['Type', self.layer_types], ['Params', list(map(get_human_readable_count, self.param_nums))]] if self.model.example_input_array is not None: arrays.append(['In sizes', self.in_sizes]) arrays.append(['Out sizes', self.out_sizes]) self.summary = _format_summary_table(*arrays) return def summarize(self): self.get_layer_names() self.get_parameter_sizes() self.get_parameter_nums() if self.model.example_input_array is not None: self.get_variable_sizes() self.make_summary() def _format_summary_table(*cols): ''' Takes in a number of arrays, each specifying a column in the summary table, and combines them all into one big string defining the summary table that are nicely formatted. ''' n_rows = len(cols[0][1]) n_cols = 1 + len(cols) # Layer counter counter = list(map(str, list(range(n_rows)))) counter_len = max([len(c) for c in counter]) # Get formatting length of each column length = [] for c in cols: str_l = len(c[0]) # default length is header length for a in c[1]: if isinstance(a, np.ndarray): array_string = '[' + ', '.join([str(j) for j in a]) + ']' str_l = max(len(array_string), str_l) else: str_l = max(len(a), str_l) length.append(str_l) # Formatting s = '{:<{}}' full_length = sum(length) + 3 * n_cols header = [s.format(' ', counter_len)] + [s.format(c[0], l) for c, l in zip(cols, length)] # Summary = header + divider + Rest of table summary = ' | '.join(header) + '\n' + '-' * full_length for i in range(n_rows): line = s.format(counter[i], counter_len) for c, l in zip(cols, length): if isinstance(c[1][i], np.ndarray): array_string = '[' + ', '.join([str(j) for j in c[1][i]]) + ']' line += ' | ' + array_string + ' ' * (l - len(array_string)) else: line += ' | ' + s.format(c[1][i], l) summary += '\n' + line return summary def print_mem_stack(): # pragma: no cover for obj in gc.get_objects(): try: if torch.is_tensor(obj) or (hasattr(obj, 'data') and torch.is_tensor(obj.data)): log.info(type(obj), obj.size()) except Exception: pass def count_mem_items(): # pragma: no cover num_params = 0 num_tensors = 0 for obj in gc.get_objects(): try: if torch.is_tensor(obj) or (hasattr(obj, 'data') and torch.is_tensor(obj.data)): obj_type = str(type(obj)) if 'parameter' in obj_type: num_params += 1 else: num_tensors += 1 except Exception: pass return num_params, num_tensors def get_memory_profile(mode): """ 'all' means return memory for all gpus 'min_max' means return memory for max and min :param mode: :return: """ memory_map = get_gpu_memory_map() if mode == 'min_max': min_index, min_memory = min(memory_map.items(), key=lambda item: item[1]) max_index, max_memory = max(memory_map.items(), key=lambda item: item[1]) memory_map = {min_index: min_memory, max_index: max_memory} return memory_map def get_gpu_memory_map(): """Get the current gpu usage. Returns ------- usage: dict Keys are device ids as integers. Values are memory usage as integers in MB. """ result = subprocess.run( [ 'nvidia-smi', '--query-gpu=memory.used', '--format=csv,nounits,noheader', ], encoding='utf-8', # capture_output=True, # valid for python version >=3.7 stdout=PIPE, stderr=PIPE, # for backward compatibility with python version 3.6 check=True) # Convert lines into a dictionary gpu_memory = [int(x) for x in result.stdout.strip().split(os.linesep)] gpu_memory_map = {f'gpu_{index}': memory for index, memory in enumerate(gpu_memory)} return gpu_memory_map def get_human_readable_count(number): """ Abbreviates an integer number with K, M, B, T for thousands, millions, billions and trillions, respectively. Examples: 123 -> 123 1234 -> 1 K (one thousand) 2e6 -> 2 M (two million) 3e9 -> 3 B (three billion) 4e12 -> 4 T (four trillion) 5e15 -> 5,000 T :param number: a positive integer number :returns a string formatted according to the pattern described above. """ assert number >= 0 labels = [' ', 'K', 'M', 'B', 'T'] num_digits = int(np.floor(np.log10(number)) + 1 if number > 0 else 1) num_groups = int(np.ceil(num_digits / 3)) num_groups = min(num_groups, len(labels)) # don't abbreviate beyond trillions shift = -3 * (num_groups - 1) number = number * (10 ** shift) index = num_groups - 1 return f'{int(number):,d} {labels[index]}'