Voice-Bot / src /streamlit_app.py
Daaku-C5's picture
Update src/streamlit_app.py
c3e2346 verified
import streamlit as st
from openai import OpenAI
import io
import base64
import os
import tempfile
from audio_recorder_streamlit import audio_recorder
# Page configuration
st.set_page_config(
page_title="Voice Bot",
layout="wide",
initial_sidebar_state="collapsed"
)
# Configuration
TEMP_AUDIO_FILE = "temp_audio.wav"
# Initialize OpenAI client
@st.cache_resource
def init_openai_client():
try:
# Check for API key in multiple places
api_key = None
# 1. Try environment variable first (HF Spaces secrets appear as env vars)
api_key = os.environ.get("OPENAI_API_KEY")
# 2. Try Streamlit secrets (for local development)
if not api_key:
try:
api_key = st.secrets["OPENAI_API_KEY"]
except (KeyError, FileNotFoundError):
pass
# 3. Check if we found the key
if not api_key:
st.error("⚠️ OpenAI API key not found!")
st.markdown("""
**For Hugging Face Spaces:**
1. Go to your Space settings
2. Click on "Repository secrets"
3. Add a new secret with name: `OPENAI_API_KEY`
4. Restart your Space
**For local development:**
Create `.streamlit/secrets.toml` with:
```
OPENAI_API_KEY = "your-key-here"
```
""")
st.stop()
return OpenAI(api_key=api_key)
except Exception as e:
st.error(f"Error initializing OpenAI client: {str(e)}")
st.stop()
client = init_openai_client()
# Initialize session state variables
def init_session_state():
if 'conversation_history' not in st.session_state:
st.session_state.conversation_history = []
if 'context' not in st.session_state:
st.session_state.context = load_context()
if 'processing' not in st.session_state:
st.session_state.processing = False
if 'last_audio_hash' not in st.session_state:
st.session_state.last_audio_hash = None
def load_context():
"""Load the context from file or return default."""
try:
base_dir = os.path.dirname(os.path.abspath(__file__))
context_path = os.path.join(base_dir, 'context.txt')
if os.path.exists(context_path):
with open(context_path, "r", encoding='utf-8') as f:
return f.read().strip()
else:
# Default context if file doesn't exist
return """I am Prakhar, an AI assistant. I can help you with general questions and conversations.
I aim to be helpful, harmless, and honest in all my interactions."""
except Exception as e:
st.error(f"Error loading context: {str(e)}")
return "I am Prakhar, an AI assistant."
def save_context(context_text):
"""Save context to file."""
try:
base_dir = os.path.dirname(os.path.abspath(__file__))
context_path = os.path.join(base_dir, 'context.txt')
with open(context_path, "w", encoding='utf-8') as f:
f.write(context_text)
return True
except Exception as e:
st.error(f"Error saving context: {str(e)}")
return False
def transcribe_audio(audio_bytes):
"""Transcribe audio using Whisper API."""
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_file.write(audio_bytes)
tmp_file_path = tmp_file.name
# Transcribe using OpenAI Whisper
with open(tmp_file_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="en" # You can remove this to auto-detect language
)
# Clean up temporary file
os.unlink(tmp_file_path)
return transcript.text.strip()
except Exception as e:
st.error(f"Error transcribing audio: {str(e)}")
return None
def get_ai_response(user_text, context):
"""Get AI response using GPT-4."""
try:
system_prompt = f"""You are Prakhar. You should respond naturally and helpfully.
Context about you:
{context}
Instructions:
- Use the context above to inform your responses
- If asked about something not covered in the context, you can use your general knowledge
- If you're not sure about something specific to your context, say "I'm not sure about that based on what I know about myself"
- Keep responses conversational and natural
- Be helpful and engaging"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text}
],
max_tokens=500,
temperature=0.7
)
return response.choices[0].message.content.strip()
except Exception as e:
st.error(f"Error getting AI response: {str(e)}")
return "I'm sorry, I encountered an error while processing your request."
def text_to_speech(text):
"""Convert text to speech using OpenAI TTS."""
try:
response = client.audio.speech.create(
model="tts-1",
voice="onyx", # Available voices: alloy, echo, fable, onyx, nova, shimmer
input=text,
speed=1.0
)
return response.content
except Exception as e:
st.error(f"Error generating speech: {str(e)}")
return None
def process_audio(audio_bytes):
"""Process recorded audio through the full pipeline."""
if not audio_bytes:
return None, None, None
# Transcribe audio
with st.spinner("🎯 Transcribing audio..."):
user_text = transcribe_audio(audio_bytes)
if not user_text:
return None, None, None
# Get AI response
with st.spinner("πŸ€– Generating response..."):
ai_response = get_ai_response(user_text, st.session_state.context)
# Convert to speech
with st.spinner("πŸ”Š Converting to speech..."):
speech_audio = text_to_speech(ai_response)
return user_text, ai_response, speech_audio
def main():
st.title("πŸŽ™οΈ Voice Bot")
st.markdown("*Talk to Prakhar using your voice!*")
# Initialize session state
init_session_state()
# Create main layout
col1, col2 = st.columns([1, 1], gap="large")
with col1:
st.subheader("🎀 Voice Input")
# Audio recorder
audio_bytes = audio_recorder(
text="Click to record",
recording_color="#e74c3c",
neutral_color="#34495e",
icon_name="microphone",
icon_size="2x",
pause_threshold=2.0,
sample_rate=44100
)
# Show current recording
if audio_bytes:
st.audio(audio_bytes, format="audio/wav")
# Process audio when new recording is available
if audio_bytes and not st.session_state.processing:
# Create a hash of the audio to detect new recordings
import hashlib
audio_hash = hashlib.md5(audio_bytes).hexdigest()
# Only process if this is a new recording
if audio_hash != st.session_state.last_audio_hash:
st.session_state.processing = True
st.session_state.last_audio_hash = audio_hash
user_text, ai_response, speech_audio = process_audio(audio_bytes)
if user_text and ai_response:
# Add to conversation history
st.session_state.conversation_history.append({
"user": user_text,
"ai": ai_response,
"speech": speech_audio
})
# Reset processing flag before rerun
st.session_state.processing = False
# Force a rerun to update the conversation display
if user_text and ai_response:
st.rerun()
with col2:
st.subheader("πŸ’¬ Conversation")
# Display conversation history
if st.session_state.conversation_history:
# Show the most recent conversation
latest = st.session_state.conversation_history[-1]
st.markdown("**You said:**")
st.info(latest["user"])
st.markdown("**Prakhar replied:**")
st.success(latest["ai"])
st.session_state.processing = False
# Play AI response audio
if latest["speech"]:
st.audio(latest["speech"], format="audio/mp3")
# Show conversation history
if len(st.session_state.conversation_history) > 1:
with st.expander("πŸ“œ Previous conversations"):
for i, conv in enumerate(reversed(st.session_state.conversation_history[:-1])):
st.markdown(f"**Conversation {len(st.session_state.conversation_history) - i - 1}:**")
st.markdown(f"πŸ‘€ You: {conv['user']}")
st.markdown(f"πŸ€– Prakhar: {conv['ai']}")
if conv["speech"]:
st.audio(conv["speech"], format="audio/mp3")
st.divider()
else:
st.info("πŸ‘‹ Start by recording your voice message above!")
# Context display section
st.divider()
with st.expander("ℹ️ Context", expanded=False):
st.info(st.session_state.context)
# Only keep the clear conversation button
if st.button("πŸ—‘οΈ Clear Conversation"):
st.session_state.conversation_history = []
st.rerun()
# Status indicators
if st.session_state.processing:
st.info("πŸ”„ Processing your request...")
if __name__ == "__main__":
main()