import streamlit as st from openai import OpenAI import io import base64 import os import tempfile from audio_recorder_streamlit import audio_recorder # Page configuration st.set_page_config( page_title="Voice Bot", layout="wide", initial_sidebar_state="collapsed" ) # Configuration TEMP_AUDIO_FILE = "temp_audio.wav" # Initialize OpenAI client @st.cache_resource def init_openai_client(): try: # Check for API key in multiple places api_key = None # 1. Try environment variable first (HF Spaces secrets appear as env vars) api_key = os.environ.get("OPENAI_API_KEY") # 2. Try Streamlit secrets (for local development) if not api_key: try: api_key = st.secrets["OPENAI_API_KEY"] except (KeyError, FileNotFoundError): pass # 3. Check if we found the key if not api_key: st.error("âš ī¸ OpenAI API key not found!") st.markdown(""" **For Hugging Face Spaces:** 1. Go to your Space settings 2. Click on "Repository secrets" 3. Add a new secret with name: `OPENAI_API_KEY` 4. Restart your Space **For local development:** Create `.streamlit/secrets.toml` with: ``` OPENAI_API_KEY = "your-key-here" ``` """) st.stop() return OpenAI(api_key=api_key) except Exception as e: st.error(f"Error initializing OpenAI client: {str(e)}") st.stop() client = init_openai_client() # Initialize session state variables def init_session_state(): if 'conversation_history' not in st.session_state: st.session_state.conversation_history = [] if 'context' not in st.session_state: st.session_state.context = load_context() if 'processing' not in st.session_state: st.session_state.processing = False if 'last_audio_hash' not in st.session_state: st.session_state.last_audio_hash = None def load_context(): """Load the context from file or return default.""" try: base_dir = os.path.dirname(os.path.abspath(__file__)) context_path = os.path.join(base_dir, 'context.txt') if os.path.exists(context_path): with open(context_path, "r", encoding='utf-8') as f: return f.read().strip() else: # Default context if file doesn't exist return """I am Prakhar, an AI assistant. I can help you with general questions and conversations. I aim to be helpful, harmless, and honest in all my interactions.""" except Exception as e: st.error(f"Error loading context: {str(e)}") return "I am Prakhar, an AI assistant." def save_context(context_text): """Save context to file.""" try: base_dir = os.path.dirname(os.path.abspath(__file__)) context_path = os.path.join(base_dir, 'context.txt') with open(context_path, "w", encoding='utf-8') as f: f.write(context_text) return True except Exception as e: st.error(f"Error saving context: {str(e)}") return False def transcribe_audio(audio_bytes): """Transcribe audio using Whisper API.""" try: # Create a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: tmp_file.write(audio_bytes) tmp_file_path = tmp_file.name # Transcribe using OpenAI Whisper with open(tmp_file_path, "rb") as audio_file: transcript = client.audio.transcriptions.create( model="whisper-1", file=audio_file, language="en" # You can remove this to auto-detect language ) # Clean up temporary file os.unlink(tmp_file_path) return transcript.text.strip() except Exception as e: st.error(f"Error transcribing audio: {str(e)}") return None def get_ai_response(user_text, context): """Get AI response using GPT-4.""" try: system_prompt = f"""You are Prakhar. You should respond naturally and helpfully. Context about you: {context} Instructions: - Use the context above to inform your responses - If asked about something not covered in the context, you can use your general knowledge - If you're not sure about something specific to your context, say "I'm not sure about that based on what I know about myself" - Keep responses conversational and natural - Be helpful and engaging""" response = client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_text} ], max_tokens=500, temperature=0.7 ) return response.choices[0].message.content.strip() except Exception as e: st.error(f"Error getting AI response: {str(e)}") return "I'm sorry, I encountered an error while processing your request." def text_to_speech(text): """Convert text to speech using OpenAI TTS.""" try: response = client.audio.speech.create( model="tts-1", voice="onyx", # Available voices: alloy, echo, fable, onyx, nova, shimmer input=text, speed=1.0 ) return response.content except Exception as e: st.error(f"Error generating speech: {str(e)}") return None def process_audio(audio_bytes): """Process recorded audio through the full pipeline.""" if not audio_bytes: return None, None, None # Transcribe audio with st.spinner("đŸŽ¯ Transcribing audio..."): user_text = transcribe_audio(audio_bytes) if not user_text: return None, None, None # Get AI response with st.spinner("🤖 Generating response..."): ai_response = get_ai_response(user_text, st.session_state.context) # Convert to speech with st.spinner("🔊 Converting to speech..."): speech_audio = text_to_speech(ai_response) return user_text, ai_response, speech_audio def main(): st.title("đŸŽ™ī¸ Voice Bot") st.markdown("*Talk to Prakhar using your voice!*") # Initialize session state init_session_state() # Create main layout col1, col2 = st.columns([1, 1], gap="large") with col1: st.subheader("🎤 Voice Input") # Audio recorder audio_bytes = audio_recorder( text="Click to record", recording_color="#e74c3c", neutral_color="#34495e", icon_name="microphone", icon_size="2x", pause_threshold=2.0, sample_rate=44100 ) # Show current recording if audio_bytes: st.audio(audio_bytes, format="audio/wav") # Process audio when new recording is available if audio_bytes and not st.session_state.processing: # Create a hash of the audio to detect new recordings import hashlib audio_hash = hashlib.md5(audio_bytes).hexdigest() # Only process if this is a new recording if audio_hash != st.session_state.last_audio_hash: st.session_state.processing = True st.session_state.last_audio_hash = audio_hash user_text, ai_response, speech_audio = process_audio(audio_bytes) if user_text and ai_response: # Add to conversation history st.session_state.conversation_history.append({ "user": user_text, "ai": ai_response, "speech": speech_audio }) # Reset processing flag before rerun st.session_state.processing = False # Force a rerun to update the conversation display if user_text and ai_response: st.rerun() with col2: st.subheader("đŸ’Ŧ Conversation") # Display conversation history if st.session_state.conversation_history: # Show the most recent conversation latest = st.session_state.conversation_history[-1] st.markdown("**You said:**") st.info(latest["user"]) st.markdown("**Prakhar replied:**") st.success(latest["ai"]) st.session_state.processing = False # Play AI response audio if latest["speech"]: st.audio(latest["speech"], format="audio/mp3") # Show conversation history if len(st.session_state.conversation_history) > 1: with st.expander("📜 Previous conversations"): for i, conv in enumerate(reversed(st.session_state.conversation_history[:-1])): st.markdown(f"**Conversation {len(st.session_state.conversation_history) - i - 1}:**") st.markdown(f"👤 You: {conv['user']}") st.markdown(f"🤖 Prakhar: {conv['ai']}") if conv["speech"]: st.audio(conv["speech"], format="audio/mp3") st.divider() else: st.info("👋 Start by recording your voice message above!") # Context display section st.divider() with st.expander("â„šī¸ Context", expanded=False): st.info(st.session_state.context) # Only keep the clear conversation button if st.button("đŸ—‘ī¸ Clear Conversation"): st.session_state.conversation_history = [] st.rerun() # Status indicators if st.session_state.processing: st.info("🔄 Processing your request...") if __name__ == "__main__": main()