| | import torch |
| | import torch.nn as nn |
| | import torch.optim as optim |
| | import pandas as pd |
| | from torch.utils.data import Dataset, DataLoader |
| | from flask import Flask, request, jsonify |
| | from sklearn.model_selection import train_test_split |
| | import torch |
| | import os |
| |
|
| |
|
| | url = f"https://drive.google.com/uc?id=1RCZShB5ohy1HdU-mogcP16TbeVv9txpY" |
| | df = pd.read_csv(url) |
| |
|
| | |
| | class ScratchTokenizer: |
| | def __init__(self): |
| | self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3} |
| | self.idx2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"} |
| | self.vocab_size = 4 |
| |
|
| | def build_vocab(self, texts): |
| | for text in texts: |
| | for word in text.split(): |
| | if word not in self.word2idx: |
| | self.word2idx[word] = self.vocab_size |
| | self.idx2word[self.vocab_size] = word |
| | self.vocab_size += 1 |
| |
|
| | def encode(self, text, max_len=200): |
| | tokens = [self.word2idx.get(word, 3) for word in text.split()] |
| | tokens = [1] + tokens[:max_len - 2] + [2] |
| | return tokens + [0] * (max_len - len(tokens)) |
| |
|
| | def decode(self, tokens): |
| | return " ".join([self.idx2word.get(idx, "<UNK>") for idx in tokens if idx > 0]) |
| |
|
| | |
| | train_data, test_data = train_test_split(df, test_size=0.2, random_state=42) |
| |
|
| | |
| | tokenizer = ScratchTokenizer() |
| | tokenizer.build_vocab(train_data["instruction"].tolist() + train_data["response"].tolist()) |
| |
|
| | |
| | class TextDataset(Dataset): |
| | def __init__(self, data, tokenizer, max_len=200): |
| | self.data = data |
| | self.tokenizer = tokenizer |
| | self.max_len = max_len |
| |
|
| | def __len__(self): |
| | return len(self.data) |
| |
|
| | def __getitem__(self, idx): |
| | src_text = self.data.iloc[idx]["instruction"] |
| | tgt_text = self.data.iloc[idx]["response"] |
| | src = torch.tensor(self.tokenizer.encode(src_text), dtype=torch.long) |
| | tgt = torch.tensor(self.tokenizer.encode(tgt_text), dtype=torch.long) |
| | return src, tgt |
| |
|
| | |
| | train_dataset = TextDataset(train_data, tokenizer) |
| | test_dataset = TextDataset(test_data, tokenizer) |
| | train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) |
| | test_loader = DataLoader(test_dataset, batch_size=8) |
| |
|
| | |
| |
|
| | class GPTModel(nn.Module): |
| | def __init__(self, vocab_size, embed_size=256, num_heads=8, num_layers=6, max_len=200): |
| | super(GPTModel, self).__init__() |
| | self.embedding = nn.Embedding(vocab_size, embed_size) |
| | self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embed_size)) |
| | |
| | |
| | |
| | self.transformer = nn.TransformerDecoder(nn.TransformerDecoderLayer(d_model=embed_size, nhead=num_heads), num_layers=num_layers) |
| | self.fc_out = nn.Linear(embed_size, vocab_size) |
| |
|
| | def forward(self, src, tgt): |
| | src_emb = self.embedding(src) + self.pos_embedding[:, :src.size(1), :] |
| | tgt_emb = self.embedding(tgt) + self.pos_embedding[:, :tgt.size(1), :] |
| |
|
| | |
| | tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device) |
| |
|
| | |
| | |
| | output = self.transformer(tgt_emb.permute(1, 0, 2), src_emb.permute(1, 0, 2), tgt_mask=tgt_mask) |
| | return self.fc_out(output.permute(1, 0, 2)) |
| |
|
| | |
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | model = GPTModel(tokenizer.vocab_size).to(device) |
| | optimizer = optim.AdamW(model.parameters(), lr=2e-4) |
| | criterion = nn.CrossEntropyLoss(label_smoothing=0.1) |
| | |
| | model = torch.load('gpt_model.pt', map_location=device) |
| | model.eval() |
| |
|
| | |
| | def train_epoch(model, loader, optimizer, criterion, device): |
| | model.train() |
| | total_loss = 0 |
| | for src, tgt in loader: |
| | src, tgt = src.to(device), tgt.to(device) |
| | optimizer.zero_grad() |
| | output = model(src, tgt[:, :-1]) |
| | loss = criterion(output.reshape(-1, tokenizer.vocab_size), tgt[:, 1:].reshape(-1)) |
| | loss.backward() |
| | optimizer.step() |
| | total_loss += loss.item() |
| | return total_loss / len(loader) |
| |
|
| |
|
| |
|
| | |
| | def generate_response(model, query, max_length=200): |
| | model.eval() |
| | src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device) |
| | tgt = torch.tensor([[1]]).to(device) |
| |
|
| | for _ in range(max_length): |
| | output = model(src, tgt) |
| | next_word = output.argmax(-1)[:, -1].unsqueeze(1) |
| | tgt = torch.cat([tgt, next_word], dim=1) |
| | if next_word.item() == 2: |
| | break |
| |
|
| | return tokenizer.decode(tgt.squeeze(0).tolist()) |
| | print(set(df['intent'])) |
| |
|
| | |
| | app = Flask(__name__) |
| |
|
| | @app.route("/") |
| | def home(): |
| | return "Transformer-based Response Generator API is running!" |
| |
|
| | @app.route("/query", methods=["POST"]) |
| | def query_model(): |
| | data = request.get_json() |
| | query = data.get("query", "") |
| |
|
| | if not query: |
| | return jsonify({"error": "Query cannot be empty"}), 400 |
| |
|
| | response = generate_response(model, query) |
| | return jsonify({"query": query, "response": response}) |
| |
|
| | @app.route("/intents", methods=["GET"]) |
| | def get_intents(): |
| | intents = set(df['intent']) |
| | return jsonify({"intents": list(intents)}) |
| |
|
| | if __name__ == "__main__": |
| | app.run(host="0.0.0.0", port=7120) |