import gradio as gr import openai import os from pathlib import Path import tempfile import numpy as np from openai import OpenAI class RealtimeVoiceAgent: def __init__(self, api_key=None): """Initialize the voice agent with OpenAI""" self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OpenAI API key not found. Set OPENAI_API_KEY environment variable.") self.client = OpenAI(api_key=self.api_key) self.conversation_history = [] self.voice = "alloy" # Default voice self.continuous_mode = False # Continuous listening mode def transcribe_audio(self, audio_path): """Convert speech to text using OpenAI Whisper API""" try: # Debug: Check if file exists if not os.path.exists(audio_path): raise Exception(f"Audio file not found at path: {audio_path}") # Debug: Check file size file_size = os.path.getsize(audio_path) if file_size == 0: raise Exception("Audio file is empty (0 bytes)") print(f"[DEBUG] Transcribing audio: {audio_path} ({file_size} bytes)") with open(audio_path, "rb") as audio_file: transcript = self.client.audio.transcriptions.create( model="whisper-1", file=audio_file, language="en" ) print(f"[DEBUG] Transcription successful: {transcript.text[:50]}...") return transcript.text except FileNotFoundError as e: raise Exception(f"Audio file not found: {str(e)}") except Exception as e: raise Exception(f"Transcription failed: {type(e).__name__} - {str(e)}") def get_llm_response(self, user_message): """Get streaming response from OpenAI GPT""" try: # Add user message to history self.conversation_history.append({ "role": "user", "content": user_message }) # Get streaming response response = self.client.chat.completions.create( model="gpt-4o-mini", # Fast and cost-effective messages=[ {"role": "system", "content": "You are a helpful, friendly voice assistant. Keep responses concise and natural for voice conversation (2-3 sentences max)."}, *self.conversation_history ], max_tokens=150, temperature=0.7, stream=True ) # Collect full response full_response = "" for chunk in response: if chunk.choices[0].delta.content: full_response += chunk.choices[0].delta.content # Add assistant response to history self.conversation_history.append({ "role": "assistant", "content": full_response }) return full_response except Exception as e: raise Exception(f"LLM response failed: {str(e)}") def synthesize_speech(self, text): """Convert text to speech using OpenAI TTS""" try: response = self.client.audio.speech.create( model="tts-1", # Fast model (tts-1-hd for higher quality) voice=self.voice, # Options: alloy, echo, fable, onyx, nova, shimmer input=text, speed=1.0 ) # Save to temporary file with proper handling for Gradio temp_dir = tempfile.gettempdir() output_path = os.path.join(temp_dir, f"tts_output_{os.getpid()}_{hash(text) % 10000}.mp3") with open(output_path, "wb") as f: f.write(response.content) return output_path except Exception as e: raise Exception(f"Speech synthesis failed: {str(e)}") def process_voice_input(self, audio_input, progress=gr.Progress()): """Full pipeline: Voice β†’ Text β†’ LLM β†’ Voice""" if audio_input is None: ready_status = '
🎀 Ready - Click microphone to speak
' return None, "⚠️ No audio detected. Please record your voice.", None, self._format_history(), ready_status try: # Step 1: Speech to Text progress(0.2, desc="🎧 Transcribing your voice...") user_text = self.transcribe_audio(audio_input) if not user_text.strip(): ready_status = '
🎀 Ready - Click microphone to speak
' return None, "⚠️ Could not understand audio. Please speak clearly.", None, self._format_history(), ready_status # Step 2: Get LLM Response progress(0.5, desc="πŸ€” Thinking...") assistant_text = self.get_llm_response(user_text) # Step 3: Text to Speech progress(0.8, desc="πŸ”Š Generating voice response...") audio_output = self.synthesize_speech(assistant_text) # Format status status = f"**You:** {user_text}\n\n**Assistant:** {assistant_text}" # Format conversation history chat_history = self._format_history() progress(1.0, desc="βœ“ Done!") # Listening status based on continuous mode if self.continuous_mode: listening_status = '
πŸŽ™οΈ LISTENING - Speak now (continuous mode)
' else: listening_status = '
🎀 Ready - Click microphone to speak
' return audio_output, status, None, chat_history, listening_status except Exception as e: error_msg = f"❌ Error: {str(e)}\n\nPlease check your API key and try again." error_status = '
⚠️ Error occurred - Try again
' return None, error_msg, None, self._format_history(), error_status def _format_history(self): """Format conversation history for chatbot display""" formatted = [] for i in range(0, len(self.conversation_history), 2): if i + 1 < len(self.conversation_history): formatted.append(( self.conversation_history[i]["content"], self.conversation_history[i + 1]["content"] )) return formatted def clear_conversation(self): """Clear conversation history""" self.conversation_history = [] ready_status = '
🎀 Ready - Click microphone to speak
' return None, "Conversation cleared!", None, [], ready_status def change_voice(self, voice_name): """Change TTS voice""" self.voice = voice_name return f"βœ“ Voice changed to: **{voice_name}**" def toggle_continuous_mode(self, enabled): """Toggle continuous listening mode""" self.continuous_mode = enabled if enabled: return "πŸŽ™οΈ **Continuous Mode ON** - Microphone will auto-activate after each response" else: return "⏸️ **Continuous Mode OFF** - Manual recording required" # Initialize agent (will use environment variable) agent = None def initialize_agent(): """Initialize agent with API key check""" global agent api_key = os.getenv("OPENAI_API_KEY") if not api_key: return "❌ OpenAI API key not found!\n\nPlease set it in Hugging Face Space settings:\nSettings β†’ Repository secrets β†’ New secret\nName: OPENAI_API_KEY\nValue: your-api-key" try: agent = RealtimeVoiceAgent(api_key=api_key) return "βœ… Voice Agent initialized successfully!\n\n🎀 You can now start talking!" except Exception as e: return f"❌ Initialization failed: {str(e)}" def process_audio_wrapper(audio, progress=gr.Progress()): """Wrapper to check if agent is initialized""" if agent is None: error_status = '
⚠️ Not initialized - Click Initialize Agent
' return None, "⚠️ Please initialize the agent first!", None, [], error_status return agent.process_voice_input(audio, progress) def clear_wrapper(): """Wrapper for clear function""" if agent is None: error_status = '
⚠️ Not initialized - Click Initialize Agent
' return None, "⚠️ Please initialize the agent first!", None, [], error_status return agent.clear_conversation() def change_voice_wrapper(voice_name): """Wrapper for voice change function""" if agent is None: return "⚠️ Please initialize the agent first!" return agent.change_voice(voice_name) def toggle_continuous_wrapper(enabled): """Wrapper for continuous mode toggle""" if agent is None: return "⚠️ Please initialize the agent first!" return agent.toggle_continuous_mode(enabled) # Create Gradio Interface with gr.Blocks( title="πŸŽ™οΈ Real-Time Voice Agent", theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"), css=""" .main-header {text-align: center; padding: 30px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 20px;} .status-box {background: #f8f9fa; padding: 15px; border-radius: 8px; border-left: 4px solid #667eea;} .warning-box {background: #fff3cd; padding: 15px; border-radius: 8px; border-left: 4px solid #ffc107;} @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.7; } } """ ) as demo: gr.Markdown("""

πŸŽ™οΈ Real-Time Voice Agent

State-of-the-art voice conversation powered by OpenAI

Whisper + GPT-4o-mini + TTS

""") with gr.Row(): with gr.Column(scale=1): gr.Markdown(""" ### πŸš€ Quick Start 1. **Initialize** the agent below 2. **Click** the microphone 🎀 3. **Speak** your question 4. **Click stop** when done 5. **Listen** to the AI response **πŸ’‘ Pro Tip:** Enable Continuous Mode below for a more natural conversation flow! --- ### βš™οΈ Settings """) init_button = gr.Button( "πŸ€– Initialize Voice Agent", variant="primary", size="lg" ) init_status = gr.Markdown( '
⚠️ Click "Initialize Voice Agent" to start
' ) gr.Markdown("---") voice_selector = gr.Dropdown( choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], value="alloy", label="🎡 AI Voice Style", info="Select the voice for AI responses" ) voice_status = gr.Markdown("") gr.Markdown("---") continuous_toggle = gr.Checkbox( label="πŸ”„ Continuous Listening Mode", value=False, info="Auto-activate microphone after each response" ) continuous_status = gr.Markdown("") gr.Markdown(""" --- ### πŸ’‘ Tips - 🎯 Speak clearly and naturally - ⏱️ Keep messages under 20 seconds - πŸ”‡ Minimize background noise - 🌐 Use Chrome for best compatibility - πŸ”„ Enable Continuous Mode for hands-free conversation ### πŸ”„ Continuous Mode When enabled, the microphone automatically activates after each AI response - just speak and click stop! ### 🎀 Voice Styles - **Alloy**: Neutral, balanced - **Echo**: Male, clear - **Fable**: British, expressive - **Onyx**: Deep, authoritative - **Nova**: Female, friendly - **Shimmer**: Warm, engaging """) with gr.Column(scale=2): gr.Markdown("## 🎀 Voice Conversation") listening_indicator = gr.Markdown( '
🎀 Ready - Click microphone to speak
' ) audio_input = gr.Audio( sources=["microphone", "upload"], type="filepath", label="🎀 Click to Record Your Voice" ) process_status = gr.Markdown( '
**Status:** Ready to listen...
', elem_classes=["status-box"] ) audio_output = gr.Audio( label="πŸ”Š AI Voice Response", type="filepath", autoplay=True ) with gr.Row(): process_btn = gr.Button( "πŸ’¬ Process Voice", variant="secondary", size="lg", scale=3 ) clear_btn = gr.Button( "πŸ—‘οΈ Clear History", variant="stop", scale=1 ) gr.Markdown("---") gr.Markdown("## πŸ’­ Conversation History") conversation_display = gr.Chatbot( label="Your Conversation", height=400, bubble_full_width=False, avatar_images=(None, "πŸ€–") ) gr.Markdown(""" --- ### πŸ“Š Technical Stack - **Speech Recognition**: OpenAI Whisper (99%+ accuracy) - **Language Model**: GPT-4o-mini (fast, intelligent) - **Speech Synthesis**: OpenAI TTS (natural, expressive) - **Interface**: Gradio (real-time updates) ### πŸ” Privacy & Costs - Requires OpenAI API key (set in Space settings) - Approximate cost: $0.01-0.03 per conversation - Audio is processed through OpenAI's API - No data is stored permanently ### πŸ› Troubleshooting - **No audio?** Check browser microphone permissions - **API error?** Verify your OpenAI API key in Space settings - **Slow response?** Try shorter messages or upgrade to paid OpenAI plan ---
Built with ❀️ using OpenAI APIs | Whisper | TTS | GPT-4
""") # Event handlers init_button.click( fn=initialize_agent, outputs=[init_status] ) process_btn.click( fn=process_audio_wrapper, inputs=[audio_input], outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator] ) # Auto-process when recording stops audio_input.stop_recording( fn=process_audio_wrapper, inputs=[audio_input], outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator] ) clear_btn.click( fn=clear_wrapper, outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator] ) voice_selector.change( fn=change_voice_wrapper, inputs=[voice_selector], outputs=[voice_status] ) continuous_toggle.change( fn=toggle_continuous_wrapper, inputs=[continuous_toggle], outputs=[continuous_status] ) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", share=False, show_error=True )