kia / app.py
Nam Fam
update app.py
c3f0b56
import streamlit as st
import logging
# Configure root logger
logging.basicConfig(level=logging.INFO)
from agents.agent_graph import build_agent_graph
from agents.websearch import WebSearchAgent
from agents.chat import ChatAgent
from datetime import datetime
import pytz
import time
# Function to generate assistant's response message with streaming effect
def generate_response_message(response):
full_response = ""
response_words = response.split()
# with st.chat_message("Kia", avatar="πŸ€–"):
message_placeholder = st.empty()
for word in response_words:
full_response += word + " "
message_placeholder.markdown(full_response + "β–Œ")
time.sleep(0.05)
message_placeholder.markdown(full_response)
return full_response
# Function to generate initial message
def generate_initial_message():
vietnam_tz = pytz.timezone("Asia/Ho_Chi_Minh")
current_time = datetime.now(vietnam_tz).time()
if 5 <= current_time.hour < 12:
greeting = "Good morning"
elif 12 <= current_time.hour < 18:
greeting = "Good afternoon"
elif 18 <= current_time.hour < 21:
greeting = "Good evening"
else:
greeting = "πŸ‘‹ Hey there"
initial_prompt = f"{greeting}! I'm Kia β€” your Know-It-All assistant. Whether it's trivia, tech, or random tidbits, I'm always ready to drop some knowledge. What can I enlighten you about today?"
return initial_prompt
st.set_page_config(
page_title="Kia β€” Your Know-It-All Assistant",
page_icon="πŸ€–",
initial_sidebar_state="expanded"
)
st.markdown(
"<h1 style='white-space:nowrap;'>πŸ€– Kia β€” Your Know-It-All Assistant</h1>",
unsafe_allow_html=True
)
# Sidebar configuration
with st.sidebar:
st.title("βš™οΈ Settings")
# st.sidebar.header("Agent Settings")
# Sidebar toggles for search agents
if 'enable_docsearch' not in st.session_state:
st.session_state['enable_docsearch'] = True
if 'enable_websearch' not in st.session_state:
st.session_state['enable_websearch'] = True
with st.expander("Agent Settings", expanded=True):
st.checkbox("Enable Web Search", value=st.session_state['enable_websearch'], key='enable_websearch')
st.checkbox("Enable Document Search", value=st.session_state['enable_docsearch'], key='enable_docsearch')
if st.session_state['enable_docsearch']:
st.write("Upload PDF(s) or enter URL(s) to index for document search")
st.file_uploader("Upload PDF document(s)", type=["pdf"], accept_multiple_files=True, key="pdf_files")
st.text_input("Enter URL(s), comma-separated", key="doc_urls")
if st.button("Rebuild Index"):
st.rerun()
# Chat Controls
with st.expander("πŸ’¬ Chat Controls", expanded=True):
if st.button("Clear Conversation"):
st.session_state['chat_history'] = []
st.rerun()
# st.selectbox("Message Display", ["Expanded", "Compact"], index=0)
# Settings Section
with st.expander("πŸ› οΈ Display Settings", expanded=True):
DEV_MODE = st.checkbox("Enable Dev Mode", value=False)
STREAMING_ENABLED = st.checkbox("Enable response streaming (just the effect πŸ˜‚)", value=True)
# # Theme selector
# if 'theme' not in st.session_state:
# st.session_state.theme = "Light"
# theme = st.selectbox("Theme", ["Light", "Dark"], index=["Light", "Dark"].index(st.session_state.theme))
# if theme != st.session_state.theme:
# st.session_state.theme = theme
# # Apply theme
# if theme == "Dark":
# st.markdown("""
# <style>
# .stApp {
# background-color: #111;
# color: #fff;
# }
# .stMarkdown, .stSelectbox, .stSlider {
# color: #fff;
# }
# .stButton>button {
# background-color: #333;
# color: #fff;
# }
# .stExpander {
# border-color: #333;
# }
# </style>
# """, unsafe_allow_html=True)
# # Agent Settings
# with st.expander("πŸ€– Agent Settings", expanded=True):
# st.checkbox("Enable Web Search", value=True)
# st.checkbox("Enable Document Search", value=True)
# st.slider("Response Timeout (sec)", 10, 60, 30)
# Information
with st.expander("ℹ️ About", expanded=True):
st.markdown("""### Kia Chatbot
An intelligent assistant powered by LangGraph.
**Quick Tips:**
- Use clear, specific questions
- Enable Dev Mode for detailed responses
- Clear conversation for fresh context
""")
if 'chat_history' not in st.session_state:
st.session_state['chat_history'] = []
if 'waiting_for_response' not in st.session_state:
st.session_state['waiting_for_response'] = False
if 'should_stream' not in st.session_state:
st.session_state['should_stream'] = False
if 'initial_greeting_added' not in st.session_state:
# Add initial greeting as persistent assistant message
st.session_state['chat_history'].append({
"user": "",
"bot": generate_initial_message(),
"agent": "chat",
"rephrased_query": None,
"trace": None,
"websearch_results": None
})
st.session_state['initial_greeting_added'] = True
st.session_state['should_stream'] = True
# Function that will store the user input in session state
def set_user_input():
if st.session_state.user_message and st.session_state.user_message.strip():
# Store the input in session state and mark that we're waiting for response
st.session_state['waiting_for_response'] = True
# Add user message to history immediately
st.session_state['chat_history'].append({
"user": st.session_state.user_message,
"bot": None, # Will be filled later
"agent": None,
"rephrased_query": None,
"trace": None # Ensure trace field exists but is None initially
})
# Force a rerun to show the user message immediately
# Display chat history in modern chat format
# Display all completed message pairs first
# Determine last completed assistant response index
# completed_idx = [idx for idx, t in enumerate(st.session_state['chat_history']) if t.get('bot') is not None]
# last_completed_idx = completed_idx[-1] if completed_idx else None
for i, turn in enumerate(st.session_state['chat_history']):
# User message (skip empty greetings)
if turn.get('user'):
with st.chat_message("user", avatar="πŸ‘€"):
st.markdown(turn['user'])
# Determine if this is the waiting placeholder
is_waiting = st.session_state['waiting_for_response'] and i == len(st.session_state['chat_history']) - 1
# If waiting, show processing status for the soon-to-be-invoked agent
if is_waiting:
import logging
from agents.orchestrator import OrchestratorAgent
user_input_wait = st.session_state['chat_history'][-1]['user']
chat_hist_wait = st.session_state['chat_history'][:-1]
try:
orchestrator = OrchestratorAgent()
next_agent = orchestrator._decide_agent(user_input_wait, chat_hist_wait)
except Exception as e:
logging.error("Error deciding next agent: %s", e)
next_agent = 'chat'
with st.chat_message("Kia", avatar="πŸ€–"):
if next_agent == 'documentsearch' and st.session_state.get('enable_docsearch', False):
st.markdown("πŸ” Kia is searching for documents...")
elif next_agent == 'websearch' and st.session_state.get('enable_websearch', False):
st.markdown("🌐 Kia is searching the web...")
else:
st.markdown("πŸ’­ Kia is thinking...")
continue
# Skip rendering if no bot response yet
if turn.get('bot') is None:
continue
# Assistant response rendering (streaming optional)
if i == len(st.session_state['chat_history']) - 1 and STREAMING_ENABLED and st.session_state['should_stream']:
with st.chat_message("Kia", avatar="πŸ€–"):
generate_response_message(turn['bot'])
# Display sources for websearch in streaming mode
if turn.get('trace') is not None:
found_sources = False
for step in turn['trace']:
if step.get('agent') == 'websearch' and step.get('step') == 'search':
raw_results = step.get('raw_results')
if raw_results and len(raw_results.get('organic', [])) > 0:
found_sources = True
st.markdown("**Sources:**")
if isinstance(raw_results, dict) and raw_results.get('organic'):
for idx, item in enumerate(raw_results['organic'], 1):
link = item.get('link')
title = item.get('title', '')
if link:
st.markdown(f"{idx}. [{title}]({link})")
elif isinstance(raw_results, list):
for idx, item in enumerate(raw_results, 1):
link = item.get('link') if isinstance(item, dict) else item
title = item.get('title', '') if isinstance(item, dict) else ''
if link:
st.markdown(f"{idx}. [{title}]({link})")
break
# if not found_sources:
# st.markdown("**No sources found.**")
st.session_state['should_stream'] = False
else:
with st.chat_message("Kia", avatar="πŸ€–"):
st.markdown(turn['bot'])
# st.markdown(turn['trace'])
if turn['trace'] is not None:
found_sources = False
for step in turn['trace']:
if step.get('agent') == 'websearch' and step.get('step') == 'search':
# Extract raw_results from trace
raw_results = step.get('raw_results')
# Display citation sources for websearch agent
if raw_results and len(raw_results.get('organic', [])) > 0:
st.markdown("**Sources:**")
found_sources = True
# Prefer organic list from raw_results
if isinstance(raw_results, dict) and raw_results.get('organic'):
for idx, item in enumerate(raw_results['organic'], 1):
link = item.get('link')
title = item.get('title', '')
st.markdown(f"{idx}. [{title}]({link})")
found_sources = True
# Fallback if raw_results itself is list
elif isinstance(raw_results, list):
for idx, item in enumerate(raw_results, 1):
link = item.get('link') if isinstance(item, dict) else item
title = item.get('title', '') if isinstance(item, dict) else ''
st.markdown(f"{idx}. [{title}]({link})")
found_sources = True
break
# if not found_sources:
# st.markdown("**No sources found.**")
# DEV_MODE trace expander (all completed turns)
if DEV_MODE and turn.get('trace'):
trace = turn.get('trace', [])
# Group execution steps by agent
agent_groups = {}
for idx, step in enumerate(trace):
ag = step.get('agent', 'unknown')
agent_groups.setdefault(ag, []).append((idx, step))
# Define agent display order
agent_order = [
('orchestrator', 'OrchestratorAgent'),
('documentsearch', 'DocumentSearchAgent'),
('websearch', 'WebSearchAgent'),
('chat', 'ChatAgent'),
]
# Show each agent's expander, even if no steps
for ag_key, display_name in agent_order:
steps = agent_groups.get(ag_key, [])
with st.expander(f"{display_name} Steps", expanded=False):
if not steps:
st.write("No steps executed.")
for idx, step in steps:
st.markdown(f"**Step {idx+1}:**")
if isinstance(step, dict):
for k, v in step.items():
if k == 'context':
st.markdown(f"**{k}:**")
if isinstance(v, list):
table_data = [[i+1, getattr(doc, 'page_content', str(doc)).strip()] for i, doc in enumerate(v)]
else:
paragraphs = str(v).split('\n')
table_data = [[i+1, p.strip()] for i, p in enumerate(paragraphs) if p.strip()]
if table_data:
import pandas as pd
df = pd.DataFrame(table_data, columns=["#", "Content"])
st.dataframe(df, use_container_width=True,
column_config={"#": st.column_config.NumberColumn("#", width=50),
"Content": st.column_config.TextColumn("Content")})
elif k == 'prompt' or k == 'response' or k == 'classification':
st.markdown(f"**{k}:**")
st.code(str(v))
else:
st.markdown(f"**{k}:**")
st.code(str(v))
else:
st.code(str(step))
# Process message if waiting for response (AFTER displaying messages)
if st.session_state['waiting_for_response']:
user_input = st.session_state['chat_history'][-1]['user']
agent_graph = build_agent_graph()
chat_history = st.session_state['chat_history'][:-1] # Exclude the waiting message
doc_urls = None
if st.session_state.get('doc_urls'):
doc_urls = [u.strip() for u in st.session_state['doc_urls'].split(",") if u.strip()]
pdf_files = st.session_state.get('pdf_files', [])
state = {
"user_input": user_input,
"chat_history": chat_history,
"enable_docsearch": st.session_state['enable_docsearch'],
"enable_websearch": st.session_state['enable_websearch'],
"doc_urls": doc_urls,
"pdf_files": pdf_files,
}
# --- Invoke agent graph (may take time) with error handling ---
try:
result_state = agent_graph.invoke(state)
except Exception as e:
# Use orchestrator fallback
from agents.orchestrator import OrchestratorAgent
error_result = OrchestratorAgent().handle_system_error(e)
result_state = {
'response': error_result['response'],
'agent': error_result['agent'],
'trace': error_result['trace'],
'websearch_results': error_result['websearch_results']
}
response = result_state.get('response', '')
agent_name = result_state.get('agent', result_state.get('next_agent', 'chat'))
rephrased_query = None
# Try to extract rephrased_query if available (for websearch agent)
if agent_name == 'websearch':
history_str = ""
for turn in chat_history:
if turn.get('bot'): # Make sure there's a bot response in history
history_str += f"User: {turn['user']}\nBot: {turn['bot']}\n"
from agents.websearch import WebSearchAgent
rephrase_prompt = (
f"Given the following conversation history:\n"
f"{history_str}"
f"User: {user_input}\n"
f"Rephrase the user's latest question so it is clear and complete for a web search. "
f"Only output the rephrased question."
)
rephrased_query = WebSearchAgent().llm.generate(rephrase_prompt).strip()
# Update the last message with the bot's response
st.session_state['chat_history'][-1].update({
"bot": response,
"agent": agent_name,
"rephrased_query": rephrased_query,
"trace": result_state.get('trace', []),
"websearch_results": result_state.get('websearch_results', []) if agent_name == 'websearch' else None
})
# Mark next run to stream this new response
st.session_state['should_stream'] = True
# Reset waiting state
st.session_state['waiting_for_response'] = False
# Force rerun to show the completed response
st.rerun()
# Modern chat UI using st.chat_input - place at the bottom
user_input = st.chat_input("Enter your message and press Enter...", key="user_message", on_submit=set_user_input)