test-ma-model / main_model.py
DP27's picture
Upload 5 files
c66a046 verified
import numpy as np
from collections import Counter
import torch
import torch.nn as nn
import json
from sklearn.model_selection import train_test_split
def build_vocab(texts):
vocab = Counter()
for text in texts:
vocab.update(text.lower().split())
vocab = {
word: idx + 4 for idx, word in enumerate(vocab)
} # +4 to reserve 0 for padding, 1 for unknown, 2 for <SOS>, 3 for <EOS>
vocab["<PAD>"] = 0
vocab["<UNK>"] = 1
vocab["<SOS>"] = 2
vocab["<EOS>"] = 3
with open("./model/ma_vocab.json", "w") as f:
json.dump(vocab, f, indent=4)
return vocab
# Tokenize function
def tokenize(text, vocab):
return (
[vocab["<SOS>"]]
+ [vocab.get(word.lower(), vocab["<UNK>"]) for word in text.split()]
+ [vocab["<EOS>"]]
)
# Pad sequences
def pad_sequences(sequences, max_len):
padded = np.zeros((len(sequences), max_len))
for i, seq in enumerate(sequences):
padded[i, : len(seq)] = seq
return padded
def evaluate_model(model, test_questions, test_answers, vocab, max_len):
correct = 0
for i in range(len(test_questions)):
question = test_questions[i]
true_answer = test_answers[i]
generated_answer = Seq2Seq.generate(model, question, vocab, max_len)
print(f"Question: {question}")
print(f"True Answer: {true_answer}")
print(f"Generated Answer: {generated_answer}")
if generated_answer.lower() == true_answer.lower():
correct += 1
accuracy = correct / len(test_questions)
return accuracy
# Define Attention Layer
class Attention(nn.Module):
def __init__(self, hidden_dim):
super(Attention, self).__init__()
self.attn = nn.Linear(hidden_dim * 2, hidden_dim) # Attention layer
self.v = nn.Parameter(torch.rand(hidden_dim)) # Weight for attention
def forward(self, hidden, encoder_outputs):
seq_len = encoder_outputs.size(1)
hidden = hidden.unsqueeze(1).repeat(
1, seq_len, 1
) # Repeat hidden state to match encoder output sequence length
energy = torch.tanh(
self.attn(torch.cat((hidden, encoder_outputs), dim=2))
) # Apply attention mechanism
attention = torch.sum(self.v * energy, dim=2) # Sum across hidden dim
return torch.softmax(attention, dim=1)
# Define the Seq2Seq Model with Attention
class Seq2Seq(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim):
super(Seq2Seq, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.encoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
self.decoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
self.attn = Attention(hidden_dim) # Attention mechanism
self.fc = nn.Linear(hidden_dim, vocab_size)
self.dropout = nn.Dropout(0.5) # Add dropout
def forward(self, src, trg):
# Encoder
embedded_src = self.dropout(self.embedding(src))
encoder_outputs, (hidden, cell) = self.encoder(embedded_src)
# Attention (if you're using it)
attn_weights = self.attn(hidden[-1], encoder_outputs)
context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs).squeeze(1)
# Decoder
embedded_trg = self.dropout(self.embedding(trg))
outputs, _ = self.decoder(embedded_trg, (hidden, cell))
# Combine context and decoder outputs
outputs = outputs + context.unsqueeze(
1
) # Add context to decoder outputs (simple fusion)
# Output layer
predictions = self.fc(outputs)
return predictions
def generate(self, question, vocab, max_len):
self.eval()
tokenized_question = tokenize(question, vocab)
padded_question = pad_sequences([tokenized_question], max_len)
src = torch.tensor(padded_question, dtype=torch.long)
trg = torch.zeros((1, max_len), dtype=torch.long)
trg[0, 0] = vocab["<SOS>"]
with torch.no_grad():
for i in range(1, max_len):
output = self.forward(src, trg[:, :i])
next_token = output.argmax(2)[:, -1]
trg[0, i] = next_token.item()
if next_token.item() == vocab["<EOS>"]:
break
answer_tokens = trg[0].tolist()
answer = " ".join(
[
list(vocab.keys())[list(vocab.values()).index(token)]
for token in answer_tokens
if token not in [vocab["<PAD>"], vocab["<SOS>"], vocab["<EOS>"]]
]
)
return answer
def train_model(file):
with open(file, "r") as f:
data = json.load(f)
# Extract questions and answers
questions = [item["question"] for item in data]
answers = [item["answer"] for item in data]
# Split data into train and test sets
train_questions, test_questions, train_answers, test_answers = train_test_split(
questions, answers, test_size=0.25, random_state=42
)
# Build vocabulary and tokenize data
vocab = build_vocab(train_questions + train_answers)
tokenized_train_questions = [tokenize(q, vocab) for q in train_questions]
tokenized_train_answers = [tokenize(a, vocab) for a in train_answers]
tokenized_test_questions = [tokenize(q, vocab) for q in test_questions]
tokenized_test_answers = [tokenize(a, vocab) for a in test_answers]
# Find the maximum sequence length
max_len = max(
max(len(seq) for seq in tokenized_train_questions + tokenized_train_answers),
max(len(seq) for seq in tokenized_test_questions + tokenized_test_answers),
)
print(f"Using max_len: {max_len}")
# Pad sequences
padded_train_questions = pad_sequences(tokenized_train_questions, max_len)
padded_train_answers = pad_sequences(tokenized_train_answers, max_len)
padded_test_questions = pad_sequences(tokenized_test_questions, max_len)
padded_test_answers = pad_sequences(tokenized_test_answers, max_len)
# Convert data to PyTorch tensors
train_src = torch.tensor(padded_train_questions, dtype=torch.long)
train_trg = torch.tensor(padded_train_answers, dtype=torch.long)
test_src = torch.tensor(padded_test_questions, dtype=torch.long)
test_trg = torch.tensor(padded_test_answers, dtype=torch.long)
# Hyperparameters
vocab_size = len(vocab)
embedding_dim = 64
hidden_dim = 128
model = Seq2Seq(vocab_size, embedding_dim, hidden_dim)
# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=0) # Ignore padding tokens
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Training loop with teacher forcing
epochs = 800
for epoch in range(epochs):
optimizer.zero_grad()
output = model(train_src, train_trg[:, :-1]) # Exclude last token from target
loss = criterion(
output.transpose(1, 2), train_trg[:, 1:]
) # Exclude first token from target
loss.backward()
optimizer.step()
print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")
accuracy = evaluate_model(model, test_questions, test_answers, vocab, max_len)
print(f"Test Accuracy: {accuracy * 100:.2f}%")
return model, vocab, max_len, vocab_size, embedding_dim, hidden_dim
def generate_answer(model, question, vocab, max_len=34):
model.eval()
tokenized_question = tokenize(question, vocab)
padded_question = pad_sequences([tokenized_question], max_len)
src = torch.tensor(padded_question, dtype=torch.long)
# Initialize decoder input with <SOS> token
trg = torch.zeros((1, max_len), dtype=torch.long)
trg[0, 0] = vocab["<SOS>"]
with torch.no_grad():
for i in range(1, max_len):
output = model(src, trg[:, :i])
next_token = output.argmax(2)[:, -1]
trg[0, i] = next_token.item()
if next_token.item() == vocab["<EOS>"]:
break
# Convert tokens to words
answer_tokens = trg[0].tolist()
answer = " ".join(
[
list(vocab.keys())[list(vocab.values()).index(token)]
for token in answer_tokens
if token not in [vocab["<PAD>"], vocab["<SOS>"], vocab["<EOS>"]]
]
)
return answer