Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import openai | |
| import os | |
| from pathlib import Path | |
| import tempfile | |
| import numpy as np | |
| from openai import OpenAI | |
| class RealtimeVoiceAgent: | |
| def __init__(self, api_key=None): | |
| """Initialize the voice agent with OpenAI""" | |
| self.api_key = api_key or os.getenv("OPENAI_API_KEY") | |
| if not self.api_key: | |
| raise ValueError("OpenAI API key not found. Set OPENAI_API_KEY environment variable.") | |
| self.client = OpenAI(api_key=self.api_key) | |
| self.conversation_history = [] | |
| self.voice = "alloy" # Default voice | |
| self.continuous_mode = False # Continuous listening mode | |
| def transcribe_audio(self, audio_path): | |
| """Convert speech to text using OpenAI Whisper API""" | |
| try: | |
| # Debug: Check if file exists | |
| if not os.path.exists(audio_path): | |
| raise Exception(f"Audio file not found at path: {audio_path}") | |
| # Debug: Check file size | |
| file_size = os.path.getsize(audio_path) | |
| if file_size == 0: | |
| raise Exception("Audio file is empty (0 bytes)") | |
| print(f"[DEBUG] Transcribing audio: {audio_path} ({file_size} bytes)") | |
| with open(audio_path, "rb") as audio_file: | |
| transcript = self.client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| language="en" | |
| ) | |
| print(f"[DEBUG] Transcription successful: {transcript.text[:50]}...") | |
| return transcript.text | |
| except FileNotFoundError as e: | |
| raise Exception(f"Audio file not found: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"Transcription failed: {type(e).__name__} - {str(e)}") | |
| def get_llm_response(self, user_message): | |
| """Get streaming response from OpenAI GPT""" | |
| try: | |
| # Add user message to history | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": user_message | |
| }) | |
| # Get streaming response | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", # Fast and cost-effective | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful, friendly voice assistant. Keep responses concise and natural for voice conversation (2-3 sentences max)."}, | |
| *self.conversation_history | |
| ], | |
| max_tokens=150, | |
| temperature=0.7, | |
| stream=True | |
| ) | |
| # Collect full response | |
| full_response = "" | |
| for chunk in response: | |
| if chunk.choices[0].delta.content: | |
| full_response += chunk.choices[0].delta.content | |
| # Add assistant response to history | |
| self.conversation_history.append({ | |
| "role": "assistant", | |
| "content": full_response | |
| }) | |
| return full_response | |
| except Exception as e: | |
| raise Exception(f"LLM response failed: {str(e)}") | |
| def synthesize_speech(self, text): | |
| """Convert text to speech using OpenAI TTS""" | |
| try: | |
| response = self.client.audio.speech.create( | |
| model="tts-1", # Fast model (tts-1-hd for higher quality) | |
| voice=self.voice, # Options: alloy, echo, fable, onyx, nova, shimmer | |
| input=text, | |
| speed=1.0 | |
| ) | |
| # Save to temporary file with proper handling for Gradio | |
| temp_dir = tempfile.gettempdir() | |
| output_path = os.path.join(temp_dir, f"tts_output_{os.getpid()}_{hash(text) % 10000}.mp3") | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| return output_path | |
| except Exception as e: | |
| raise Exception(f"Speech synthesis failed: {str(e)}") | |
| def process_voice_input(self, audio_input, progress=gr.Progress()): | |
| """Full pipeline: Voice β Text β LLM β Voice""" | |
| if audio_input is None: | |
| ready_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">π€ Ready - Click microphone to speak</div>' | |
| return None, "β οΈ No audio detected. Please record your voice.", None, self._format_history(), ready_status | |
| try: | |
| # Step 1: Speech to Text | |
| progress(0.2, desc="π§ Transcribing your voice...") | |
| user_text = self.transcribe_audio(audio_input) | |
| if not user_text.strip(): | |
| ready_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">π€ Ready - Click microphone to speak</div>' | |
| return None, "β οΈ Could not understand audio. Please speak clearly.", None, self._format_history(), ready_status | |
| # Step 2: Get LLM Response | |
| progress(0.5, desc="π€ Thinking...") | |
| assistant_text = self.get_llm_response(user_text) | |
| # Step 3: Text to Speech | |
| progress(0.8, desc="π Generating voice response...") | |
| audio_output = self.synthesize_speech(assistant_text) | |
| # Format status | |
| status = f"**You:** {user_text}\n\n**Assistant:** {assistant_text}" | |
| # Format conversation history | |
| chat_history = self._format_history() | |
| progress(1.0, desc="β Done!") | |
| # Listening status based on continuous mode | |
| if self.continuous_mode: | |
| listening_status = '<div style="background: #4caf50; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white; animation: pulse 1.5s infinite;">ποΈ LISTENING - Speak now (continuous mode)</div>' | |
| else: | |
| listening_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">π€ Ready - Click microphone to speak</div>' | |
| return audio_output, status, None, chat_history, listening_status | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}\n\nPlease check your API key and try again." | |
| error_status = '<div style="background: #f44336; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white;">β οΈ Error occurred - Try again</div>' | |
| return None, error_msg, None, self._format_history(), error_status | |
| def _format_history(self): | |
| """Format conversation history for chatbot display""" | |
| formatted = [] | |
| for i in range(0, len(self.conversation_history), 2): | |
| if i + 1 < len(self.conversation_history): | |
| formatted.append(( | |
| self.conversation_history[i]["content"], | |
| self.conversation_history[i + 1]["content"] | |
| )) | |
| return formatted | |
| def clear_conversation(self): | |
| """Clear conversation history""" | |
| self.conversation_history = [] | |
| ready_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">π€ Ready - Click microphone to speak</div>' | |
| return None, "Conversation cleared!", None, [], ready_status | |
| def change_voice(self, voice_name): | |
| """Change TTS voice""" | |
| self.voice = voice_name | |
| return f"β Voice changed to: **{voice_name}**" | |
| def toggle_continuous_mode(self, enabled): | |
| """Toggle continuous listening mode""" | |
| self.continuous_mode = enabled | |
| if enabled: | |
| return "ποΈ **Continuous Mode ON** - Microphone will auto-activate after each response" | |
| else: | |
| return "βΈοΈ **Continuous Mode OFF** - Manual recording required" | |
| # Initialize agent (will use environment variable) | |
| agent = None | |
| def initialize_agent(): | |
| """Initialize agent with API key check""" | |
| global agent | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| return "β OpenAI API key not found!\n\nPlease set it in Hugging Face Space settings:\nSettings β Repository secrets β New secret\nName: OPENAI_API_KEY\nValue: your-api-key" | |
| try: | |
| agent = RealtimeVoiceAgent(api_key=api_key) | |
| return "β Voice Agent initialized successfully!\n\nπ€ You can now start talking!" | |
| except Exception as e: | |
| return f"β Initialization failed: {str(e)}" | |
| def process_audio_wrapper(audio, progress=gr.Progress()): | |
| """Wrapper to check if agent is initialized""" | |
| if agent is None: | |
| error_status = '<div style="background: #f44336; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white;">β οΈ Not initialized - Click Initialize Agent</div>' | |
| return None, "β οΈ Please initialize the agent first!", None, [], error_status | |
| return agent.process_voice_input(audio, progress) | |
| def clear_wrapper(): | |
| """Wrapper for clear function""" | |
| if agent is None: | |
| error_status = '<div style="background: #f44336; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white;">β οΈ Not initialized - Click Initialize Agent</div>' | |
| return None, "β οΈ Please initialize the agent first!", None, [], error_status | |
| return agent.clear_conversation() | |
| def change_voice_wrapper(voice_name): | |
| """Wrapper for voice change function""" | |
| if agent is None: | |
| return "β οΈ Please initialize the agent first!" | |
| return agent.change_voice(voice_name) | |
| def toggle_continuous_wrapper(enabled): | |
| """Wrapper for continuous mode toggle""" | |
| if agent is None: | |
| return "β οΈ Please initialize the agent first!" | |
| return agent.toggle_continuous_mode(enabled) | |
| # Create Gradio Interface | |
| with gr.Blocks( | |
| title="ποΈ Real-Time Voice Agent", | |
| theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"), | |
| css=""" | |
| .main-header {text-align: center; padding: 30px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 20px;} | |
| .status-box {background: #f8f9fa; padding: 15px; border-radius: 8px; border-left: 4px solid #667eea;} | |
| .warning-box {background: #fff3cd; padding: 15px; border-radius: 8px; border-left: 4px solid #ffc107;} | |
| @keyframes pulse { | |
| 0%, 100% { opacity: 1; } | |
| 50% { opacity: 0.7; } | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown(""" | |
| <div class="main-header"> | |
| <h1>ποΈ Real-Time Voice Agent</h1> | |
| <p>State-of-the-art voice conversation powered by OpenAI</p> | |
| <p><em>Whisper + GPT-4o-mini + TTS</em></p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown(""" | |
| ### π Quick Start | |
| 1. **Initialize** the agent below | |
| 2. **Click** the microphone π€ | |
| 3. **Speak** your question | |
| 4. **Click stop** when done | |
| 5. **Listen** to the AI response | |
| **π‘ Pro Tip:** Enable Continuous Mode below for a more natural conversation flow! | |
| --- | |
| ### βοΈ Settings | |
| """) | |
| init_button = gr.Button( | |
| "π€ Initialize Voice Agent", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| init_status = gr.Markdown( | |
| '<div class="warning-box">β οΈ Click "Initialize Voice Agent" to start</div>' | |
| ) | |
| gr.Markdown("---") | |
| voice_selector = gr.Dropdown( | |
| choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], | |
| value="alloy", | |
| label="π΅ AI Voice Style", | |
| info="Select the voice for AI responses" | |
| ) | |
| voice_status = gr.Markdown("") | |
| gr.Markdown("---") | |
| continuous_toggle = gr.Checkbox( | |
| label="π Continuous Listening Mode", | |
| value=False, | |
| info="Auto-activate microphone after each response" | |
| ) | |
| continuous_status = gr.Markdown("") | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Tips | |
| - π― Speak clearly and naturally | |
| - β±οΈ Keep messages under 20 seconds | |
| - π Minimize background noise | |
| - π Use Chrome for best compatibility | |
| - π Enable Continuous Mode for hands-free conversation | |
| ### π Continuous Mode | |
| When enabled, the microphone automatically activates after each AI response - just speak and click stop! | |
| ### π€ Voice Styles | |
| - **Alloy**: Neutral, balanced | |
| - **Echo**: Male, clear | |
| - **Fable**: British, expressive | |
| - **Onyx**: Deep, authoritative | |
| - **Nova**: Female, friendly | |
| - **Shimmer**: Warm, engaging | |
| """) | |
| with gr.Column(scale=2): | |
| gr.Markdown("## π€ Voice Conversation") | |
| listening_indicator = gr.Markdown( | |
| '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">π€ Ready - Click microphone to speak</div>' | |
| ) | |
| audio_input = gr.Audio( | |
| sources=["microphone", "upload"], | |
| type="filepath", | |
| label="π€ Click to Record Your Voice" | |
| ) | |
| process_status = gr.Markdown( | |
| '<div class="status-box">**Status:** Ready to listen...</div>', | |
| elem_classes=["status-box"] | |
| ) | |
| audio_output = gr.Audio( | |
| label="π AI Voice Response", | |
| type="filepath", | |
| autoplay=True | |
| ) | |
| with gr.Row(): | |
| process_btn = gr.Button( | |
| "π¬ Process Voice", | |
| variant="secondary", | |
| size="lg", | |
| scale=3 | |
| ) | |
| clear_btn = gr.Button( | |
| "ποΈ Clear History", | |
| variant="stop", | |
| scale=1 | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("## π Conversation History") | |
| conversation_display = gr.Chatbot( | |
| label="Your Conversation", | |
| height=400, | |
| bubble_full_width=False, | |
| avatar_images=(None, "π€") | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π Technical Stack | |
| - **Speech Recognition**: OpenAI Whisper (99%+ accuracy) | |
| - **Language Model**: GPT-4o-mini (fast, intelligent) | |
| - **Speech Synthesis**: OpenAI TTS (natural, expressive) | |
| - **Interface**: Gradio (real-time updates) | |
| ### π Privacy & Costs | |
| - Requires OpenAI API key (set in Space settings) | |
| - Approximate cost: $0.01-0.03 per conversation | |
| - Audio is processed through OpenAI's API | |
| - No data is stored permanently | |
| ### π Troubleshooting | |
| - **No audio?** Check browser microphone permissions | |
| - **API error?** Verify your OpenAI API key in Space settings | |
| - **Slow response?** Try shorter messages or upgrade to paid OpenAI plan | |
| --- | |
| <div style="text-align: center; color: #666;"> | |
| Built with β€οΈ using OpenAI APIs | | |
| <a href="https://github.com/openai/whisper">Whisper</a> | | |
| <a href="https://platform.openai.com/docs/guides/text-to-speech">TTS</a> | | |
| <a href="https://platform.openai.com/docs/guides/chat">GPT-4</a> | |
| </div> | |
| """) | |
| # Event handlers | |
| init_button.click( | |
| fn=initialize_agent, | |
| outputs=[init_status] | |
| ) | |
| process_btn.click( | |
| fn=process_audio_wrapper, | |
| inputs=[audio_input], | |
| outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator] | |
| ) | |
| # Auto-process when recording stops | |
| audio_input.stop_recording( | |
| fn=process_audio_wrapper, | |
| inputs=[audio_input], | |
| outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator] | |
| ) | |
| clear_btn.click( | |
| fn=clear_wrapper, | |
| outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator] | |
| ) | |
| voice_selector.change( | |
| fn=change_voice_wrapper, | |
| inputs=[voice_selector], | |
| outputs=[voice_status] | |
| ) | |
| continuous_toggle.change( | |
| fn=toggle_continuous_wrapper, | |
| inputs=[continuous_toggle], | |
| outputs=[continuous_status] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| share=False, | |
| show_error=True | |
| ) | |