|
|
import streamlit as st |
|
|
from openai import OpenAI |
|
|
import io |
|
|
import base64 |
|
|
import os |
|
|
import tempfile |
|
|
from audio_recorder_streamlit import audio_recorder |
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Voice Bot", |
|
|
layout="wide", |
|
|
initial_sidebar_state="collapsed" |
|
|
) |
|
|
|
|
|
|
|
|
TEMP_AUDIO_FILE = "temp_audio.wav" |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def init_openai_client(): |
|
|
try: |
|
|
|
|
|
api_key = None |
|
|
|
|
|
|
|
|
api_key = os.environ.get("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
if not api_key: |
|
|
try: |
|
|
api_key = st.secrets["OPENAI_API_KEY"] |
|
|
except (KeyError, FileNotFoundError): |
|
|
pass |
|
|
|
|
|
|
|
|
if not api_key: |
|
|
st.error("β οΈ OpenAI API key not found!") |
|
|
st.markdown(""" |
|
|
**For Hugging Face Spaces:** |
|
|
1. Go to your Space settings |
|
|
2. Click on "Repository secrets" |
|
|
3. Add a new secret with name: `OPENAI_API_KEY` |
|
|
4. Restart your Space |
|
|
|
|
|
**For local development:** |
|
|
Create `.streamlit/secrets.toml` with: |
|
|
``` |
|
|
OPENAI_API_KEY = "your-key-here" |
|
|
``` |
|
|
""") |
|
|
st.stop() |
|
|
|
|
|
return OpenAI(api_key=api_key) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error initializing OpenAI client: {str(e)}") |
|
|
st.stop() |
|
|
|
|
|
client = init_openai_client() |
|
|
|
|
|
|
|
|
def init_session_state(): |
|
|
if 'conversation_history' not in st.session_state: |
|
|
st.session_state.conversation_history = [] |
|
|
if 'context' not in st.session_state: |
|
|
st.session_state.context = load_context() |
|
|
if 'processing' not in st.session_state: |
|
|
st.session_state.processing = False |
|
|
if 'last_audio_hash' not in st.session_state: |
|
|
st.session_state.last_audio_hash = None |
|
|
|
|
|
def load_context(): |
|
|
"""Load the context from file or return default.""" |
|
|
try: |
|
|
base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
context_path = os.path.join(base_dir, 'context.txt') |
|
|
|
|
|
if os.path.exists(context_path): |
|
|
with open(context_path, "r", encoding='utf-8') as f: |
|
|
return f.read().strip() |
|
|
else: |
|
|
|
|
|
return """I am Prakhar, an AI assistant. I can help you with general questions and conversations. |
|
|
I aim to be helpful, harmless, and honest in all my interactions.""" |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error loading context: {str(e)}") |
|
|
return "I am Prakhar, an AI assistant." |
|
|
|
|
|
def save_context(context_text): |
|
|
"""Save context to file.""" |
|
|
try: |
|
|
base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
context_path = os.path.join(base_dir, 'context.txt') |
|
|
|
|
|
with open(context_path, "w", encoding='utf-8') as f: |
|
|
f.write(context_text) |
|
|
return True |
|
|
except Exception as e: |
|
|
st.error(f"Error saving context: {str(e)}") |
|
|
return False |
|
|
|
|
|
def transcribe_audio(audio_bytes): |
|
|
"""Transcribe audio using Whisper API.""" |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: |
|
|
tmp_file.write(audio_bytes) |
|
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
|
|
|
with open(tmp_file_path, "rb") as audio_file: |
|
|
transcript = client.audio.transcriptions.create( |
|
|
model="whisper-1", |
|
|
file=audio_file, |
|
|
language="en" |
|
|
) |
|
|
|
|
|
|
|
|
os.unlink(tmp_file_path) |
|
|
|
|
|
return transcript.text.strip() |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error transcribing audio: {str(e)}") |
|
|
return None |
|
|
|
|
|
def get_ai_response(user_text, context): |
|
|
"""Get AI response using GPT-4.""" |
|
|
try: |
|
|
system_prompt = f"""You are Prakhar. You should respond naturally and helpfully. |
|
|
|
|
|
Context about you: |
|
|
{context} |
|
|
|
|
|
Instructions: |
|
|
- Use the context above to inform your responses |
|
|
- If asked about something not covered in the context, you can use your general knowledge |
|
|
- If you're not sure about something specific to your context, say "I'm not sure about that based on what I know about myself" |
|
|
- Keep responses conversational and natural |
|
|
- Be helpful and engaging""" |
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model="gpt-4", |
|
|
messages=[ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_text} |
|
|
], |
|
|
max_tokens=500, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
return response.choices[0].message.content.strip() |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error getting AI response: {str(e)}") |
|
|
return "I'm sorry, I encountered an error while processing your request." |
|
|
|
|
|
def text_to_speech(text): |
|
|
"""Convert text to speech using OpenAI TTS.""" |
|
|
try: |
|
|
response = client.audio.speech.create( |
|
|
model="tts-1", |
|
|
voice="onyx", |
|
|
input=text, |
|
|
speed=1.0 |
|
|
) |
|
|
|
|
|
return response.content |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error generating speech: {str(e)}") |
|
|
return None |
|
|
|
|
|
def process_audio(audio_bytes): |
|
|
"""Process recorded audio through the full pipeline.""" |
|
|
if not audio_bytes: |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
with st.spinner("π― Transcribing audio..."): |
|
|
user_text = transcribe_audio(audio_bytes) |
|
|
|
|
|
if not user_text: |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
with st.spinner("π€ Generating response..."): |
|
|
ai_response = get_ai_response(user_text, st.session_state.context) |
|
|
|
|
|
|
|
|
with st.spinner("π Converting to speech..."): |
|
|
speech_audio = text_to_speech(ai_response) |
|
|
|
|
|
return user_text, ai_response, speech_audio |
|
|
|
|
|
def main(): |
|
|
st.title("ποΈ Voice Bot") |
|
|
st.markdown("*Talk to Prakhar using your voice!*") |
|
|
|
|
|
|
|
|
init_session_state() |
|
|
|
|
|
|
|
|
col1, col2 = st.columns([1, 1], gap="large") |
|
|
|
|
|
with col1: |
|
|
st.subheader("π€ Voice Input") |
|
|
|
|
|
|
|
|
audio_bytes = audio_recorder( |
|
|
text="Click to record", |
|
|
recording_color="#e74c3c", |
|
|
neutral_color="#34495e", |
|
|
icon_name="microphone", |
|
|
icon_size="2x", |
|
|
pause_threshold=2.0, |
|
|
sample_rate=44100 |
|
|
) |
|
|
|
|
|
|
|
|
if audio_bytes: |
|
|
st.audio(audio_bytes, format="audio/wav") |
|
|
|
|
|
|
|
|
if audio_bytes and not st.session_state.processing: |
|
|
|
|
|
import hashlib |
|
|
audio_hash = hashlib.md5(audio_bytes).hexdigest() |
|
|
|
|
|
|
|
|
if audio_hash != st.session_state.last_audio_hash: |
|
|
st.session_state.processing = True |
|
|
st.session_state.last_audio_hash = audio_hash |
|
|
|
|
|
user_text, ai_response, speech_audio = process_audio(audio_bytes) |
|
|
|
|
|
if user_text and ai_response: |
|
|
|
|
|
st.session_state.conversation_history.append({ |
|
|
"user": user_text, |
|
|
"ai": ai_response, |
|
|
"speech": speech_audio |
|
|
}) |
|
|
|
|
|
|
|
|
st.session_state.processing = False |
|
|
|
|
|
|
|
|
if user_text and ai_response: |
|
|
st.rerun() |
|
|
|
|
|
with col2: |
|
|
st.subheader("π¬ Conversation") |
|
|
|
|
|
|
|
|
if st.session_state.conversation_history: |
|
|
|
|
|
latest = st.session_state.conversation_history[-1] |
|
|
|
|
|
st.markdown("**You said:**") |
|
|
st.info(latest["user"]) |
|
|
|
|
|
st.markdown("**Prakhar replied:**") |
|
|
st.success(latest["ai"]) |
|
|
st.session_state.processing = False |
|
|
|
|
|
|
|
|
if latest["speech"]: |
|
|
st.audio(latest["speech"], format="audio/mp3") |
|
|
|
|
|
|
|
|
if len(st.session_state.conversation_history) > 1: |
|
|
with st.expander("π Previous conversations"): |
|
|
for i, conv in enumerate(reversed(st.session_state.conversation_history[:-1])): |
|
|
st.markdown(f"**Conversation {len(st.session_state.conversation_history) - i - 1}:**") |
|
|
st.markdown(f"π€ You: {conv['user']}") |
|
|
st.markdown(f"π€ Prakhar: {conv['ai']}") |
|
|
if conv["speech"]: |
|
|
st.audio(conv["speech"], format="audio/mp3") |
|
|
st.divider() |
|
|
else: |
|
|
st.info("π Start by recording your voice message above!") |
|
|
|
|
|
|
|
|
st.divider() |
|
|
|
|
|
with st.expander("βΉοΈ Context", expanded=False): |
|
|
st.info(st.session_state.context) |
|
|
|
|
|
|
|
|
if st.button("ποΈ Clear Conversation"): |
|
|
st.session_state.conversation_history = [] |
|
|
st.rerun() |
|
|
|
|
|
|
|
|
if st.session_state.processing: |
|
|
st.info("π Processing your request...") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |