Spaces:
Runtime error
Runtime error
| ''' | |
| Author: Qiguang Chen | |
| Date: 2023-01-11 10:39:26 | |
| LastEditors: Qiguang Chen | |
| LastEditTime: 2023-02-17 21:08:19 | |
| Description: non-pretrained encoder model | |
| ''' | |
| import math | |
| import einops | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence | |
| from common.utils import HiddenData, InputData | |
| from model.encoder.base_encoder import BaseEncoder | |
| class NonPretrainedEncoder(BaseEncoder): | |
| """ | |
| Encoder structure based on bidirectional LSTM and self-attention. | |
| """ | |
| def __init__(self, **config): | |
| """ init non-pretrained encoder | |
| Args: | |
| config (dict): | |
| embedding (dict): | |
| dropout_rate (float): dropout rate. | |
| load_embedding_name (str): None if not use pretrained embedding or embedding name like "glove.6B.300d.txt". | |
| embedding_matrix (Tensor, Optional): embedding matrix tensor. Enabled if load_embedding_name is not None. | |
| vocab_size (str, Optional): vocabulary size. Enabled if load_embedding_name is None. | |
| lstm (dict): | |
| output_dim (int): lstm output dim. | |
| bidirectional (bool): if use BiLSTM or LSTM. | |
| layer_num (int): number of layers. | |
| dropout_rate (float): dropout rate. | |
| attention (dict, Optional): | |
| dropout_rate (float): dropout rate. | |
| hidden_dim (int): attention hidden dim. | |
| output_dim (int): attention output dim. | |
| unflat_attention (dict, optional): Enabled if attention is not None. | |
| dropout_rate (float): dropout rate. | |
| """ | |
| super(NonPretrainedEncoder, self).__init__() | |
| self.config = config | |
| # Embedding Initialization | |
| embed_config = config["embedding"] | |
| self.__embedding_dim = embed_config["embedding_dim"] | |
| if embed_config.get("load_embedding_name") and embed_config.get("embedding_matrix"): | |
| self.__embedding_layer = nn.Embedding.from_pretrained(embed_config["embedding_matrix"], padding_idx=0) | |
| else: | |
| self.__embedding_layer = nn.Embedding( | |
| embed_config["vocab_size"], self.__embedding_dim | |
| ) | |
| self.__embedding_dropout_layer = nn.Dropout(embed_config["dropout_rate"]) | |
| # LSTM Initialization | |
| lstm_config = config["lstm"] | |
| self.__hidden_size = lstm_config["output_dim"] | |
| self.__lstm_layer = nn.LSTM( | |
| input_size=self.__embedding_dim, | |
| hidden_size=lstm_config["output_dim"] // 2, | |
| batch_first=True, | |
| bidirectional=lstm_config["bidirectional"], | |
| dropout=lstm_config["dropout_rate"], | |
| num_layers=lstm_config["layer_num"] | |
| ) | |
| if self.config.get("attention"): | |
| # Attention Initialization | |
| att_config = config["attention"] | |
| self.__attention_dropout_layer = nn.Dropout(att_config["dropout_rate"]) | |
| self.__attention_layer = QKVAttention( | |
| self.__embedding_dim, self.__embedding_dim, self.__embedding_dim, | |
| att_config["hidden_dim"], att_config["output_dim"], att_config["dropout_rate"] | |
| ) | |
| if self.config.get("unflat_attention"): | |
| unflat_att_config = config["unflat_attention"] | |
| self.__sentattention = UnflatSelfAttention( | |
| lstm_config["output_dim"] + att_config["output_dim"], | |
| unflat_att_config["dropout_rate"] | |
| ) | |
| def forward(self, inputs: InputData): | |
| """ Forward process for Non-Pretrained Encoder. | |
| Args: | |
| inputs: padded input ids, masks. | |
| Returns: | |
| encoded hidden vectors. | |
| """ | |
| # LSTM Encoder | |
| # Padded_text should be instance of LongTensor. | |
| embedded_text = self.__embedding_layer(inputs.input_ids) | |
| dropout_text = self.__embedding_dropout_layer(embedded_text) | |
| seq_lens = inputs.attention_mask.sum(-1).detach().cpu() | |
| # Pack and Pad process for input of variable length. | |
| packed_text = pack_padded_sequence(dropout_text, seq_lens, batch_first=True, enforce_sorted=False) | |
| lstm_hiddens, (h_last, c_last) = self.__lstm_layer(packed_text) | |
| padded_hiddens, _ = pad_packed_sequence(lstm_hiddens, batch_first=True) | |
| if self.config.get("attention"): | |
| # Attention Encoder | |
| dropout_text = self.__attention_dropout_layer(embedded_text) | |
| attention_hiddens = self.__attention_layer( | |
| dropout_text, dropout_text, dropout_text, mask=inputs.attention_mask | |
| ) | |
| # Attention + LSTM | |
| hiddens = torch.cat([attention_hiddens, padded_hiddens], dim=-1) | |
| hidden = HiddenData(None, hiddens) | |
| if self.config.get("return_with_input"): | |
| hidden.add_input(inputs) | |
| if self.config.get("return_sentence_level_hidden"): | |
| if self.config.get("unflat_attention"): | |
| sentence = self.__sentattention(hiddens, seq_lens) | |
| else: | |
| sentence = hiddens[:, 0, :] | |
| hidden.update_intent_hidden_state(sentence) | |
| else: | |
| sentence_hidden = None | |
| if self.config.get("return_sentence_level_hidden"): | |
| sentence_hidden = torch.cat((h_last[-1], h_last[-1], c_last[-1], c_last[-2]), dim=-1) | |
| hidden = HiddenData(sentence_hidden, padded_hiddens) | |
| if self.config.get("return_with_input"): | |
| hidden.add_input(inputs) | |
| return hidden | |
| class QKVAttention(nn.Module): | |
| """ | |
| Attention mechanism based on Query-Key-Value architecture. And | |
| especially, when query == key == value, it's self-attention. | |
| """ | |
| def __init__(self, query_dim, key_dim, value_dim, hidden_dim, output_dim, dropout_rate): | |
| super(QKVAttention, self).__init__() | |
| # Record hyper-parameters. | |
| self.__query_dim = query_dim | |
| self.__key_dim = key_dim | |
| self.__value_dim = value_dim | |
| self.__hidden_dim = hidden_dim | |
| self.__output_dim = output_dim | |
| self.__dropout_rate = dropout_rate | |
| # Declare network structures. | |
| self.__query_layer = nn.Linear(self.__query_dim, self.__hidden_dim) | |
| self.__key_layer = nn.Linear(self.__key_dim, self.__hidden_dim) | |
| self.__value_layer = nn.Linear(self.__value_dim, self.__output_dim) | |
| self.__dropout_layer = nn.Dropout(p=self.__dropout_rate) | |
| def forward(self, input_query, input_key, input_value, mask=None): | |
| """ The forward propagation of attention. | |
| Here we require the first dimension of input key | |
| and value are equal. | |
| Args: | |
| input_query: is query tensor, (n, d_q) | |
| input_key: is key tensor, (m, d_k) | |
| input_value: is value tensor, (m, d_v) | |
| Returns: | |
| attention based tensor, (n, d_h) | |
| """ | |
| # Linear transform to fine-tune dimension. | |
| linear_query = self.__query_layer(input_query) | |
| linear_key = self.__key_layer(input_key) | |
| linear_value = self.__value_layer(input_value) | |
| score_tensor = torch.matmul( | |
| linear_query, | |
| linear_key.transpose(-2, -1) | |
| ) / math.sqrt(self.__hidden_dim) | |
| if mask is not None: | |
| attn_mask = einops.repeat((mask == 0), "b l -> b l h", h=score_tensor.shape[-1]) | |
| score_tensor = score_tensor.masked_fill_(attn_mask, -float(1e20)) | |
| score_tensor = F.softmax(score_tensor, dim=-1) | |
| forced_tensor = torch.matmul(score_tensor, linear_value) | |
| forced_tensor = self.__dropout_layer(forced_tensor) | |
| return forced_tensor | |
| class UnflatSelfAttention(nn.Module): | |
| """ | |
| scores each element of the sequence with a linear layer and uses the normalized scores to compute a context over the sequence. | |
| """ | |
| def __init__(self, d_hid, dropout=0.): | |
| super().__init__() | |
| self.scorer = nn.Linear(d_hid, 1) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, inp, lens): | |
| batch_size, seq_len, d_feat = inp.size() | |
| inp = self.dropout(inp) | |
| scores = self.scorer(inp.contiguous().view(-1, d_feat)).view(batch_size, seq_len) | |
| max_len = max(lens) | |
| for i, l in enumerate(lens): | |
| if l < max_len: | |
| scores.data[i, l:] = -np.inf | |
| scores = F.softmax(scores, dim=1) | |
| context = scores.unsqueeze(2).expand_as(inp).mul(inp).sum(1) | |
| return context |