Spaces:
Sleeping
Sleeping
| import textwrap | |
| import torch | |
| from datetime import datetime | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import spacy | |
| import random | |
| import pandas as pd | |
| from torch.utils.data import Dataset, DataLoader | |
| from torch.nn.utils.rnn import pad_sequence | |
| from sklearn.model_selection import train_test_split | |
| from flask import Flask, request, jsonify, send_file, after_this_request, Response, stream_with_context | |
| from collections import Counter | |
| from flask_cors import CORS | |
| import requests | |
| from gtts import gTTS | |
| import uuid | |
| import os | |
| import time | |
| import json | |
| import io | |
| # Set PyTorch to use all available CPU threads | |
| torch.set_num_threads(os.cpu_count()) | |
| # torch.set_num_interop_threads(os.cpu_count()) | |
| # Enable PyTorch JIT for better performance | |
| torch.jit.enable_onednn_fusion(True) | |
| # Load Dataset | |
| try: | |
| df = pd.read_csv("https://drive.google.com/uc?id=1RCZShB5ohy1HdU-mogcP16TbeVv9txpY") | |
| df = df.dropna(subset=['instruction', 'response']) | |
| # Ensure all entries are strings | |
| df['instruction'] = df['instruction'].astype(str) | |
| df['response'] = df['response'].astype(str) | |
| print("Main dataset loaded successfully") | |
| except Exception as e: | |
| print(f"Error loading main dataset: {e}") | |
| # Create a dummy dataset for testing | |
| df = pd.DataFrame({ | |
| 'instruction': ['Hello', 'How are you?'], | |
| 'response': ['Hi there!', 'I am doing well, thank you!'], | |
| 'intent': ['greeting', 'greeting'] | |
| }) | |
| # Tokenizer (Scratch) | |
| class ScratchTokenizer: | |
| def __init__(self): | |
| self.word2idx = {"<PAD>": 0, "< SOS >": 1, "<EOS>": 2, "<UNK>": 3} | |
| self.idx2word = {0: "<PAD>", 1: "< SOS >", 2: "<EOS>", 3: "<UNK>"} | |
| self.vocab_size = 4 | |
| def build_vocab(self, texts): | |
| for text in texts: | |
| for word in text.split(): | |
| if word not in self.word2idx: | |
| self.word2idx[word] = self.vocab_size | |
| self.idx2word[self.vocab_size] = word | |
| self.vocab_size += 1 | |
| def encode(self, text, max_len=200): | |
| tokens = [self.word2idx.get(word, 3) for word in text.split()] | |
| tokens = [1] + tokens[:max_len - 2] + [2] | |
| return tokens + [0] * (max_len - len(tokens)) | |
| def decode(self, tokens): | |
| return " ".join([self.idx2word.get(idx, "<UNK>") for idx in tokens if idx > 0]) | |
| # Train-Test Split | |
| train_data, test_data = train_test_split(df, test_size=0.2, random_state=42) | |
| # Initialize Tokenizer | |
| tokenizer = ScratchTokenizer() | |
| tokenizer.build_vocab(train_data["instruction"].tolist() + train_data["response"].tolist()) | |
| # Dataset Class | |
| class TextDataset(Dataset): | |
| def __init__(self, data, tokenizer, max_len=200): | |
| self.data = data | |
| self.tokenizer = tokenizer | |
| self.max_len = max_len | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| src_text = self.data.iloc[idx]["instruction"] | |
| tgt_text = self.data.iloc[idx]["response"] | |
| src = torch.tensor(self.tokenizer.encode(src_text), dtype=torch.long) | |
| tgt = torch.tensor(self.tokenizer.encode(tgt_text), dtype=torch.long) | |
| return src, tgt | |
| # Load Dataset | |
| train_dataset = TextDataset(train_data, tokenizer) | |
| test_dataset = TextDataset(test_data, tokenizer) | |
| train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) | |
| test_loader = DataLoader(test_dataset, batch_size=8) | |
| # Function to detect model architecture from saved file | |
| def detect_model_architecture(model_path): | |
| try: | |
| checkpoint = torch.load(model_path, map_location='cpu') | |
| # Check the feedforward dimension from the transformer layers | |
| for key in checkpoint.keys(): | |
| if 'transformer.layers.0.linear1.weight' in key: | |
| feedforward_dim = checkpoint[key].shape[0] # Output dimension of first linear layer | |
| embed_size = checkpoint[key].shape[1] # Input dimension (embed_size) | |
| return embed_size, feedforward_dim | |
| return 256, 1024 # Default values | |
| except Exception as e: | |
| print(f"Could not detect architecture: {e}") | |
| return 256, 1024 | |
| # Improved GPT-Style Transformer Model with configurable architecture | |
| class GPTModel(nn.Module): | |
| def __init__(self, vocab_size, embed_size=256, num_heads=8, num_layers=6, max_len=200, feedforward_dim=None): | |
| super(GPTModel, self).__init__() | |
| if feedforward_dim is None: | |
| feedforward_dim = embed_size * 4 | |
| self.embedding = nn.Embedding(vocab_size, embed_size) | |
| self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embed_size)) | |
| self.transformer = nn.TransformerDecoder( | |
| nn.TransformerDecoderLayer( | |
| d_model=embed_size, | |
| nhead=num_heads, | |
| dim_feedforward=feedforward_dim, # Use detected or provided feedforward dimension | |
| dropout=0.1, | |
| batch_first=True # Enable batch first for better performance | |
| ), | |
| num_layers=num_layers | |
| ) | |
| self.fc_out = nn.Linear(embed_size, vocab_size) | |
| # Initialize weights for better training | |
| self.apply(self._init_weights) | |
| def _init_weights(self, module): | |
| if isinstance(module, nn.Linear): | |
| torch.nn.init.xavier_uniform_(module.weight) | |
| if module.bias is not None: | |
| module.bias.data.zero_() | |
| elif isinstance(module, nn.Embedding): | |
| torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) | |
| def forward(self, src, tgt): | |
| src_emb = self.embedding(src) + self.pos_embedding[:, :src.size(1), :] | |
| tgt_emb = self.embedding(tgt) + self.pos_embedding[:, :tgt.size(1), :] | |
| tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device) | |
| output = self.transformer(tgt_emb, src_emb, tgt_mask=tgt_mask) | |
| return self.fc_out(output) | |
| # Initialize Model with proper architecture detection | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| # Detect architecture from saved model | |
| model_path = "gpt_model.pth" | |
| if os.path.exists(model_path): | |
| embed_size, feedforward_dim = detect_model_architecture(model_path) | |
| print(f"Detected model architecture: embed_size={embed_size}, feedforward_dim={feedforward_dim}") | |
| model = GPTModel(tokenizer.vocab_size, embed_size=embed_size, feedforward_dim=feedforward_dim).to(device) | |
| else: | |
| model = GPTModel(tokenizer.vocab_size).to(device) | |
| optimizer = optim.AdamW(model.parameters(), lr=2e-4, weight_decay=0.01) # Added weight decay | |
| criterion = nn.CrossEntropyLoss(label_smoothing=0.1) | |
| def load_model(model, path="gpt_model.pth"): | |
| if os.path.exists(path): | |
| try: | |
| checkpoint = torch.load(path, map_location=device) | |
| model.load_state_dict(checkpoint) | |
| model.eval() | |
| print("GPT Model loaded successfully.") | |
| return True | |
| except Exception as e: | |
| print(f"Error loading GPT model: {e}") | |
| return False | |
| else: | |
| print("GPT Model file not found!") | |
| return False | |
| load_model(model) | |
| # Translation model parameters | |
| DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| MAX_LEN = 350 | |
| BATCH_SIZE = 8 | |
| EMB_SIZE = 128 | |
| NHEAD = 8 | |
| FFN_HID_DIM = 256 | |
| NUM_ENCODER_LAYERS = 4 | |
| NUM_DECODER_LAYERS = 4 | |
| NUM_EPOCHS = 18 | |
| MIN_FREQ = 2 | |
| # ==== Tokenizers ==== | |
| try: | |
| spacy_eng = spacy.load("en_core_web_sm") | |
| print("Spacy English model loaded successfully") | |
| except OSError: | |
| print("Warning: Spacy English model not found. Using simple tokenizer.") | |
| spacy_eng = None | |
| def tokenize_en(text): | |
| if spacy_eng: | |
| return [tok.text.lower() for tok in spacy_eng.tokenizer(text)] | |
| else: | |
| # Simple fallback tokenizer | |
| return text.lower().split() | |
| def tokenize_te(text): | |
| return text.strip().split(" ") | |
| # ==== Vocab Builder ==== | |
| def build_vocab(sentences, tokenizer, min_freq): | |
| counter = Counter() | |
| for sent in sentences: | |
| counter.update(tokenizer(sent)) | |
| vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3} | |
| for word, freq in counter.items(): | |
| if freq >= min_freq: | |
| vocab[word] = len(vocab) | |
| return vocab | |
| # ==== Dataset ==== | |
| class TranslationDataset(Dataset): | |
| def __init__(self, df, en_vocab, te_vocab): | |
| self.data = df | |
| self.en_vocab = en_vocab | |
| self.te_vocab = te_vocab | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| en = self.data.iloc[idx]['response'] | |
| te = self.data.iloc[idx]['translated_response'] | |
| en_tokens = ['<sos>'] + tokenize_en(en) + ['<eos>'] | |
| te_tokens = ['<sos>'] + tokenize_te(te) + ['<eos>'] | |
| en_ids = [self.en_vocab.get(tok, self.en_vocab['<unk>']) for tok in en_tokens] | |
| te_ids = [self.te_vocab.get(tok, self.te_vocab['<unk>']) for tok in te_tokens] | |
| return torch.tensor(en_ids), torch.tensor(te_ids) | |
| # ==== Transformer Model ==== | |
| class Seq2SeqTransformer(nn.Module): | |
| def __init__(self, num_encoder_layers, num_decoder_layers, | |
| emb_size, src_vocab_size, tgt_vocab_size, | |
| nhead, dim_feedforward=512, dropout=0.1): | |
| super().__init__() | |
| self.transformer = nn.Transformer(d_model=emb_size, nhead=nhead, | |
| num_encoder_layers=num_encoder_layers, | |
| num_decoder_layers=num_decoder_layers, | |
| dim_feedforward=dim_feedforward, dropout=dropout) | |
| self.src_tok_emb = nn.Embedding(src_vocab_size, emb_size) | |
| self.tgt_tok_emb = nn.Embedding(tgt_vocab_size, emb_size) | |
| self.fc_out = nn.Linear(emb_size, tgt_vocab_size) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, src, tgt): | |
| src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(DEVICE) | |
| tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(DEVICE) | |
| src_emb = self.dropout(self.src_tok_emb(src)) | |
| tgt_emb = self.dropout(self.tgt_tok_emb(tgt)) | |
| outs = self.transformer(src_emb.permute(1,0,2), tgt_emb.permute(1,0,2), | |
| src_mask=src_mask, tgt_mask=tgt_mask) | |
| return self.fc_out(outs.permute(1,0,2)) | |
| def translate(model, sentence, en_vocab, te_vocab, te_inv_vocab, max_len=MAX_LEN): | |
| model.eval() | |
| tokens = ['<sos>'] + tokenize_en(sentence) + ['<eos>'] | |
| src_ids = torch.tensor([[en_vocab.get(t, en_vocab['<unk>']) for t in tokens]]).to(DEVICE) | |
| tgt_ids = torch.tensor([[te_vocab['<sos>']]]).to(DEVICE) | |
| for i in range(max_len): | |
| out = model(src_ids, tgt_ids) | |
| next_token = out.argmax(-1)[:, -1].item() | |
| tgt_ids = torch.cat([tgt_ids, torch.tensor([[next_token]]).to(DEVICE)], dim=1) | |
| if next_token == te_vocab['<eos>']: | |
| break | |
| translated = [te_inv_vocab[idx.item()] for idx in tgt_ids[0][1:]] | |
| return ' '.join(translated[:-1]) if translated[-1] == '<eos>' else ' '.join(translated) | |
| # Initialize vocabularies from model checkpoint | |
| translation_available = False | |
| telugu_model_loaded = False | |
| en_vocab = None | |
| te_vocab = None | |
| te_inv_vocab = None | |
| model_telugu = None | |
| # Load translation model and extract vocabularies | |
| model_path = "english_telugu_transformer.pth" | |
| if os.path.exists(model_path): | |
| try: | |
| print("Loading Telugu translation model...") | |
| checkpoint = torch.load(model_path, map_location='cpu') | |
| # Extract vocabulary sizes from the saved model | |
| if 'src_tok_emb.weight' in checkpoint: | |
| saved_en_vocab_size = checkpoint['src_tok_emb.weight'].shape[0] | |
| saved_te_vocab_size = checkpoint['tgt_tok_emb.weight'].shape[0] | |
| print(f"Saved model vocabs - EN: {saved_en_vocab_size}, TE: {saved_te_vocab_size}") | |
| # Create model with correct vocabulary sizes | |
| model_telugu = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE, | |
| saved_en_vocab_size, saved_te_vocab_size, NHEAD, FFN_HID_DIM).to(DEVICE) | |
| model_telugu.load_state_dict(checkpoint) | |
| model_telugu.eval() | |
| # Try to load translation data to build vocabularies | |
| try: | |
| df_telugu = pd.read_csv("merged_translated_responses.csv") | |
| df_telugu = df_telugu.dropna(subset=['response', 'translated_response']) | |
| df_telugu['response'] = df_telugu['response'].astype(str) | |
| df_telugu['translated_response'] = df_telugu['translated_response'].astype(str) | |
| print("Building vocabularies from data...") | |
| en_vocab = build_vocab(df_telugu['response'], tokenize_en, MIN_FREQ) | |
| te_vocab = build_vocab(df_telugu['translated_response'], tokenize_te, MIN_FREQ) | |
| te_inv_vocab = {idx: tok for tok, idx in te_vocab.items()} | |
| # Check if vocabulary sizes match | |
| if len(en_vocab) == saved_en_vocab_size and len(te_vocab) == saved_te_vocab_size: | |
| translation_available = True | |
| telugu_model_loaded = True | |
| print(f"Telugu translation model loaded successfully") | |
| print(f"English vocab size: {len(en_vocab)}, Telugu vocab size: {len(te_vocab)}") | |
| else: | |
| print(f"Vocabulary size mismatch - Data EN: {len(en_vocab)}, TE: {len(te_vocab)}") | |
| print("Creating placeholder vocabularies...") | |
| # Create vocabularies with correct sizes | |
| en_vocab = {f'word_{i}': i for i in range(saved_en_vocab_size)} | |
| te_vocab = {f'word_{i}': i for i in range(saved_te_vocab_size)} | |
| te_inv_vocab = {idx: tok for tok, idx in te_vocab.items()} | |
| translation_available = True | |
| telugu_model_loaded = True | |
| except Exception as e: | |
| print(f"Error loading Telugu dataset: {e}") | |
| print("Creating placeholder vocabularies...") | |
| # Create placeholder vocabularies with correct sizes | |
| en_vocab = {f'word_{i}': i for i in range(saved_en_vocab_size)} | |
| te_vocab = {f'word_{i}': i for i in range(saved_te_vocab_size)} | |
| te_inv_vocab = {idx: tok for tok, idx in te_vocab.items()} | |
| translation_available = True | |
| telugu_model_loaded = True | |
| except Exception as e: | |
| print(f"Error loading Telugu translation model: {e}") | |
| translation_available = False | |
| telugu_model_loaded = False | |
| else: | |
| print("Telugu model file not found!") | |
| # Flask App | |
| app = Flask(__name__) | |
| CORS(app) | |
| def home(): | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| return jsonify({"message": f"Welcome to TRAVIS API, Time : {current_time}"}) | |
| def intents(): | |
| return jsonify({"intents" :list(set(df['intent'].dropna()))}) | |
| def translate_text(): | |
| if not translation_available or not telugu_model_loaded: | |
| return jsonify({"error": "Translation service not available"}), 503 | |
| data = request.get_json() | |
| text = data.get("text", "") | |
| if not text: | |
| return jsonify({"error": "Text cannot be empty"}), 400 | |
| def generate(): | |
| try: | |
| start = time.time() | |
| word_count = 0 | |
| # Translate to Telugu word by word | |
| telugu_response = translate(model_telugu, text, en_vocab, te_vocab, te_inv_vocab) | |
| # Stream each word of the translation | |
| for word in telugu_response.split(): | |
| word_count += 1 | |
| response_data = { | |
| "word": word.strip(), | |
| "timestamp": time.time() - start, | |
| "word_count": word_count, | |
| "type": "telugu" | |
| } | |
| yield f"data: {json.dumps(response_data)}\n\n" | |
| except Exception as e: | |
| error_data = {"error": str(e), "type": "error"} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| return Response( | |
| stream_with_context(generate()), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive' | |
| } | |
| ) | |
| def generate_text(): | |
| data = request.get_json() | |
| query = data.get("query", "") | |
| print("entered /generate") | |
| if not query: | |
| return jsonify({"error": "Query cannot be empty"}), 400 | |
| def generate(): | |
| try: | |
| start = time.time() | |
| word_count = 0 | |
| model.eval() | |
| with torch.no_grad(): | |
| src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device) | |
| tgt = torch.tensor([[1]]).to(device) # < SOS > | |
| for _ in range(200): # max_length | |
| output = model(src, tgt) | |
| next_token = output[:, -1, :].argmax(dim=-1, keepdim=True) | |
| tgt = torch.cat([tgt, next_token], dim=1) | |
| if next_token.item() == 2: # <EOS> | |
| break | |
| word = tokenizer.idx2word.get(next_token.item(), "<UNK>") | |
| if word not in ["<PAD>", "<EOS>", "< SOS >"]: | |
| word_count += 1 | |
| response_data = { | |
| "word": word.strip(), | |
| "timestamp": time.time() - start, | |
| "word_count": word_count, | |
| "type": "english" | |
| } | |
| yield f"data: {json.dumps(response_data)}\n\n" | |
| except Exception as e: | |
| error_data = {"error": str(e), "type": "error"} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| return Response( | |
| stream_with_context(generate()), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive' | |
| } | |
| ) | |
| def query_model(): | |
| data = request.get_json() | |
| query = data.get("query", "") | |
| if not query: | |
| return jsonify({"error": "Query cannot be empty"}), 400 | |
| def generate(): | |
| try: | |
| start = time.time() | |
| word_count = 0 | |
| model.eval() | |
| with torch.no_grad(): | |
| # Generate English response | |
| src = torch.tensor(tokenizer.encode(query)).unsqueeze(0).to(device) | |
| tgt = torch.tensor([[1]]).to(device) # < SOS > | |
| english_words = [] | |
| for _ in range(200): # max_length | |
| output = model(src, tgt) | |
| next_token = output[:, -1, :].argmax(dim=-1, keepdim=True) | |
| tgt = torch.cat([tgt, next_token], dim=1) | |
| if next_token.item() == 2: # <EOS> | |
| break | |
| word = tokenizer.idx2word.get(next_token.item(), "<UNK>") | |
| if word not in ["<PAD>", "<EOS>", "< SOS >"]: | |
| english_words.append(word.strip()) | |
| word_count += 1 | |
| response_data = { | |
| "word": word.strip(), | |
| "timestamp": time.time() - start, | |
| "word_count": word_count, | |
| "type": "english" | |
| } | |
| yield f"data: {json.dumps(response_data)}\n\n" | |
| # Translate to Telugu if available | |
| if translation_available and telugu_model_loaded: | |
| english_response = " ".join(english_words) | |
| telugu_response = translate(model_telugu, english_response, en_vocab, te_vocab, te_inv_vocab) | |
| for word in telugu_response.split(): | |
| word_count += 1 | |
| response_data = { | |
| "word": word.strip(), | |
| "timestamp": time.time() - start, | |
| "word_count": word_count, | |
| "type": "telugu" | |
| } | |
| yield f"data: {json.dumps(response_data)}\n\n" | |
| except Exception as e: | |
| error_data = {"error": str(e), "type": "error"} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| return Response( | |
| stream_with_context(generate()), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive' | |
| } | |
| ) | |
| def get_audio(): | |
| data = request.get_json() | |
| text = data.get("text") | |
| if not text: | |
| return jsonify({"error": "No Response To convert to speech"}), 400 | |
| try: | |
| start_te = time.time() | |
| # Convert text to Telugu speech using in-memory file | |
| speech = gTTS(text=text, lang="te") | |
| audio_io = io.BytesIO() | |
| speech.write_to_fp(audio_io) | |
| audio_io.seek(0) | |
| end_te = time.time() | |
| print("telugu_time: ", (end_te - start_te)) | |
| return send_file(audio_io, mimetype="audio/mpeg", as_attachment=False) | |
| except Exception as e: | |
| return jsonify({"error": f"Audio generation failed: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| print("Starting Flask application...") | |
| print(f"Translation service available: {translation_available}") | |
| print(f"Telugu model loaded: {telugu_model_loaded}") | |
| app.run(host="0.0.0.0",port=7860, debug=True) |