# phase/Student_view/chatbot.py import os import re import datetime import traceback import streamlit as st # --- use our backend client (utils/api.py) --- try: from utils import api as backend except ModuleNotFoundError: # fallback if running from a different CWD import sys, pathlib ROOT = pathlib.Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from utils import api as backend TUTOR_WELCOME = "Hi! I'm your AI Financial Tutor. What would you like to learn today?" def _clean_bot_text(t: str) -> str: # strip xml-ish tags like ..., ... t = re.sub(r"]*>", "", t, flags=re.I) # strip leading speaker labels (User:, Assistant:, System:) t = re.sub(r"(?im)^(user|assistant|system)\s*:\s*", "", t) # collapse extra newlines t = re.sub(r"\n{3,}", "\n\n", t) return t.strip() # ------------------------------- # History helpers # ------------------------------- def add_message(text: str, sender: str): if "messages" not in st.session_state: st.session_state.messages = [] st.session_state.messages.append( { "id": str(datetime.datetime.now().timestamp()), "text": (text or "").strip(), "sender": sender, "timestamp": datetime.datetime.now(), } ) def _coerce_ts(ts): if isinstance(ts, datetime.datetime): return ts if isinstance(ts, (int, float)): try: return datetime.datetime.fromtimestamp(ts) except Exception: return None if isinstance(ts, str): for parser in (datetime.datetime.fromisoformat, lambda s: datetime.datetime.fromtimestamp(float(s))): try: return parser(ts) except Exception: pass return None def _normalize_messages(): msgs = st.session_state.get("messages", []) normed = [] now = datetime.datetime.now() for m in msgs: text = (m.get("text") or "").strip() sender = m.get("sender") or "user" ts = _coerce_ts(m.get("timestamp")) or now normed.append({**m, "text": text, "sender": sender, "timestamp": ts}) st.session_state.messages = normed def _history_for_backend(): hist = [] for m in st.session_state.get("messages", []): text = (m.get("text") or "").strip() if not text or text == TUTOR_WELCOME: continue role = "assistant" if (m.get("sender") == "assistant") else "user" hist.append({"role": role, "content": text}) return hist[-4:] # <= keep it tiny # ------------------------------- # Reply via backend (/chat) # ------------------------------- def _reply_via_backend(user_text: str) -> str: lesson_id = st.session_state.get("current_lesson_id") or 0 level_slug = (st.session_state.get("user", {}).get("level") or "beginner").strip().lower() # Build history and remove duplicate of the message we are sending as `query` hist = _history_for_backend() if hist and hist[-1].get("role") == "user" and hist[-1].get("content", "").strip() == (user_text or "").strip(): hist = hist[:-1] hist = hist[-4:] try: answer = backend.chat_ai( query=user_text, lesson_id=lesson_id, level_slug=level_slug, history=hist, ) return _clean_bot_text((answer or "").strip()) except Exception as e: err_text = "".join(traceback.format_exception_only(type(e), e)).strip() return f"⚠️ Chat failed: {err_text}" # ------------------------------- # Streamlit page # ------------------------------- def show_page(): st.title("🤖 AI Financial Tutor") st.caption("Get personalized help with your financial questions") # --- session state init --- if "messages" not in st.session_state: st.session_state.messages = [{ "id": "1", "text": TUTOR_WELCOME, "sender": "assistant", "timestamp": datetime.datetime.now() }] if "is_typing" not in st.session_state: st.session_state.is_typing = False if "chatbot_prefill_sent" not in st.session_state: st.session_state.chatbot_prefill_sent = False _normalize_messages() # --- render chat bubbles --- chat_container = st.container() with chat_container: for msg in st.session_state.messages: t = msg["timestamp"].strftime("%H:%M") if msg.get("sender") == "assistant": bubble = ( "
" f"{msg.get('text','')}
{t}
" ) else: bubble = ( "
" f"{msg.get('text','')}
{t}
" ) st.markdown(bubble, unsafe_allow_html=True) if st.session_state.is_typing: st.markdown("🤖 _FinanceBot is typing..._") # --- quiz handoff auto-prompt (only once) --- prefill = st.session_state.get("chatbot_prefill") if prefill and not st.session_state.chatbot_prefill_sent: add_message(prefill, "user") st.session_state.is_typing = True st.session_state.chatbot_prefill_sent = True st.session_state.chatbot_prefill = None st.rerun() # --- quick suggestions when fresh --- if len(st.session_state.messages) == 1: st.markdown("Try asking about:") cols = st.columns(2) quick = [ "How does compound interest work?", "How much should I save for emergencies?", "What's a good budgeting strategy?", "How do I start investing?", ] for i, q in enumerate(quick): if cols[i % 2].button(q, key=f"suggest_{i}"): add_message(q, "user") st.session_state.is_typing = True st.rerun() # --- user input --- user_input = st.chat_input("Ask me anything about personal finance...") if user_input: add_message(user_input, "user") st.session_state.is_typing = True st.rerun() # --- handle pending bot reply --- if st.session_state.is_typing: with st.spinner("FinanceBot is thinking..."): last_user_msg = next((m["text"] for m in reversed(st.session_state.messages) if m["sender"] == "user"), "") bot_reply = _reply_via_backend(last_user_msg) add_message(bot_reply, "assistant") st.session_state.is_typing = False st.rerun() if st.button("Back to Dashboard", key="ai_tutor_back_btn"): st.session_state.current_page = "Student Dashboard" st.rerun()