| | |
| | import torch |
| | import torch.nn.functional as F |
| |
|
| |
|
| | def sequence_mask(length, max_length=None): |
| | if max_length is None: |
| | max_length = length.max() |
| | x = torch.arange(max_length, dtype=length.dtype, device=length.device) |
| | return x.unsqueeze(0) < length.unsqueeze(1) |
| |
|
| |
|
| | def make_pad_mask(lengths: torch.Tensor, max_len: int = 0) -> torch.Tensor: |
| | """ |
| | Args: |
| | lengths: |
| | A 1-D tensor containing sentence lengths. |
| | max_len: |
| | The length of masks. |
| | Returns: |
| | Return a 2-D bool tensor, where masked positions |
| | are filled with `True` and non-masked positions are |
| | filled with `False`. |
| | |
| | #>>> lengths = torch.tensor([1, 3, 2, 5]) |
| | #>>> make_pad_mask(lengths) |
| | tensor([[False, True, True, True, True], |
| | [False, False, False, True, True], |
| | [False, False, True, True, True], |
| | [False, False, False, False, False]]) |
| | """ |
| | assert lengths.ndim == 1, lengths.ndim |
| | max_len = max(max_len, lengths.max()) |
| | n = lengths.size(0) |
| | seq_range = torch.arange(0, max_len, device=lengths.device) |
| | expaned_lengths = seq_range.unsqueeze(0).expand(n, max_len) |
| |
|
| | return expaned_lengths >= lengths.unsqueeze(-1) |
| |
|
| |
|
| | |
| | def top_k_top_p_filtering( |
| | logits, top_k=0, top_p=1.0, filter_value=-float("Inf"), min_tokens_to_keep=1 |
| | ): |
| | """Filter a distribution of logits using top-k and/or nucleus (top-p) filtering |
| | Args: |
| | logits: logits distribution shape (batch size, vocabulary size) |
| | if top_k > 0: keep only top k tokens with highest probability (top-k filtering). |
| | if top_p < 1.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). |
| | Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751) |
| | Make sure we keep at least min_tokens_to_keep per batch example in the output |
| | From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317 |
| | """ |
| | if top_k > 0: |
| | top_k = min(max(top_k, min_tokens_to_keep), logits.size(-1)) |
| | |
| | indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] |
| | logits[indices_to_remove] = filter_value |
| |
|
| | if top_p < 1.0: |
| | sorted_logits, sorted_indices = torch.sort(logits, descending=True) |
| | cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) |
| |
|
| | |
| | sorted_indices_to_remove = cumulative_probs > top_p |
| | if min_tokens_to_keep > 1: |
| | |
| | sorted_indices_to_remove[..., :min_tokens_to_keep] = 0 |
| | |
| | sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() |
| | sorted_indices_to_remove[..., 0] = 0 |
| |
|
| | |
| | indices_to_remove = sorted_indices_to_remove.scatter( |
| | 1, sorted_indices, sorted_indices_to_remove |
| | ) |
| | logits[indices_to_remove] = filter_value |
| | return logits |
| |
|
| |
|
| | def topk_sampling(logits, top_k=10, top_p=1.0, temperature=1.0): |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | if temperature != 1.0: |
| | logits = logits / temperature |
| | |
| | logits = top_k_top_p_filtering(logits, top_k=top_k, top_p=top_p) |
| | |
| | token = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1) |
| | return token |
| |
|
| |
|
| | from typing import Optional, Tuple |
| |
|
| |
|
| | def multinomial_sample_one_no_sync( |
| | probs_sort, |
| | ): |
| | q = torch.empty_like(probs_sort).exponential_(1) |
| | return torch.argmax(probs_sort / q, dim=-1, keepdim=True).to(dtype=torch.int) |
| |
|
| |
|
| | def logits_to_probs( |
| | logits, |
| | previous_tokens: Optional[torch.Tensor] = None, |
| | temperature: float = 1.0, |
| | top_k: Optional[int] = None, |
| | top_p: Optional[int] = None, |
| | repetition_penalty: float = 1.0, |
| | ): |
| | previous_tokens = previous_tokens.squeeze() |
| | |
| | |
| | if previous_tokens is not None and repetition_penalty != 1.0: |
| | previous_tokens = previous_tokens.long() |
| | score = torch.gather(logits, dim=0, index=previous_tokens) |
| | score = torch.where( |
| | score < 0, score * repetition_penalty, score / repetition_penalty |
| | ) |
| | logits.scatter_(dim=0, index=previous_tokens, src=score) |
| |
|
| | if top_p is not None and top_p < 1.0: |
| | sorted_logits, sorted_indices = torch.sort(logits, descending=True) |
| | cum_probs = torch.cumsum( |
| | torch.nn.functional.softmax(sorted_logits, dim=-1), dim=-1 |
| | ) |
| | sorted_indices_to_remove = cum_probs > top_p |
| | sorted_indices_to_remove[0] = False |
| | indices_to_remove = sorted_indices_to_remove.scatter( |
| | dim=0, index=sorted_indices, src=sorted_indices_to_remove |
| | ) |
| | logits = logits.masked_fill(indices_to_remove, -float("Inf")) |
| |
|
| | logits = logits / max(temperature, 1e-5) |
| |
|
| | if top_k is not None: |
| | v, _ = torch.topk(logits, min(top_k, logits.size(-1))) |
| | pivot = v.select(-1, -1).unsqueeze(-1) |
| | logits = torch.where(logits < pivot, -float("Inf"), logits) |
| |
|
| | probs = torch.nn.functional.softmax(logits, dim=-1) |
| | return probs |
| |
|
| |
|
| | def sample( |
| | logits, |
| | previous_tokens: Optional[torch.Tensor] = None, |
| | **sampling_kwargs, |
| | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | probs = logits_to_probs( |
| | logits=logits, previous_tokens=previous_tokens, **sampling_kwargs |
| | ) |
| | idx_next = multinomial_sample_one_no_sync(probs) |
| | return idx_next, probs |
| |
|