| |
| import os |
| import re |
| import datetime |
| import traceback |
| import streamlit as st |
|
|
| |
| try: |
| from utils import api as backend |
| except ModuleNotFoundError: |
| |
| import sys, pathlib |
| ROOT = pathlib.Path(__file__).resolve().parents[2] |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
| from utils import api as backend |
|
|
| TUTOR_WELCOME = "Hi! I'm your AI Financial Tutor. What would you like to learn today?" |
|
|
| def _clean_bot_text(t: str) -> str: |
| |
| t = re.sub(r"</?(user|assistant|system)\b[^>]*>", "", t, flags=re.I) |
| |
| t = re.sub(r"(?im)^(user|assistant|system)\s*:\s*", "", t) |
| |
| t = re.sub(r"\n{3,}", "\n\n", t) |
| return t.strip() |
|
|
| |
| |
| |
| def add_message(text: str, sender: str): |
| if "messages" not in st.session_state: |
| st.session_state.messages = [] |
| st.session_state.messages.append( |
| { |
| "id": str(datetime.datetime.now().timestamp()), |
| "text": (text or "").strip(), |
| "sender": sender, |
| "timestamp": datetime.datetime.now(), |
| } |
| ) |
|
|
| def _coerce_ts(ts): |
| if isinstance(ts, datetime.datetime): |
| return ts |
| if isinstance(ts, (int, float)): |
| try: |
| return datetime.datetime.fromtimestamp(ts) |
| except Exception: |
| return None |
| if isinstance(ts, str): |
| for parser in (datetime.datetime.fromisoformat, lambda s: datetime.datetime.fromtimestamp(float(s))): |
| try: |
| return parser(ts) |
| except Exception: |
| pass |
| return None |
|
|
| def _normalize_messages(): |
| msgs = st.session_state.get("messages", []) |
| normed = [] |
| now = datetime.datetime.now() |
| for m in msgs: |
| text = (m.get("text") or "").strip() |
| sender = m.get("sender") or "user" |
| ts = _coerce_ts(m.get("timestamp")) or now |
| normed.append({**m, "text": text, "sender": sender, "timestamp": ts}) |
| st.session_state.messages = normed |
|
|
| def _history_for_backend(): |
| hist = [] |
| for m in st.session_state.get("messages", []): |
| text = (m.get("text") or "").strip() |
| if not text or text == TUTOR_WELCOME: |
| continue |
| role = "assistant" if (m.get("sender") == "assistant") else "user" |
| hist.append({"role": role, "content": text}) |
| return hist[-4:] |
|
|
| |
| |
| |
| def _reply_via_backend(user_text: str) -> str: |
| lesson_id = st.session_state.get("current_lesson_id") or 0 |
| level_slug = (st.session_state.get("user", {}).get("level") or "beginner").strip().lower() |
|
|
| |
| hist = _history_for_backend() |
| if hist and hist[-1].get("role") == "user" and hist[-1].get("content", "").strip() == (user_text or "").strip(): |
| hist = hist[:-1] |
| hist = hist[-4:] |
|
|
| try: |
| answer = backend.chat_ai( |
| query=user_text, |
| lesson_id=lesson_id, |
| level_slug=level_slug, |
| history=hist, |
| ) |
| return _clean_bot_text((answer or "").strip()) |
| except Exception as e: |
| err_text = "".join(traceback.format_exception_only(type(e), e)).strip() |
| return f"⚠️ Chat failed: {err_text}" |
|
|
| |
| |
| |
| def show_page(): |
| st.title("🤖 AI Financial Tutor") |
| st.caption("Get personalized help with your financial questions") |
|
|
| |
| if "messages" not in st.session_state: |
| st.session_state.messages = [{ |
| "id": "1", |
| "text": TUTOR_WELCOME, |
| "sender": "assistant", |
| "timestamp": datetime.datetime.now() |
| }] |
| if "is_typing" not in st.session_state: |
| st.session_state.is_typing = False |
| if "chatbot_prefill_sent" not in st.session_state: |
| st.session_state.chatbot_prefill_sent = False |
|
|
| _normalize_messages() |
|
|
| |
| chat_container = st.container() |
| with chat_container: |
| for msg in st.session_state.messages: |
| t = msg["timestamp"].strftime("%H:%M") |
| if msg.get("sender") == "assistant": |
| bubble = ( |
| "<div style='background:#e0e0e0;color:#000;padding:10px;border-radius:12px;" |
| "max-width:70%;margin-bottom:6px;'>" |
| f"{msg.get('text','')}<br><sub>{t}</sub></div>" |
| ) |
| else: |
| bubble = ( |
| "<div style='background:#4CAF50;color:#fff;padding:10px;border-radius:12px;" |
| "max-width:70%;margin-left:auto;margin-bottom:6px;'>" |
| f"{msg.get('text','')}<br><sub>{t}</sub></div>" |
| ) |
| st.markdown(bubble, unsafe_allow_html=True) |
|
|
| if st.session_state.is_typing: |
| st.markdown("🤖 _FinanceBot is typing..._") |
|
|
| |
| prefill = st.session_state.get("chatbot_prefill") |
| if prefill and not st.session_state.chatbot_prefill_sent: |
| add_message(prefill, "user") |
| st.session_state.is_typing = True |
| st.session_state.chatbot_prefill_sent = True |
| st.session_state.chatbot_prefill = None |
| st.rerun() |
|
|
| |
| if len(st.session_state.messages) == 1: |
| st.markdown("Try asking about:") |
| cols = st.columns(2) |
| quick = [ |
| "How does compound interest work?", |
| "How much should I save for emergencies?", |
| "What's a good budgeting strategy?", |
| "How do I start investing?", |
| ] |
| for i, q in enumerate(quick): |
| if cols[i % 2].button(q, key=f"suggest_{i}"): |
| add_message(q, "user") |
| st.session_state.is_typing = True |
| st.rerun() |
|
|
| |
| user_input = st.chat_input("Ask me anything about personal finance...") |
| if user_input: |
| add_message(user_input, "user") |
| st.session_state.is_typing = True |
| st.rerun() |
|
|
| |
| if st.session_state.is_typing: |
| with st.spinner("FinanceBot is thinking..."): |
| last_user_msg = next((m["text"] for m in reversed(st.session_state.messages) if m["sender"] == "user"), "") |
| bot_reply = _reply_via_backend(last_user_msg) |
| add_message(bot_reply, "assistant") |
| st.session_state.is_typing = False |
| st.rerun() |
|
|
| if st.button("Back to Dashboard", key="ai_tutor_back_btn"): |
| st.session_state.current_page = "Student Dashboard" |
| st.rerun() |