| import numpy as np
|
| from collections import Counter
|
| import torch
|
| import torch.nn as nn
|
| import json
|
| from sklearn.model_selection import train_test_split
|
|
|
|
|
| def build_vocab(texts):
|
| vocab = Counter()
|
| for text in texts:
|
| vocab.update(text.lower().split())
|
| vocab = {
|
| word: idx + 4 for idx, word in enumerate(vocab)
|
| }
|
| vocab["<PAD>"] = 0
|
| vocab["<UNK>"] = 1
|
| vocab["<SOS>"] = 2
|
| vocab["<EOS>"] = 3
|
| with open("./model/ma_vocab.json", "w") as f:
|
| json.dump(vocab, f, indent=4)
|
| return vocab
|
|
|
|
|
|
|
| def tokenize(text, vocab):
|
| return (
|
| [vocab["<SOS>"]]
|
| + [vocab.get(word.lower(), vocab["<UNK>"]) for word in text.split()]
|
| + [vocab["<EOS>"]]
|
| )
|
|
|
|
|
|
|
| def pad_sequences(sequences, max_len):
|
| padded = np.zeros((len(sequences), max_len))
|
| for i, seq in enumerate(sequences):
|
| padded[i, : len(seq)] = seq
|
| return padded
|
|
|
|
|
| def evaluate_model(model, test_questions, test_answers, vocab, max_len):
|
| correct = 0
|
| for i in range(len(test_questions)):
|
| question = test_questions[i]
|
| true_answer = test_answers[i]
|
| generated_answer = Seq2Seq.generate(model, question, vocab, max_len)
|
| print(f"Question: {question}")
|
| print(f"True Answer: {true_answer}")
|
| print(f"Generated Answer: {generated_answer}")
|
| if generated_answer.lower() == true_answer.lower():
|
| correct += 1
|
| accuracy = correct / len(test_questions)
|
| return accuracy
|
|
|
|
|
|
|
| class Attention(nn.Module):
|
| def __init__(self, hidden_dim):
|
| super(Attention, self).__init__()
|
| self.attn = nn.Linear(hidden_dim * 2, hidden_dim)
|
| self.v = nn.Parameter(torch.rand(hidden_dim))
|
|
|
| def forward(self, hidden, encoder_outputs):
|
| seq_len = encoder_outputs.size(1)
|
| hidden = hidden.unsqueeze(1).repeat(
|
| 1, seq_len, 1
|
| )
|
| energy = torch.tanh(
|
| self.attn(torch.cat((hidden, encoder_outputs), dim=2))
|
| )
|
| attention = torch.sum(self.v * energy, dim=2)
|
| return torch.softmax(attention, dim=1)
|
|
|
|
|
|
|
| class Seq2Seq(nn.Module):
|
| def __init__(self, vocab_size, embedding_dim, hidden_dim):
|
| super(Seq2Seq, self).__init__()
|
| self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
| self.encoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
|
| self.decoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
|
| self.attn = Attention(hidden_dim)
|
| self.fc = nn.Linear(hidden_dim, vocab_size)
|
| self.dropout = nn.Dropout(0.5)
|
|
|
| def forward(self, src, trg):
|
|
|
| embedded_src = self.dropout(self.embedding(src))
|
| encoder_outputs, (hidden, cell) = self.encoder(embedded_src)
|
|
|
|
|
| attn_weights = self.attn(hidden[-1], encoder_outputs)
|
| context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs).squeeze(1)
|
|
|
|
|
| embedded_trg = self.dropout(self.embedding(trg))
|
| outputs, _ = self.decoder(embedded_trg, (hidden, cell))
|
|
|
|
|
| outputs = outputs + context.unsqueeze(
|
| 1
|
| )
|
|
|
|
|
| predictions = self.fc(outputs)
|
| return predictions
|
|
|
| def generate(self, question, vocab, max_len):
|
| self.eval()
|
| tokenized_question = tokenize(question, vocab)
|
| padded_question = pad_sequences([tokenized_question], max_len)
|
| src = torch.tensor(padded_question, dtype=torch.long)
|
|
|
| trg = torch.zeros((1, max_len), dtype=torch.long)
|
| trg[0, 0] = vocab["<SOS>"]
|
|
|
| with torch.no_grad():
|
| for i in range(1, max_len):
|
| output = self.forward(src, trg[:, :i])
|
| next_token = output.argmax(2)[:, -1]
|
| trg[0, i] = next_token.item()
|
| if next_token.item() == vocab["<EOS>"]:
|
| break
|
|
|
| answer_tokens = trg[0].tolist()
|
| answer = " ".join(
|
| [
|
| list(vocab.keys())[list(vocab.values()).index(token)]
|
| for token in answer_tokens
|
| if token not in [vocab["<PAD>"], vocab["<SOS>"], vocab["<EOS>"]]
|
| ]
|
| )
|
| return answer
|
|
|
|
|
| def train_model(file):
|
| with open(file, "r") as f:
|
| data = json.load(f)
|
|
|
|
|
| questions = [item["question"] for item in data]
|
| answers = [item["answer"] for item in data]
|
|
|
|
|
| train_questions, test_questions, train_answers, test_answers = train_test_split(
|
| questions, answers, test_size=0.25, random_state=42
|
| )
|
|
|
|
|
| vocab = build_vocab(train_questions + train_answers)
|
| tokenized_train_questions = [tokenize(q, vocab) for q in train_questions]
|
| tokenized_train_answers = [tokenize(a, vocab) for a in train_answers]
|
| tokenized_test_questions = [tokenize(q, vocab) for q in test_questions]
|
| tokenized_test_answers = [tokenize(a, vocab) for a in test_answers]
|
|
|
|
|
| max_len = max(
|
| max(len(seq) for seq in tokenized_train_questions + tokenized_train_answers),
|
| max(len(seq) for seq in tokenized_test_questions + tokenized_test_answers),
|
| )
|
|
|
| print(f"Using max_len: {max_len}")
|
|
|
|
|
| padded_train_questions = pad_sequences(tokenized_train_questions, max_len)
|
| padded_train_answers = pad_sequences(tokenized_train_answers, max_len)
|
| padded_test_questions = pad_sequences(tokenized_test_questions, max_len)
|
| padded_test_answers = pad_sequences(tokenized_test_answers, max_len)
|
|
|
|
|
| train_src = torch.tensor(padded_train_questions, dtype=torch.long)
|
| train_trg = torch.tensor(padded_train_answers, dtype=torch.long)
|
| test_src = torch.tensor(padded_test_questions, dtype=torch.long)
|
| test_trg = torch.tensor(padded_test_answers, dtype=torch.long)
|
|
|
|
|
| vocab_size = len(vocab)
|
| embedding_dim = 64
|
| hidden_dim = 128
|
| model = Seq2Seq(vocab_size, embedding_dim, hidden_dim)
|
|
|
|
|
| criterion = nn.CrossEntropyLoss(ignore_index=0)
|
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
|
|
|
|
|
| epochs = 800
|
| for epoch in range(epochs):
|
| optimizer.zero_grad()
|
| output = model(train_src, train_trg[:, :-1])
|
| loss = criterion(
|
| output.transpose(1, 2), train_trg[:, 1:]
|
| )
|
| loss.backward()
|
| optimizer.step()
|
| print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")
|
| accuracy = evaluate_model(model, test_questions, test_answers, vocab, max_len)
|
| print(f"Test Accuracy: {accuracy * 100:.2f}%")
|
| return model, vocab, max_len, vocab_size, embedding_dim, hidden_dim
|
|
|
|
|
| def generate_answer(model, question, vocab, max_len=34):
|
| model.eval()
|
| tokenized_question = tokenize(question, vocab)
|
| padded_question = pad_sequences([tokenized_question], max_len)
|
| src = torch.tensor(padded_question, dtype=torch.long)
|
|
|
|
|
| trg = torch.zeros((1, max_len), dtype=torch.long)
|
| trg[0, 0] = vocab["<SOS>"]
|
|
|
| with torch.no_grad():
|
| for i in range(1, max_len):
|
| output = model(src, trg[:, :i])
|
| next_token = output.argmax(2)[:, -1]
|
| trg[0, i] = next_token.item()
|
| if next_token.item() == vocab["<EOS>"]:
|
| break
|
|
|
|
|
| answer_tokens = trg[0].tolist()
|
| answer = " ".join(
|
| [
|
| list(vocab.keys())[list(vocab.values()).index(token)]
|
| for token in answer_tokens
|
| if token not in [vocab["<PAD>"], vocab["<SOS>"], vocab["<EOS>"]]
|
| ]
|
| )
|
| return answer |