Spaces:
Runtime error
Runtime error
| """ | |
| Defines the Encoder, Decoder and Sequence to Sequence models | |
| used in this projet | |
| """ | |
| import logging | |
| import torch | |
| import dataloader | |
| logging.basicConfig(level=logging.DEBUG) | |
| data1 = dataloader.Data("data_extract/train_extract.jsonl") | |
| words = data1.get_words() | |
| vectoriser = dataloader.Vectoriser(words) | |
| class Encoder(torch.nn.Module): | |
| def __init__( | |
| self, | |
| vocab_size: int, | |
| embeddings_dim: int, | |
| hidden_size: int, | |
| dropout: int, | |
| device, | |
| ): | |
| # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie | |
| super().__init__() | |
| self.device = device | |
| # On ajoute un mot supplémentaire au vocabulaire : | |
| # on s'en servira pour les mots inconnus | |
| self.embeddings = torch.nn.Embedding(vocab_size, embeddings_dim) | |
| self.embeddings.to(device) | |
| self.hidden = torch.nn.LSTM(embeddings_dim, hidden_size, dropout=dropout) | |
| # Comme on va calculer la log-vraisemblance, | |
| # c'est le log-softmax qui nous intéresse | |
| self.dropout = torch.nn.Dropout(dropout) | |
| self.dropout.to(self.device) | |
| # Dropout | |
| def forward(self, inpt): | |
| inpt.to(self.device) | |
| emb = self.dropout(self.embeddings(inpt)).to(self.device) | |
| emb = emb.to(self.device) | |
| output, (hidden, cell) = self.hidden(emb) | |
| output.to(self.device) | |
| hidden = hidden.to(self.device) | |
| cell = cell.to(self.device) | |
| return hidden, cell | |
| class Decoder(torch.nn.Module): | |
| def __init__( | |
| self, | |
| vocab_size: int, | |
| embeddings_dim: int, | |
| hidden_size: int, | |
| dropout: int, | |
| device, | |
| ): | |
| # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie | |
| super().__init__() | |
| self.device = device | |
| # On ajoute un mot supplémentaire au vocabulaire : | |
| # on s'en servira pour les mots inconnus | |
| self.vocab_size = vocab_size | |
| self.embeddings = torch.nn.Embedding(vocab_size, embeddings_dim) | |
| self.hidden = torch.nn.LSTM(embeddings_dim, hidden_size, dropout=dropout) | |
| self.output = torch.nn.Linear(hidden_size, vocab_size) | |
| # Comme on va calculer la log-vraisemblance, | |
| # c'est le log-softmax qui nous intéresse | |
| self.dropout = torch.nn.Dropout(dropout) | |
| def forward(self, input, hidden, cell): | |
| input = input.unsqueeze(0) | |
| input = input.to(self.device) | |
| emb = self.dropout(self.embeddings(input)).to(self.device) | |
| emb = emb.to(self.device) | |
| output, (hidden, cell) = self.hidden(emb, (hidden, cell)) | |
| output = output.to(self.device) | |
| out = self.output(output.squeeze(0)).to(self.device) | |
| return out, hidden, cell | |
| class EncoderDecoderModel(torch.nn.Module): | |
| def __init__(self, encoder, decoder, device): | |
| # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie | |
| super().__init__() | |
| self.encoder = encoder | |
| self.decoder = decoder | |
| self.device = device | |
| def forward(self, source, num_beams=3): | |
| # CHANGER LA TARGET LEN POUR QQCH DE MODULABLE | |
| target_len = int(1 * source.shape[0]) # Taille du texte que l'on recherche | |
| target_vocab_size = self.decoder.vocab_size # Taille du mot | |
| # tensor to store decoder outputs | |
| outputs = torch.zeros(target_len, target_vocab_size).to( | |
| self.device | |
| ) # Instenciation d'une matrice de zeros de taille (taille du texte, taille du mot) | |
| outputs.to( | |
| self.device | |
| ) # Une idiosyncrasie de torch pour mettre le tensor sur le GPU | |
| # last hidden state of the encoder is used as the initial hidden state of the decoder | |
| source.to( | |
| self.device | |
| ) # Une idiosyncrasie de torch pour mettre le tensor sur le GPU | |
| hidden, cell = self.encoder(source) # Encode le texte sous forme de vecteur | |
| hidden.to( | |
| self.device | |
| ) # Une idiosyncrasie de torch pour mettre le tensor sur le GPU | |
| cell.to( | |
| self.device | |
| ) # Une idiosyncrasie de torch pour mettre le tensor sur le GPU | |
| # first input to the decoder is the <start> token. | |
| input = vectoriser.encode("<start>") # Mot de départ du MOdèle | |
| input.to(self.device) # idiosyncrasie de torch pour mmettre sur GPU | |
| ### DÉBUT DE L'INSTANCIATION TEST ### | |
| # If you wonder, b stands for better | |
| values = None | |
| b_outputs = torch.zeros(target_len, target_vocab_size).to(self.device) | |
| b_outputs.to(self.device) | |
| for i in range( | |
| 1, target_len | |
| ): # On va déterminer autant de mot que la taille du texte souhaité | |
| # insert input token embedding, previous hidden and previous cell states | |
| # receive output tensor (predictions) and new hidden and cell states. | |
| # replace predictions in a tensor holding predictions for each token | |
| # logging.debug(f"output : {output}") | |
| ####### DÉBUT DU BEAM SEARCH ########## | |
| if values is None: | |
| # On calcule une première fois les premières probabilité de mot après <start> | |
| output, hidden, cell = self.decoder(input, hidden, cell) | |
| output.to(self.device) | |
| b_hidden = hidden | |
| b_cell = cell | |
| # On choisi les k meilleurs scores pour choisir la meilleure probabilité | |
| # sur deux itérations ensuite | |
| values, indices = output.topk(num_beams, sorted=True) | |
| else: | |
| # On instancie le dictionnaire qui contiendra les scores pour chaque possibilité | |
| scores = {} | |
| # Pour chacune des meilleures valeurs, on va calculer l'output | |
| for value, indice in zip(values, indices): | |
| indice.to(self.device) | |
| # On calcule l'output | |
| b_output, b_hidden, b_cell = self.decoder(indice, b_hidden, b_cell) | |
| # On empêche le modèle de se répéter d'un mot sur l'autre en mettant | |
| # de force la probabilité du mot précédent à 0 | |
| b_output[indice] = torch.zeros(1) | |
| # On choisit le meilleur résultat pour cette possibilité | |
| highest_value = torch.log(b_output).max() | |
| # On calcule le score des 2 itérations ensembles | |
| score = highest_value * torch.log(value) | |
| scores[score] = (b_output, b_hidden, b_cell) | |
| # On garde le meilleur score sur LES 2 ITÉRATIONS | |
| b_output, b_hidden, b_cell = scores.get(max(scores)) | |
| # Et du coup on rempli la place de i-1 à la place de i | |
| b_outputs[i - 1] = b_output.to(self.device) | |
| # On instancies nos nouvelles valeurs pour la prochaine itération | |
| values, indices = b_output.topk(num_beams, sorted=True) | |
| ################################## | |
| # outputs[i] = output.to(self.device) | |
| # input = output.argmax(dim=-1).to(self.device) | |
| # input.to(self.device) | |
| # logging.debug(f"{vectoriser.decode(outputs.argmax(dim=-1))}") | |
| return b_outputs.to(self.device) | |