import streamlit as st import os import glob import asyncio import sys # Add project root sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from appagents.OrchestratorAgent import OrchestratorAgent from agents import Runner, trace, SQLiteSession from agents.exceptions import InputGuardrailTripwireTriggered # ----------------------------- # Load predefined prompts # ----------------------------- def load_prompts(folder="prompts"): prompts = [] prompt_labels = [] for file_path in glob.glob(os.path.join(folder, "*.txt")): with open(file_path, "r", encoding="utf-8") as f: content = f.read().strip() if content: prompts.append(content) prompt_labels.append(os.path.basename(file_path).replace("_", " ").replace(".txt", "").title()) return prompts, prompt_labels prompts, prompt_labels = load_prompts() # ----------------------------- # Streamlit page config # ----------------------------- st.set_page_config(page_title="AI Chat", layout="wide") # ----------------------------- # Custom CSS (chat, hero banner, input) # ----------------------------- st.markdown(""" """, unsafe_allow_html=True) # ----------------------------- # Session state defaults # ----------------------------- if "chat_history" not in st.session_state: st.session_state.chat_history = [] # newest-first if "input_value" not in st.session_state: st.session_state.input_value = "" if "pending_response" not in st.session_state: st.session_state.pending_response = False if "pending_message" not in st.session_state: st.session_state.pending_message = None if "auto_send_prompt" not in st.session_state: st.session_state.auto_send_prompt = None # Create (or reuse) a persistent SQLite session import uuid # Generate a unique session ID for this browser session if "ai_session_id" not in st.session_state: st.session_state.ai_session_id = str(uuid.uuid4()) session_id = st.session_state.ai_session_id # Create a unique SQLite session per user if "ai_session" not in st.session_state: st.session_state.ai_session = SQLiteSession(f"conversation_{session_id}.db") session = st.session_state.ai_session # ----------------------------- # Async AI response # ----------------------------- async def get_ai_response(prompt: str) -> str: try: agent = OrchestratorAgent.create() with trace("Chatbot Search Agent Run"): result = await Runner.run(agent, prompt, session=session) return result.final_output except InputGuardrailTripwireTriggered as e: reasoning = getattr(e, "reasoning", None) \ or getattr(getattr(e, "output", None), "reasoning", None) \ or getattr(getattr(e, "guardrail_output", None), "reasoning", None) \ or "Guardrail triggered, but no reasoning provided." return f"⚠️ Guardrail Blocked Input:\n\n**Reason:** {reasoning}" # ----------------------------- # Desktop Sidebar Quick Prompts # ----------------------------- with st.sidebar.container() if st.config.get_option("server.headless") is False else st.container() as container: st.markdown('
', unsafe_allow_html=True) # Desktop Quick Prompts st.sidebar.title("💡 Quick Prompts") buttons_html = '
' for idx, prompt_text in enumerate(prompts): label = prompt_labels[idx] if idx < len(prompt_labels) else f"Prompt {idx+1}" buttons_html += f'' buttons_html += '
' st.sidebar.markdown(buttons_html, unsafe_allow_html=True) # ----------------------------- # Mobile Quick Prompts # ----------------------------- with st.container(): st.markdown('
', unsafe_allow_html=True) with st.expander("💡 Quick Prompts"): for idx, prompt_text in enumerate(prompts): label = prompt_labels[idx] if idx < len(prompt_labels) else f"Prompt {idx+1}" if st.button(label, key=f"mobile_prompt_{idx}"): st.session_state.auto_send_prompt = prompt_text st.markdown('
', unsafe_allow_html=True) # ----------------------------- # Hero banner # ----------------------------- st.markdown("""
🤖 AI Chatbot
Your intelligent assistant for insights, trends, and strategy exploration.
""", unsafe_allow_html=True) # ----------------------------- # Chat input area # ----------------------------- with st.form(key="chat_form", clear_on_submit=False): user_input = st.text_input( "Type your message here:", value=st.session_state.input_value, placeholder="Send a message...", key="chat_input" ) send_button = st.form_submit_button("Send") # ----------------------------- # Helper to insert user message immediately # ----------------------------- def send_user_message(msg): st.session_state.chat_history.insert(0, {"role": "user", "message": msg}) st.session_state.pending_message = msg st.session_state.pending_response = True st.session_state.input_value = "" # ----------------------------- # Handle normal send # ----------------------------- if send_button and user_input.strip(): send_user_message(user_input.strip()) # Handle sidebar/mobile prompt auto-send if st.session_state.auto_send_prompt: send_user_message(st.session_state.auto_send_prompt) st.session_state.auto_send_prompt = None # ----------------------------- # Handle AI response asynchronously # ----------------------------- if st.session_state.pending_response and st.session_state.pending_message: with st.spinner("🤖 Thinking..."): try: ai_response = asyncio.run(get_ai_response(st.session_state.pending_message)) except Exception as e: ai_response = f"[Error generating response: {e}]" st.session_state.chat_history.insert(0, {"role": "assistant", "message": ai_response}) st.session_state.pending_response = False st.session_state.pending_message = None # ----------------------------- # Display chat history with Markdown in AI bubbles # ----------------------------- for chat in st.session_state.chat_history: if chat["role"] == "user": msg_html = chat["message"].replace("\n","
") st.markdown( f"
" f"
{msg_html}
" f"👤" f"
", unsafe_allow_html=True ) else: st.markdown( f"""
🤖
{chat['message']}
""", unsafe_allow_html=True )