| | |
| | """ Model definition functions and weight loading. |
| | """ |
| |
|
| | from __future__ import print_function, division, unicode_literals |
| |
|
| | from os.path import exists |
| |
|
| | import torch |
| | import torch.nn as nn |
| | from torch.autograd import Variable |
| | from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, PackedSequence |
| |
|
| | from torchmoji.lstm import LSTMHardSigmoid |
| | from torchmoji.attlayer import Attention |
| | from torchmoji.global_variables import NB_TOKENS, NB_EMOJI_CLASSES |
| |
|
| |
|
| | def torchmoji_feature_encoding(weight_path, return_attention=False): |
| | """ Loads the pretrained torchMoji model for extracting features |
| | from the penultimate feature layer. In this way, it transforms |
| | the text into its emotional encoding. |
| | |
| | # Arguments: |
| | weight_path: Path to model weights to be loaded. |
| | return_attention: If true, output will include weight of each input token |
| | used for the prediction |
| | |
| | # Returns: |
| | Pretrained model for encoding text into feature vectors. |
| | """ |
| |
|
| | model = TorchMoji(nb_classes=None, |
| | nb_tokens=NB_TOKENS, |
| | feature_output=True, |
| | return_attention=return_attention) |
| | load_specific_weights(model, weight_path, exclude_names=['output_layer']) |
| | return model |
| |
|
| |
|
| | def torchmoji_emojis(weight_path, return_attention=False): |
| | """ Loads the pretrained torchMoji model for extracting features |
| | from the penultimate feature layer. In this way, it transforms |
| | the text into its emotional encoding. |
| | |
| | # Arguments: |
| | weight_path: Path to model weights to be loaded. |
| | return_attention: If true, output will include weight of each input token |
| | used for the prediction |
| | |
| | # Returns: |
| | Pretrained model for encoding text into feature vectors. |
| | """ |
| |
|
| | model = TorchMoji(nb_classes=NB_EMOJI_CLASSES, |
| | nb_tokens=NB_TOKENS, |
| | return_attention=return_attention) |
| | model.load_state_dict(torch.load(weight_path)) |
| | return model |
| |
|
| |
|
| | def torchmoji_transfer(nb_classes, weight_path=None, extend_embedding=0, |
| | embed_dropout_rate=0.1, final_dropout_rate=0.5): |
| | """ Loads the pretrained torchMoji model for finetuning/transfer learning. |
| | Does not load weights for the softmax layer. |
| | |
| | Note that if you are planning to use class average F1 for evaluation, |
| | nb_classes should be set to 2 instead of the actual number of classes |
| | in the dataset, since binary classification will be performed on each |
| | class individually. |
| | |
| | Note that for the 'new' method, weight_path should be left as None. |
| | |
| | # Arguments: |
| | nb_classes: Number of classes in the dataset. |
| | weight_path: Path to model weights to be loaded. |
| | extend_embedding: Number of tokens that have been added to the |
| | vocabulary on top of NB_TOKENS. If this number is larger than 0, |
| | the embedding layer's dimensions are adjusted accordingly, with the |
| | additional weights being set to random values. |
| | embed_dropout_rate: Dropout rate for the embedding layer. |
| | final_dropout_rate: Dropout rate for the final Softmax layer. |
| | |
| | # Returns: |
| | Model with the given parameters. |
| | """ |
| |
|
| | model = TorchMoji(nb_classes=nb_classes, |
| | nb_tokens=NB_TOKENS + extend_embedding, |
| | embed_dropout_rate=embed_dropout_rate, |
| | final_dropout_rate=final_dropout_rate, |
| | output_logits=True) |
| | if weight_path is not None: |
| | load_specific_weights(model, weight_path, |
| | exclude_names=['output_layer'], |
| | extend_embedding=extend_embedding) |
| | return model |
| |
|
| |
|
| | class TorchMoji(nn.Module): |
| | def __init__(self, nb_classes, nb_tokens, feature_output=False, output_logits=False, |
| | embed_dropout_rate=0, final_dropout_rate=0, return_attention=False): |
| | """ |
| | torchMoji model. |
| | IMPORTANT: The model is loaded in evaluation mode by default (self.eval()) |
| | |
| | # Arguments: |
| | nb_classes: Number of classes in the dataset. |
| | nb_tokens: Number of tokens in the dataset (i.e. vocabulary size). |
| | feature_output: If True the model returns the penultimate |
| | feature vector rather than Softmax probabilities |
| | (defaults to False). |
| | output_logits: If True the model returns logits rather than probabilities |
| | (defaults to False). |
| | embed_dropout_rate: Dropout rate for the embedding layer. |
| | final_dropout_rate: Dropout rate for the final Softmax layer. |
| | return_attention: If True the model also returns attention weights over the sentence |
| | (defaults to False). |
| | """ |
| | super(TorchMoji, self).__init__() |
| |
|
| | embedding_dim = 256 |
| | hidden_size = 512 |
| | attention_size = 4 * hidden_size + embedding_dim |
| |
|
| | self.feature_output = feature_output |
| | self.embed_dropout_rate = embed_dropout_rate |
| | self.final_dropout_rate = final_dropout_rate |
| | self.return_attention = return_attention |
| | self.hidden_size = hidden_size |
| | self.output_logits = output_logits |
| | self.nb_classes = nb_classes |
| |
|
| | self.add_module('embed', nn.Embedding(nb_tokens, embedding_dim)) |
| | |
| | |
| | self.add_module('embed_dropout', nn.Dropout2d(embed_dropout_rate)) |
| | self.add_module('lstm_0', LSTMHardSigmoid(embedding_dim, hidden_size, batch_first=True, bidirectional=True)) |
| | self.add_module('lstm_1', LSTMHardSigmoid(hidden_size*2, hidden_size, batch_first=True, bidirectional=True)) |
| | self.add_module('attention_layer', Attention(attention_size=attention_size, return_attention=return_attention)) |
| | if not feature_output: |
| | self.add_module('final_dropout', nn.Dropout(final_dropout_rate)) |
| | if output_logits: |
| | self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1))) |
| | else: |
| | self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1), |
| | nn.Softmax() if self.nb_classes > 2 else nn.Sigmoid())) |
| | self.init_weights() |
| | |
| | self.eval() |
| |
|
| | def init_weights(self): |
| | """ |
| | Here we reproduce Keras default initialization weights for consistency with Keras version |
| | """ |
| | ih = (param.data for name, param in self.named_parameters() if 'weight_ih' in name) |
| | hh = (param.data for name, param in self.named_parameters() if 'weight_hh' in name) |
| | b = (param.data for name, param in self.named_parameters() if 'bias' in name) |
| | nn.init.uniform(self.embed.weight.data, a=-0.5, b=0.5) |
| | for t in ih: |
| | nn.init.xavier_uniform(t) |
| | for t in hh: |
| | nn.init.orthogonal(t) |
| | for t in b: |
| | nn.init.constant(t, 0) |
| | if not self.feature_output: |
| | nn.init.xavier_uniform(self.output_layer[0].weight.data) |
| |
|
| | def forward(self, input_seqs): |
| | """ Forward pass. |
| | |
| | # Arguments: |
| | input_seqs: Can be one of Numpy array, Torch.LongTensor, Torch.Variable, Torch.PackedSequence. |
| | |
| | # Return: |
| | Same format as input format (except for PackedSequence returned as Variable). |
| | """ |
| | |
| | return_numpy = False |
| | return_tensor = False |
| | if isinstance(input_seqs, (torch.LongTensor, torch.cuda.LongTensor)): |
| | input_seqs = Variable(input_seqs) |
| | return_tensor = True |
| | elif not isinstance(input_seqs, Variable): |
| | input_seqs = Variable(torch.from_numpy(input_seqs.astype('int64')).long()) |
| | return_numpy = True |
| |
|
| | |
| | reorder_output = False |
| | if not isinstance(input_seqs, PackedSequence): |
| | ho = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
| | co = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
| |
|
| | |
| | input_lengths = torch.LongTensor([torch.max(input_seqs[i, :].data.nonzero()) + 1 for i in range(input_seqs.size()[0])]) |
| | input_lengths, perm_idx = input_lengths.sort(0, descending=True) |
| | input_seqs = input_seqs[perm_idx][:, :input_lengths.max()] |
| |
|
| | |
| | packed_input = pack_padded_sequence(input_seqs, input_lengths.cpu().numpy(), batch_first=True) |
| | reorder_output = True |
| | else: |
| | ho = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
| | co = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
| | input_lengths = input_seqs.batch_sizes |
| | packed_input = input_seqs |
| |
|
| | hidden = (Variable(ho, requires_grad=False), Variable(co, requires_grad=False)) |
| |
|
| | |
| | x = self.embed(packed_input.data) |
| | x = nn.Tanh()(x) |
| |
|
| | |
| | x = self.embed_dropout(x) |
| |
|
| | |
| | packed_input = PackedSequence(x, packed_input.batch_sizes) |
| |
|
| | |
| | |
| | lstm_0_output, _ = self.lstm_0(packed_input, hidden) |
| | lstm_1_output, _ = self.lstm_1(lstm_0_output, hidden) |
| |
|
| | |
| | packed_input = PackedSequence(torch.cat((lstm_1_output.data, |
| | lstm_0_output.data, |
| | packed_input.data), dim=1), |
| | packed_input.batch_sizes) |
| |
|
| | input_seqs, _ = pad_packed_sequence(packed_input, batch_first=True) |
| |
|
| | x, att_weights = self.attention_layer(input_seqs, input_lengths) |
| |
|
| | |
| | if not self.feature_output: |
| | x = self.final_dropout(x) |
| | outputs = self.output_layer(x) |
| | else: |
| | outputs = x |
| |
|
| | |
| | if reorder_output: |
| | reorered = Variable(outputs.data.new(outputs.size())) |
| | reorered[perm_idx] = outputs |
| | outputs = reorered |
| |
|
| | |
| | if return_tensor: |
| | outputs = outputs.data |
| | if return_numpy: |
| | outputs = outputs.data.numpy() |
| |
|
| | if self.return_attention: |
| | return outputs, att_weights |
| | else: |
| | return outputs |
| |
|
| |
|
| | def load_specific_weights(model, weight_path, exclude_names=[], extend_embedding=0, verbose=True): |
| | """ Loads model weights from the given file path, excluding any |
| | given layers. |
| | |
| | # Arguments: |
| | model: Model whose weights should be loaded. |
| | weight_path: Path to file containing model weights. |
| | exclude_names: List of layer names whose weights should not be loaded. |
| | extend_embedding: Number of new words being added to vocabulary. |
| | verbose: Verbosity flag. |
| | |
| | # Raises: |
| | ValueError if the file at weight_path does not exist. |
| | """ |
| | if not exists(weight_path): |
| | raise ValueError('ERROR (load_weights): The weights file at {} does ' |
| | 'not exist. Refer to the README for instructions.' |
| | .format(weight_path)) |
| |
|
| | if extend_embedding and 'embed' in exclude_names: |
| | raise ValueError('ERROR (load_weights): Cannot extend a vocabulary ' |
| | 'without loading the embedding weights.') |
| |
|
| | |
| | |
| | weights = torch.load(weight_path) |
| | for key, weight in weights.items(): |
| | if any(excluded in key for excluded in exclude_names): |
| | if verbose: |
| | print('Ignoring weights for {}'.format(key)) |
| | continue |
| |
|
| | try: |
| | model_w = model.state_dict()[key] |
| | except KeyError: |
| | raise KeyError("Weights had parameters {},".format(key) |
| | + " but could not find this parameters in model.") |
| |
|
| | if verbose: |
| | print('Loading weights for {}'.format(key)) |
| |
|
| | |
| | |
| | if 'embed' in key and extend_embedding > 0: |
| | weight = torch.cat((weight, model_w[NB_TOKENS:, :]), dim=0) |
| | if verbose: |
| | print('Extended vocabulary for embedding layer ' + |
| | 'from {} to {} tokens.'.format( |
| | NB_TOKENS, NB_TOKENS + extend_embedding)) |
| | try: |
| | model_w.copy_(weight) |
| | except: |
| | print('While copying the weigths named {}, whose dimensions in the model are' |
| | ' {} and whose dimensions in the saved file are {}, ...'.format( |
| | key, model_w.size(), weight.size())) |
| | raise |
| |
|