File size: 7,053 Bytes
0aa6283 5f63087 702933a 1f8a76a 5f63087 0aa6283 5f63087 0aa6283 1f8a76a 702933a 1f8a76a 5f63087 1f8a76a 5f63087 0aa6283 5f63087 0aa6283 5f63087 0aa6283 5f63087 0aa6283 5f63087 0aa6283 5f63087 0aa6283 1f8a76a 5f63087 0617fab 5f63087 7979c8a 88fdb8a 5f63087 702933a 5f63087 702933a 5f63087 702933a 5f63087 0aa6283 1f8a76a 0aa6283 5f63087 0aa6283 0617fab 0aa6283 5f63087 1f8a76a 0aa6283 5f63087 0aa6283 0617fab 0aa6283 88fdb8a 0617fab 0aa6283 5f63087 1f8a76a 5f63087 1f8a76a 5f63087 1f8a76a 0aa6283 5f63087 0aa6283 0617fab 0aa6283 5f63087 0aa6283 5f63087 0aa6283 0617fab 5f63087 0aa6283 0617fab 0aa6283 5f63087 0617fab 5f63087 0aa6283 0617fab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | # phase/Student_view/chatbot.py
import os
import re
import datetime
import traceback
import streamlit as st
# --- use our backend client (utils/api.py) ---
try:
from utils import api as backend
except ModuleNotFoundError:
# fallback if running from a different CWD
import sys, pathlib
ROOT = pathlib.Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from utils import api as backend
TUTOR_WELCOME = "Hi! I'm your AI Financial Tutor. What would you like to learn today?"
def _clean_bot_text(t: str) -> str:
# strip xml-ish tags like <user>...</user>, <assistant>...</assistant>
t = re.sub(r"</?(user|assistant|system)\b[^>]*>", "", t, flags=re.I)
# strip leading speaker labels (User:, Assistant:, System:)
t = re.sub(r"(?im)^(user|assistant|system)\s*:\s*", "", t)
# collapse extra newlines
t = re.sub(r"\n{3,}", "\n\n", t)
return t.strip()
# -------------------------------
# History helpers
# -------------------------------
def add_message(text: str, sender: str):
if "messages" not in st.session_state:
st.session_state.messages = []
st.session_state.messages.append(
{
"id": str(datetime.datetime.now().timestamp()),
"text": (text or "").strip(),
"sender": sender,
"timestamp": datetime.datetime.now(),
}
)
def _coerce_ts(ts):
if isinstance(ts, datetime.datetime):
return ts
if isinstance(ts, (int, float)):
try:
return datetime.datetime.fromtimestamp(ts)
except Exception:
return None
if isinstance(ts, str):
for parser in (datetime.datetime.fromisoformat, lambda s: datetime.datetime.fromtimestamp(float(s))):
try:
return parser(ts)
except Exception:
pass
return None
def _normalize_messages():
msgs = st.session_state.get("messages", [])
normed = []
now = datetime.datetime.now()
for m in msgs:
text = (m.get("text") or "").strip()
sender = m.get("sender") or "user"
ts = _coerce_ts(m.get("timestamp")) or now
normed.append({**m, "text": text, "sender": sender, "timestamp": ts})
st.session_state.messages = normed
def _history_for_backend():
hist = []
for m in st.session_state.get("messages", []):
text = (m.get("text") or "").strip()
if not text or text == TUTOR_WELCOME:
continue
role = "assistant" if (m.get("sender") == "assistant") else "user"
hist.append({"role": role, "content": text})
return hist[-4:] # <= keep it tiny
# -------------------------------
# Reply via backend (/chat)
# -------------------------------
def _reply_via_backend(user_text: str) -> str:
lesson_id = st.session_state.get("current_lesson_id") or 0
level_slug = (st.session_state.get("user", {}).get("level") or "beginner").strip().lower()
# Build history and remove duplicate of the message we are sending as `query`
hist = _history_for_backend()
if hist and hist[-1].get("role") == "user" and hist[-1].get("content", "").strip() == (user_text or "").strip():
hist = hist[:-1]
hist = hist[-4:]
try:
answer = backend.chat_ai(
query=user_text,
lesson_id=lesson_id,
level_slug=level_slug,
history=hist,
)
return _clean_bot_text((answer or "").strip())
except Exception as e:
err_text = "".join(traceback.format_exception_only(type(e), e)).strip()
return f"⚠️ Chat failed: {err_text}"
# -------------------------------
# Streamlit page
# -------------------------------
def show_page():
st.title("🤖 AI Financial Tutor")
st.caption("Get personalized help with your financial questions")
# --- session state init ---
if "messages" not in st.session_state:
st.session_state.messages = [{
"id": "1",
"text": TUTOR_WELCOME,
"sender": "assistant",
"timestamp": datetime.datetime.now()
}]
if "is_typing" not in st.session_state:
st.session_state.is_typing = False
if "chatbot_prefill_sent" not in st.session_state:
st.session_state.chatbot_prefill_sent = False
_normalize_messages()
# --- render chat bubbles ---
chat_container = st.container()
with chat_container:
for msg in st.session_state.messages:
t = msg["timestamp"].strftime("%H:%M")
if msg.get("sender") == "assistant":
bubble = (
"<div style='background:#e0e0e0;color:#000;padding:10px;border-radius:12px;"
"max-width:70%;margin-bottom:6px;'>"
f"{msg.get('text','')}<br><sub>{t}</sub></div>"
)
else:
bubble = (
"<div style='background:#4CAF50;color:#fff;padding:10px;border-radius:12px;"
"max-width:70%;margin-left:auto;margin-bottom:6px;'>"
f"{msg.get('text','')}<br><sub>{t}</sub></div>"
)
st.markdown(bubble, unsafe_allow_html=True)
if st.session_state.is_typing:
st.markdown("🤖 _FinanceBot is typing..._")
# --- quiz handoff auto-prompt (only once) ---
prefill = st.session_state.get("chatbot_prefill")
if prefill and not st.session_state.chatbot_prefill_sent:
add_message(prefill, "user")
st.session_state.is_typing = True
st.session_state.chatbot_prefill_sent = True
st.session_state.chatbot_prefill = None
st.rerun()
# --- quick suggestions when fresh ---
if len(st.session_state.messages) == 1:
st.markdown("Try asking about:")
cols = st.columns(2)
quick = [
"How does compound interest work?",
"How much should I save for emergencies?",
"What's a good budgeting strategy?",
"How do I start investing?",
]
for i, q in enumerate(quick):
if cols[i % 2].button(q, key=f"suggest_{i}"):
add_message(q, "user")
st.session_state.is_typing = True
st.rerun()
# --- user input ---
user_input = st.chat_input("Ask me anything about personal finance...")
if user_input:
add_message(user_input, "user")
st.session_state.is_typing = True
st.rerun()
# --- handle pending bot reply ---
if st.session_state.is_typing:
with st.spinner("FinanceBot is thinking..."):
last_user_msg = next((m["text"] for m in reversed(st.session_state.messages) if m["sender"] == "user"), "")
bot_reply = _reply_via_backend(last_user_msg)
add_message(bot_reply, "assistant")
st.session_state.is_typing = False
st.rerun()
if st.button("Back to Dashboard", key="ai_tutor_back_btn"):
st.session_state.current_page = "Student Dashboard"
st.rerun() |