Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import time | |
| import random | |
| import numpy as np | |
| import math | |
| import shutil | |
| # import base64 # Not directly needed for Gradio filepath output | |
| # Torch and Audio | |
| import torch | |
| import torch.nn as nn | |
| # import torch.optim as optim # Not needed for inference | |
| # from torch.utils.data import Dataset, DataLoader # Not needed for inference | |
| import torch.nn.functional as F | |
| import torchaudio | |
| import librosa | |
| # import librosa.display # Not used in pipeline | |
| # Text and Audio Processing | |
| from unidecode import unidecode | |
| # from inflect import engine # Not explicitly used in pipeline, consider removing | |
| # import pydub # Not explicitly used in pipeline, consider removing | |
| import soundfile as sf | |
| # Transformers | |
| from transformers import ( | |
| WhisperProcessor, WhisperForConditionalGeneration, | |
| MarianTokenizer, MarianMTModel, | |
| ) | |
| from huggingface_hub import hf_hub_download | |
| # Gradio and Hugging Face Spaces | |
| import gradio as gr | |
| import spaces # <<< --- ADD THIS IMPORT --- <<< | |
| # --- Global Configuration & Device Setup --- | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"--- Initializing on device: {DEVICE} ---") # This will run when the Space builds/starts | |
| # --- Part 1: TTS Model Components (Your Custom TTS) --- | |
| # ... (Keep all your Hyperparams, text_to_seq, audio processing for TTS, and Model class definitions: | |
| # EncoderBlock, DecoderBlock, EncoderPreNet, PostNet, DecoderPreNet, TransformerTTS) | |
| # ... (Ensure TransformerTTS and its sub-modules are correctly defined as in your previous code) | |
| # --- (Start of your model definitions - make sure this is complete from your previous code) --- | |
| class Hyperparams: | |
| seed = 42 | |
| # We won't use these dataset paths, but keep them for hp object integrity | |
| csv_path = "path/to/metadata.csv" | |
| wav_path = "path/to/wavs" | |
| symbols = [ | |
| 'EOS', ' ', '!', ',', '-', '.', ';', '?', 'a', 'b', 'c', 'd', 'e', 'f', | |
| 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', | |
| 't', 'u', 'v', 'w', 'x', 'y', 'z', 'à', 'â', 'è', 'é', 'ê', 'ü', | |
| '’', '“', '”' | |
| ] | |
| sr = 22050 | |
| n_fft = 2048 | |
| n_stft = int((n_fft//2) + 1) | |
| hop_length = int(n_fft/8.0) | |
| win_length = int(n_fft/2.0) | |
| mel_freq = 128 | |
| max_mel_time = 1024 | |
| power = 2.0 | |
| text_num_embeddings = 2*len(symbols) | |
| embedding_size = 256 | |
| encoder_embedding_size = 512 | |
| dim_feedforward = 1024 | |
| postnet_embedding_size = 1024 | |
| encoder_kernel_size = 3 | |
| postnet_kernel_size = 5 | |
| ampl_multiplier = 10.0 | |
| ampl_amin = 1e-10 | |
| db_multiplier = 1.0 | |
| ampl_ref = 1.0 | |
| ampl_power = 1.0 | |
| max_db = 100 | |
| scale_db = 10 | |
| hp = Hyperparams() | |
| # Text to Sequence | |
| symbol_to_id = {s: i for i, s in enumerate(hp.symbols)} | |
| def text_to_seq(text): | |
| text = text.lower() | |
| seq = [] | |
| for s in text: | |
| _id = symbol_to_id.get(s, None) | |
| if _id is not None: | |
| seq.append(_id) | |
| seq.append(symbol_to_id["EOS"]) | |
| return torch.IntTensor(seq) | |
| # Audio Processing | |
| spec_transform = torchaudio.transforms.Spectrogram(n_fft=hp.n_fft, win_length=hp.win_length, hop_length=hp.hop_length, power=hp.power) | |
| mel_scale_transform = torchaudio.transforms.MelScale(n_mels=hp.mel_freq, sample_rate=hp.sr, n_stft=hp.n_stft) | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| mel_inverse_transform = torchaudio.transforms.InverseMelScale(n_mels=hp.mel_freq, sample_rate=hp.sr, n_stft=hp.n_stft).to(DEVICE) | |
| griffnlim_transform = torchaudio.transforms.GriffinLim(n_fft=hp.n_fft, win_length=hp.win_length, hop_length=hp.hop_length).to(DEVICE) | |
| def pow_to_db_mel_spec(mel_spec): | |
| mel_spec = torchaudio.functional.amplitude_to_DB(mel_spec, multiplier=hp.ampl_multiplier, amin=hp.ampl_amin, db_multiplier=hp.db_multiplier, top_db=hp.max_db) | |
| mel_spec = mel_spec/hp.scale_db | |
| return mel_spec | |
| def db_to_power_mel_spec(mel_spec): | |
| mel_spec = mel_spec*hp.scale_db | |
| mel_spec = torchaudio.functional.DB_to_amplitude(mel_spec, ref=hp.ampl_ref, power=hp.ampl_power) | |
| return mel_spec | |
| def inverse_mel_spec_to_wav(mel_spec): | |
| power_mel_spec = db_to_power_mel_spec(mel_spec.to(DEVICE)) | |
| spectrogram = mel_inverse_transform(power_mel_spec) | |
| pseudo_wav = griffnlim_transform(spectrogram) | |
| return pseudo_wav | |
| def mask_from_seq_lengths(sequence_lengths: torch.Tensor, max_length: int) -> torch.BoolTensor: | |
| ones = sequence_lengths.new_ones(sequence_lengths.size(0), max_length) | |
| range_tensor = ones.cumsum(dim=1) | |
| return sequence_lengths.unsqueeze(1) >= range_tensor | |
| # --- TransformerTTS Model Architecture (Copied from notebook) | |
| class EncoderBlock(nn.Module): | |
| def __init__(self): | |
| super(EncoderBlock, self).__init__() | |
| self.norm_1 = nn.LayerNorm(normalized_shape=hp.embedding_size) | |
| self.attn = torch.nn.MultiheadAttention(embed_dim=hp.embedding_size, num_heads=4, dropout=0.1, batch_first=True) | |
| self.dropout_1 = torch.nn.Dropout(0.1) | |
| self.norm_2 = nn.LayerNorm(normalized_shape=hp.embedding_size) | |
| self.linear_1 = nn.Linear(hp.embedding_size, hp.dim_feedforward) | |
| self.dropout_2 = torch.nn.Dropout(0.1) | |
| self.linear_2 = nn.Linear(hp.dim_feedforward, hp.embedding_size) | |
| self.dropout_3 = torch.nn.Dropout(0.1) | |
| def forward(self, x, attn_mask=None, key_padding_mask=None): | |
| x_out = self.norm_1(x) | |
| x_out, _ = self.attn(query=x_out, key=x_out, value=x_out, attn_mask=attn_mask, key_padding_mask=key_padding_mask) | |
| x_out = self.dropout_1(x_out) | |
| x = x + x_out | |
| x_out = self.norm_2(x) | |
| x_out = self.linear_1(x_out) | |
| x_out = F.relu(x_out) | |
| x_out = self.dropout_2(x_out) | |
| x_out = self.linear_2(x_out) | |
| x_out = self.dropout_3(x_out) | |
| x = x + x_out | |
| return x | |
| class DecoderBlock(nn.Module): | |
| def __init__(self): | |
| super(DecoderBlock, self).__init__() | |
| self.norm_1 = nn.LayerNorm(normalized_shape=hp.embedding_size) | |
| self.self_attn = torch.nn.MultiheadAttention(embed_dim=hp.embedding_size, num_heads=4, dropout=0.1, batch_first=True) | |
| self.dropout_1 = torch.nn.Dropout(0.1) | |
| self.norm_2 = nn.LayerNorm(normalized_shape=hp.embedding_size) | |
| self.attn = torch.nn.MultiheadAttention(embed_dim=hp.embedding_size, num_heads=4, dropout=0.1, batch_first=True) | |
| self.dropout_2 = torch.nn.Dropout(0.1) | |
| self.norm_3 = nn.LayerNorm(normalized_shape=hp.embedding_size) | |
| self.linear_1 = nn.Linear(hp.embedding_size, hp.dim_feedforward) | |
| self.dropout_3 = torch.nn.Dropout(0.1) | |
| self.linear_2 = nn.Linear(hp.dim_feedforward, hp.embedding_size) | |
| self.dropout_4 = torch.nn.Dropout(0.1) | |
| def forward(self, x, memory, x_attn_mask=None, x_key_padding_mask=None, memory_attn_mask=None, memory_key_padding_mask=None): | |
| x_out, _ = self.self_attn(query=x, key=x, value=x, attn_mask=x_attn_mask, key_padding_mask=x_key_padding_mask) | |
| x_out = self.dropout_1(x_out) | |
| x = self.norm_1(x + x_out) | |
| x_out, _ = self.attn(query=x, key=memory, value=memory, attn_mask=memory_attn_mask, key_padding_mask=memory_key_padding_mask) | |
| x_out = self.dropout_2(x_out) | |
| x = self.norm_2(x + x_out) | |
| x_out = self.linear_1(x) | |
| x_out = F.relu(x_out) | |
| x_out = self.dropout_3(x_out) | |
| x_out = self.linear_2(x_out) | |
| x_out = self.dropout_4(x_out) | |
| x = self.norm_3(x + x_out) | |
| return x | |
| class EncoderPreNet(nn.Module): | |
| def __init__(self): | |
| super(EncoderPreNet, self).__init__() | |
| self.embedding = nn.Embedding(num_embeddings=hp.text_num_embeddings, embedding_dim=hp.encoder_embedding_size) | |
| self.linear_1 = nn.Linear(hp.encoder_embedding_size, hp.encoder_embedding_size) | |
| self.linear_2 = nn.Linear(hp.encoder_embedding_size, hp.embedding_size) | |
| self.conv_1 = nn.Conv1d(hp.encoder_embedding_size, hp.encoder_embedding_size, kernel_size=hp.encoder_kernel_size, stride=1, padding=int((hp.encoder_kernel_size - 1) / 2), dilation=1) | |
| self.bn_1 = nn.BatchNorm1d(hp.encoder_embedding_size) | |
| self.dropout_1 = torch.nn.Dropout(0.5) | |
| self.conv_2 = nn.Conv1d(hp.encoder_embedding_size, hp.encoder_embedding_size, kernel_size=hp.encoder_kernel_size, stride=1, padding=int((hp.encoder_kernel_size - 1) / 2), dilation=1) | |
| self.bn_2 = nn.BatchNorm1d(hp.encoder_embedding_size) | |
| self.dropout_2 = torch.nn.Dropout(0.5) | |
| self.conv_3 = nn.Conv1d(hp.encoder_embedding_size, hp.encoder_embedding_size, kernel_size=hp.encoder_kernel_size, stride=1, padding=int((hp.encoder_kernel_size - 1) / 2), dilation=1) | |
| self.bn_3 = nn.BatchNorm1d(hp.encoder_embedding_size) | |
| self.dropout_3 = torch.nn.Dropout(0.5) | |
| def forward(self, text): | |
| x = self.embedding(text) | |
| x = self.linear_1(x) | |
| x = x.transpose(2, 1) | |
| x = self.conv_1(x) | |
| x = self.bn_1(x) | |
| x = F.relu(x) | |
| x = self.dropout_1(x) | |
| x = self.conv_2(x) | |
| x = self.bn_2(x) | |
| x = F.relu(x) | |
| x = self.dropout_2(x) | |
| x = self.conv_3(x) | |
| x = self.bn_3(x) | |
| x = F.relu(x) | |
| x = self.dropout_3(x) | |
| x = x.transpose(1, 2) | |
| x = self.linear_2(x) | |
| return x | |
| class PostNet(nn.Module): | |
| def __init__(self): | |
| super(PostNet, self).__init__() | |
| self.conv_1 = nn.Conv1d(hp.mel_freq, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) | |
| self.bn_1 = nn.BatchNorm1d(hp.postnet_embedding_size) | |
| self.dropout_1 = torch.nn.Dropout(0.5) | |
| self.conv_2 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) | |
| self.bn_2 = nn.BatchNorm1d(hp.postnet_embedding_size) | |
| self.dropout_2 = torch.nn.Dropout(0.5) | |
| self.conv_3 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) | |
| self.bn_3 = nn.BatchNorm1d(hp.postnet_embedding_size) | |
| self.dropout_3 = torch.nn.Dropout(0.5) | |
| self.conv_4 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) | |
| self.bn_4 = nn.BatchNorm1d(hp.postnet_embedding_size) | |
| self.dropout_4 = torch.nn.Dropout(0.5) | |
| self.conv_5 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) | |
| self.bn_5 = nn.BatchNorm1d(hp.postnet_embedding_size) | |
| self.dropout_5 = torch.nn.Dropout(0.5) | |
| self.conv_6 = nn.Conv1d(hp.postnet_embedding_size, hp.mel_freq, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) | |
| self.bn_6 = nn.BatchNorm1d(hp.mel_freq) | |
| self.dropout_6 = torch.nn.Dropout(0.5) | |
| def forward(self, x): | |
| x = x.transpose(2, 1) | |
| x = self.conv_1(x) | |
| x = self.bn_1(x); x = torch.tanh(x); x = self.dropout_1(x) | |
| x = self.conv_2(x) | |
| x = self.bn_2(x); x = torch.tanh(x); x = self.dropout_2(x) | |
| x = self.conv_3(x) | |
| x = self.bn_3(x); x = torch.tanh(x); x = self.dropout_3(x) | |
| x = self.conv_4(x) | |
| x = self.bn_4(x); x = torch.tanh(x); x = self.dropout_4(x) | |
| x = self.conv_5(x) | |
| x = self.bn_5(x); x = torch.tanh(x); x = self.dropout_5(x) | |
| x = self.conv_6(x) | |
| x = self.bn_6(x); x = self.dropout_6(x) | |
| x = x.transpose(1, 2) | |
| return x | |
| class DecoderPreNet(nn.Module): | |
| def __init__(self): | |
| super(DecoderPreNet, self).__init__() | |
| self.linear_1 = nn.Linear(hp.mel_freq, hp.embedding_size) | |
| self.linear_2 = nn.Linear(hp.embedding_size, hp.embedding_size) | |
| def forward(self, x): | |
| x = self.linear_1(x) | |
| x = F.relu(x) | |
| x = F.dropout(x, p=0.5, training=True) | |
| x = self.linear_2(x) | |
| x = F.relu(x) | |
| x = F.dropout(x, p=0.5, training=True) | |
| return x | |
| class TransformerTTS(nn.Module): | |
| def __init__(self, device=DEVICE): | |
| super(TransformerTTS, self).__init__() | |
| self.encoder_prenet = EncoderPreNet() | |
| self.decoder_prenet = DecoderPreNet() | |
| self.postnet = PostNet() | |
| self.pos_encoding = nn.Embedding(num_embeddings=hp.max_mel_time, embedding_dim=hp.embedding_size) | |
| self.encoder_block_1 = EncoderBlock() | |
| self.encoder_block_2 = EncoderBlock() | |
| self.encoder_block_3 = EncoderBlock() | |
| self.decoder_block_1 = DecoderBlock() | |
| self.decoder_block_2 = DecoderBlock() | |
| self.decoder_block_3 = DecoderBlock() | |
| self.linear_1 = nn.Linear(hp.embedding_size, hp.mel_freq) | |
| self.linear_2 = nn.Linear(hp.embedding_size, 1) | |
| self.norm_memory = nn.LayerNorm(normalized_shape=hp.embedding_size) | |
| def forward(self, text, text_len, mel, mel_len): | |
| N = text.shape[0]; S = text.shape[1]; TIME = mel.shape[1] | |
| self.src_key_padding_mask = torch.zeros((N, S), device=text.device).masked_fill(~mask_from_seq_lengths(text_len, max_length=S), float("-inf")) | |
| self.src_mask = torch.zeros((S, S), device=text.device).masked_fill(torch.triu(torch.full((S, S), True, dtype=torch.bool), diagonal=1).to(text.device), float("-inf")) | |
| self.tgt_key_padding_mask = torch.zeros((N, TIME), device=mel.device).masked_fill(~mask_from_seq_lengths(mel_len, max_length=TIME), float("-inf")) | |
| self.tgt_mask = torch.zeros((TIME, TIME), device=mel.device).masked_fill(torch.triu(torch.full((TIME, TIME), True, device=mel.device, dtype=torch.bool), diagonal=1), float("-inf")) | |
| self.memory_mask = torch.zeros((TIME, S), device=mel.device).masked_fill(torch.triu(torch.full((TIME, S), True, device=mel.device, dtype=torch.bool), diagonal=1), float("-inf")) | |
| text_x = self.encoder_prenet(text) | |
| pos_codes = self.pos_encoding(torch.arange(hp.max_mel_time).to(mel.device)) | |
| S = text_x.shape[1]; text_x = text_x + pos_codes[:S] | |
| text_x = self.encoder_block_1(text_x, attn_mask = self.src_mask, key_padding_mask = self.src_key_padding_mask) | |
| text_x = self.encoder_block_2(text_x, attn_mask = self.src_mask, key_padding_mask = self.src_key_padding_mask) | |
| text_x = self.encoder_block_3(text_x, attn_mask = self.src_mask, key_padding_mask = self.src_key_padding_mask) | |
| text_x = self.norm_memory(text_x) | |
| mel_x = self.decoder_prenet(mel); mel_x = mel_x + pos_codes[:TIME] | |
| mel_x = self.decoder_block_1(x=mel_x, memory=text_x, x_attn_mask=self.tgt_mask, x_key_padding_mask=self.tgt_key_padding_mask, memory_attn_mask=self.memory_mask, memory_key_padding_mask=self.src_key_padding_mask) | |
| mel_x = self.decoder_block_2(x=mel_x, memory=text_x, x_attn_mask=self.tgt_mask, x_key_padding_mask=self.tgt_key_padding_mask, memory_attn_mask=self.memory_mask, memory_key_padding_mask=self.src_key_padding_mask) | |
| mel_x = self.decoder_block_3(x=mel_x, memory=text_x, x_attn_mask=self.tgt_mask, x_key_padding_mask=self.tgt_key_padding_mask, memory_attn_mask=self.memory_mask, memory_key_padding_mask=self.src_key_padding_mask) | |
| mel_linear = self.linear_1(mel_x) | |
| mel_postnet = self.postnet(mel_linear) | |
| mel_postnet = mel_linear + mel_postnet | |
| stop_token = self.linear_2(mel_x) | |
| bool_mel_mask = self.tgt_key_padding_mask.ne(0).unsqueeze(-1).repeat(1, 1, hp.mel_freq) | |
| mel_linear = mel_linear.masked_fill(bool_mel_mask, 0) | |
| mel_postnet = mel_postnet.masked_fill(bool_mel_mask, 0) | |
| stop_token = stop_token.masked_fill(bool_mel_mask[:, :, 0].unsqueeze(-1), 1e3).squeeze(2) | |
| return mel_postnet, mel_linear, stop_token | |
| def inference(self, text, max_length=800, gate_threshold=1e-5, with_tqdm=True): | |
| self.eval() | |
| self.train(False) | |
| text_lengths = torch.tensor(text.shape[1]).unsqueeze(0).to(DEVICE) | |
| N = 1 | |
| SOS = torch.zeros((N, 1, hp.mel_freq), device=DEVICE) | |
| mel_padded = SOS | |
| mel_lengths = torch.tensor(1).unsqueeze(0).to(DEVICE) | |
| stop_token_outputs = torch.FloatTensor([]).to(text.device) | |
| if with_tqdm: | |
| from tqdm import tqdm | |
| iters = tqdm(range(max_length)) | |
| else: | |
| iters = range(max_length) | |
| frames_generated = 0 | |
| for i in iters: | |
| mel_postnet, mel_linear, stop_token = self( | |
| text, | |
| text_lengths, | |
| mel_padded, | |
| mel_lengths | |
| ) | |
| # Add the new frame | |
| mel_padded = torch.cat( | |
| [ | |
| mel_padded, | |
| mel_postnet[:, -1:, :] | |
| ], | |
| dim=1 | |
| ) | |
| frames_generated += 1 | |
| # Check stop condition but ensure minimum generation | |
| stop_prob = torch.sigmoid(stop_token[:, -1]) | |
| if stop_prob > gate_threshold and frames_generated > 50: # Ensure at least 50 frames | |
| print(f"TTS: Stopping at frame {frames_generated}, stop_prob: {stop_prob.item():.6f}") | |
| break | |
| else: | |
| stop_token_outputs = torch.cat([stop_token_outputs, stop_token[:, -1:]], dim=1) | |
| mel_lengths = torch.tensor(mel_padded.shape[1]).unsqueeze(0).to(DEVICE) | |
| print(f"TTS: Generated {frames_generated} frames, final mel shape: {list(mel_postnet.shape)}") | |
| return mel_postnet, stop_token_outputs | |
| # --- (End of your model definitions) --- | |
| # --- Part 2: Model Loading --- | |
| # (Same as before - ensure TTS_MODEL = TransformerTTS(device=DEVICE).to(DEVICE) is used) | |
| TTS_MODEL_HUB_ID = "MoHamdyy/transformer-tts-ljspeech" | |
| ASR_HUB_ID = "MoHamdyy/whisper-stt-model" | |
| MARIAN_HUB_ID = "MoHamdyy/marian-ar-en-translation" | |
| # Wrap model loading in a function to clearly see when it happens or to potentially delay it. | |
| # For Spaces, global loading is fine and preferred as it happens once. | |
| print("--- Starting Model Loading ---") | |
| try: | |
| print("Loading TTS model...") | |
| # Download the .pt file from its repo | |
| tts_model_path = hf_hub_download(repo_id=TTS_MODEL_HUB_ID, filename="train_SimpleTransfromerTTS.pt") | |
| state = torch.load(tts_model_path, map_location=DEVICE) | |
| TTS_MODEL = TransformerTTS().to(DEVICE) | |
| # Check for the correct key in the state dictionary | |
| if "model" in state: | |
| TTS_MODEL.load_state_dict(state["model"]) | |
| elif "state_dict" in state: | |
| TTS_MODEL.load_state_dict(state["state_dict"]) | |
| else: | |
| TTS_MODEL.load_state_dict(state) # Assume the whole file is the state_dict | |
| TTS_MODEL.eval() | |
| # Try torch.compile for additional speedup (PyTorch 2.0+) | |
| try: | |
| TTS_MODEL = torch.compile(TTS_MODEL, mode="reduce-overhead") | |
| print("TTS model compiled successfully (with torch.compile).") | |
| except Exception as compile_error: | |
| print(f"Torch compile not available: {compile_error}, using standard model.") | |
| print("TTS model loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading TTS model: {e}") | |
| TTS_MODEL = None | |
| # Load STT (Whisper) Model from Hub | |
| try: | |
| print("Loading STT (Whisper) model...") | |
| stt_processor = WhisperProcessor.from_pretrained(ASR_HUB_ID) | |
| stt_model = WhisperForConditionalGeneration.from_pretrained(ASR_HUB_ID).to(DEVICE).eval() | |
| print("STT model loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading STT model: {e}") | |
| stt_processor = None | |
| stt_model = None | |
| # Load TTT (MarianMT) Model from Hub | |
| try: | |
| print("Loading TTT (MarianMT) model...") | |
| mt_tokenizer = MarianTokenizer.from_pretrained(MARIAN_HUB_ID) | |
| mt_model = MarianMTModel.from_pretrained(MARIAN_HUB_ID).to(DEVICE).eval() | |
| print("TTT model loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading TTT model: {e}") | |
| mt_tokenizer = None | |
| mt_model = None | |
| print("--- Model Loading Complete ---") | |
| # --- Part 3: Full Pipeline Function for Gradio --- | |
| # For ZeroGPU execution context | |
| def full_speech_translation_pipeline(audio_input_path: str): | |
| print(f"--- PIPELINE START: Processing {audio_input_path} ---") | |
| if audio_input_path is None or not os.path.exists(audio_input_path): | |
| msg = "Error: Audio file not provided or not found." | |
| print(msg) | |
| # Return empty/default values | |
| return "Error: No file", "", (hp.sr, np.zeros(hp.sr, dtype=np.float32)) # 1 second of silence | |
| # STT Stage | |
| arabic_transcript = "STT Error: Processing failed." | |
| try: | |
| print("STT: Loading and resampling audio...") | |
| wav, sr = torchaudio.load(audio_input_path) | |
| if wav.size(0) > 1: wav = wav.mean(dim=0, keepdim=True) | |
| target_sr_stt = stt_processor.feature_extractor.sampling_rate | |
| if sr != target_sr_stt: wav = torchaudio.transforms.Resample(sr, target_sr_stt)(wav) | |
| audio_array_stt = wav.squeeze().cpu().numpy() | |
| print("STT: Extracting features and transcribing...") | |
| inputs = stt_processor(audio_array_stt, sampling_rate=target_sr_stt, return_tensors="pt").input_features.to(DEVICE) | |
| with torch.no_grad(): | |
| # Generate without forced_decoder_ids to avoid compatibility issues | |
| generated_ids = stt_model.generate( | |
| inputs, | |
| max_length=448, | |
| language="arabic", | |
| task="transcribe" | |
| ) | |
| # Use batch_decode for robustness | |
| arabic_transcript = stt_processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() | |
| print(f"STT Output: {arabic_transcript}") | |
| except Exception as e: | |
| print(f"STT Error: {e}") | |
| # TTT Stage | |
| english_translation = "TTT Error: Processing failed." | |
| if arabic_transcript and not arabic_transcript.startswith("STT Error"): | |
| try: | |
| print("TTT: Translating to English...") | |
| batch = mt_tokenizer(arabic_transcript, return_tensors="pt", padding=True).to(DEVICE) | |
| with torch.no_grad(): | |
| translated_ids = mt_model.generate(**batch, max_length=512) | |
| english_translation = mt_tokenizer.batch_decode(translated_ids, skip_special_tokens=True)[0].strip() | |
| print(f"TTT Output: {english_translation}") | |
| except Exception as e: | |
| print(f"TTT Error: {e}") | |
| else: | |
| english_translation = "(Skipped TTT due to STT failure)" | |
| print(english_translation) | |
| # TTS Stage | |
| synthesized_audio_np = np.zeros(hp.sr, dtype=np.float32) # Default to 1 second of silence | |
| if english_translation and not english_translation.startswith("TTT Error"): | |
| try: | |
| print("TTS: Synthesizing English speech...") | |
| sequence = text_to_seq(english_translation).unsqueeze(0).to(DEVICE) | |
| generated_mel, _ = TTS_MODEL.inference(sequence, max_length=hp.max_mel_time-50, gate_threshold=1e-4, with_tqdm=False) | |
| print(f"TTS: Generated mel shape: {generated_mel.shape if generated_mel is not None else 'None'}") | |
| if generated_mel is not None and generated_mel.numel() > 128: # Ensure minimum size | |
| mel_for_vocoder = generated_mel.detach().squeeze(0).transpose(0, 1) | |
| # Add safety check for mel dimensions | |
| if mel_for_vocoder.numel() > 0 and mel_for_vocoder.shape[0] > 10: | |
| audio_tensor = inverse_mel_spec_to_wav(mel_for_vocoder) | |
| synthesized_audio_np = audio_tensor.cpu().numpy() | |
| print(f"TTS: Synthesized audio shape: {synthesized_audio_np.shape}") | |
| # Ensure audio is not empty | |
| if synthesized_audio_np.size == 0: | |
| print("TTS: Generated audio is empty, using silence") | |
| synthesized_audio_np = np.zeros(hp.sr, dtype=np.float32) | |
| else: | |
| print("TTS: Generated mel too small, using silence") | |
| synthesized_audio_np = np.zeros(hp.sr, dtype=np.float32) | |
| else: | |
| print("TTS: Generated mel is empty or too small, using silence") | |
| synthesized_audio_np = np.zeros(hp.sr, dtype=np.float32) | |
| except Exception as e: | |
| print(f"TTS Error: {e}") | |
| synthesized_audio_np = np.zeros(hp.sr, dtype=np.float32) # Fallback to silence | |
| else: | |
| print("TTS: Skipped due to TTT failure or empty translation") | |
| synthesized_audio_np = np.zeros(hp.sr, dtype=np.float32) | |
| print(f"--- PIPELINE END ---") | |
| return arabic_transcript, english_translation, (hp.sr, synthesized_audio_np) | |
| # --- Part 4: Gradio Interface Definition --- | |
| # (Same as before) | |
| iface = gr.Interface( | |
| fn=full_speech_translation_pipeline, | |
| inputs=[ | |
| gr.Audio(type="filepath", label="Upload Arabic Speech") | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Arabic Transcript (STT)"), | |
| gr.Textbox(label="English Translation (TTT)"), | |
| gr.Audio(label="Synthesized English Speech (TTS)", type="filepath") | |
| ], | |
| title="Arabic to English Speech Translation (ZeroGPU)", | |
| description="Upload an Arabic audio file. Transcribed to Arabic (Whisper), translated to English (MarianMT), synthesized to English speech (Custom TransformerTTS).", | |
| allow_flagging="never", | |
| # examples=[["sample.wav"]] # If you add a sample.wav to your repo | |
| ) | |
| # --- Part 5: Launch for Spaces (and local testing) --- | |
| if __name__ == '__main__': | |
| # Clean up temp audio files from previous local runs | |
| for f_name in os.listdir("."): | |
| if f_name.startswith("output_audio_") and f_name.endswith(".wav"): | |
| try: | |
| os.remove(f_name) | |
| except OSError: | |
| pass # Ignore if file is already gone or locked | |
| print("Starting Gradio interface locally with debug mode...") | |
| iface.launch(debug=True, share=False) # share=False for local, Spaces handles public URL |