FInFront / phase /Student_view /chatbot.py
Kerikim's picture
elkay: api.py, chatbot, welcome
702933a
# phase/Student_view/chatbot.py
import os
import re
import datetime
import traceback
import streamlit as st
# --- use our backend client (utils/api.py) ---
try:
from utils import api as backend
except ModuleNotFoundError:
# fallback if running from a different CWD
import sys, pathlib
ROOT = pathlib.Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from utils import api as backend
TUTOR_WELCOME = "Hi! I'm your AI Financial Tutor. What would you like to learn today?"
def _clean_bot_text(t: str) -> str:
# strip xml-ish tags like <user>...</user>, <assistant>...</assistant>
t = re.sub(r"</?(user|assistant|system)\b[^>]*>", "", t, flags=re.I)
# strip leading speaker labels (User:, Assistant:, System:)
t = re.sub(r"(?im)^(user|assistant|system)\s*:\s*", "", t)
# collapse extra newlines
t = re.sub(r"\n{3,}", "\n\n", t)
return t.strip()
# -------------------------------
# History helpers
# -------------------------------
def add_message(text: str, sender: str):
if "messages" not in st.session_state:
st.session_state.messages = []
st.session_state.messages.append(
{
"id": str(datetime.datetime.now().timestamp()),
"text": (text or "").strip(),
"sender": sender,
"timestamp": datetime.datetime.now(),
}
)
def _coerce_ts(ts):
if isinstance(ts, datetime.datetime):
return ts
if isinstance(ts, (int, float)):
try:
return datetime.datetime.fromtimestamp(ts)
except Exception:
return None
if isinstance(ts, str):
for parser in (datetime.datetime.fromisoformat, lambda s: datetime.datetime.fromtimestamp(float(s))):
try:
return parser(ts)
except Exception:
pass
return None
def _normalize_messages():
msgs = st.session_state.get("messages", [])
normed = []
now = datetime.datetime.now()
for m in msgs:
text = (m.get("text") or "").strip()
sender = m.get("sender") or "user"
ts = _coerce_ts(m.get("timestamp")) or now
normed.append({**m, "text": text, "sender": sender, "timestamp": ts})
st.session_state.messages = normed
def _history_for_backend():
hist = []
for m in st.session_state.get("messages", []):
text = (m.get("text") or "").strip()
if not text or text == TUTOR_WELCOME:
continue
role = "assistant" if (m.get("sender") == "assistant") else "user"
hist.append({"role": role, "content": text})
return hist[-4:] # <= keep it tiny
# -------------------------------
# Reply via backend (/chat)
# -------------------------------
def _reply_via_backend(user_text: str) -> str:
lesson_id = st.session_state.get("current_lesson_id") or 0
level_slug = (st.session_state.get("user", {}).get("level") or "beginner").strip().lower()
# Build history and remove duplicate of the message we are sending as `query`
hist = _history_for_backend()
if hist and hist[-1].get("role") == "user" and hist[-1].get("content", "").strip() == (user_text or "").strip():
hist = hist[:-1]
hist = hist[-4:]
try:
answer = backend.chat_ai(
query=user_text,
lesson_id=lesson_id,
level_slug=level_slug,
history=hist,
)
return _clean_bot_text((answer or "").strip())
except Exception as e:
err_text = "".join(traceback.format_exception_only(type(e), e)).strip()
return f"⚠️ Chat failed: {err_text}"
# -------------------------------
# Streamlit page
# -------------------------------
def show_page():
st.title("🤖 AI Financial Tutor")
st.caption("Get personalized help with your financial questions")
# --- session state init ---
if "messages" not in st.session_state:
st.session_state.messages = [{
"id": "1",
"text": TUTOR_WELCOME,
"sender": "assistant",
"timestamp": datetime.datetime.now()
}]
if "is_typing" not in st.session_state:
st.session_state.is_typing = False
if "chatbot_prefill_sent" not in st.session_state:
st.session_state.chatbot_prefill_sent = False
_normalize_messages()
# --- render chat bubbles ---
chat_container = st.container()
with chat_container:
for msg in st.session_state.messages:
t = msg["timestamp"].strftime("%H:%M")
if msg.get("sender") == "assistant":
bubble = (
"<div style='background:#e0e0e0;color:#000;padding:10px;border-radius:12px;"
"max-width:70%;margin-bottom:6px;'>"
f"{msg.get('text','')}<br><sub>{t}</sub></div>"
)
else:
bubble = (
"<div style='background:#4CAF50;color:#fff;padding:10px;border-radius:12px;"
"max-width:70%;margin-left:auto;margin-bottom:6px;'>"
f"{msg.get('text','')}<br><sub>{t}</sub></div>"
)
st.markdown(bubble, unsafe_allow_html=True)
if st.session_state.is_typing:
st.markdown("🤖 _FinanceBot is typing..._")
# --- quiz handoff auto-prompt (only once) ---
prefill = st.session_state.get("chatbot_prefill")
if prefill and not st.session_state.chatbot_prefill_sent:
add_message(prefill, "user")
st.session_state.is_typing = True
st.session_state.chatbot_prefill_sent = True
st.session_state.chatbot_prefill = None
st.rerun()
# --- quick suggestions when fresh ---
if len(st.session_state.messages) == 1:
st.markdown("Try asking about:")
cols = st.columns(2)
quick = [
"How does compound interest work?",
"How much should I save for emergencies?",
"What's a good budgeting strategy?",
"How do I start investing?",
]
for i, q in enumerate(quick):
if cols[i % 2].button(q, key=f"suggest_{i}"):
add_message(q, "user")
st.session_state.is_typing = True
st.rerun()
# --- user input ---
user_input = st.chat_input("Ask me anything about personal finance...")
if user_input:
add_message(user_input, "user")
st.session_state.is_typing = True
st.rerun()
# --- handle pending bot reply ---
if st.session_state.is_typing:
with st.spinner("FinanceBot is thinking..."):
last_user_msg = next((m["text"] for m in reversed(st.session_state.messages) if m["sender"] == "user"), "")
bot_reply = _reply_via_backend(last_user_msg)
add_message(bot_reply, "assistant")
st.session_state.is_typing = False
st.rerun()
if st.button("Back to Dashboard", key="ai_tutor_back_btn"):
st.session_state.current_page = "Student Dashboard"
st.rerun()