import textwrap import torch import torch.nn as nn import torch.optim as optim import spacy import random import pandas as pd from torch.utils.data import Dataset, DataLoader from torch.nn.utils.rnn import pad_sequence from sklearn.model_selection import train_test_split from flask import Flask ,request, jsonify,send_file,after_this_request from collections import Counter from flask_cors import CORS import requests from gtts import gTTS from googletrans import Translator import uuid import os # Load Dataset df = pd.read_csv("https://drive.google.com/uc?id=1RCZShB5ohy1HdU-mogcP16TbeVv9txpY") df = df.dropna(subset=['instruction', 'response']) # Ensure all entries are strings df['instruction'] = df['instruction'].astype(str) df['response'] = df['response'].astype(str) # Tokenizer (Scratch) class ScratchTokenizer: def __init__(self): self.word2idx = {"": 0, "": 1, "": 2, "": 3} self.idx2word = {0: "", 1: "", 2: "", 3: ""} self.vocab_size = 4 def build_vocab(self, texts): for text in texts: for word in text.split(): if word not in self.word2idx: self.word2idx[word] = self.vocab_size self.idx2word[self.vocab_size] = word self.vocab_size += 1 def encode(self, text, max_len=200): tokens = [self.word2idx.get(word, 3) for word in text.split()] tokens = [1] + tokens[:max_len - 2] + [2] return tokens + [0] * (max_len - len(tokens)) def decode(self, tokens): return " ".join([self.idx2word.get(idx, "") for idx in tokens if idx > 0]) # Train-Test Split train_data, test_data = train_test_split(df, test_size=0.2, random_state=42) # Initialize Tokenizer tokenizer = ScratchTokenizer() tokenizer.build_vocab(train_data["instruction"].tolist() + train_data["response"].tolist()) # Dataset Class class TextDataset(Dataset): def __init__(self, data, tokenizer, max_len=200): self.data = data self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return len(self.data) def __getitem__(self, idx): src_text = self.data.iloc[idx]["instruction"] tgt_text = self.data.iloc[idx]["response"] src = torch.tensor(self.tokenizer.encode(src_text), dtype=torch.long) tgt = torch.tensor(self.tokenizer.encode(tgt_text), dtype=torch.long) return src, tgt # Load Dataset train_dataset = TextDataset(train_data, tokenizer) test_dataset = TextDataset(test_data, tokenizer) train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=8) # Improved GPT-Style Transformer Model class GPTModel(nn.Module): def __init__(self, vocab_size, embed_size=256, num_heads=8, num_layers=6, max_len=200): super(GPTModel, self).__init__() self.embedding = nn.Embedding(vocab_size, embed_size) self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embed_size)) # The problem was here, setting num_encoder_layers to 0 # makes the model try to access a non-existent layer. # The solution is to remove the encoder completely. self.transformer = nn.TransformerDecoder(nn.TransformerDecoderLayer(d_model=embed_size, nhead=num_heads), num_layers=num_layers) self.fc_out = nn.Linear(embed_size, vocab_size) def forward(self, src, tgt): src_emb = self.embedding(src) + self.pos_embedding[:, :src.size(1), :] tgt_emb = self.embedding(tgt) + self.pos_embedding[:, :tgt.size(1), :] # Causal Mask for Auto-Regressive Decoding tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device) output = self.transformer(tgt_emb.permute(1, 0, 2), src_emb.permute(1, 0, 2), tgt_mask=tgt_mask) return self.fc_out(output.permute(1, 0, 2)) # Initialize Model device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = GPTModel(tokenizer.vocab_size).to(device) optimizer = optim.AdamW(model.parameters(), lr=2e-4) criterion = nn.CrossEntropyLoss(label_smoothing=0.1) def load_model(model, path="gpt_model.pth"): if os.path.exists(path): model.load_state_dict(torch.load(path, map_location=device)) model.eval() print("Model loaded successfully.") else: print("Model file not found!") load_model(model) # Generate Response def generate_response(model, query, max_length=200): model.eval() with torch.no_grad(): # Disable gradient tracking src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device) tgt = torch.tensor([[1]]).to(device) # for _ in range(max_length): output = model(src, tgt) next_token = output[:, -1, :].argmax(dim=-1, keepdim=True) tgt = torch.cat([tgt, next_token], dim=1) if next_token.item() == 2: # break return tokenizer.decode(tgt.squeeze(0).tolist()) DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') MAX_LEN = 350 BATCH_SIZE = 8 EMB_SIZE = 128 NHEAD = 8 FFN_HID_DIM = 256 NUM_ENCODER_LAYERS = 4 NUM_DECODER_LAYERS = 4 NUM_EPOCHS = 18 MIN_FREQ = 2 # ==== Tokenizers ==== spacy_eng = spacy.load("en_core_web_sm") def tokenize_en(text): return [tok.text.lower() for tok in spacy_eng.tokenizer(text)] def tokenize_te(text): return text.strip().split(" ") # ==== Vocab Builder ==== def build_vocab(sentences, tokenizer, min_freq): counter = Counter() for sent in sentences: counter.update(tokenizer(sent)) vocab = {'': 0, '': 1, '': 2, '': 3} for word, freq in counter.items(): if freq >= min_freq: vocab[word] = len(vocab) return vocab # ==== Dataset ==== class TranslationDataset(Dataset): def __init__(self, df, en_vocab, te_vocab): self.data = df self.en_vocab = en_vocab self.te_vocab = te_vocab def __len__(self): return len(self.data) def __getitem__(self, idx): en = self.data.iloc[idx]['response'] te = self.data.iloc[idx]['translated_response'] en_tokens = [''] + tokenize_en(en) + [''] te_tokens = [''] + tokenize_te(te) + [''] en_ids = [self.en_vocab.get(tok, self.en_vocab['']) for tok in en_tokens] te_ids = [self.te_vocab.get(tok, self.te_vocab['']) for tok in te_tokens] return torch.tensor(en_ids), torch.tensor(te_ids) # ==== Collate Function ==== def collate_fn(batch): src_batch, tgt_batch = zip(*batch) src_batch = pad_sequence(src_batch, padding_value=en_vocab[''], batch_first=True) tgt_batch = pad_sequence(tgt_batch, padding_value=te_vocab[''], batch_first=True) return src_batch, tgt_batch # ==== Transformer Model ==== class Seq2SeqTransformer(nn.Module): def __init__(self, num_encoder_layers, num_decoder_layers, emb_size, src_vocab_size, tgt_vocab_size, nhead, dim_feedforward=512, dropout=0.1): super().__init__() self.transformer = nn.Transformer(d_model=emb_size, nhead=nhead, num_encoder_layers=num_encoder_layers, num_decoder_layers=num_decoder_layers, dim_feedforward=dim_feedforward, dropout=dropout) self.src_tok_emb = nn.Embedding(src_vocab_size, emb_size) self.tgt_tok_emb = nn.Embedding(tgt_vocab_size, emb_size) self.fc_out = nn.Linear(emb_size, tgt_vocab_size) self.dropout = nn.Dropout(dropout) def forward(self, src, tgt): src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(DEVICE) tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(DEVICE) src_emb = self.dropout(self.src_tok_emb(src)) tgt_emb = self.dropout(self.tgt_tok_emb(tgt)) outs = self.transformer(src_emb.permute(1,0,2), tgt_emb.permute(1,0,2), src_mask=src_mask, tgt_mask=tgt_mask) return self.fc_out(outs.permute(1,0,2)) def translate(model, sentence, en_vocab, te_vocab, te_inv_vocab, max_len=MAX_LEN): model.eval() tokens = [''] + tokenize_en(sentence) + [''] src_ids = torch.tensor([[en_vocab.get(t, en_vocab['']) for t in tokens]]).to(DEVICE) tgt_ids = torch.tensor([[te_vocab['']]]).to(DEVICE) for i in range(max_len): out = model(src_ids, tgt_ids) next_token = out.argmax(-1)[:, -1].item() tgt_ids = torch.cat([tgt_ids, torch.tensor([[next_token]]).to(DEVICE)], dim=1) if next_token == te_vocab['']: break translated = [te_inv_vocab[idx.item()] for idx in tgt_ids[0][1:]] return ' '.join(translated[:-1]) if translated[-1] == '' else ' '.join(translated) # ==== Load Data ==== df_telugu = pd.read_csv("merged_translated_responses.csv") # columns: 'en', 'te' # Clean NaN or non-string entries df_telugu = df_telugu.dropna(subset=['response', 'translated_response']) # Ensure all entries are strings df_telugu['response'] = df_telugu['response'].astype(str) df_telugu['translated_response'] = df_telugu['translated_response'].astype(str) # Build vocabularies en_vocab = build_vocab(df_telugu['response'], tokenize_en, MIN_FREQ) te_vocab = build_vocab(df_telugu['translated_response'], tokenize_te, MIN_FREQ) te_inv_vocab = {idx: tok for tok, idx in te_vocab.items()} # Prepare Dataset & DataLoader dataset = TranslationDataset(df_telugu, en_vocab, te_vocab) dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) # Initialize Model # model = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE, # len(en_vocab), len(te_vocab), NHEAD, FFN_HID_DIM).to(DEVICE) pad_idx = te_vocab[''] criterion_telugu = nn.CrossEntropyLoss(ignore_index=pad_idx) optimizer_telugu = optim.Adam(model.parameters(), lr=0.0005) # ==== Training ==== # for epoch in range(NUM_EPOCHS): # loss = train(model, dataloader, optimizer, criterion) # print(f"Epoch {epoch+1}, Loss: {loss:.4f}") # ==== Try Translation ==== model_telugu = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,len(en_vocab), len(te_vocab), NHEAD, FFN_HID_DIM).to(DEVICE) # Load saved weights model_telugu.load_state_dict(torch.load("english_telugu_transformer.pth",map_location = torch.device('cpu'))) model_telugu.eval() app=Flask(__name__) CORS(app) @app.route("/intent") def home(): return jsonify({"intents" :list(set(df['intent'].dropna()))}) @app.route("/query", methods=["POST"]) def query_model(): global audio_telugu_response data = request.get_json() query = data.get("query", "") if not query: return jsonify({"error": "Query cannot be empty"}), 400 # Assuming `generate_response` is a function that processes the query response = generate_response(model, query) def clean_response(response): return response.replace("", "").replace("", "").strip() response=clean_response(response) telugu_response = translate(model_telugu, response, en_vocab, te_vocab, te_inv_vocab) audio_telugu_response=telugu_response return jsonify({"telugu":(telugu_response),"english":(response)}) @app.route("/audio", methods=["POST"]) def get_audio(): data = request.get_json() text = data.get("text") # text=audio_telugu_response if not text: return jsonify({"error": "No Response To convert to speech"}), 400 filename = f"speech_{uuid.uuid4().hex}.mp3" filepath = os.path.join("audio_temp", filename) os.makedirs("audio_temp", exist_ok=True) # Convert text to Telugu speech speech = gTTS(text=text, lang="te") speech.save(filepath) # Automatically delete the file after sending @after_this_request def cleanup(response): try: os.remove(filepath) except Exception as e: print(f"Cleanup error: {e}") return response return send_file(filepath, mimetype="audio/mpeg", as_attachment=False)