Spaces:
Sleeping
Sleeping
File size: 4,991 Bytes
0c89f3d 72f5aa7 0c89f3d 72f5aa7 0c89f3d 72f5aa7 e0030a1 0c89f3d 3c819a7 0c89f3d 3c819a7 0c89f3d 14c7516 0c89f3d e0030a1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from flask import Flask, request, jsonify
from sklearn.model_selection import train_test_split
import os
# Load data
url = "https://drive.google.com/uc?id=1RCZShB5ohy1HdU-mogcP16TbeVv9txpY"
df = pd.read_csv(url)
# Tokenizer
class ScratchTokenizer:
def __init__(self):
self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
self.idx2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
self.vocab_size = 4
def build_vocab(self, texts):
for text in texts:
for word in text.split():
if word not in self.word2idx:
self.word2idx[word] = self.vocab_size
self.idx2word[self.vocab_size] = word
self.vocab_size += 1
def encode(self, text, max_len=200):
tokens = [self.word2idx.get(word, 3) for word in text.split()]
tokens = [1] + tokens[:max_len - 2] + [2]
return tokens + [0] * (max_len - len(tokens))
def decode(self, tokens):
return " ".join([self.idx2word.get(idx, "<UNK>") for idx in tokens if idx > 0])
# Train-Test Split
train_data, test_data = train_test_split(df, test_size=0.2, random_state=42)
# Initialize Tokenizer
tokenizer = ScratchTokenizer()
tokenizer.build_vocab(train_data["instruction"].tolist() + train_data["response"].tolist())
# Dataset Class
class TextDataset(Dataset):
def __init__(self, data, tokenizer, max_len=200):
self.data = data
self.tokenizer = tokenizer
self.max_len = max_len
def _len_(self):
return len(self.data)
def _getitem_(self, idx):
src_text = self.data.iloc[idx]["instruction"]
tgt_text = self.data.iloc[idx]["response"]
src = torch.tensor(self.tokenizer.encode(src_text), dtype=torch.long)
tgt = torch.tensor(self.tokenizer.encode(tgt_text), dtype=torch.long)
return src, tgt
# Model
class GPTModel(nn.Module):
def __init__(self, vocab_size, embed_size=256, num_heads=8, num_layers=6, max_len=200):
super(GPTModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embed_size))
self.transformer = nn.TransformerDecoder(
nn.TransformerDecoderLayer(d_model=embed_size, nhead=num_heads),
num_layers=num_layers
)
self.fc_out = nn.Linear(embed_size, vocab_size)
def forward(self, src, tgt):
src_emb = self.embedding(src) + self.pos_embedding[:, :src.size(1), :]
tgt_emb = self.embedding(tgt) + self.pos_embedding[:, :tgt.size(1), :]
tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)
output = self.transformer(tgt_emb.permute(1, 0, 2), src_emb.permute(1, 0, 2), tgt_mask=tgt_mask)
return self.fc_out(output.permute(1, 0, 2))
# Load model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GPTModel(tokenizer.vocab_size).to(device)
def load_model(model, path="gpt_model.pth"):
if os.path.exists(path):
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
print("Model loaded successfully.")
else:
print("Model file not found!")
load_model(model)
# # Generate Response
# def generate_response(model, query, max_length=200):
# model.eval()
# src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device)
# tgt = torch.tensor([[1]]).to(device) # <SOS>
# for _ in range(max_length):
# output = model(src, tgt)
# next_word = output.argmax(-1)[:, -1].unsqueeze(1)
# tgt = torch.cat([tgt, next_word], dim=1)
# if next_word.item() == 2: # <EOS>
# break
# return tokenizer.decode(tgt.squeeze(0).tolist())
def generate_response(model, query, max_length=200):
model.eval()
with torch.no_grad(): # Disable gradient tracking
src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device)
tgt = torch.tensor([[1]]).to(device) # <SOS>
for _ in range(max_length):
output = model(src, tgt)
next_token = output[:, -1, :].argmax(dim=-1, keepdim=True)
tgt = torch.cat([tgt, next_token], dim=1)
if next_token.item() == 2: # <EOS>
break
return tokenizer.decode(tgt.squeeze(0).tolist())
# Flask App
app = Flask(__name__)
@app.route("/")
def home():
return {"message": "Transformer-based Response Generator API is running!"}
@app.route("/query", methods=["POST"])
def query_model():
data = request.get_json()
query = data.get("query", "")
if not query:
return jsonify({"error": "Query cannot be empty"}), 400
response = generate_response(model, query)
return jsonify({"query": query, "response": response})
# DO NOT ADD app.run() |