import numpy as np from collections import Counter import torch import torch.nn as nn import json from sklearn.model_selection import train_test_split def build_vocab(texts): vocab = Counter() for text in texts: vocab.update(text.lower().split()) vocab = { word: idx + 4 for idx, word in enumerate(vocab) } # +4 to reserve 0 for padding, 1 for unknown, 2 for , 3 for vocab[""] = 0 vocab[""] = 1 vocab[""] = 2 vocab[""] = 3 with open("./model/ma_vocab.json", "w") as f: json.dump(vocab, f, indent=4) return vocab # Tokenize function def tokenize(text, vocab): return ( [vocab[""]] + [vocab.get(word.lower(), vocab[""]) for word in text.split()] + [vocab[""]] ) # Pad sequences def pad_sequences(sequences, max_len): padded = np.zeros((len(sequences), max_len)) for i, seq in enumerate(sequences): padded[i, : len(seq)] = seq return padded def evaluate_model(model, test_questions, test_answers, vocab, max_len): correct = 0 for i in range(len(test_questions)): question = test_questions[i] true_answer = test_answers[i] generated_answer = Seq2Seq.generate(model, question, vocab, max_len) print(f"Question: {question}") print(f"True Answer: {true_answer}") print(f"Generated Answer: {generated_answer}") if generated_answer.lower() == true_answer.lower(): correct += 1 accuracy = correct / len(test_questions) return accuracy # Define Attention Layer class Attention(nn.Module): def __init__(self, hidden_dim): super(Attention, self).__init__() self.attn = nn.Linear(hidden_dim * 2, hidden_dim) # Attention layer self.v = nn.Parameter(torch.rand(hidden_dim)) # Weight for attention def forward(self, hidden, encoder_outputs): seq_len = encoder_outputs.size(1) hidden = hidden.unsqueeze(1).repeat( 1, seq_len, 1 ) # Repeat hidden state to match encoder output sequence length energy = torch.tanh( self.attn(torch.cat((hidden, encoder_outputs), dim=2)) ) # Apply attention mechanism attention = torch.sum(self.v * energy, dim=2) # Sum across hidden dim return torch.softmax(attention, dim=1) # Define the Seq2Seq Model with Attention class Seq2Seq(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim): super(Seq2Seq, self).__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.encoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True) self.decoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True) self.attn = Attention(hidden_dim) # Attention mechanism self.fc = nn.Linear(hidden_dim, vocab_size) self.dropout = nn.Dropout(0.5) # Add dropout def forward(self, src, trg): # Encoder embedded_src = self.dropout(self.embedding(src)) encoder_outputs, (hidden, cell) = self.encoder(embedded_src) # Attention (if you're using it) attn_weights = self.attn(hidden[-1], encoder_outputs) context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs).squeeze(1) # Decoder embedded_trg = self.dropout(self.embedding(trg)) outputs, _ = self.decoder(embedded_trg, (hidden, cell)) # Combine context and decoder outputs outputs = outputs + context.unsqueeze( 1 ) # Add context to decoder outputs (simple fusion) # Output layer predictions = self.fc(outputs) return predictions def generate(self, question, vocab, max_len): self.eval() tokenized_question = tokenize(question, vocab) padded_question = pad_sequences([tokenized_question], max_len) src = torch.tensor(padded_question, dtype=torch.long) trg = torch.zeros((1, max_len), dtype=torch.long) trg[0, 0] = vocab[""] with torch.no_grad(): for i in range(1, max_len): output = self.forward(src, trg[:, :i]) next_token = output.argmax(2)[:, -1] trg[0, i] = next_token.item() if next_token.item() == vocab[""]: break answer_tokens = trg[0].tolist() answer = " ".join( [ list(vocab.keys())[list(vocab.values()).index(token)] for token in answer_tokens if token not in [vocab[""], vocab[""], vocab[""]] ] ) return answer def train_model(file): with open(file, "r") as f: data = json.load(f) # Extract questions and answers questions = [item["question"] for item in data] answers = [item["answer"] for item in data] # Split data into train and test sets train_questions, test_questions, train_answers, test_answers = train_test_split( questions, answers, test_size=0.25, random_state=42 ) # Build vocabulary and tokenize data vocab = build_vocab(train_questions + train_answers) tokenized_train_questions = [tokenize(q, vocab) for q in train_questions] tokenized_train_answers = [tokenize(a, vocab) for a in train_answers] tokenized_test_questions = [tokenize(q, vocab) for q in test_questions] tokenized_test_answers = [tokenize(a, vocab) for a in test_answers] # Find the maximum sequence length max_len = max( max(len(seq) for seq in tokenized_train_questions + tokenized_train_answers), max(len(seq) for seq in tokenized_test_questions + tokenized_test_answers), ) print(f"Using max_len: {max_len}") # Pad sequences padded_train_questions = pad_sequences(tokenized_train_questions, max_len) padded_train_answers = pad_sequences(tokenized_train_answers, max_len) padded_test_questions = pad_sequences(tokenized_test_questions, max_len) padded_test_answers = pad_sequences(tokenized_test_answers, max_len) # Convert data to PyTorch tensors train_src = torch.tensor(padded_train_questions, dtype=torch.long) train_trg = torch.tensor(padded_train_answers, dtype=torch.long) test_src = torch.tensor(padded_test_questions, dtype=torch.long) test_trg = torch.tensor(padded_test_answers, dtype=torch.long) # Hyperparameters vocab_size = len(vocab) embedding_dim = 64 hidden_dim = 128 model = Seq2Seq(vocab_size, embedding_dim, hidden_dim) # Loss and optimizer criterion = nn.CrossEntropyLoss(ignore_index=0) # Ignore padding tokens optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # Training loop with teacher forcing epochs = 800 for epoch in range(epochs): optimizer.zero_grad() output = model(train_src, train_trg[:, :-1]) # Exclude last token from target loss = criterion( output.transpose(1, 2), train_trg[:, 1:] ) # Exclude first token from target loss.backward() optimizer.step() print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}") accuracy = evaluate_model(model, test_questions, test_answers, vocab, max_len) print(f"Test Accuracy: {accuracy * 100:.2f}%") return model, vocab, max_len, vocab_size, embedding_dim, hidden_dim def generate_answer(model, question, vocab, max_len=34): model.eval() tokenized_question = tokenize(question, vocab) padded_question = pad_sequences([tokenized_question], max_len) src = torch.tensor(padded_question, dtype=torch.long) # Initialize decoder input with token trg = torch.zeros((1, max_len), dtype=torch.long) trg[0, 0] = vocab[""] with torch.no_grad(): for i in range(1, max_len): output = model(src, trg[:, :i]) next_token = output.argmax(2)[:, -1] trg[0, i] = next_token.item() if next_token.item() == vocab[""]: break # Convert tokens to words answer_tokens = trg[0].tolist() answer = " ".join( [ list(vocab.keys())[list(vocab.values()).index(token)] for token in answer_tokens if token not in [vocab[""], vocab[""], vocab[""]] ] ) return answer