""" Professional Voice Agent - GPU Optimized High-quality voice assistant with speech recognition and synthesis Designed for best user experience on GPU hardware """ import gradio as gr import torch import numpy as np from transformers import ( pipeline, AutoModelForCausalLM, AutoTokenizer, WhisperProcessor, WhisperForConditionalGeneration, SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan ) from datasets import load_dataset import soundfile as sf import io import time import logging from typing import Tuple, Optional import warnings warnings.filterwarnings("ignore") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ProfessionalVoiceAgent: """High-quality voice agent optimized for GPU""" def __init__(self, use_large_models=True): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.use_large_models = use_large_models and torch.cuda.is_available() logger.info(f"Initializing on {self.device}") logger.info(f"GPU Available: {torch.cuda.is_available()}") if torch.cuda.is_available(): logger.info(f"GPU Name: {torch.cuda.get_device_name(0)}") logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") # Model components self.whisper_model = None self.whisper_processor = None self.chat_model = None self.chat_tokenizer = None self.tts_model = None self.tts_processor = None self.vocoder = None self.speaker_embeddings = None # Load models self.load_all_models() def load_all_models(self): """Load all models with GPU optimization""" logger.info("Loading models... This will take a moment for best quality.") # Load Whisper for speech recognition self.load_whisper() # Load chat model self.load_chat_model() # Load TTS self.load_tts() logger.info("All models loaded successfully!") def load_whisper(self): """Load Whisper model for speech recognition""" try: # Use tiny model for speed - small is too slow model_name = "openai/whisper-tiny" logger.info(f"Loading Whisper Tiny for fast processing...") self.whisper_processor = WhisperProcessor.from_pretrained(model_name) self.whisper_model = WhisperForConditionalGeneration.from_pretrained( model_name, torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, low_cpu_mem_usage=True ).to(self.device) # Set to eval mode for inference self.whisper_model.eval() logger.info(f"✓ Whisper loaded on {self.device}") except Exception as e: logger.error(f"Failed to load Whisper: {e}") # Fallback to pipeline self.whisper_model = pipeline( "automatic-speech-recognition", model="openai/whisper-tiny", device=0 if self.device.type == "cuda" else -1 ) def load_chat_model(self): """Load conversational AI model""" try: if self.use_large_models: # Use larger model for better conversations model_name = "microsoft/DialoGPT-medium" logger.info("Loading DialoGPT-medium for better conversations...") else: model_name = "microsoft/DialoGPT-small" logger.info("Loading DialoGPT-small...") self.chat_tokenizer = AutoTokenizer.from_pretrained(model_name) self.chat_model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, low_cpu_mem_usage=True ).to(self.device) # Add padding token self.chat_tokenizer.pad_token = self.chat_tokenizer.eos_token # Set to eval mode self.chat_model.eval() logger.info(f"✓ Chat model loaded on {self.device}") except Exception as e: logger.error(f"Failed to load chat model: {e}") # Fallback self.chat_model = pipeline( "text-generation", model="microsoft/DialoGPT-small", device=0 if self.device.type == "cuda" else -1 ) def load_tts(self): """Load Text-to-Speech model""" try: logger.info("Loading SpeechT5 TTS model...") self.tts_processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") self.tts_model = SpeechT5ForTextToSpeech.from_pretrained( "microsoft/speecht5_tts", torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32 ).to(self.device) self.vocoder = SpeechT5HifiGan.from_pretrained( "microsoft/speecht5_hifigan", torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32 ).to(self.device) # Set to eval mode self.tts_model.eval() self.vocoder.eval() # Load speaker embeddings for voice try: logger.info("Loading speaker embeddings dataset...") embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation") # Use a pleasant voice (you can experiment with different indices) self.speaker_embeddings = torch.tensor( embeddings_dataset[7306]["xvector"] ).unsqueeze(0).to(self.device) logger.info("✓ Speaker embeddings loaded from dataset") except Exception as e: logger.warning(f"Failed to load speaker embeddings from dataset: {e}") logger.info("Creating default speaker embeddings...") # Fallback: Create default speaker embeddings # SpeechT5 expects 512-dimensional speaker embeddings self.speaker_embeddings = torch.randn(1, 512).to(self.device) if self.device.type == "cuda": self.speaker_embeddings = self.speaker_embeddings.half() logger.info("✓ Using default speaker embeddings") logger.info("✓ TTS models loaded successfully") except Exception as e: logger.error(f"Failed to load TTS: {e}") self.tts_model = None def transcribe_audio(self, audio) -> str: """Convert speech to text using Whisper""" if audio is None: logger.warning("No audio input received") return "" try: # Handle Gradio 4.x audio format (dict with 'array' and 'sample_rate') if isinstance(audio, dict): sample_rate = audio.get("sample_rate", 16000) audio_data = audio.get("array", audio.get("data", None)) logger.info(f"Audio format: dict, sample_rate={sample_rate}, data shape={audio_data.shape if audio_data is not None else 'None'}") if audio_data is None: logger.error("Audio dict missing 'array' or 'data' key") return "Could not process audio format." elif isinstance(audio, tuple): sample_rate, audio_data = audio logger.info(f"Audio format: tuple, sample_rate={sample_rate}, data shape={audio_data.shape}") else: audio_data = audio sample_rate = 16000 logger.info(f"Audio format: raw array, shape={audio_data.shape}") # Ensure we have audio data if audio_data is None or len(audio_data) == 0: logger.warning("Empty audio data") return "No audio data received." # Log audio stats duration_seconds = len(audio_data) / sample_rate logger.info(f"Audio duration: {duration_seconds:.2f}s, sample_rate: {sample_rate}Hz") # Convert to float32 if needed logger.info(f"Audio dtype before conversion: {audio_data.dtype}") if audio_data.dtype == np.int16: logger.info("Converting from int16 to float32") audio_data = audio_data.astype(np.float32) / 32768.0 elif audio_data.dtype == np.int32: logger.info("Converting from int32 to float32") audio_data = audio_data.astype(np.float32) / 2147483648.0 elif audio_data.dtype == np.float64: logger.info("Converting from float64 to float32") audio_data = audio_data.astype(np.float32) logger.info(f"Audio dtype after conversion: {audio_data.dtype}") # Handle stereo to mono conversion if len(audio_data.shape) > 1 and audio_data.shape[1] > 1: audio_data = np.mean(audio_data, axis=1) logger.info(f"Converted stereo to mono, new shape: {audio_data.shape}") # Check audio statistics before resampling logger.info(f"Audio stats - min: {audio_data.min():.4f}, max: {audio_data.max():.4f}, mean: {audio_data.mean():.4f}") # Resample to 16kHz if needed (Whisper requirement) if sample_rate != 16000: import librosa audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) logger.info(f"Resampled to 16kHz, new length: {len(audio_data)} samples ({len(audio_data)/16000:.2f}s)") # Check if audio is too quiet or silent audio_abs_mean = np.abs(audio_data).mean() if audio_abs_mean < 0.001: logger.warning(f"Audio might be too quiet! Abs mean: {audio_abs_mean}") # Trim silence and limit audio length for speed (max 30 seconds) max_samples = 16000 * 30 # 30 seconds at 16kHz if len(audio_data) > max_samples: logger.warning(f"Audio trimmed from {len(audio_data)/16000:.1f}s to 30s") audio_data = audio_data[:max_samples] if self.whisper_processor and hasattr(self.whisper_model, 'generate'): # Use loaded model input_features = self.whisper_processor( audio_data, sampling_rate=16000, return_tensors="pt" ).input_features.to(self.device) logger.info(f"Whisper input_features shape: {input_features.shape}, device: {input_features.device}") # Generate token ids - optimized for speed with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"): with torch.no_grad(): # Force English language to avoid language detection overhead forced_decoder_ids = self.whisper_processor.get_decoder_prompt_ids( language="en", task="transcribe" ) logger.info(f"Forced decoder IDs: {forced_decoder_ids}") predicted_ids = self.whisper_model.generate( input_features, forced_decoder_ids=forced_decoder_ids, max_new_tokens=64, # Reduced for faster processing num_beams=1, # Greedy decoding for speed do_sample=False # Deterministic ) logger.info(f"Predicted token IDs shape: {predicted_ids.shape}, first 10 IDs: {predicted_ids[0][:10].tolist()}") # Decode token ids to text transcription = self.whisper_processor.batch_decode( predicted_ids, skip_special_tokens=True )[0] else: # Use pipeline transcription = self.whisper_model(audio_data)["text"] # Clear CUDA cache to prevent memory buildup if self.device.type == "cuda": torch.cuda.empty_cache() logger.info(f"Transcribed: {transcription}") return transcription.strip() except Exception as e: logger.error(f"Transcription error: {e}") return "Could not transcribe audio. Please try again." def generate_response(self, text: str, conversation_history: list = None, temperature: float = 0.8) -> str: """Generate AI response with conversation context""" if not text: return "I didn't catch that. Could you please repeat?" try: # Build conversation context if conversation_history: context = "" for user_msg, bot_msg in conversation_history[-3:]: # Last 3 exchanges context += f"User: {user_msg}\nAssistant: {bot_msg}\n" context += f"User: {text}\nAssistant:" logger.info(f"Input text: '{text}' | History entries: {len(conversation_history)}") else: context = f"User: {text}\nAssistant:" logger.info(f"Input text: '{text}' | No history") logger.debug(f"Full context sent to model:\n{context}") if self.chat_tokenizer and hasattr(self.chat_model, 'generate'): # Tokenize input inputs = self.chat_tokenizer.encode( context, return_tensors="pt", truncation=True, max_length=512 ).to(self.device) # Generate response - optimized for speed with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"): with torch.no_grad(): outputs = self.chat_model.generate( inputs, max_new_tokens=50, # Shorter for faster response temperature=temperature, top_p=0.9, do_sample=True if temperature > 0 else False, pad_token_id=self.chat_tokenizer.eos_token_id, eos_token_id=self.chat_tokenizer.eos_token_id, num_beams=1 # Greedy for speed ) # Decode response full_response = self.chat_tokenizer.decode(outputs[0], skip_special_tokens=True) logger.debug(f"Raw model output: '{full_response}'") # Clean response response = full_response.replace(context, "").strip() logger.info(f"Generated response: '{response}'") else: # Use pipeline result = self.chat_model( text, max_new_tokens=100, temperature=temperature, do_sample=True ) response = result[0]['generated_text'].replace(text, "").strip() # Clear CUDA cache if self.device.type == "cuda": torch.cuda.empty_cache() return response if response else "I understand. Tell me more!" except Exception as e: logger.error(f"Generation error: {e}") return "I had a moment of confusion. Could you rephrase that?" def synthesize_speech(self, text: str, speed: float = 1.0) -> Optional[Tuple[int, np.ndarray]]: """Convert text to speech""" if not text or not self.tts_model or self.speaker_embeddings is None: if not self.tts_model: logger.warning("TTS model not loaded") if self.speaker_embeddings is None: logger.warning("Speaker embeddings not available") return None try: logger.info(f"Synthesizing speech for text: '{text}'") # Truncate if too long and warn max_chars = 600 if len(text) > max_chars: logger.warning(f"Text truncated from {len(text)} to {max_chars} characters for TTS") text = text[:max_chars] + "..." # Prepare text input inputs = self.tts_processor( text=text, return_tensors="pt", truncation=True, max_length=600 # SpeechT5 limit ) input_ids = inputs["input_ids"].to(self.device) # Generate speech with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"): with torch.no_grad(): speech = self.tts_model.generate_speech( input_ids, self.speaker_embeddings, vocoder=self.vocoder ) # Convert to numpy speech_np = speech.cpu().numpy() # Apply speed adjustment if needed if speed != 1.0: import librosa speech_np = librosa.effects.time_stretch(speech_np, rate=speed) # Clear CUDA cache if self.device.type == "cuda": torch.cuda.empty_cache() # Return with sample rate return (16000, speech_np) except Exception as e: logger.error(f"TTS error: {e}") return None def process_voice_to_voice(self, audio, conversation_history=None, temperature=0.8, speed=1.0) -> Tuple[str, str, Optional[Tuple[int, np.ndarray]]]: """Complete voice-to-voice pipeline""" start_time = time.time() # Step 1: Transcribe logger.info("Processing voice input...") user_text = self.transcribe_audio(audio) if "Could not transcribe" in user_text or "No audio data" in user_text: return user_text, "Please try speaking again.", None # Step 2: Generate response logger.info("Generating response...") response_text = self.generate_response(user_text, conversation_history, temperature) # Step 3: Synthesize speech logger.info("Generating voice output...") response_audio = self.synthesize_speech(response_text, speed) total_time = time.time() - start_time logger.info(f"Total processing time: {total_time:.2f}s") return user_text, response_text, response_audio # Global instance agent = ProfessionalVoiceAgent(use_large_models=True) def create_professional_interface(): """Create professional voice interface""" custom_css = """ .container {max-width: 900px; margin: auto; padding: 20px;} .main-button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border: none; padding: 20px 40px; border-radius: 50px; font-size: 18px; font-weight: bold; cursor: pointer; color: white; transition: all 0.3s; } .main-button:hover {transform: scale(1.05);} .status-box { padding: 10px; border-radius: 10px; margin: 10px 0; text-align: center; } """ with gr.Blocks(title="Professional Voice Agent", css=custom_css) as interface: # Store conversation history conversation_history = gr.State([]) gr.HTML("""

