Spaces:
Runtime error
Runtime error
| """ | |
| Defines the Encoder, Decoder and Sequence to Sequence models | |
| used in this projet | |
| """ | |
| import logging | |
| import torch | |
| logging.basicConfig(level=logging.DEBUG) | |
| class Encoder(torch.nn.Module): | |
| def __init__( | |
| self, | |
| vocab_size: int, | |
| embeddings_dim: int, | |
| hidden_size: int, | |
| dropout: int, | |
| device, | |
| ): | |
| # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie | |
| super().__init__() | |
| self.device = device | |
| # On ajoute un mot supplémentaire au vocabulaire : | |
| # on s'en servira pour les mots inconnus | |
| self.embeddings = torch.nn.Embedding(vocab_size, embeddings_dim) | |
| self.embeddings.to(device) | |
| self.hidden = torch.nn.LSTM( | |
| embeddings_dim, hidden_size, dropout=dropout) | |
| # Comme on va calculer la log-vraisemblance, | |
| # c'est le log-softmax qui nous intéresse | |
| self.dropout = torch.nn.Dropout(dropout) | |
| self.dropout.to(self.device) | |
| # Dropout | |
| def forward(self, inpt): | |
| inpt.to(self.device) | |
| emb = self.dropout(self.embeddings(inpt)).to(self.device) | |
| emb = emb.to(self.device) | |
| output, (hidden, cell) = self.hidden(emb) | |
| output.to(self.device) | |
| hidden = hidden.to(self.device) | |
| cell = cell.to(self.device) | |
| return hidden, cell | |
| class Decoder(torch.nn.Module): | |
| def __init__( | |
| self, | |
| vocab_size: int, | |
| embeddings_dim: int, | |
| hidden_size: int, | |
| dropout: int, | |
| device, | |
| ): | |
| # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie | |
| super().__init__() | |
| self.device = device | |
| # On ajoute un mot supplémentaire au vocabulaire : | |
| # on s'en servira pour les mots inconnus | |
| self.vocab_size = vocab_size | |
| self.embeddings = torch.nn.Embedding(vocab_size, embeddings_dim) | |
| self.hidden = torch.nn.LSTM( | |
| embeddings_dim, hidden_size, dropout=dropout) | |
| self.output = torch.nn.Linear(hidden_size, vocab_size) | |
| # Comme on va calculer la log-vraisemblance, | |
| # c'est le log-softmax qui nous intéresse | |
| self.dropout = torch.nn.Dropout(dropout) | |
| def forward(self, input, hidden, cell): | |
| input = input.unsqueeze(0) | |
| input = input.to(self.device) | |
| emb = self.dropout(self.embeddings(input)).to(self.device) | |
| emb = emb.to(self.device) | |
| output, (hidden, cell) = self.hidden(emb, (hidden, cell)) | |
| output = output.to(self.device) | |
| out = self.output(output.squeeze(0)).to(self.device) | |
| return out, hidden, cell | |
| class EncoderDecoderModel(torch.nn.Module): | |
| def __init__(self, encoder, decoder, vectoriser, device): | |
| # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie | |
| super().__init__() | |
| self.encoder = encoder | |
| self.decoder = decoder | |
| self.vectoriser = vectoriser | |
| self.device = device | |
| def forward(self, source, num_beams=3, summary_len=0.2): | |
| """ | |
| :param source: tensor | |
| the input text | |
| :param num_beams: int | |
| the number of outputs to iterate on for beam_search | |
| :param summary_len: int | |
| length ratio of the summary compared to the text | |
| """ | |
| # The ratio must be inferior to 1 to allow text compression | |
| assert summary_len < 1, f"number lesser than 1 expected, got {summary_len}" | |
| # Expected summary length (in words) | |
| target_len = int(summary_len * source.shape[0]) | |
| # Word Embedding length | |
| target_vocab_size = self.decoder.vocab_size | |
| # Output of the right format (expected summmary length x word | |
| # embedding length) filled with zeros. On each iteration, we | |
| # will replace one of the row of this matrix with the choosen | |
| # word embedding | |
| outputs = torch.zeros(target_len, target_vocab_size) | |
| # put the tensors on the device (useless if CPU bus very useful in | |
| # case of GPU) | |
| outputs.to(self.device) | |
| source.to(self.device) | |
| # last hidden state of the encoder is used | |
| # as the initial hidden state of the decoder | |
| # Encode the input text | |
| hidden, cell = self.encoder(source) | |
| # Encode the first word of the summary | |
| input = self.vectoriser.encode("<start>") | |
| # put the tensors on the device | |
| hidden.to(self.device) | |
| cell.to(self.device) | |
| input.to(self.device) | |
| # BEAM SEARCH # | |
| # If you wonder, b stands for better | |
| values = None | |
| b_outputs = torch.zeros(target_len, target_vocab_size).to(self.device) | |
| b_outputs.to(self.device) | |
| for i in range(1, target_len): | |
| # On va déterminer autant de mot que la taille du texte souhaité | |
| # insert input token embedding, previous hidden and previous cell states | |
| # receive output tensor (predictions) and new hidden and cell | |
| # states. | |
| # replace predictions in a tensor holding predictions for each token | |
| # logging.debug(f"output : {output}") | |
| ####### DÉBUT DU BEAM SEARCH ########## | |
| if values is None: | |
| # On calcule une première fois les premières probabilité de mot | |
| # après <start> | |
| output, hidden, cell = self.decoder(input, hidden, cell) | |
| output.to(self.device) | |
| b_hidden = hidden | |
| b_cell = cell | |
| # On choisi les k meilleurs scores pour choisir la meilleure probabilité | |
| # sur deux itérations ensuite | |
| values, indices = output.topk(num_beams, sorted=True) | |
| else: | |
| # On instancie le dictionnaire qui contiendra les scores pour | |
| # chaque possibilité | |
| scores = {} | |
| # Pour chacune des meilleures valeurs, on va calculer l'output | |
| for value, indice in zip(values, indices): | |
| indice.to(self.device) | |
| # On calcule l'output | |
| b_output, b_hidden, b_cell = self.decoder( | |
| indice, b_hidden, b_cell) | |
| # On empêche le modèle de se répéter d'un mot sur l'autre en mettant | |
| # de force la probabilité du mot précédent à 0 | |
| b_output[indice] = torch.zeros(1) | |
| # On choisit le meilleur résultat pour cette possibilité | |
| highest_value = torch.log(b_output).max() | |
| # On calcule le score des 2 itérations ensembles | |
| score = highest_value * torch.log(value) | |
| scores[score] = (b_output, b_hidden, b_cell) | |
| # On garde le meilleur score sur LES 2 ITÉRATIONS | |
| b_output, b_hidden, b_cell = scores.get(max(scores)) | |
| # Et du coup on rempli la place de i-1 à la place de i | |
| b_outputs[i - 1] = b_output.to(self.device) | |
| # On instancies nos nouvelles valeurs pour la prochaine | |
| # itération | |
| values, indices = b_output.topk(num_beams, sorted=True) | |
| ################################## | |
| # outputs[i] = output.to(self.device) | |
| # input = output.argmax(dim=-1).to(self.device) | |
| # input.to(self.device) | |
| # logging.debug(f"{vectoriser.decode(outputs.argmax(dim=-1))}") | |
| return b_outputs.to(self.device) | |