Spaces:
Runtime error
Runtime error
| """ | |
| Taken from ESPNet, modified by Florian Lux | |
| """ | |
| import os | |
| from abc import ABC | |
| import torch | |
| def cumsum_durations(durations): | |
| out = [0] | |
| for duration in durations: | |
| out.append(duration + out[-1]) | |
| centers = list() | |
| for index, _ in enumerate(out): | |
| if index + 1 < len(out): | |
| centers.append((out[index] + out[index + 1]) / 2) | |
| return out, centers | |
| def delete_old_checkpoints(checkpoint_dir, keep=5): | |
| checkpoint_list = list() | |
| for el in os.listdir(checkpoint_dir): | |
| if el.endswith(".pt") and el != "best.pt": | |
| checkpoint_list.append(int(el.split(".")[0].split("_")[1])) | |
| if len(checkpoint_list) <= keep: | |
| return | |
| else: | |
| checkpoint_list.sort(reverse=False) | |
| checkpoints_to_delete = [os.path.join(checkpoint_dir, "checkpoint_{}.pt".format(step)) for step in checkpoint_list[:-keep]] | |
| for old_checkpoint in checkpoints_to_delete: | |
| os.remove(os.path.join(old_checkpoint)) | |
| def get_most_recent_checkpoint(checkpoint_dir, verbose=True): | |
| checkpoint_list = list() | |
| for el in os.listdir(checkpoint_dir): | |
| if el.endswith(".pt") and el != "best.pt": | |
| checkpoint_list.append(int(el.split(".")[0].split("_")[1])) | |
| if len(checkpoint_list) == 0: | |
| print("No previous checkpoints found, cannot reload.") | |
| return None | |
| checkpoint_list.sort(reverse=True) | |
| if verbose: | |
| print("Reloading checkpoint_{}.pt".format(checkpoint_list[0])) | |
| return os.path.join(checkpoint_dir, "checkpoint_{}.pt".format(checkpoint_list[0])) | |
| def make_pad_mask(lengths, xs=None, length_dim=-1, device=None): | |
| """ | |
| Make mask tensor containing indices of padded part. | |
| Args: | |
| lengths (LongTensor or List): Batch of lengths (B,). | |
| xs (Tensor, optional): The reference tensor. | |
| If set, masks will be the same shape as this tensor. | |
| length_dim (int, optional): Dimension indicator of the above tensor. | |
| See the example. | |
| Returns: | |
| Tensor: Mask tensor containing indices of padded part. | |
| dtype=torch.uint8 in PyTorch 1.2- | |
| dtype=torch.bool in PyTorch 1.2+ (including 1.2) | |
| """ | |
| if length_dim == 0: | |
| raise ValueError("length_dim cannot be 0: {}".format(length_dim)) | |
| if not isinstance(lengths, list): | |
| lengths = lengths.tolist() | |
| bs = int(len(lengths)) | |
| if xs is None: | |
| maxlen = int(max(lengths)) | |
| else: | |
| maxlen = xs.size(length_dim) | |
| if device is not None: | |
| seq_range = torch.arange(0, maxlen, dtype=torch.int64, device=device) | |
| else: | |
| seq_range = torch.arange(0, maxlen, dtype=torch.int64) | |
| seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen) | |
| seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1) | |
| mask = seq_range_expand >= seq_length_expand | |
| if xs is not None: | |
| assert xs.size(0) == bs, (xs.size(0), bs) | |
| if length_dim < 0: | |
| length_dim = xs.dim() + length_dim | |
| # ind = (:, None, ..., None, :, , None, ..., None) | |
| ind = tuple(slice(None) if i in (0, length_dim) else None for i in range(xs.dim())) | |
| mask = mask[ind].expand_as(xs).to(xs.device) | |
| return mask | |
| def make_non_pad_mask(lengths, xs=None, length_dim=-1, device=None): | |
| """ | |
| Make mask tensor containing indices of non-padded part. | |
| Args: | |
| lengths (LongTensor or List): Batch of lengths (B,). | |
| xs (Tensor, optional): The reference tensor. | |
| If set, masks will be the same shape as this tensor. | |
| length_dim (int, optional): Dimension indicator of the above tensor. | |
| See the example. | |
| Returns: | |
| ByteTensor: mask tensor containing indices of padded part. | |
| dtype=torch.uint8 in PyTorch 1.2- | |
| dtype=torch.bool in PyTorch 1.2+ (including 1.2) | |
| """ | |
| return ~make_pad_mask(lengths, xs, length_dim, device=device) | |
| def initialize(model, init): | |
| """ | |
| Initialize weights of a neural network module. | |
| Parameters are initialized using the given method or distribution. | |
| Args: | |
| model: Target. | |
| init: Method of initialization. | |
| """ | |
| # weight init | |
| for p in model.parameters(): | |
| if p.dim() > 1: | |
| if init == "xavier_uniform": | |
| torch.nn.init.xavier_uniform_(p.data) | |
| elif init == "xavier_normal": | |
| torch.nn.init.xavier_normal_(p.data) | |
| elif init == "kaiming_uniform": | |
| torch.nn.init.kaiming_uniform_(p.data, nonlinearity="relu") | |
| elif init == "kaiming_normal": | |
| torch.nn.init.kaiming_normal_(p.data, nonlinearity="relu") | |
| else: | |
| raise ValueError("Unknown initialization: " + init) | |
| # bias init | |
| for p in model.parameters(): | |
| if p.dim() == 1: | |
| p.data.zero_() | |
| # reset some modules with default init | |
| for m in model.modules(): | |
| if isinstance(m, (torch.nn.Embedding, torch.nn.LayerNorm)): | |
| m.reset_parameters() | |
| def pad_list(xs, pad_value): | |
| """ | |
| Perform padding for the list of tensors. | |
| Args: | |
| xs (List): List of Tensors [(T_1, `*`), (T_2, `*`), ..., (T_B, `*`)]. | |
| pad_value (float): Value for padding. | |
| Returns: | |
| Tensor: Padded tensor (B, Tmax, `*`). | |
| """ | |
| n_batch = len(xs) | |
| max_len = max(x.size(0) for x in xs) | |
| pad = xs[0].new(n_batch, max_len, *xs[0].size()[1:]).fill_(pad_value) | |
| for i in range(n_batch): | |
| pad[i, : xs[i].size(0)] = xs[i] | |
| return pad | |
| def subsequent_mask(size, device="cpu", dtype=torch.bool): | |
| """ | |
| Create mask for subsequent steps (size, size). | |
| :param int size: size of mask | |
| :param str device: "cpu" or "cuda" or torch.Tensor.device | |
| :param torch.dtype dtype: result dtype | |
| :rtype | |
| """ | |
| ret = torch.ones(size, size, device=device, dtype=dtype) | |
| return torch.tril(ret, out=ret) | |
| class ScorerInterface: | |
| """ | |
| Scorer interface for beam search. | |
| The scorer performs scoring of the all tokens in vocabulary. | |
| Examples: | |
| * Search heuristics | |
| * :class:`espnet.nets.scorers.length_bonus.LengthBonus` | |
| * Decoder networks of the sequence-to-sequence models | |
| * :class:`espnet.nets.pytorch_backend.nets.transformer.decoder.Decoder` | |
| * :class:`espnet.nets.pytorch_backend.nets.rnn.decoders.Decoder` | |
| * Neural language models | |
| * :class:`espnet.nets.pytorch_backend.lm.transformer.TransformerLM` | |
| * :class:`espnet.nets.pytorch_backend.lm.default.DefaultRNNLM` | |
| * :class:`espnet.nets.pytorch_backend.lm.seq_rnn.SequentialRNNLM` | |
| """ | |
| def init_state(self, x): | |
| """ | |
| Get an initial state for decoding (optional). | |
| Args: | |
| x (torch.Tensor): The encoded feature tensor | |
| Returns: initial state | |
| """ | |
| return None | |
| def select_state(self, state, i, new_id=None): | |
| """ | |
| Select state with relative ids in the main beam search. | |
| Args: | |
| state: Decoder state for prefix tokens | |
| i (int): Index to select a state in the main beam search | |
| new_id (int): New label index to select a state if necessary | |
| Returns: | |
| state: pruned state | |
| """ | |
| return None if state is None else state[i] | |
| def score(self, y, state, x): | |
| """ | |
| Score new token (required). | |
| Args: | |
| y (torch.Tensor): 1D torch.int64 prefix tokens. | |
| state: Scorer state for prefix tokens | |
| x (torch.Tensor): The encoder feature that generates ys. | |
| Returns: | |
| tuple[torch.Tensor, Any]: Tuple of | |
| scores for next token that has a shape of `(n_vocab)` | |
| and next state for ys | |
| """ | |
| raise NotImplementedError | |
| def final_score(self, state): | |
| """ | |
| Score eos (optional). | |
| Args: | |
| state: Scorer state for prefix tokens | |
| Returns: | |
| float: final score | |
| """ | |
| return 0.0 | |
| class BatchScorerInterface(ScorerInterface, ABC): | |
| def batch_init_state(self, x): | |
| """ | |
| Get an initial state for decoding (optional). | |
| Args: | |
| x (torch.Tensor): The encoded feature tensor | |
| Returns: initial state | |
| """ | |
| return self.init_state(x) | |
| def batch_score(self, ys, states, xs): | |
| """ | |
| Score new token batch (required). | |
| Args: | |
| ys (torch.Tensor): torch.int64 prefix tokens (n_batch, ylen). | |
| states (List[Any]): Scorer states for prefix tokens. | |
| xs (torch.Tensor): | |
| The encoder feature that generates ys (n_batch, xlen, n_feat). | |
| Returns: | |
| tuple[torch.Tensor, List[Any]]: Tuple of | |
| batchfied scores for next token with shape of `(n_batch, n_vocab)` | |
| and next state list for ys. | |
| """ | |
| scores = list() | |
| outstates = list() | |
| for i, (y, state, x) in enumerate(zip(ys, states, xs)): | |
| score, outstate = self.score(y, state, x) | |
| outstates.append(outstate) | |
| scores.append(score) | |
| scores = torch.cat(scores, 0).view(ys.shape[0], -1) | |
| return scores, outstates | |
| def to_device(m, x): | |
| """Send tensor into the device of the module. | |
| Args: | |
| m (torch.nn.Module): Torch module. | |
| x (Tensor): Torch tensor. | |
| Returns: | |
| Tensor: Torch tensor located in the same place as torch module. | |
| """ | |
| if isinstance(m, torch.nn.Module): | |
| device = next(m.parameters()).device | |
| elif isinstance(m, torch.Tensor): | |
| device = m.device | |
| else: | |
| raise TypeError( | |
| "Expected torch.nn.Module or torch.tensor, " f"bot got: {type(m)}" | |
| ) | |
| return x.to(device) | |