Spaces:
Sleeping
Sleeping
| import textwrap | |
| import torch | |
| from datetime import datetime | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import spacy | |
| import random | |
| import pandas as pd | |
| from torch.utils.data import Dataset, DataLoader | |
| from torch.nn.utils.rnn import pad_sequence | |
| from sklearn.model_selection import train_test_split | |
| from flask import Flask ,request, jsonify,send_file,after_this_request | |
| from collections import Counter | |
| from flask_cors import CORS | |
| import requests | |
| from gtts import gTTS | |
| from googletrans import Translator | |
| import uuid | |
| import os | |
| import time | |
| # Load Dataset | |
| df = pd.read_csv("https://drive.google.com/uc?id=1RCZShB5ohy1HdU-mogcP16TbeVv9txpY") | |
| df = df.dropna(subset=['instruction', 'response']) | |
| # Ensure all entries are strings | |
| df['instruction'] = df['instruction'].astype(str) | |
| df['response'] = df['response'].astype(str) | |
| # Tokenizer (Scratch) | |
| class ScratchTokenizer: | |
| def __init__(self): | |
| self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3} | |
| self.idx2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"} | |
| self.vocab_size = 4 | |
| def build_vocab(self, texts): | |
| for text in texts: | |
| for word in text.split(): | |
| if word not in self.word2idx: | |
| self.word2idx[word] = self.vocab_size | |
| self.idx2word[self.vocab_size] = word | |
| self.vocab_size += 1 | |
| def encode(self, text, max_len=200): | |
| tokens = [self.word2idx.get(word, 3) for word in text.split()] | |
| tokens = [1] + tokens[:max_len - 2] + [2] | |
| return tokens + [0] * (max_len - len(tokens)) | |
| def decode(self, tokens): | |
| return " ".join([self.idx2word.get(idx, "<UNK>") for idx in tokens if idx > 0]) | |
| # Train-Test Split | |
| train_data, test_data = train_test_split(df, test_size=0.2, random_state=42) | |
| # Initialize Tokenizer | |
| tokenizer = ScratchTokenizer() | |
| tokenizer.build_vocab(train_data["instruction"].tolist() + train_data["response"].tolist()) | |
| # Dataset Class | |
| class TextDataset(Dataset): | |
| def __init__(self, data, tokenizer, max_len=200): | |
| self.data = data | |
| self.tokenizer = tokenizer | |
| self.max_len = max_len | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| src_text = self.data.iloc[idx]["instruction"] | |
| tgt_text = self.data.iloc[idx]["response"] | |
| src = torch.tensor(self.tokenizer.encode(src_text), dtype=torch.long) | |
| tgt = torch.tensor(self.tokenizer.encode(tgt_text), dtype=torch.long) | |
| return src, tgt | |
| # Load Dataset | |
| train_dataset = TextDataset(train_data, tokenizer) | |
| test_dataset = TextDataset(test_data, tokenizer) | |
| train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) | |
| test_loader = DataLoader(test_dataset, batch_size=8) | |
| # Improved GPT-Style Transformer Model | |
| class GPTModel(nn.Module): | |
| def __init__(self, vocab_size, embed_size=256, num_heads=8, num_layers=6, max_len=200): | |
| super(GPTModel, self).__init__() | |
| self.embedding = nn.Embedding(vocab_size, embed_size) | |
| self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embed_size)) | |
| # The problem was here, setting num_encoder_layers to 0 | |
| # makes the model try to access a non-existent layer. | |
| # The solution is to remove the encoder completely. | |
| self.transformer = nn.TransformerDecoder(nn.TransformerDecoderLayer(d_model=embed_size, nhead=num_heads), num_layers=num_layers) | |
| self.fc_out = nn.Linear(embed_size, vocab_size) | |
| def forward(self, src, tgt): | |
| src_emb = self.embedding(src) + self.pos_embedding[:, :src.size(1), :] | |
| tgt_emb = self.embedding(tgt) + self.pos_embedding[:, :tgt.size(1), :] | |
| # Causal Mask for Auto-Regressive Decoding | |
| tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device) | |
| output = self.transformer(tgt_emb.permute(1, 0, 2), src_emb.permute(1, 0, 2), tgt_mask=tgt_mask) | |
| return self.fc_out(output.permute(1, 0, 2)) | |
| # Initialize Model | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = GPTModel(tokenizer.vocab_size).to(device) | |
| optimizer = optim.AdamW(model.parameters(), lr=2e-4) | |
| criterion = nn.CrossEntropyLoss(label_smoothing=0.1) | |
| def load_model(model, path="gpt_model.pth"): | |
| if os.path.exists(path): | |
| model.load_state_dict(torch.load(path, map_location=device)) | |
| model.eval() | |
| print("Model loaded successfully.") | |
| else: | |
| print("Model file not found!") | |
| load_model(model) | |
| # Generate Response | |
| def generate_response(model, query, max_length=200): | |
| model.eval() | |
| with torch.no_grad(): # Disable gradient tracking | |
| src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device) | |
| tgt = torch.tensor([[1]]).to(device) # <SOS> | |
| for _ in range(max_length): | |
| output = model(src, tgt) | |
| next_token = output[:, -1, :].argmax(dim=-1, keepdim=True) | |
| tgt = torch.cat([tgt, next_token], dim=1) | |
| if next_token.item() == 2: # <EOS> | |
| break | |
| return tokenizer.decode(tgt.squeeze(0).tolist()) | |
| DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| MAX_LEN = 350 | |
| BATCH_SIZE = 8 | |
| EMB_SIZE = 128 | |
| NHEAD = 8 | |
| FFN_HID_DIM = 256 | |
| NUM_ENCODER_LAYERS = 4 | |
| NUM_DECODER_LAYERS = 4 | |
| NUM_EPOCHS = 18 | |
| MIN_FREQ = 2 | |
| # ==== Tokenizers ==== | |
| spacy_eng = spacy.load("en_core_web_sm") | |
| def tokenize_en(text): | |
| return [tok.text.lower() for tok in spacy_eng.tokenizer(text)] | |
| def tokenize_te(text): | |
| return text.strip().split(" ") | |
| # ==== Vocab Builder ==== | |
| def build_vocab(sentences, tokenizer, min_freq): | |
| counter = Counter() | |
| for sent in sentences: | |
| counter.update(tokenizer(sent)) | |
| vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3} | |
| for word, freq in counter.items(): | |
| if freq >= min_freq: | |
| vocab[word] = len(vocab) | |
| return vocab | |
| # ==== Dataset ==== | |
| class TranslationDataset(Dataset): | |
| def __init__(self, df, en_vocab, te_vocab): | |
| self.data = df | |
| self.en_vocab = en_vocab | |
| self.te_vocab = te_vocab | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| en = self.data.iloc[idx]['response'] | |
| te = self.data.iloc[idx]['translated_response'] | |
| en_tokens = ['<sos>'] + tokenize_en(en) + ['<eos>'] | |
| te_tokens = ['<sos>'] + tokenize_te(te) + ['<eos>'] | |
| en_ids = [self.en_vocab.get(tok, self.en_vocab['<unk>']) for tok in en_tokens] | |
| te_ids = [self.te_vocab.get(tok, self.te_vocab['<unk>']) for tok in te_tokens] | |
| return torch.tensor(en_ids), torch.tensor(te_ids) | |
| # ==== Collate Function ==== | |
| def collate_fn(batch): | |
| src_batch, tgt_batch = zip(*batch) | |
| src_batch = pad_sequence(src_batch, padding_value=en_vocab['<pad>'], batch_first=True) | |
| tgt_batch = pad_sequence(tgt_batch, padding_value=te_vocab['<pad>'], batch_first=True) | |
| return src_batch, tgt_batch | |
| # ==== Transformer Model ==== | |
| class Seq2SeqTransformer(nn.Module): | |
| def __init__(self, num_encoder_layers, num_decoder_layers, | |
| emb_size, src_vocab_size, tgt_vocab_size, | |
| nhead, dim_feedforward=512, dropout=0.1): | |
| super().__init__() | |
| self.transformer = nn.Transformer(d_model=emb_size, nhead=nhead, | |
| num_encoder_layers=num_encoder_layers, | |
| num_decoder_layers=num_decoder_layers, | |
| dim_feedforward=dim_feedforward, dropout=dropout) | |
| self.src_tok_emb = nn.Embedding(src_vocab_size, emb_size) | |
| self.tgt_tok_emb = nn.Embedding(tgt_vocab_size, emb_size) | |
| self.fc_out = nn.Linear(emb_size, tgt_vocab_size) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, src, tgt): | |
| src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(DEVICE) | |
| tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(DEVICE) | |
| src_emb = self.dropout(self.src_tok_emb(src)) | |
| tgt_emb = self.dropout(self.tgt_tok_emb(tgt)) | |
| outs = self.transformer(src_emb.permute(1,0,2), tgt_emb.permute(1,0,2), | |
| src_mask=src_mask, tgt_mask=tgt_mask) | |
| return self.fc_out(outs.permute(1,0,2)) | |
| def translate(model, sentence, en_vocab, te_vocab, te_inv_vocab, max_len=MAX_LEN): | |
| model.eval() | |
| tokens = ['<sos>'] + tokenize_en(sentence) + ['<eos>'] | |
| src_ids = torch.tensor([[en_vocab.get(t, en_vocab['<unk>']) for t in tokens]]).to(DEVICE) | |
| tgt_ids = torch.tensor([[te_vocab['<sos>']]]).to(DEVICE) | |
| for i in range(max_len): | |
| out = model(src_ids, tgt_ids) | |
| next_token = out.argmax(-1)[:, -1].item() | |
| tgt_ids = torch.cat([tgt_ids, torch.tensor([[next_token]]).to(DEVICE)], dim=1) | |
| if next_token == te_vocab['<eos>']: | |
| break | |
| translated = [te_inv_vocab[idx.item()] for idx in tgt_ids[0][1:]] | |
| return ' '.join(translated[:-1]) if translated[-1] == '<eos>' else ' '.join(translated) | |
| # ==== Load Data ==== | |
| df_telugu = pd.read_csv("merged_translated_responses.csv") # columns: 'en', 'te' | |
| # Clean NaN or non-string entries | |
| df_telugu = df_telugu.dropna(subset=['response', 'translated_response']) | |
| # Ensure all entries are strings | |
| df_telugu['response'] = df_telugu['response'].astype(str) | |
| df_telugu['translated_response'] = df_telugu['translated_response'].astype(str) | |
| # Build vocabularies | |
| en_vocab = build_vocab(df_telugu['response'], tokenize_en, MIN_FREQ) | |
| te_vocab = build_vocab(df_telugu['translated_response'], tokenize_te, MIN_FREQ) | |
| te_inv_vocab = {idx: tok for tok, idx in te_vocab.items()} | |
| # Prepare Dataset & DataLoader | |
| dataset = TranslationDataset(df_telugu, en_vocab, te_vocab) | |
| dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) | |
| # Initialize Model | |
| # model = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE, | |
| # len(en_vocab), len(te_vocab), NHEAD, FFN_HID_DIM).to(DEVICE) | |
| pad_idx = te_vocab['<pad>'] | |
| criterion_telugu = nn.CrossEntropyLoss(ignore_index=pad_idx) | |
| optimizer_telugu = optim.Adam(model.parameters(), lr=0.0005) | |
| # ==== Training ==== | |
| # for epoch in range(NUM_EPOCHS): | |
| # loss = train(model, dataloader, optimizer, criterion) | |
| # print(f"Epoch {epoch+1}, Loss: {loss:.4f}") | |
| # ==== Try Translation ==== | |
| model_telugu = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,len(en_vocab), len(te_vocab), NHEAD, FFN_HID_DIM).to(DEVICE) | |
| # Load saved weights | |
| model_telugu.load_state_dict(torch.load("english_telugu_transformer.pth",map_location = torch.device('cpu'))) | |
| model_telugu.eval() | |
| app=Flask(__name__) | |
| CORS(app) | |
| def home(): | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| return jsonify({"message": f"Welcome to TRAVIS API, Time : {current_time}"}) | |
| def intents(): | |
| return jsonify({"intents" :list(set(df['intent'].dropna()))}) | |
| def translate_text(): | |
| data = request.get_json() | |
| text = data.get("text", "") | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print("Entered '/translate' at time: ",current_time) | |
| if not text: | |
| return jsonify({"error": "Text cannot be empty"}), 400 | |
| # First generate English response | |
| english_response = text | |
| start=time.time() | |
| # Then translate to Telugu | |
| telugu_response = translate(model_telugu, english_response, en_vocab, te_vocab, te_inv_vocab) | |
| end=time.time() | |
| return jsonify({ | |
| "english": english_response, | |
| "telugu": telugu_response, | |
| "time": end-start | |
| }) | |
| def generate_text(): | |
| data = request.get_json() | |
| query = data.get("query", "") | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print("Entered '/generate' at time: ",current_time) | |
| if not query: | |
| return jsonify({"error": "Query cannot be empty"}), 400 | |
| start=time.time() | |
| response = generate_response(model, query) | |
| end=time.time() | |
| # Clean the response | |
| def clean_response(response): | |
| return response.replace("<EOS>", "").replace("<SOS>", "").strip() | |
| response = clean_response(response) | |
| return jsonify({ | |
| "response": response, | |
| "time": end-start | |
| }) | |
| def query_model(): | |
| global audio_telugu_response | |
| data = request.get_json() | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print("Entered '/query' at time: ",current_time) | |
| query = data.get("query", "") | |
| if not query: | |
| return jsonify({"error": "Query cannot be empty"}), 400 | |
| start_eng = time.time() | |
| # Assuming `generate_response` is a function that processes the query | |
| response = generate_response(model, query) | |
| end_eng = time.time() | |
| def clean_response(response): | |
| return response.replace("<EOS>", "").replace("<SOS>", "").strip() | |
| response=clean_response(response) | |
| start_te = time.time() | |
| telugu_response = translate(model_telugu, response, en_vocab, te_vocab, te_inv_vocab) | |
| end_te = time.time() | |
| audio_telugu_response=telugu_response | |
| return jsonify({"telugu":(telugu_response),"english":(response),"eng_time":(end_eng-start_eng),"telugu_time":(end_te-start_te)}) | |
| def get_audio(): | |
| data = request.get_json() | |
| text = data.get("text") | |
| start_te = time.time() | |
| if not text: | |
| return jsonify({"error": "No Response To convert to speech"}), 400 | |
| # Convert text to Telugu speech using in-memory file | |
| speech = gTTS(text=text, lang="te") | |
| audio_io = io.BytesIO() | |
| speech.write_to_fp(audio_io) | |
| audio_io.seek(0) | |
| end_te = time.time() | |
| print("telugu_time: ",(end_te-start_te)) | |
| return send_file(audio_io, mimetype="audio/mpeg", as_attachment=False) |