Spaces:
Sleeping
Sleeping
| """ | |
| Professional Voice Agent - GPU Optimized | |
| High-quality voice assistant with speech recognition and synthesis | |
| Designed for best user experience on GPU hardware | |
| """ | |
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| from transformers import ( | |
| pipeline, | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| WhisperProcessor, | |
| WhisperForConditionalGeneration, | |
| SpeechT5Processor, | |
| SpeechT5ForTextToSpeech, | |
| SpeechT5HifiGan | |
| ) | |
| from datasets import load_dataset | |
| import soundfile as sf | |
| import io | |
| import time | |
| import logging | |
| from typing import Tuple, Optional | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class ProfessionalVoiceAgent: | |
| """High-quality voice agent optimized for GPU""" | |
| def __init__(self, use_large_models=True): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.use_large_models = use_large_models and torch.cuda.is_available() | |
| logger.info(f"Initializing on {self.device}") | |
| logger.info(f"GPU Available: {torch.cuda.is_available()}") | |
| if torch.cuda.is_available(): | |
| logger.info(f"GPU Name: {torch.cuda.get_device_name(0)}") | |
| logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") | |
| # Model components | |
| self.whisper_model = None | |
| self.whisper_processor = None | |
| self.chat_model = None | |
| self.chat_tokenizer = None | |
| self.tts_model = None | |
| self.tts_processor = None | |
| self.vocoder = None | |
| self.speaker_embeddings = None | |
| # Load models | |
| self.load_all_models() | |
| def load_all_models(self): | |
| """Load all models with GPU optimization""" | |
| logger.info("Loading models... This will take a moment for best quality.") | |
| # Load Whisper for speech recognition | |
| self.load_whisper() | |
| # Load chat model | |
| self.load_chat_model() | |
| # Load TTS | |
| self.load_tts() | |
| logger.info("All models loaded successfully!") | |
| def load_whisper(self): | |
| """Load Whisper model for speech recognition""" | |
| try: | |
| # Use tiny model for speed - small is too slow | |
| model_name = "openai/whisper-tiny" | |
| logger.info(f"Loading Whisper Tiny for fast processing...") | |
| self.whisper_processor = WhisperProcessor.from_pretrained(model_name) | |
| self.whisper_model = WhisperForConditionalGeneration.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, | |
| low_cpu_mem_usage=True | |
| ).to(self.device) | |
| # Set to eval mode for inference | |
| self.whisper_model.eval() | |
| logger.info(f"β Whisper loaded on {self.device}") | |
| except Exception as e: | |
| logger.error(f"Failed to load Whisper: {e}") | |
| # Fallback to pipeline | |
| self.whisper_model = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-tiny", | |
| device=0 if self.device.type == "cuda" else -1 | |
| ) | |
| def load_chat_model(self): | |
| """Load conversational AI model""" | |
| try: | |
| if self.use_large_models: | |
| # Use larger model for better conversations | |
| model_name = "microsoft/DialoGPT-medium" | |
| logger.info("Loading DialoGPT-medium for better conversations...") | |
| else: | |
| model_name = "microsoft/DialoGPT-small" | |
| logger.info("Loading DialoGPT-small...") | |
| self.chat_tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.chat_model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, | |
| low_cpu_mem_usage=True | |
| ).to(self.device) | |
| # Add padding token | |
| self.chat_tokenizer.pad_token = self.chat_tokenizer.eos_token | |
| # Set to eval mode | |
| self.chat_model.eval() | |
| logger.info(f"β Chat model loaded on {self.device}") | |
| except Exception as e: | |
| logger.error(f"Failed to load chat model: {e}") | |
| # Fallback | |
| self.chat_model = pipeline( | |
| "text-generation", | |
| model="microsoft/DialoGPT-small", | |
| device=0 if self.device.type == "cuda" else -1 | |
| ) | |
| def load_tts(self): | |
| """Load Text-to-Speech model""" | |
| try: | |
| logger.info("Loading SpeechT5 TTS model...") | |
| self.tts_processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") | |
| self.tts_model = SpeechT5ForTextToSpeech.from_pretrained( | |
| "microsoft/speecht5_tts", | |
| torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32 | |
| ).to(self.device) | |
| self.vocoder = SpeechT5HifiGan.from_pretrained( | |
| "microsoft/speecht5_hifigan", | |
| torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32 | |
| ).to(self.device) | |
| # Set to eval mode | |
| self.tts_model.eval() | |
| self.vocoder.eval() | |
| # Load speaker embeddings for voice | |
| try: | |
| logger.info("Loading speaker embeddings dataset...") | |
| embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation") | |
| # Use a pleasant voice (you can experiment with different indices) | |
| self.speaker_embeddings = torch.tensor( | |
| embeddings_dataset[7306]["xvector"] | |
| ).unsqueeze(0).to(self.device) | |
| logger.info("β Speaker embeddings loaded from dataset") | |
| except Exception as e: | |
| logger.warning(f"Failed to load speaker embeddings from dataset: {e}") | |
| logger.info("Creating default speaker embeddings...") | |
| # Fallback: Create default speaker embeddings | |
| # SpeechT5 expects 512-dimensional speaker embeddings | |
| self.speaker_embeddings = torch.randn(1, 512).to(self.device) | |
| if self.device.type == "cuda": | |
| self.speaker_embeddings = self.speaker_embeddings.half() | |
| logger.info("β Using default speaker embeddings") | |
| logger.info("β TTS models loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to load TTS: {e}") | |
| self.tts_model = None | |
| def transcribe_audio(self, audio) -> str: | |
| """Convert speech to text using Whisper""" | |
| if audio is None: | |
| logger.warning("No audio input received") | |
| return "" | |
| try: | |
| # Handle Gradio 4.x audio format (dict with 'array' and 'sample_rate') | |
| if isinstance(audio, dict): | |
| sample_rate = audio.get("sample_rate", 16000) | |
| audio_data = audio.get("array", audio.get("data", None)) | |
| logger.info(f"Audio format: dict, sample_rate={sample_rate}, data shape={audio_data.shape if audio_data is not None else 'None'}") | |
| if audio_data is None: | |
| logger.error("Audio dict missing 'array' or 'data' key") | |
| return "Could not process audio format." | |
| elif isinstance(audio, tuple): | |
| sample_rate, audio_data = audio | |
| logger.info(f"Audio format: tuple, sample_rate={sample_rate}, data shape={audio_data.shape}") | |
| else: | |
| audio_data = audio | |
| sample_rate = 16000 | |
| logger.info(f"Audio format: raw array, shape={audio_data.shape}") | |
| # Ensure we have audio data | |
| if audio_data is None or len(audio_data) == 0: | |
| logger.warning("Empty audio data") | |
| return "No audio data received." | |
| # Log audio stats | |
| duration_seconds = len(audio_data) / sample_rate | |
| logger.info(f"Audio duration: {duration_seconds:.2f}s, sample_rate: {sample_rate}Hz") | |
| # Convert to float32 if needed | |
| logger.info(f"Audio dtype before conversion: {audio_data.dtype}") | |
| if audio_data.dtype == np.int16: | |
| logger.info("Converting from int16 to float32") | |
| audio_data = audio_data.astype(np.float32) / 32768.0 | |
| elif audio_data.dtype == np.int32: | |
| logger.info("Converting from int32 to float32") | |
| audio_data = audio_data.astype(np.float32) / 2147483648.0 | |
| elif audio_data.dtype == np.float64: | |
| logger.info("Converting from float64 to float32") | |
| audio_data = audio_data.astype(np.float32) | |
| logger.info(f"Audio dtype after conversion: {audio_data.dtype}") | |
| # Handle stereo to mono conversion | |
| if len(audio_data.shape) > 1 and audio_data.shape[1] > 1: | |
| audio_data = np.mean(audio_data, axis=1) | |
| logger.info(f"Converted stereo to mono, new shape: {audio_data.shape}") | |
| # Check audio statistics before resampling | |
| logger.info(f"Audio stats - min: {audio_data.min():.4f}, max: {audio_data.max():.4f}, mean: {audio_data.mean():.4f}") | |
| # Resample to 16kHz if needed (Whisper requirement) | |
| if sample_rate != 16000: | |
| import librosa | |
| audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) | |
| logger.info(f"Resampled to 16kHz, new length: {len(audio_data)} samples ({len(audio_data)/16000:.2f}s)") | |
| # Check if audio is too quiet or silent | |
| audio_abs_mean = np.abs(audio_data).mean() | |
| if audio_abs_mean < 0.001: | |
| logger.warning(f"Audio might be too quiet! Abs mean: {audio_abs_mean}") | |
| # Trim silence and limit audio length for speed (max 30 seconds) | |
| max_samples = 16000 * 30 # 30 seconds at 16kHz | |
| if len(audio_data) > max_samples: | |
| logger.warning(f"Audio trimmed from {len(audio_data)/16000:.1f}s to 30s") | |
| audio_data = audio_data[:max_samples] | |
| if self.whisper_processor and hasattr(self.whisper_model, 'generate'): | |
| # Use loaded model | |
| input_features = self.whisper_processor( | |
| audio_data, | |
| sampling_rate=16000, | |
| return_tensors="pt" | |
| ).input_features.to(self.device) | |
| logger.info(f"Whisper input_features shape: {input_features.shape}, device: {input_features.device}") | |
| # Generate token ids - optimized for speed | |
| with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"): | |
| with torch.no_grad(): | |
| # Force English language to avoid language detection overhead | |
| forced_decoder_ids = self.whisper_processor.get_decoder_prompt_ids( | |
| language="en", | |
| task="transcribe" | |
| ) | |
| logger.info(f"Forced decoder IDs: {forced_decoder_ids}") | |
| predicted_ids = self.whisper_model.generate( | |
| input_features, | |
| forced_decoder_ids=forced_decoder_ids, | |
| max_new_tokens=64, # Reduced for faster processing | |
| num_beams=1, # Greedy decoding for speed | |
| do_sample=False # Deterministic | |
| ) | |
| logger.info(f"Predicted token IDs shape: {predicted_ids.shape}, first 10 IDs: {predicted_ids[0][:10].tolist()}") | |
| # Decode token ids to text | |
| transcription = self.whisper_processor.batch_decode( | |
| predicted_ids, | |
| skip_special_tokens=True | |
| )[0] | |
| else: | |
| # Use pipeline | |
| transcription = self.whisper_model(audio_data)["text"] | |
| # Clear CUDA cache to prevent memory buildup | |
| if self.device.type == "cuda": | |
| torch.cuda.empty_cache() | |
| logger.info(f"Transcribed: {transcription}") | |
| return transcription.strip() | |
| except Exception as e: | |
| logger.error(f"Transcription error: {e}") | |
| return "Could not transcribe audio. Please try again." | |
| def generate_response(self, text: str, conversation_history: list = None, temperature: float = 0.8) -> str: | |
| """Generate AI response with conversation context""" | |
| if not text: | |
| return "I didn't catch that. Could you please repeat?" | |
| try: | |
| # Build conversation context | |
| if conversation_history: | |
| context = "" | |
| for user_msg, bot_msg in conversation_history[-3:]: # Last 3 exchanges | |
| context += f"User: {user_msg}\nAssistant: {bot_msg}\n" | |
| context += f"User: {text}\nAssistant:" | |
| logger.info(f"Input text: '{text}' | History entries: {len(conversation_history)}") | |
| else: | |
| context = f"User: {text}\nAssistant:" | |
| logger.info(f"Input text: '{text}' | No history") | |
| logger.debug(f"Full context sent to model:\n{context}") | |
| if self.chat_tokenizer and hasattr(self.chat_model, 'generate'): | |
| # Tokenize input | |
| inputs = self.chat_tokenizer.encode( | |
| context, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512 | |
| ).to(self.device) | |
| # Generate response - optimized for speed | |
| with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"): | |
| with torch.no_grad(): | |
| outputs = self.chat_model.generate( | |
| inputs, | |
| max_new_tokens=50, # Shorter for faster response | |
| temperature=temperature, | |
| top_p=0.9, | |
| do_sample=True if temperature > 0 else False, | |
| pad_token_id=self.chat_tokenizer.eos_token_id, | |
| eos_token_id=self.chat_tokenizer.eos_token_id, | |
| num_beams=1 # Greedy for speed | |
| ) | |
| # Decode response | |
| full_response = self.chat_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| logger.debug(f"Raw model output: '{full_response}'") | |
| # Clean response | |
| response = full_response.replace(context, "").strip() | |
| logger.info(f"Generated response: '{response}'") | |
| else: | |
| # Use pipeline | |
| result = self.chat_model( | |
| text, | |
| max_new_tokens=100, | |
| temperature=temperature, | |
| do_sample=True | |
| ) | |
| response = result[0]['generated_text'].replace(text, "").strip() | |
| # Clear CUDA cache | |
| if self.device.type == "cuda": | |
| torch.cuda.empty_cache() | |
| return response if response else "I understand. Tell me more!" | |
| except Exception as e: | |
| logger.error(f"Generation error: {e}") | |
| return "I had a moment of confusion. Could you rephrase that?" | |
| def synthesize_speech(self, text: str, speed: float = 1.0) -> Optional[Tuple[int, np.ndarray]]: | |
| """Convert text to speech""" | |
| if not text or not self.tts_model or self.speaker_embeddings is None: | |
| if not self.tts_model: | |
| logger.warning("TTS model not loaded") | |
| if self.speaker_embeddings is None: | |
| logger.warning("Speaker embeddings not available") | |
| return None | |
| try: | |
| logger.info(f"Synthesizing speech for text: '{text}'") | |
| # Truncate if too long and warn | |
| max_chars = 600 | |
| if len(text) > max_chars: | |
| logger.warning(f"Text truncated from {len(text)} to {max_chars} characters for TTS") | |
| text = text[:max_chars] + "..." | |
| # Prepare text input | |
| inputs = self.tts_processor( | |
| text=text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=600 # SpeechT5 limit | |
| ) | |
| input_ids = inputs["input_ids"].to(self.device) | |
| # Generate speech | |
| with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"): | |
| with torch.no_grad(): | |
| speech = self.tts_model.generate_speech( | |
| input_ids, | |
| self.speaker_embeddings, | |
| vocoder=self.vocoder | |
| ) | |
| # Convert to numpy | |
| speech_np = speech.cpu().numpy() | |
| # Apply speed adjustment if needed | |
| if speed != 1.0: | |
| import librosa | |
| speech_np = librosa.effects.time_stretch(speech_np, rate=speed) | |
| # Clear CUDA cache | |
| if self.device.type == "cuda": | |
| torch.cuda.empty_cache() | |
| # Return with sample rate | |
| return (16000, speech_np) | |
| except Exception as e: | |
| logger.error(f"TTS error: {e}") | |
| return None | |
| def process_voice_to_voice(self, audio, conversation_history=None, temperature=0.8, speed=1.0) -> Tuple[str, str, Optional[Tuple[int, np.ndarray]]]: | |
| """Complete voice-to-voice pipeline""" | |
| start_time = time.time() | |
| # Step 1: Transcribe | |
| logger.info("Processing voice input...") | |
| user_text = self.transcribe_audio(audio) | |
| if "Could not transcribe" in user_text or "No audio data" in user_text: | |
| return user_text, "Please try speaking again.", None | |
| # Step 2: Generate response | |
| logger.info("Generating response...") | |
| response_text = self.generate_response(user_text, conversation_history, temperature) | |
| # Step 3: Synthesize speech | |
| logger.info("Generating voice output...") | |
| response_audio = self.synthesize_speech(response_text, speed) | |
| total_time = time.time() - start_time | |
| logger.info(f"Total processing time: {total_time:.2f}s") | |
| return user_text, response_text, response_audio | |
| # Global instance | |
| agent = ProfessionalVoiceAgent(use_large_models=True) | |
| def create_professional_interface(): | |
| """Create professional voice interface""" | |
| custom_css = """ | |
| .container {max-width: 900px; margin: auto; padding: 20px;} | |
| .main-button { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border: none; | |
| padding: 20px 40px; | |
| border-radius: 50px; | |
| font-size: 18px; | |
| font-weight: bold; | |
| cursor: pointer; | |
| color: white; | |
| transition: all 0.3s; | |
| } | |
| .main-button:hover {transform: scale(1.05);} | |
| .status-box { | |
| padding: 10px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| text-align: center; | |
| } | |
| """ | |
| with gr.Blocks(title="Professional Voice Agent", css=custom_css) as interface: | |
| # Store conversation history | |
| conversation_history = gr.State([]) | |
| gr.HTML(""" | |
| <div class="container"> | |
| <h1 style="text-align: center;">ποΈ Professional Voice Assistant</h1> | |
| <p style="text-align: center;">GPU-powered voice agent with high-quality speech recognition and synthesis</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Voice Input") | |
| audio_input = gr.Audio( | |
| sources=["microphone", "upload"], | |
| type="numpy", | |
| label="Click microphone to record", | |
| elem_classes=["audio-input"] | |
| ) | |
| with gr.Row(): | |
| clear_audio = gr.Button("ποΈ Clear", size="sm") | |
| process_btn = gr.Button("π Process Voice", variant="primary", size="lg", elem_classes=["main-button"]) | |
| gr.Markdown(""" | |
| **Tips for best results:** | |
| - Speak clearly and naturally | |
| - Avoid background noise | |
| - Keep messages concise | |
| - Wait for complete processing | |
| """) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π¬ Conversation") | |
| user_text = gr.Textbox( | |
| label="You said:", | |
| lines=2, | |
| interactive=False | |
| ) | |
| response_text = gr.Textbox( | |
| label="Assistant response:", | |
| lines=3, | |
| interactive=False | |
| ) | |
| response_audio = gr.Audio( | |
| label="π Voice Response", | |
| type="numpy", | |
| autoplay=True, | |
| elem_classes=["audio-output"] | |
| ) | |
| status = gr.Textbox( | |
| label="Status", | |
| value="Ready", | |
| interactive=False, | |
| elem_classes=["status-box"] | |
| ) | |
| # Conversation history display | |
| with gr.Row(): | |
| gr.Markdown("### π Conversation History") | |
| chat_history = gr.Chatbot( | |
| height=300, | |
| bubble_full_width=False, | |
| avatar_images=["π§", "π€"] | |
| ) | |
| # Advanced settings | |
| with gr.Accordion("βοΈ Advanced Settings", open=False): | |
| with gr.Row(): | |
| temperature = gr.Slider(0.1, 1.0, 0.8, label="Response Creativity (Temperature)") | |
| voice_speed = gr.Slider(0.5, 2.0, 1.0, label="Voice Speed") | |
| clear_history = gr.Button("Clear History") | |
| # Processing pipeline | |
| def process_audio_pipeline(audio, history, temp, speed): | |
| if audio is None: | |
| return ( | |
| "", | |
| "Please record or upload audio first.", | |
| None, | |
| "No audio detected", | |
| history if history else [], | |
| history if history else [] | |
| ) | |
| # Initialize history if None | |
| if history is None: | |
| history = [] | |
| # Update status | |
| status_msg = "Processing... π" | |
| # Process voice-to-voice | |
| user_text_result, bot_response, audio_response = agent.process_voice_to_voice( | |
| audio, | |
| history, | |
| temperature=temp, | |
| speed=speed | |
| ) | |
| # Update history | |
| history.append((user_text_result, bot_response)) | |
| # Format for chatbot display | |
| chat_display = [(u, b) for u, b in history] | |
| return ( | |
| user_text_result, | |
| bot_response, | |
| audio_response, | |
| "β Complete", | |
| history, | |
| chat_display | |
| ) | |
| process_btn.click( | |
| fn=process_audio_pipeline, | |
| inputs=[audio_input, conversation_history, temperature, voice_speed], | |
| outputs=[ | |
| user_text, | |
| response_text, | |
| response_audio, | |
| status, | |
| conversation_history, | |
| chat_history | |
| ] | |
| ) | |
| clear_audio.click( | |
| lambda: None, | |
| outputs=[audio_input] | |
| ) | |
| clear_history.click( | |
| lambda: ([], []), | |
| outputs=[conversation_history, chat_history] | |
| ) | |
| # Examples | |
| gr.Markdown("### π‘ Example Phrases") | |
| gr.Examples( | |
| examples=[ | |
| ["Hello, introduce yourself"], | |
| ["What's the weather like today?"], | |
| ["Tell me an interesting fact"], | |
| ["How can you help me?"], | |
| ["What are your capabilities?"] | |
| ], | |
| inputs=[user_text], | |
| examples_per_page=5 | |
| ) | |
| # System info | |
| with gr.Accordion("π System Information", open=False): | |
| system_info = f""" | |
| - **Device**: {agent.device} | |
| - **GPU Available**: {torch.cuda.is_available()} | |
| """ | |
| if torch.cuda.is_available(): | |
| system_info += f""" | |
| - **GPU Model**: {torch.cuda.get_device_name(0)} | |
| - **GPU Memory**: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB | |
| - **Models**: Large variants loaded for best quality | |
| """ | |
| else: | |
| system_info += "\n- **Note**: Running on CPU (slower performance)" | |
| gr.Markdown(system_info) | |
| return interface | |
| # Create the interface | |
| demo = create_professional_interface() | |
| if __name__ == "__main__": | |
| print("="*50) | |
| print("Professional Voice Agent - GPU Optimized") | |
| print("="*50) | |
| print(f"Device: {agent.device}") | |
| if torch.cuda.is_available(): | |
| print(f"GPU: {torch.cuda.get_device_name(0)}") | |
| print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") | |
| print("="*50) | |
| print("Starting server...") | |
| demo.queue(max_size=5, default_concurrency_limit=1) # Manage GPU memory | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| max_threads=2 # Limit for GPU memory | |
| ) | |