Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """Network related utility tools.""" | |
| import logging | |
| from typing import Dict | |
| import numpy as np | |
| import torch | |
| def to_device(m, x): | |
| """Send tensor into the device of the module. | |
| Args: | |
| m (torch.nn.Module): Torch module. | |
| x (Tensor): Torch tensor. | |
| Returns: | |
| Tensor: Torch tensor located in the same place as torch module. | |
| """ | |
| if isinstance(m, torch.nn.Module): | |
| device = next(m.parameters()).device | |
| elif isinstance(m, torch.Tensor): | |
| device = m.device | |
| else: | |
| raise TypeError( | |
| "Expected torch.nn.Module or torch.tensor, " f"bot got: {type(m)}" | |
| ) | |
| return x.to(device) | |
| def pad_list(xs, pad_value): | |
| """Perform padding for the list of tensors. | |
| Args: | |
| xs (List): List of Tensors [(T_1, `*`), (T_2, `*`), ..., (T_B, `*`)]. | |
| pad_value (float): Value for padding. | |
| Returns: | |
| Tensor: Padded tensor (B, Tmax, `*`). | |
| Examples: | |
| >>> x = [torch.ones(4), torch.ones(2), torch.ones(1)] | |
| >>> x | |
| [tensor([1., 1., 1., 1.]), tensor([1., 1.]), tensor([1.])] | |
| >>> pad_list(x, 0) | |
| tensor([[1., 1., 1., 1.], | |
| [1., 1., 0., 0.], | |
| [1., 0., 0., 0.]]) | |
| """ | |
| n_batch = len(xs) | |
| max_len = max(x.size(0) for x in xs) | |
| pad = xs[0].new(n_batch, max_len, *xs[0].size()[1:]).fill_(pad_value) | |
| for i in range(n_batch): | |
| pad[i, : xs[i].size(0)] = xs[i] | |
| return pad | |
| def make_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None): | |
| """Make mask tensor containing indices of padded part. | |
| Args: | |
| lengths (LongTensor or List): Batch of lengths (B,). | |
| xs (Tensor, optional): The reference tensor. | |
| If set, masks will be the same shape as this tensor. | |
| length_dim (int, optional): Dimension indicator of the above tensor. | |
| See the example. | |
| Returns: | |
| Tensor: Mask tensor containing indices of padded part. | |
| dtype=torch.uint8 in PyTorch 1.2- | |
| dtype=torch.bool in PyTorch 1.2+ (including 1.2) | |
| Examples: | |
| With only lengths. | |
| >>> lengths = [5, 3, 2] | |
| >>> make_pad_mask(lengths) | |
| masks = [[0, 0, 0, 0 ,0], | |
| [0, 0, 0, 1, 1], | |
| [0, 0, 1, 1, 1]] | |
| With the reference tensor. | |
| >>> xs = torch.zeros((3, 2, 4)) | |
| >>> make_pad_mask(lengths, xs) | |
| tensor([[[0, 0, 0, 0], | |
| [0, 0, 0, 0]], | |
| [[0, 0, 0, 1], | |
| [0, 0, 0, 1]], | |
| [[0, 0, 1, 1], | |
| [0, 0, 1, 1]]], dtype=torch.uint8) | |
| >>> xs = torch.zeros((3, 2, 6)) | |
| >>> make_pad_mask(lengths, xs) | |
| tensor([[[0, 0, 0, 0, 0, 1], | |
| [0, 0, 0, 0, 0, 1]], | |
| [[0, 0, 0, 1, 1, 1], | |
| [0, 0, 0, 1, 1, 1]], | |
| [[0, 0, 1, 1, 1, 1], | |
| [0, 0, 1, 1, 1, 1]]], dtype=torch.uint8) | |
| With the reference tensor and dimension indicator. | |
| >>> xs = torch.zeros((3, 6, 6)) | |
| >>> make_pad_mask(lengths, xs, 1) | |
| tensor([[[0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [1, 1, 1, 1, 1, 1]], | |
| [[0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1]], | |
| [[0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1]]], dtype=torch.uint8) | |
| >>> make_pad_mask(lengths, xs, 2) | |
| tensor([[[0, 0, 0, 0, 0, 1], | |
| [0, 0, 0, 0, 0, 1], | |
| [0, 0, 0, 0, 0, 1], | |
| [0, 0, 0, 0, 0, 1], | |
| [0, 0, 0, 0, 0, 1], | |
| [0, 0, 0, 0, 0, 1]], | |
| [[0, 0, 0, 1, 1, 1], | |
| [0, 0, 0, 1, 1, 1], | |
| [0, 0, 0, 1, 1, 1], | |
| [0, 0, 0, 1, 1, 1], | |
| [0, 0, 0, 1, 1, 1], | |
| [0, 0, 0, 1, 1, 1]], | |
| [[0, 0, 1, 1, 1, 1], | |
| [0, 0, 1, 1, 1, 1], | |
| [0, 0, 1, 1, 1, 1], | |
| [0, 0, 1, 1, 1, 1], | |
| [0, 0, 1, 1, 1, 1], | |
| [0, 0, 1, 1, 1, 1]]], dtype=torch.uint8) | |
| """ | |
| if length_dim == 0: | |
| raise ValueError("length_dim cannot be 0: {}".format(length_dim)) | |
| if not isinstance(lengths, list): | |
| lengths = lengths.long().tolist() | |
| bs = int(len(lengths)) | |
| if maxlen is None: | |
| if xs is None: | |
| maxlen = int(max(lengths)) | |
| else: | |
| maxlen = xs.size(length_dim) | |
| else: | |
| assert xs is None | |
| assert maxlen >= int(max(lengths)) | |
| seq_range = torch.arange(0, maxlen, dtype=torch.int64) | |
| seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen) | |
| seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1) | |
| mask = seq_range_expand >= seq_length_expand | |
| if xs is not None: | |
| assert xs.size(0) == bs, (xs.size(0), bs) | |
| if length_dim < 0: | |
| length_dim = xs.dim() + length_dim | |
| # ind = (:, None, ..., None, :, , None, ..., None) | |
| ind = tuple( | |
| slice(None) if i in (0, length_dim) else None for i in range(xs.dim()) | |
| ) | |
| mask = mask[ind].expand_as(xs).to(xs.device) | |
| return mask | |
| def make_non_pad_mask(lengths, xs=None, length_dim=-1): | |
| """Make mask tensor containing indices of non-padded part. | |
| Args: | |
| lengths (LongTensor or List): Batch of lengths (B,). | |
| xs (Tensor, optional): The reference tensor. | |
| If set, masks will be the same shape as this tensor. | |
| length_dim (int, optional): Dimension indicator of the above tensor. | |
| See the example. | |
| Returns: | |
| ByteTensor: mask tensor containing indices of padded part. | |
| dtype=torch.uint8 in PyTorch 1.2- | |
| dtype=torch.bool in PyTorch 1.2+ (including 1.2) | |
| Examples: | |
| With only lengths. | |
| >>> lengths = [5, 3, 2] | |
| >>> make_non_pad_mask(lengths) | |
| masks = [[1, 1, 1, 1 ,1], | |
| [1, 1, 1, 0, 0], | |
| [1, 1, 0, 0, 0]] | |
| With the reference tensor. | |
| >>> xs = torch.zeros((3, 2, 4)) | |
| >>> make_non_pad_mask(lengths, xs) | |
| tensor([[[1, 1, 1, 1], | |
| [1, 1, 1, 1]], | |
| [[1, 1, 1, 0], | |
| [1, 1, 1, 0]], | |
| [[1, 1, 0, 0], | |
| [1, 1, 0, 0]]], dtype=torch.uint8) | |
| >>> xs = torch.zeros((3, 2, 6)) | |
| >>> make_non_pad_mask(lengths, xs) | |
| tensor([[[1, 1, 1, 1, 1, 0], | |
| [1, 1, 1, 1, 1, 0]], | |
| [[1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 0, 0, 0]], | |
| [[1, 1, 0, 0, 0, 0], | |
| [1, 1, 0, 0, 0, 0]]], dtype=torch.uint8) | |
| With the reference tensor and dimension indicator. | |
| >>> xs = torch.zeros((3, 6, 6)) | |
| >>> make_non_pad_mask(lengths, xs, 1) | |
| tensor([[[1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [0, 0, 0, 0, 0, 0]], | |
| [[1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0]], | |
| [[1, 1, 1, 1, 1, 1], | |
| [1, 1, 1, 1, 1, 1], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0], | |
| [0, 0, 0, 0, 0, 0]]], dtype=torch.uint8) | |
| >>> make_non_pad_mask(lengths, xs, 2) | |
| tensor([[[1, 1, 1, 1, 1, 0], | |
| [1, 1, 1, 1, 1, 0], | |
| [1, 1, 1, 1, 1, 0], | |
| [1, 1, 1, 1, 1, 0], | |
| [1, 1, 1, 1, 1, 0], | |
| [1, 1, 1, 1, 1, 0]], | |
| [[1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 0, 0, 0]], | |
| [[1, 1, 0, 0, 0, 0], | |
| [1, 1, 0, 0, 0, 0], | |
| [1, 1, 0, 0, 0, 0], | |
| [1, 1, 0, 0, 0, 0], | |
| [1, 1, 0, 0, 0, 0], | |
| [1, 1, 0, 0, 0, 0]]], dtype=torch.uint8) | |
| """ | |
| return ~make_pad_mask(lengths, xs, length_dim) | |
| def mask_by_length(xs, lengths, fill=0): | |
| """Mask tensor according to length. | |
| Args: | |
| xs (Tensor): Batch of input tensor (B, `*`). | |
| lengths (LongTensor or List): Batch of lengths (B,). | |
| fill (int or float): Value to fill masked part. | |
| Returns: | |
| Tensor: Batch of masked input tensor (B, `*`). | |
| Examples: | |
| >>> x = torch.arange(5).repeat(3, 1) + 1 | |
| >>> x | |
| tensor([[1, 2, 3, 4, 5], | |
| [1, 2, 3, 4, 5], | |
| [1, 2, 3, 4, 5]]) | |
| >>> lengths = [5, 3, 2] | |
| >>> mask_by_length(x, lengths) | |
| tensor([[1, 2, 3, 4, 5], | |
| [1, 2, 3, 0, 0], | |
| [1, 2, 0, 0, 0]]) | |
| """ | |
| assert xs.size(0) == len(lengths) | |
| ret = xs.data.new(*xs.size()).fill_(fill) | |
| for i, l in enumerate(lengths): | |
| ret[i, :l] = xs[i, :l] | |
| return ret | |
| def th_accuracy(pad_outputs, pad_targets, ignore_label): | |
| """Calculate accuracy. | |
| Args: | |
| pad_outputs (Tensor): Prediction tensors (B * Lmax, D). | |
| pad_targets (LongTensor): Target label tensors (B, Lmax, D). | |
| ignore_label (int): Ignore label id. | |
| Returns: | |
| float: Accuracy value (0.0 - 1.0). | |
| """ | |
| pad_pred = pad_outputs.view( | |
| pad_targets.size(0), pad_targets.size(1), pad_outputs.size(1) | |
| ).argmax(2) | |
| mask = pad_targets != ignore_label | |
| numerator = torch.sum( | |
| pad_pred.masked_select(mask) == pad_targets.masked_select(mask) | |
| ) | |
| denominator = torch.sum(mask) | |
| return float(numerator) / float(denominator) | |
| def to_torch_tensor(x): | |
| """Change to torch.Tensor or ComplexTensor from numpy.ndarray. | |
| Args: | |
| x: Inputs. It should be one of numpy.ndarray, Tensor, ComplexTensor, and dict. | |
| Returns: | |
| Tensor or ComplexTensor: Type converted inputs. | |
| Examples: | |
| >>> xs = np.ones(3, dtype=np.float32) | |
| >>> xs = to_torch_tensor(xs) | |
| tensor([1., 1., 1.]) | |
| >>> xs = torch.ones(3, 4, 5) | |
| >>> assert to_torch_tensor(xs) is xs | |
| >>> xs = {'real': xs, 'imag': xs} | |
| >>> to_torch_tensor(xs) | |
| ComplexTensor( | |
| Real: | |
| tensor([1., 1., 1.]) | |
| Imag; | |
| tensor([1., 1., 1.]) | |
| ) | |
| """ | |
| # If numpy, change to torch tensor | |
| if isinstance(x, np.ndarray): | |
| if x.dtype.kind == "c": | |
| # Dynamically importing because torch_complex requires python3 | |
| from torch_complex.tensor import ComplexTensor | |
| return ComplexTensor(x) | |
| else: | |
| return torch.from_numpy(x) | |
| # If {'real': ..., 'imag': ...}, convert to ComplexTensor | |
| elif isinstance(x, dict): | |
| # Dynamically importing because torch_complex requires python3 | |
| from torch_complex.tensor import ComplexTensor | |
| if "real" not in x or "imag" not in x: | |
| raise ValueError("has 'real' and 'imag' keys: {}".format(list(x))) | |
| # Relative importing because of using python3 syntax | |
| return ComplexTensor(x["real"], x["imag"]) | |
| # If torch.Tensor, as it is | |
| elif isinstance(x, torch.Tensor): | |
| return x | |
| else: | |
| error = ( | |
| "x must be numpy.ndarray, torch.Tensor or a dict like " | |
| "{{'real': torch.Tensor, 'imag': torch.Tensor}}, " | |
| "but got {}".format(type(x)) | |
| ) | |
| try: | |
| from torch_complex.tensor import ComplexTensor | |
| except Exception: | |
| # If PY2 | |
| raise ValueError(error) | |
| else: | |
| # If PY3 | |
| if isinstance(x, ComplexTensor): | |
| return x | |
| else: | |
| raise ValueError(error) | |
| def get_subsample(train_args, mode, arch): | |
| """Parse the subsampling factors from the args for the specified `mode` and `arch`. | |
| Args: | |
| train_args: argument Namespace containing options. | |
| mode: one of ('asr', 'mt', 'st') | |
| arch: one of ('rnn', 'rnn-t', 'rnn_mix', 'rnn_mulenc', 'transformer') | |
| Returns: | |
| np.ndarray / List[np.ndarray]: subsampling factors. | |
| """ | |
| if arch == "transformer": | |
| return np.array([1]) | |
| elif mode == "mt" and arch == "rnn": | |
| # +1 means input (+1) and layers outputs (train_args.elayer) | |
| subsample = np.ones(train_args.elayers + 1, dtype=np.int64) | |
| logging.warning("Subsampling is not performed for machine translation.") | |
| logging.info("subsample: " + " ".join([str(x) for x in subsample])) | |
| return subsample | |
| elif ( | |
| (mode == "asr" and arch in ("rnn", "rnn-t")) | |
| or (mode == "mt" and arch == "rnn") | |
| or (mode == "st" and arch == "rnn") | |
| ): | |
| subsample = np.ones(train_args.elayers + 1, dtype=np.int64) | |
| if train_args.etype.endswith("p") and not train_args.etype.startswith("vgg"): | |
| ss = train_args.subsample.split("_") | |
| for j in range(min(train_args.elayers + 1, len(ss))): | |
| subsample[j] = int(ss[j]) | |
| else: | |
| logging.warning( | |
| "Subsampling is not performed for vgg*. " | |
| "It is performed in max pooling layers at CNN." | |
| ) | |
| logging.info("subsample: " + " ".join([str(x) for x in subsample])) | |
| return subsample | |
| elif mode == "asr" and arch == "rnn_mix": | |
| subsample = np.ones( | |
| train_args.elayers_sd + train_args.elayers + 1, dtype=np.int64 | |
| ) | |
| if train_args.etype.endswith("p") and not train_args.etype.startswith("vgg"): | |
| ss = train_args.subsample.split("_") | |
| for j in range( | |
| min(train_args.elayers_sd + train_args.elayers + 1, len(ss)) | |
| ): | |
| subsample[j] = int(ss[j]) | |
| else: | |
| logging.warning( | |
| "Subsampling is not performed for vgg*. " | |
| "It is performed in max pooling layers at CNN." | |
| ) | |
| logging.info("subsample: " + " ".join([str(x) for x in subsample])) | |
| return subsample | |
| elif mode == "asr" and arch == "rnn_mulenc": | |
| subsample_list = [] | |
| for idx in range(train_args.num_encs): | |
| subsample = np.ones(train_args.elayers[idx] + 1, dtype=np.int64) | |
| if train_args.etype[idx].endswith("p") and not train_args.etype[ | |
| idx | |
| ].startswith("vgg"): | |
| ss = train_args.subsample[idx].split("_") | |
| for j in range(min(train_args.elayers[idx] + 1, len(ss))): | |
| subsample[j] = int(ss[j]) | |
| else: | |
| logging.warning( | |
| "Encoder %d: Subsampling is not performed for vgg*. " | |
| "It is performed in max pooling layers at CNN.", | |
| idx + 1, | |
| ) | |
| logging.info("subsample: " + " ".join([str(x) for x in subsample])) | |
| subsample_list.append(subsample) | |
| return subsample_list | |
| else: | |
| raise ValueError("Invalid options: mode={}, arch={}".format(mode, arch)) | |
| def rename_state_dict( | |
| old_prefix: str, new_prefix: str, state_dict: Dict[str, torch.Tensor] | |
| ): | |
| """Replace keys of old prefix with new prefix in state dict.""" | |
| # need this list not to break the dict iterator | |
| old_keys = [k for k in state_dict if k.startswith(old_prefix)] | |
| if len(old_keys) > 0: | |
| logging.warning(f"Rename: {old_prefix} -> {new_prefix}") | |
| for k in old_keys: | |
| v = state_dict.pop(k) | |
| new_k = k.replace(old_prefix, new_prefix) | |
| state_dict[new_k] = v | |
| def get_activation(act): | |
| """Return activation function.""" | |
| # Lazy load to avoid unused import | |
| from espnet.nets.pytorch_backend.conformer.swish import Swish | |
| activation_funcs = { | |
| "hardtanh": torch.nn.Hardtanh, | |
| "tanh": torch.nn.Tanh, | |
| "relu": torch.nn.ReLU, | |
| "selu": torch.nn.SELU, | |
| "swish": Swish, | |
| } | |
| return activation_funcs[act]() |