easy_voice / app.py
eduard76's picture
Update app.py
19c6da1 verified
import gradio as gr
import openai
import os
from pathlib import Path
import tempfile
import numpy as np
from openai import OpenAI
class RealtimeVoiceAgent:
def __init__(self, api_key=None):
"""Initialize the voice agent with OpenAI"""
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OpenAI API key not found. Set OPENAI_API_KEY environment variable.")
self.client = OpenAI(api_key=self.api_key)
self.conversation_history = []
self.voice = "alloy" # Default voice
self.continuous_mode = False # Continuous listening mode
def transcribe_audio(self, audio_path):
"""Convert speech to text using OpenAI Whisper API"""
try:
# Debug: Check if file exists
if not os.path.exists(audio_path):
raise Exception(f"Audio file not found at path: {audio_path}")
# Debug: Check file size
file_size = os.path.getsize(audio_path)
if file_size == 0:
raise Exception("Audio file is empty (0 bytes)")
print(f"[DEBUG] Transcribing audio: {audio_path} ({file_size} bytes)")
with open(audio_path, "rb") as audio_file:
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="en"
)
print(f"[DEBUG] Transcription successful: {transcript.text[:50]}...")
return transcript.text
except FileNotFoundError as e:
raise Exception(f"Audio file not found: {str(e)}")
except Exception as e:
raise Exception(f"Transcription failed: {type(e).__name__} - {str(e)}")
def get_llm_response(self, user_message):
"""Get streaming response from OpenAI GPT"""
try:
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": user_message
})
# Get streaming response
response = self.client.chat.completions.create(
model="gpt-4o-mini", # Fast and cost-effective
messages=[
{"role": "system", "content": "You are a helpful, friendly voice assistant. Keep responses concise and natural for voice conversation (2-3 sentences max)."},
*self.conversation_history
],
max_tokens=150,
temperature=0.7,
stream=True
)
# Collect full response
full_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content
# Add assistant response to history
self.conversation_history.append({
"role": "assistant",
"content": full_response
})
return full_response
except Exception as e:
raise Exception(f"LLM response failed: {str(e)}")
def synthesize_speech(self, text):
"""Convert text to speech using OpenAI TTS"""
try:
response = self.client.audio.speech.create(
model="tts-1", # Fast model (tts-1-hd for higher quality)
voice=self.voice, # Options: alloy, echo, fable, onyx, nova, shimmer
input=text,
speed=1.0
)
# Save to temporary file with proper handling for Gradio
temp_dir = tempfile.gettempdir()
output_path = os.path.join(temp_dir, f"tts_output_{os.getpid()}_{hash(text) % 10000}.mp3")
with open(output_path, "wb") as f:
f.write(response.content)
return output_path
except Exception as e:
raise Exception(f"Speech synthesis failed: {str(e)}")
def process_voice_input(self, audio_input, progress=gr.Progress()):
"""Full pipeline: Voice β†’ Text β†’ LLM β†’ Voice"""
if audio_input is None:
ready_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">🎀 Ready - Click microphone to speak</div>'
return None, "⚠️ No audio detected. Please record your voice.", None, self._format_history(), ready_status
try:
# Step 1: Speech to Text
progress(0.2, desc="🎧 Transcribing your voice...")
user_text = self.transcribe_audio(audio_input)
if not user_text.strip():
ready_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">🎀 Ready - Click microphone to speak</div>'
return None, "⚠️ Could not understand audio. Please speak clearly.", None, self._format_history(), ready_status
# Step 2: Get LLM Response
progress(0.5, desc="πŸ€” Thinking...")
assistant_text = self.get_llm_response(user_text)
# Step 3: Text to Speech
progress(0.8, desc="πŸ”Š Generating voice response...")
audio_output = self.synthesize_speech(assistant_text)
# Format status
status = f"**You:** {user_text}\n\n**Assistant:** {assistant_text}"
# Format conversation history
chat_history = self._format_history()
progress(1.0, desc="βœ“ Done!")
# Listening status based on continuous mode
if self.continuous_mode:
listening_status = '<div style="background: #4caf50; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white; animation: pulse 1.5s infinite;">πŸŽ™οΈ LISTENING - Speak now (continuous mode)</div>'
else:
listening_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">🎀 Ready - Click microphone to speak</div>'
return audio_output, status, None, chat_history, listening_status
except Exception as e:
error_msg = f"❌ Error: {str(e)}\n\nPlease check your API key and try again."
error_status = '<div style="background: #f44336; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white;">⚠️ Error occurred - Try again</div>'
return None, error_msg, None, self._format_history(), error_status
def _format_history(self):
"""Format conversation history for chatbot display"""
formatted = []
for i in range(0, len(self.conversation_history), 2):
if i + 1 < len(self.conversation_history):
formatted.append((
self.conversation_history[i]["content"],
self.conversation_history[i + 1]["content"]
))
return formatted
def clear_conversation(self):
"""Clear conversation history"""
self.conversation_history = []
ready_status = '<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">🎀 Ready - Click microphone to speak</div>'
return None, "Conversation cleared!", None, [], ready_status
def change_voice(self, voice_name):
"""Change TTS voice"""
self.voice = voice_name
return f"βœ“ Voice changed to: **{voice_name}**"
def toggle_continuous_mode(self, enabled):
"""Toggle continuous listening mode"""
self.continuous_mode = enabled
if enabled:
return "πŸŽ™οΈ **Continuous Mode ON** - Microphone will auto-activate after each response"
else:
return "⏸️ **Continuous Mode OFF** - Manual recording required"
# Initialize agent (will use environment variable)
agent = None
def initialize_agent():
"""Initialize agent with API key check"""
global agent
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return "❌ OpenAI API key not found!\n\nPlease set it in Hugging Face Space settings:\nSettings β†’ Repository secrets β†’ New secret\nName: OPENAI_API_KEY\nValue: your-api-key"
try:
agent = RealtimeVoiceAgent(api_key=api_key)
return "βœ… Voice Agent initialized successfully!\n\n🎀 You can now start talking!"
except Exception as e:
return f"❌ Initialization failed: {str(e)}"
def process_audio_wrapper(audio, progress=gr.Progress()):
"""Wrapper to check if agent is initialized"""
if agent is None:
error_status = '<div style="background: #f44336; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white;">⚠️ Not initialized - Click Initialize Agent</div>'
return None, "⚠️ Please initialize the agent first!", None, [], error_status
return agent.process_voice_input(audio, progress)
def clear_wrapper():
"""Wrapper for clear function"""
if agent is None:
error_status = '<div style="background: #f44336; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold; color: white;">⚠️ Not initialized - Click Initialize Agent</div>'
return None, "⚠️ Please initialize the agent first!", None, [], error_status
return agent.clear_conversation()
def change_voice_wrapper(voice_name):
"""Wrapper for voice change function"""
if agent is None:
return "⚠️ Please initialize the agent first!"
return agent.change_voice(voice_name)
def toggle_continuous_wrapper(enabled):
"""Wrapper for continuous mode toggle"""
if agent is None:
return "⚠️ Please initialize the agent first!"
return agent.toggle_continuous_mode(enabled)
# Create Gradio Interface
with gr.Blocks(
title="πŸŽ™οΈ Real-Time Voice Agent",
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"),
css="""
.main-header {text-align: center; padding: 30px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 20px;}
.status-box {background: #f8f9fa; padding: 15px; border-radius: 8px; border-left: 4px solid #667eea;}
.warning-box {background: #fff3cd; padding: 15px; border-radius: 8px; border-left: 4px solid #ffc107;}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.7; }
}
"""
) as demo:
gr.Markdown("""
<div class="main-header">
<h1>πŸŽ™οΈ Real-Time Voice Agent</h1>
<p>State-of-the-art voice conversation powered by OpenAI</p>
<p><em>Whisper + GPT-4o-mini + TTS</em></p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("""
### πŸš€ Quick Start
1. **Initialize** the agent below
2. **Click** the microphone 🎀
3. **Speak** your question
4. **Click stop** when done
5. **Listen** to the AI response
**πŸ’‘ Pro Tip:** Enable Continuous Mode below for a more natural conversation flow!
---
### βš™οΈ Settings
""")
init_button = gr.Button(
"πŸ€– Initialize Voice Agent",
variant="primary",
size="lg"
)
init_status = gr.Markdown(
'<div class="warning-box">⚠️ Click "Initialize Voice Agent" to start</div>'
)
gr.Markdown("---")
voice_selector = gr.Dropdown(
choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"],
value="alloy",
label="🎡 AI Voice Style",
info="Select the voice for AI responses"
)
voice_status = gr.Markdown("")
gr.Markdown("---")
continuous_toggle = gr.Checkbox(
label="πŸ”„ Continuous Listening Mode",
value=False,
info="Auto-activate microphone after each response"
)
continuous_status = gr.Markdown("")
gr.Markdown("""
---
### πŸ’‘ Tips
- 🎯 Speak clearly and naturally
- ⏱️ Keep messages under 20 seconds
- πŸ”‡ Minimize background noise
- 🌐 Use Chrome for best compatibility
- πŸ”„ Enable Continuous Mode for hands-free conversation
### πŸ”„ Continuous Mode
When enabled, the microphone automatically activates after each AI response - just speak and click stop!
### 🎀 Voice Styles
- **Alloy**: Neutral, balanced
- **Echo**: Male, clear
- **Fable**: British, expressive
- **Onyx**: Deep, authoritative
- **Nova**: Female, friendly
- **Shimmer**: Warm, engaging
""")
with gr.Column(scale=2):
gr.Markdown("## 🎀 Voice Conversation")
listening_indicator = gr.Markdown(
'<div style="background: #e3f2fd; padding: 10px; border-radius: 5px; text-align: center; font-weight: bold;">🎀 Ready - Click microphone to speak</div>'
)
audio_input = gr.Audio(
sources=["microphone", "upload"],
type="filepath",
label="🎀 Click to Record Your Voice"
)
process_status = gr.Markdown(
'<div class="status-box">**Status:** Ready to listen...</div>',
elem_classes=["status-box"]
)
audio_output = gr.Audio(
label="πŸ”Š AI Voice Response",
type="filepath",
autoplay=True
)
with gr.Row():
process_btn = gr.Button(
"πŸ’¬ Process Voice",
variant="secondary",
size="lg",
scale=3
)
clear_btn = gr.Button(
"πŸ—‘οΈ Clear History",
variant="stop",
scale=1
)
gr.Markdown("---")
gr.Markdown("## πŸ’­ Conversation History")
conversation_display = gr.Chatbot(
label="Your Conversation",
height=400,
bubble_full_width=False,
avatar_images=(None, "πŸ€–")
)
gr.Markdown("""
---
### πŸ“Š Technical Stack
- **Speech Recognition**: OpenAI Whisper (99%+ accuracy)
- **Language Model**: GPT-4o-mini (fast, intelligent)
- **Speech Synthesis**: OpenAI TTS (natural, expressive)
- **Interface**: Gradio (real-time updates)
### πŸ” Privacy & Costs
- Requires OpenAI API key (set in Space settings)
- Approximate cost: $0.01-0.03 per conversation
- Audio is processed through OpenAI's API
- No data is stored permanently
### πŸ› Troubleshooting
- **No audio?** Check browser microphone permissions
- **API error?** Verify your OpenAI API key in Space settings
- **Slow response?** Try shorter messages or upgrade to paid OpenAI plan
---
<div style="text-align: center; color: #666;">
Built with ❀️ using OpenAI APIs |
<a href="https://github.com/openai/whisper">Whisper</a> |
<a href="https://platform.openai.com/docs/guides/text-to-speech">TTS</a> |
<a href="https://platform.openai.com/docs/guides/chat">GPT-4</a>
</div>
""")
# Event handlers
init_button.click(
fn=initialize_agent,
outputs=[init_status]
)
process_btn.click(
fn=process_audio_wrapper,
inputs=[audio_input],
outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator]
)
# Auto-process when recording stops
audio_input.stop_recording(
fn=process_audio_wrapper,
inputs=[audio_input],
outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator]
)
clear_btn.click(
fn=clear_wrapper,
outputs=[audio_output, process_status, audio_input, conversation_display, listening_indicator]
)
voice_selector.change(
fn=change_voice_wrapper,
inputs=[voice_selector],
outputs=[voice_status]
)
continuous_toggle.change(
fn=toggle_continuous_wrapper,
inputs=[continuous_toggle],
outputs=[continuous_status]
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
share=False,
show_error=True
)