🎙️ Professional Voice Assistant

GPU-powered voice agent with high-quality speech recognition and synthesis

""") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 🎤 Voice Input") audio_input = gr.Audio( sources=["microphone", "upload"], type="numpy", label="Click microphone to record", elem_classes=["audio-input"] ) with gr.Row(): clear_audio = gr.Button("🗑️ Clear", size="sm") process_btn = gr.Button("🚀 Process Voice", variant="primary", size="lg", elem_classes=["main-button"]) gr.Markdown(""" **Tips for best results:** - Speak clearly and naturally - Avoid background noise - Keep messages concise - Wait for complete processing """) with gr.Column(scale=1): gr.Markdown("### 💬 Conversation") user_text = gr.Textbox( label="You said:", lines=2, interactive=False ) response_text = gr.Textbox( label="Assistant response:", lines=3, interactive=False ) response_audio = gr.Audio( label="🔊 Voice Response", type="numpy", autoplay=True, elem_classes=["audio-output"] ) status = gr.Textbox( label="Status", value="Ready", interactive=False, elem_classes=["status-box"] ) # Conversation history display with gr.Row(): gr.Markdown("### 📝 Conversation History") chat_history = gr.Chatbot( height=300, bubble_full_width=False, avatar_images=["🧑", "🤖"] ) # Advanced settings with gr.Accordion("⚙️ Advanced Settings", open=False): with gr.Row(): temperature = gr.Slider(0.1, 1.0, 0.8, label="Response Creativity (Temperature)") voice_speed = gr.Slider(0.5, 2.0, 1.0, label="Voice Speed") clear_history = gr.Button("Clear History") # Processing pipeline def process_audio_pipeline(audio, history, temp, speed): if audio is None: return ( "", "Please record or upload audio first.", None, "No audio detected", history if history else [], history if history else [] ) # Initialize history if None if history is None: history = [] # Update status status_msg = "Processing... 🔄" # Process voice-to-voice user_text_result, bot_response, audio_response = agent.process_voice_to_voice( audio, history, temperature=temp, speed=speed ) # Update history history.append((user_text_result, bot_response)) # Format for chatbot display chat_display = [(u, b) for u, b in history] return ( user_text_result, bot_response, audio_response, "✅ Complete", history, chat_display ) process_btn.click( fn=process_audio_pipeline, inputs=[audio_input, conversation_history, temperature, voice_speed], outputs=[ user_text, response_text, response_audio, status, conversation_history, chat_history ] ) clear_audio.click( lambda: None, outputs=[audio_input] ) clear_history.click( lambda: ([], []), outputs=[conversation_history, chat_history] ) # Examples gr.Markdown("### 💡 Example Phrases") gr.Examples( examples=[ ["Hello, introduce yourself"], ["What's the weather like today?"], ["Tell me an interesting fact"], ["How can you help me?"], ["What are your capabilities?"] ], inputs=[user_text], examples_per_page=5 ) # System info with gr.Accordion("📊 System Information", open=False): system_info = f""" - **Device**: {agent.device} - **GPU Available**: {torch.cuda.is_available()} """ if torch.cuda.is_available(): system_info += f""" - **GPU Model**: {torch.cuda.get_device_name(0)} - **GPU Memory**: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB - **Models**: Large variants loaded for best quality """ else: system_info += "\n- **Note**: Running on CPU (slower performance)" gr.Markdown(system_info) return interface # Create the interface demo = create_professional_interface() if __name__ == "__main__": print("="*50) print("Professional Voice Agent - GPU Optimized") print("="*50) print(f"Device: {agent.device}") if torch.cuda.is_available(): print(f"GPU: {torch.cuda.get_device_name(0)}") print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") print("="*50) print("Starting server...") demo.queue(max_size=5, default_concurrency_limit=1) # Manage GPU memory demo.launch( server_name="0.0.0.0", server_port=7860, share=False, max_threads=2 # Limit for GPU memory )