Tesneem's picture
Update app.py
5d89060 verified
# app.py — Student Skill Radar (MongoDB, secrets-based)
import os
from datetime import date
from typing import Dict, List
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
from pymongo import MongoClient
from urllib.parse import quote_plus
st.set_page_config(page_title="Student Skill Radar", layout="wide")
# ------------------- Constants -------------------
SKILLS = [
"Problem-Solving", "Critical Thinking", "Analytical Reasoning",
"Adaptability", "Continuous Learning", "Creativity",
"Communication", "Collaboration", "Community Engagement",
"Emotional Intelligence", "Ethical Decision-Making",
"Time Management", "Tech Aptitude"
]
SKILL_GROUPS = {
"Problem-Solving, Critical Thinking, Analytical Reasoning": [
"Problem-Solving", "Critical Thinking", "Analytical Reasoning"
],
"Adaptability, Continuous Learning, Creativity": [
"Adaptability", "Continuous Learning", "Creativity"
],
"Time Management": ["Time Management"],
"Communication, Teamwork, Collaboration, Community Engagement": [
"Communication", "Collaboration", "Community Engagement"
],
"Emotional Intelligence, Ethical Decision Making": [
"Emotional Intelligence", "Ethical Decision-Making"
],
"Tech Aptitude": ["Tech Aptitude"]
}
SOURCE_TO_STAGE = {
"onboarding_responses": "onboarding",
"closing_responses": "closing",
}
# ------------------- Helpers -------------------
def safe_mean(vals):
clean = [v for v in vals if v is not None and not pd.isna(v)]
return float(np.mean(clean)) if clean else np.nan
def to_01_or_nan(x):
try:
v = float(x)
except Exception:
return np.nan
if pd.isna(v):
return np.nan
return max(0.0, min(1.0, v))
def aggregate_groups_row(row: pd.Series) -> Dict[str, float]:
return {
g: safe_mean([row.get(s, np.nan) for s in members])
for g, members in SKILL_GROUPS.items()
}
def df_to_grouped(df_in: pd.DataFrame) -> pd.DataFrame:
if df_in.empty:
return df_in
rows = []
for _, r in df_in.iterrows():
grp = aggregate_groups_row(r)
out = {"label": r["label"]}
for glabel in SKILL_GROUPS.keys():
v = grp.get(glabel)
out[glabel] = 0.0 if pd.isna(v) else float(v)
rows.append(out)
return pd.DataFrame(rows, columns=["label"] + list(SKILL_GROUPS.keys()))
def plot_radar(df: pd.DataFrame, grouped: bool, title: str, avg_label: str = None):
if df.empty:
return go.Figure()
traces = []
labels = list(SKILL_GROUPS.keys()) if grouped else SKILLS
for _, r in df.iterrows():
values = [0.0 if pd.isna(r.get(k)) else float(r.get(k)) for k in labels]
is_avg = avg_label and (str(r["label"]) == avg_label)
traces.append(go.Scatterpolar(
r=values + [values[0]],
theta=labels + [labels[0]],
name=r["label"],
fill="toself",
line=dict(
width=4 if is_avg else 2,
dash="dash" if is_avg else "solid",
color="red" if is_avg else None
),
opacity=0.7 if is_avg else 0.5
))
fig = go.Figure(traces)
fig.update_layout(
title=title or "Skill Radar",
showlegend=True,
polar=dict(
radialaxis=dict(
autorange=False, range=[0, 1], tick0=0, dtick=0.2,
ticks="outside", showline=True, showgrid=True, visible=True
)
),
margin=dict(l=30, r=30, t=60, b=30),
)
return fig
def _vector_from_row(row: pd.Series, cols: list[str]) -> dict:
return {k: (None if pd.isna(row.get(k)) else float(row.get(k))) for k in cols}
def _percent_change(new: float | None, old: float | None) -> float | None:
if new is None or old is None:
return None
if old == 0:
return None # avoid div-by-zero; you can choose to show 100% if new>0
return (new - old) / old * 100.0
def _merge_resp_and_likert_vector(resp_vec: dict, likert_grouped_vec: dict | None, grouped: bool, SKILL_TO_GROUPS: dict[str, list[str]], SKILL_GROUPS: dict[str, list[str]]) -> dict:
"""
Returns a merged vector:
- If grouped: keys are group labels
- If ungrouped: keys are per-skill; Likert (group) is projected to skills by averaging groups a skill belongs to
"""
if likert_grouped_vec is None:
return resp_vec
if grouped:
out = {}
for g in SKILL_GROUPS.keys():
rv = resp_vec.get(g, None)
lv = likert_grouped_vec.get(g, None)
if rv is not None and lv is not None:
out[g] = (rv + lv) / 2.0
elif rv is not None:
out[g] = rv
else:
out[g] = lv
return out
else:
# project group likert to each skill
out = {}
for s in resp_vec.keys():
rv = resp_vec.get(s, None)
groups = SKILL_TO_GROUPS.get(s, [])
lik_vals = [likert_grouped_vec.get(g) for g in groups if likert_grouped_vec.get(g) is not None]
lv = float(np.mean(lik_vals)) if lik_vals else None
if rv is not None and lv is not None:
out[s] = (rv + lv) / 2.0
elif rv is not None:
out[s] = rv
else:
out[s] = lv
return out
# ------------------- Mongo -------------------
def _get_secret(name: str) -> str | None:
try:
val = st.secrets.get(name)
if val is not None:
return str(val)
except Exception:
pass
return os.getenv(name)
def _build_uri(db_name: str | None) -> str | None:
user = _get_secret("MONGO_USER")
pw = _get_secret("MONGO_PASS")
cluster = _get_secret("MONGO_CLUSTER")
if not (user and pw and cluster):
return None
return f"mongodb+srv://{quote_plus(user)}:{quote_plus(pw)}@{cluster}/{db_name}?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true"
@st.cache_resource(show_spinner=False)
def _client(uri: str):
return MongoClient(uri, serverSelectionTimeoutMS=10000)
def mongo_distinct(uri: str, db: str, coll: str, field: str) -> List[str]:
if not uri:
return []
try:
return sorted([v for v in _client(uri)[db][coll].distinct(field) if isinstance(v, str) and v.strip()])
except Exception:
return []
def mongo_records(uri: str, db: str, coll: str, student: str | None, source: str | None) -> List[dict]:
if not uri:
return []
q = {}
if student and student != "(All)":
q["student"] = student
if source and source != "(All)":
q["source"] = source
try:
docs = list(_client(uri)[db][coll].find(q, {"_id": 0, "student": 1, "source": 1, "skills": 1}))
rows = []
for d in docs:
base = {"student": str(d.get("student", "")), "source": str(d.get("source", ""))}
for k in SKILLS:
base[k] = to_01_or_nan((d.get("skills") or {}).get(k, np.nan))
rows.append(base)
return rows
except Exception:
return []
# ---------- Likert helpers ----------
def _norm_01(v):
try:
return max(0.0, min(1.0, float(v) / 5.0 if float(v) > 1 else float(v)))
except Exception:
return None
def mongo_get_likert_grouped(uri: str, db: str, coll: str, student: str, stage: str) -> dict:
if not (uri and student and stage):
return {}
try:
doc = _client(uri)[db][coll].find_one({"student_name": student, "stage": stage}, {"_id": 0, "average_skill_scores": 1})
avg = (doc or {}).get("average_skill_scores") or {}
return {g: _norm_01(avg.get(g)) for g in SKILL_GROUPS.keys()}
except Exception:
return {}
# ---- Analyses (Markdown) helpers ----
ANALYSES_DIR = os.getenv("ANALYSES_DIR", "student_analyses") # folder in your HF Space
def _normalize_name(s: str) -> str:
# Lower, remove non-alphanumerics, collapse spaces/underscores
import re, unicodedata
s = unicodedata.normalize("NFKC", s or "").strip().lower()
s = re.sub(r"[^\w\s]", "", s)
s = re.sub(r"[\s_]+", " ", s).strip()
return s
@st.cache_data(show_spinner=False)
def _build_analysis_index(analyses_dir: str) -> dict:
"""Return dict: normalized_name -> file_path for *.md under analyses_dir."""
import os, glob
index = {}
if not os.path.isdir(analyses_dir):
return index
for path in glob.glob(os.path.join(analyses_dir, "*.md")):
base = os.path.splitext(os.path.basename(path))[0] # "Student_Name"
# accept both "Student Name" and "Student_Name" as same
norm = _normalize_name(base.replace("_", " "))
index[norm] = path
return index
@st.cache_data(show_spinner=False)
def _load_markdown(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return ""
# ------------------- UI -------------------
st.title("📊 Student Skill Radar")
with st.sidebar:
db_name = st.text_input("Database name", value="student_skills")
coll_name = st.text_input("Collection name", value="responses_IFE_2025")
summaries_coll = st.text_input("Likert summaries collection", value="likert_summaries_IFE_2025")
mongo_uri = _build_uri(db_name)
students = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "student") if mongo_uri else [])
sources = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "source") if mongo_uri else [])
student_choice = st.selectbox("Select student", students)
source_choice = st.selectbox("Select source/week", sources)
# start_dt = st.date_input("Start date", value=None)
# end_dt = st.date_input("End date", value=None)
grouped = st.toggle("Grouped skills", value=True)
overlay_sources = st.toggle("Overlay all sources when '(All)' selected", value=False)
chart_title = st.text_input("Chart title", value="")
# start_str = start_dt.strftime("%Y-%m-%d") if isinstance(start_dt, date) else None
# end_str = end_dt.strftime("%Y-%m-%d") if isinstance(end_dt, date) else None
# ------------------- Fetch + merge -------------------
records = mongo_records(mongo_uri, db_name, coll_name, student_choice, source_choice) if mongo_uri else []
df_raw = pd.DataFrame(records) if records else pd.DataFrame()
if not df_raw.empty:
df_raw["label"] = df_raw["student"].astype(str) + " — " + df_raw["source"].astype(str)
df_resp = df_raw.groupby("label", dropna=False)[SKILLS].mean().reset_index()
if grouped:
df_resp = df_to_grouped(df_resp)
else:
df_resp = pd.DataFrame()
# ---------- Merge Likert scores (works for grouped and ungrouped) ----------
from statistics import mean
# map each skill to the group(s) it belongs to (almost always one)
SKILL_TO_GROUPS = {s: [g for g, members in SKILL_GROUPS.items() if s in members] for s in SKILLS}
def _likert_for_skill(likert_grouped: dict, skill: str) -> float | None:
groups = SKILL_TO_GROUPS.get(skill, [])
vals = [likert_grouped.get(g) for g in groups if likert_grouped.get(g) is not None]
return mean(vals) if vals else None
if not df_resp.empty and summaries_coll:
merged_rows = []
# choose which columns we're merging
cols = list(SKILL_GROUPS.keys()) if grouped else SKILLS
for _, r in df_resp.iterrows():
label = str(r["label"])
student, stage = label.split(" — ", 1) if " — " in label else (label, None)
stage = SOURCE_TO_STAGE.get(stage.strip()) if stage else None
# only onboarding/closing have Likert summaries
likert_g = (
mongo_get_likert_grouped(mongo_uri, db_name, summaries_coll, student.strip(), stage)
if stage in ("onboarding", "closing") else {}
)
out = {"label": label}
if grouped:
# combine grouped columns directly
for g in SKILL_GROUPS.keys():
resp_val = None if pd.isna(r.get(g)) else float(r.get(g))
likert_val = likert_g.get(g, None)
if resp_val is not None and likert_val is not None:
out[g] = (resp_val + likert_val) / 2.0
elif resp_val is not None:
out[g] = resp_val
elif likert_val is not None:
out[g] = likert_val
else:
out[g] = np.nan
else:
# map group Likert down to each skill, then combine
for s in SKILLS:
resp_val = None if pd.isna(r.get(s)) else float(r.get(s))
likert_val = _likert_for_skill(likert_g, s)
if resp_val is not None and likert_val is not None:
out[s] = (resp_val + likert_val) / 2.0
elif resp_val is not None:
out[s] = resp_val
elif likert_val is not None:
out[s] = likert_val
else:
out[s] = np.nan
merged_rows.append(out)
df_final = pd.DataFrame(merged_rows, columns=["label"] + cols)
else:
df_final = df_resp
# Overlay mode
# if grouped and not df_final.empty and source_choice == "(All)" and not overlay_sources:
# df_final["_student"] = df_final["label"].apply(lambda s: s.split(" — ", 1)[0])
# df_final = df_final.groupby("_student", dropna=False)[list(SKILL_GROUPS.keys())].mean().reset_index()
# df_final = df_final.rename(columns={"_student": "label"})
# ---------------- Overlay vs Combine ----------------
if not df_final.empty and source_choice == "(All)":
if overlay_sources:
# Overlay ON → keep one line per source (do nothing)
pass
else:
# Overlay OFF → combine all sources into one line per student
df_final["_student"] = df_final["label"].apply(lambda s: s.split(" — ", 1)[0])
if grouped:
cols = list(SKILL_GROUPS.keys())
else:
cols = SKILLS
df_final = (
df_final
.groupby("_student", dropna=False)[cols]
.mean()
.reset_index()
.rename(columns={"_student": "label"})
)
# ------------------- Output -------------------
# fig = plot_radar(df_final, grouped, chart_title)
# st.plotly_chart(fig, use_container_width=True)
# ============== Build per-stage vectors for comparisons (LIKERT-AWARE) ==============
# Columns to use based on mode
COLS = list(SKILL_GROUPS.keys()) if grouped else SKILLS
# Map each skill to its group(s) once (used to project group Likert down to skills)
SKILL_TO_GROUPS = {s: [g for g, members in SKILL_GROUPS.items() if s in members] for s in SKILLS}
def _project_likert_to_cols(likert_grouped: dict | None, cols: list[str], grouped_flag: bool) -> dict:
"""Return a vector aligned to COLS from group-level Likert. If ungrouped, project to skills."""
if not likert_grouped:
return {k: None for k in cols}
if grouped_flag:
return {k: (likert_grouped.get(k) if k in likert_grouped else None) for k in cols}
# ungrouped → average the groups a skill belongs to
out = {}
for s in cols:
gs = SKILL_TO_GROUPS.get(s, [])
vals = [likert_grouped.get(g) for g in gs if likert_grouped.get(g) is not None]
out[s] = float(np.mean(vals)) if vals else None
return out
def _merge_resp_and_likert(resp_vec: dict, likert_vec: dict) -> dict:
"""Average where both exist; else take whichever exists."""
out = {}
for k in resp_vec.keys():
rv = resp_vec.get(k, None)
lv = likert_vec.get(k, None)
if rv is not None and lv is not None:
out[k] = (rv + lv) / 2.0
elif rv is not None:
out[k] = rv
else:
out[k] = lv
return out
def _mean_vectors(vecs: list[dict]) -> dict:
"""Element-wise mean ignoring None; returns None if all Nones for a key."""
if not vecs:
return {}
keys = list(vecs[0].keys())
out = {}
for k in keys:
vals = [v.get(k) for v in vecs if v.get(k) is not None]
out[k] = (float(np.mean(vals)) if vals else None)
return out
def _resp_mean_for_sources(df_src: pd.DataFrame, student: str | None, sources: list[str], cols: list[str]) -> dict:
"""Mean of response scores across docs for (student,sources). If student None → cohort."""
if df_src.empty:
return {k: None for k in cols}
sub = df_src.copy()
if student:
sub = sub[sub["student"] == student]
sub = sub[sub["source"].isin(sources)]
if sub.empty:
return {k: None for k in cols}
m = sub[cols].mean(numeric_only=True)
return {k: (None if pd.isna(m.get(k)) else float(m.get(k))) for k in cols}
def _likert_grouped_for_student_stage(student: str, stage: str) -> dict | None:
"""Get normalized (0–1) group-level Likert for onboarding/closing only."""
if stage not in ("onboarding", "closing"):
return None
lg = mongo_get_likert_grouped(mongo_uri, db_name, summaries_coll, student, stage)
return lg if lg else None
def _student_stage_vectors(df_src: pd.DataFrame, stu: str, cols: list[str], grouped_flag: bool) -> dict:
"""Per-student vectors with Likert merged for onboarding/closing; combined includes closing(merged)."""
# Onboarding = RESP(onboarding) ⊕ Likert(onboarding)
onb_resp = _resp_mean_for_sources(df_src, stu, ["onboarding_responses"], cols)
onb_lik = _project_likert_to_cols(_likert_grouped_for_student_stage(stu, "onboarding"), cols, grouped_flag)
onb = _merge_resp_and_likert(onb_resp, onb_lik)
# Closing = RESP(closing) ⊕ Likert(closing)
cls_resp = _resp_mean_for_sources(df_src, stu, ["closing_responses"], cols)
cls_lik = _project_likert_to_cols(_likert_grouped_for_student_stage(stu, "closing"), cols, grouped_flag)
cls = _merge_resp_and_likert(cls_resp, cls_lik)
# Combined = mean( RESP(week2), RESP(week3), CLOSING(merged) )
w2 = _resp_mean_for_sources(df_src, stu, ["week_2_responses"], cols)
w3 = _resp_mean_for_sources(df_src, stu, ["week_3_responses"], cols)
combo = _mean_vectors([w2, w3, cls]) # <- note: closing already merged with Likert
return {"onboarding": onb, "closing": cls, "combined": combo}
def _stage_vectors_for_current_selection(df_src: pd.DataFrame, student_choice: str | None, cols: list[str], grouped_flag: bool) -> dict:
"""
If a student is selected → return their vectors.
If cohort (“(All)”) → average per-student vectors (Likert included where available).
"""
if student_choice and student_choice != "(All)":
return _student_stage_vectors(df_src, student_choice, cols, grouped_flag)
# Cohort: compute for each student then average
if df_src.empty:
empty_vec = {k: None for k in cols}
return {"onboarding": empty_vec, "closing": empty_vec, "combined": empty_vec}
students = sorted(set(str(x) for x in df_src["student"].dropna().unique()))
per_student = [_student_stage_vectors(df_src, s, cols, grouped_flag) for s in students]
return {
"onboarding": _mean_vectors([p["onboarding"] for p in per_student]),
"closing": _mean_vectors([p["closing"] for p in per_student]),
"combined": _mean_vectors([p["combined"] for p in per_student]),
}
def _percent_change(new: float | None, old: float | None) -> float | None:
if new is None or old is None:
return None
if old == 0:
return None # or return 100.0 if you prefer
return (new - old) / old * 100.0
# Use df_raw (one row per doc) so overlay/aggregation doesn’t hide sources
# Ensure df_raw has the per-skill or per-group columns we need:
if grouped and not df_raw.empty:
# build grouped view just for comparisons
df_grouped_for_comp = df_raw.copy()
# aggregate per-doc row to grouped columns
df_grouped_for_comp = (
df_grouped_for_comp
.assign(**{
g: df_grouped_for_comp.apply(lambda r: safe_mean([r.get(s, np.nan) for s in SKILL_GROUPS[g]]), axis=1)
for g in SKILL_GROUPS.keys()
})
)
df_src_for_comp = df_grouped_for_comp[["student", "source"] + list(SKILL_GROUPS.keys())]
else:
df_src_for_comp = df_raw # already per-skill
stage_vecs = _stage_vectors_for_current_selection(df_src_for_comp, student_choice, COLS, grouped)
vec_onb = stage_vecs["onboarding"]
vec_cls = stage_vecs["closing"]
vec_combo = stage_vecs["combined"]
pct_onb_to_cls = {k: _percent_change(vec_cls.get(k), vec_onb.get(k)) for k in COLS}
pct_onb_to_combo = {k: _percent_change(vec_combo.get(k), vec_onb.get(k)) for k in COLS}
# ------------------- Plot + table above stays the same -------------------
df_plot = df_final.copy()
avg_label = None
if not df_plot.empty:
cols = list(SKILL_GROUPS.keys()) if grouped else SKILLS
show_cohort_avg = st.toggle("Show cohort average (all students)", value=True)
if show_cohort_avg:
avg_vals = df_plot[cols].mean()
avg_row = {"label": "Average (All Students)"}
avg_row.update({k: float(avg_vals[k]) for k in cols})
df_plot = pd.concat([df_plot, pd.DataFrame([avg_row])], ignore_index=True)
avg_label = "Average (All Students)"
fig = plot_radar(df_plot, grouped, chart_title, avg_label=avg_label)
st.plotly_chart(fig, use_container_width=True)
st.caption(f"{len(df_final)} line(s) aggregated." if not df_final.empty else "No data.")
# ================== Dynamic Stage Summaries (only if student answered that week) ==================
import re
import unicodedata
from collections import Counter
from difflib import SequenceMatcher
import math
# Stage <-> Source mapping
STAGE_TO_SOURCE = {
"onboarding": "onboarding_responses",
"week_2": "week_2_responses",
"week_3": "week_3_responses",
"closing": "closing_responses", # future-proof
}
SOURCE_TO_STAGE = {v: k for k, v in STAGE_TO_SOURCE.items()}
def _answer_total_score(resp: dict) -> float:
skills = resp.get("skills") or {}
total = 0.0
for v in skills.values():
try:
total += float(v)
except Exception:
pass
return total
def _responses_for_student_stage(uri, db, responses_coll, student: str, stage: str) -> list[dict]:
"""Return responses for a student at a stage (mapped to source) with non-empty answers."""
if not (uri and student and stage):
return []
src = STAGE_TO_SOURCE.get(stage)
if not src:
return []
try:
c = _client(uri)
docs = list(c[db][responses_coll].find(
{"student": student, "source": src},
{"_id": 0, "answer": 1, "skills": 1}
))
# keep only responses with a non-empty answer
return [d for d in docs if (d.get("answer") or "").strip()]
except Exception:
return []
def _normalize_quotes_spaces(s: str) -> str:
if not s:
return ""
s = unicodedata.normalize("NFKC", s)
s = s.replace("…", "...")
s = re.sub(r"\s+", " ", s).strip()
return s
def _clean_tokens(s: str) -> list[str]:
s = _normalize_quotes_spaces(s).lower()
# keep letters/digits/spaces; drop punctuation
s = re.sub(r"[^\w\s]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s.split()
def _vectorize(tokens: list[str]) -> Counter:
return Counter(tokens)
def _cosine_sim(a: Counter, b: Counter) -> float:
if not a or not b:
return 0.0
# dot
dot = sum(a[k] * b.get(k, 0) for k in a)
# norms
na = math.sqrt(sum(v*v for v in a.values()))
nb = math.sqrt(sum(v*v for v in b.values()))
if na == 0.0 or nb == 0.0:
return 0.0
return dot / (na * nb)
def _seq_ratio(a: str, b: str) -> float:
# SequenceMatcher returns 0..1
return SequenceMatcher(None, a, b).ratio()
def _best_full_answer_for_quote(q: str, responses: list[dict]) -> str | None:
"""
Return the best-matching full answer for a (possibly truncated/middle) quote.
Uses semantic similarity: 0.6*cosine(token) + 0.4*SequenceMatcher.
If multiple tie, picks the one with HIGHEST total skill score.
"""
q_norm = _normalize_quotes_spaces(q)
q_clean = _normalize_quotes_spaces(q).lower()
q_tokens = _clean_tokens(q_norm)
q_vec = _vectorize(q_tokens)
best = None # (combined_score, skill_total, full_answer)
for r in responses:
full = (r.get("answer") or "").strip()
if not full:
continue
full_norm = _normalize_quotes_spaces(full)
full_clean = full_norm.lower()
full_tokens = _clean_tokens(full_norm)
full_vec = _vectorize(full_tokens)
cos = _cosine_sim(q_vec, full_vec)
seq = _seq_ratio(q_clean, full_clean)
combined = 0.6 * cos + 0.4 * seq
# small boost if the normalized quote substring appears (cheap heuristic)
if q_clean and q_clean in full_clean:
combined += 0.05
# compute skill total for tie-break
skills = r.get("skills") or {}
skill_total = 0.0
for v in skills.values():
try:
skill_total += float(v)
except Exception:
pass
cand = (combined, skill_total, full)
if (best is None) or (cand[0] > best[0]) or (cand[0] == best[0] and cand[1] > best[1]):
best = cand
# Threshold so we don't replace with a bad match; tweak 0.45–0.65 as needed
if best and best[0] >= 0.5:
return best[2]
return None
def _fix_cutoff_quotes(quotes: list[str], responses: list[dict]) -> list[str]:
"""
Replace truncated/middle quotes with the best-matching full answer from `responses`
(already filtered to student+stage). If no decent semantic match, keep original.
"""
if not quotes:
return []
out = []
for q in quotes:
q_raw = (q or "").strip()
if not q_raw:
continue
# If it looks truncated (ellipsis) OR is short, try semantic match
looks_truncated = ("..." in q_raw) or (len(q_raw) < 100)
if looks_truncated:
full = _best_full_answer_for_quote(q_raw, responses)
out.append(full if full else q_raw)
else:
out.append(q_raw)
return out
def _top3_answers_by_skill_sum(responses: list[dict]) -> list[str]:
"""Pick up to 3 answers with the highest total skill score."""
scored = []
for r in responses:
ans = (r.get("answer") or "").strip()
if not ans:
continue
total = _answer_total_score(r)
scored.append((total, ans))
scored.sort(key=lambda x: x[0], reverse=True)
return [ans for _, ans in scored[:3]]
def fetch_student_stage_summary(
uri: str,
db: str,
summaries_coll: str,
responses_coll: str,
student: str,
stage: str
):
"""
Return summary dict for a student+stage ONLY if the student has responses for that week.
Otherwise, return None (so we don't render the panel).
"""
# 1) Require that the student answered that week (source derived from stage)
responses = _responses_for_student_stage(uri, db, responses_coll, student, stage)
if not responses:
return None
# 2) Pull summary doc (patterns nested)
patterns = {}
top_strengths = []
notable_quotes = []
try:
c = _client(uri)
doc = c[db][summaries_coll].find_one(
{"student_name": student, "stage": stage},
{"_id": 0, "patterns": 1, "top_strengths": 1, "notable_quotes": 1}
) or {}
patterns = doc.get("patterns") or {}
top_strengths = doc.get("top_strengths") or []
notable_quotes = doc.get("notable_quotes") or []
except Exception:
pass
most_consistent = patterns.get("most_consistent")
most_developed = patterns.get("most_developed")
# 3) Repair cut-off quotes; if none after fixing, fallback to top 3 highest-scoring answers
notable_quotes = _fix_cutoff_quotes(notable_quotes, responses)
if not notable_quotes:
notable_quotes = _top3_answers_by_skill_sum(responses)
return {
"most_consistent": most_consistent,
"most_developed": most_developed,
"top_strengths": top_strengths,
"notable_quotes": notable_quotes,
}
# # ------------------- Output (Tabs) -------------------
# tab_summary, tab_analyses, tab_compare = st.tabs(["📈 Summary", "📝 Analyses","📊 Comparisons"])
tabs = st.tabs(["📈 Summary", "📝 Analyses", "📊 Comparisons"])
with tabs[0]:
# ---------- Render the summary panel dynamically ----------
if mongo_uri and student_choice != "(All)" and source_choice != "(All)":
stage = SOURCE_TO_STAGE.get(source_choice.strip())
if stage:
# set to your actual summaries collection name
summaries_coll_name = "summaries_IFE_2025"
summary = fetch_student_stage_summary(
mongo_uri, db_name, summaries_coll_name, coll_name,
student=student_choice, stage=stage
)
if summary:
st.markdown("---")
st.subheader(f"Summary — {student_choice} ({stage.replace('_', ' ').title()})")
c1, c2 = st.columns(2)
with c1:
st.markdown(f"**Most Consistent:** {summary.get('most_consistent') or '—'}")
st.markdown(f"**Most Developed:** {summary.get('most_developed') or '—'}")
with c2:
strengths = summary.get("top_strengths") or []
st.markdown("**Top Strengths:** " + (", ".join(strengths) if strengths else "—"))
st.markdown("**Notable Quotes:**")
for q in (summary.get("notable_quotes") or [])[:3]:
st.markdown(f"> {q}")
with tabs[1]:
st.subheader("Student Analysis")
# Use the folder you defined at top (ANALYSES_DIR), or expose it in the sidebar if you prefer.
idx = _build_analysis_index(ANALYSES_DIR)
if student_choice == "(All)":
st.info("Pick a specific student on the left to view their analysis.")
# (Optional) show what's available so you can browse:
if idx:
st.caption("Available analyses:")
st.write(", ".join(sorted({name.title() for name in idx.keys()})))
file_path="full_class_summary.md"
full_summary=_load_markdown(file_path)
if full_summary.strip():
st.markdown(full_summary, unsafe_allow_html=False)
# Optional download button
with open(file_path, "rb") as f:
st.download_button(
"Download analysis (.md)", f,
file_name=os.path.basename(file_path), mime="text/markdown"
)
else:
st.warning("Analysis file found but empty.")
else:
# Normalize the selected student name to match filenames
norm = _normalize_name(student_choice)
path = idx.get(norm)
# If exact match not found, try simple underscore variant
if not path:
alt = student_choice.replace(" ", "_")
path = idx.get(_normalize_name(alt))
if path:
md = _load_markdown(path)
if md.strip():
st.markdown(md, unsafe_allow_html=False)
system = '''### 🔵🔵 Skill Indicator System
| Symbol | Meaning |
|---------|----------------------------------------------|
| 🔵 | Clear evidence of the skill that week |
| 🔵🔵 | Strong or standout performance that week |
| ⚪⚪ | Little to no evidence for that skill that week|
'''
st.markdown(system)
# Optional download button
with open(path, "rb") as f:
st.download_button(
"Download analysis (.md)", f,
file_name=os.path.basename(path), mime="text/markdown"
)
else:
st.warning("Analysis file found but empty.")
else:
st.warning(f"No analysis found for **{student_choice}** in `{ANALYSES_DIR}` yet.")
if idx:
st.caption("Available analyses:")
st.write(", ".join(sorted({name.title() for name in idx.keys()})))
with tabs[2]:
st.subheader("Onboarding vs Closing — % Change")
df1 = pd.DataFrame({
"Dimension": COLS,
"Onboarding": [vec_onb.get(k) for k in COLS],
"Closing": [vec_cls.get(k) for k in COLS],
"% Change": [pct_onb_to_cls.get(k) for k in COLS],
})
st.dataframe(df1.style.format({"Onboarding": "{:.2f}", "Closing": "{:.2f}", "% Change": "{:+.1f}%"}), use_container_width=True)
st.subheader("Onboarding vs (Week2+Week3+Closing) — % Change")
df2 = pd.DataFrame({
"Dimension": COLS,
"Onboarding": [vec_onb.get(k) for k in COLS],
"Weeks 2+3+Closing (combined)": [vec_combo.get(k) for k in COLS],
"% Change": [pct_onb_to_combo.get(k) for k in COLS],
})
st.dataframe(df2.style.format({"Onboarding": "{:.2f}", "Weeks 2+3+Closing (combined)": "{:.2f}", "% Change": "{:+.1f}%"}), use_container_width=True)
# Optional bar chart: % change Onboarding -> Closing
try:
fig_delta = go.Figure()
fig_delta.add_bar(x=COLS, y=[pct_onb_to_cls.get(k) if pct_onb_to_cls.get(k) is not None else 0 for k in COLS], name="%Δ Onb→Closing")
fig_delta.update_layout(title="% Change: Onboarding → Closing", xaxis_title="Dimension", yaxis_title="% change", margin=dict(l=20, r=20, t=50, b=20))
st.plotly_chart(fig_delta, use_container_width=True)
except Exception:
pass
# # app.py — Student Skill Radar (MongoDB, secrets-based, no CSV)
# import os
# from datetime import date
# from typing import Dict, List
# import numpy as np
# import pandas as pd
# import plotly.graph_objects as go
# import streamlit as st
# from pymongo import MongoClient
# from urllib.parse import quote_plus
# st.set_page_config(page_title="Student Skill Radar", layout="wide")
# # ------------------- Constants -------------------
# SKILLS = [
# "Problem-Solving",
# "Critical Thinking",
# "Analytical Reasoning",
# "Adaptability",
# "Continuous Learning",
# "Creativity",
# "Communication",
# "Collaboration",
# "Community Engagement",
# "Emotional Intelligence",
# "Ethical Decision-Making",
# "Time Management",
# "Tech Aptitude",
# ]
# SKILL_GROUPS = {
# "Problem-Solving, Critical Thinking, Analytical Reasoning": [
# "Problem-Solving", "Critical Thinking", "Analytical Reasoning"
# ],
# "Adaptability, Continuous Learning, Creativity": [
# "Adaptability", "Continuous Learning", "Creativity"
# ],
# "Time Management": ["Time Management"],
# "Communication, Teamwork, Collaboration, Community Engagement": [
# "Communication", "Collaboration", "Community Engagement"
# ],
# "Emotional Intelligence, Ethical Decision Making": [
# "Emotional Intelligence", "Ethical Decision-Making"
# ],
# "Tech Aptitude": ["Tech Aptitude"],
# }
# # ------------------- Helpers -------------------
# def safe_mean(vals):
# clean = [v for v in vals if v is not None and not pd.isna(v)]
# return float(np.mean(clean)) if clean else np.nan
# def to_01_or_nan(x):
# try:
# v = float(x)
# except Exception:
# return np.nan
# if pd.isna(v):
# return np.nan
# return max(0.0, min(1.0, v))
# def aggregate_groups_row(row: pd.Series) -> Dict[str, float]:
# return {
# g: safe_mean([row.get(s, np.nan) for s in members])
# for g, members in SKILL_GROUPS.items()
# }
# def summarize(records: List[dict], level: str = "student") -> pd.DataFrame:
# df = pd.DataFrame(records) if records else pd.DataFrame()
# if df.empty:
# return df
# if level == "student+source":
# df["label"] = df["student"].astype(str) + " — " + df["source"].astype(str)
# else:
# df["label"] = df["student"].astype(str)
# # groupby mean skips NaNs by default
# return df.groupby("label", dropna=False)[SKILLS].mean().reset_index()
# def plot_radar(df: pd.DataFrame, grouped: bool, title: str):
# if df.empty:
# return go.Figure()
# traces = []
# if grouped:
# labels = list(SKILL_GROUPS.keys())
# for _, r in df.iterrows():
# grp = aggregate_groups_row(r)
# values = [0.0 if pd.isna(grp[k]) else float(grp[k]) for k in labels]
# traces.append(go.Scatterpolar(
# r=values + [values[0]],
# theta=labels + [labels[0]],
# name=r["label"],
# fill="toself",
# ))
# else:
# labels = SKILLS
# for _, r in df.iterrows():
# values = []
# for k in SKILLS:
# v = r.get(k, np.nan)
# values.append(0.0 if pd.isna(v) else float(v))
# traces.append(go.Scatterpolar(
# r=values + [values[0]],
# theta=labels + [labels[0]],
# name=r["label"],
# fill="toself",
# ))
# fig = go.Figure(traces)
# fig.update_layout(
# title=title or "Skill Radar",
# showlegend=True,
# polar=dict(
# radialaxis=dict(
# autorange=False,
# range=[0, 1],
# tick0=0,
# dtick=0.2,
# ticks="outside",
# showline=True,
# showgrid=True,
# visible=True,
# )
# ),
# margin=dict(l=30, r=30, t=60, b=30),
# )
# return fig
# # ------------------- Mongo Access (secrets-only) -------------------
# def _get_secret(name: str) -> str | None:
# try:
# val = st.secrets.get(name)
# if val is not None:
# return str(val)
# except Exception:
# pass
# return os.getenv(name)
# def _build_uri(db_name: str | None) -> str | None:
# user = _get_secret("MONGO_USER")
# pw = _get_secret("MONGO_PASS")
# cluster = _get_secret("MONGO_CLUSTER")
# if not (user and pw and cluster):
# return None
# user_q = quote_plus(user)
# pw_q = quote_plus(pw)
# db_path = f"/{db_name}" if db_name else ""
# return (
# f"mongodb+srv://{user_q}:{pw_q}@{cluster}{db_path}"
# f"?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true"
# )
# @st.cache_resource(show_spinner=False)
# def _client(uri: str):
# return MongoClient(uri, serverSelectionTimeoutMS=10000)
# # @st.cache_data(show_spinner=False)
# def mongo_distinct(uri: str, db: str, coll: str, field: str) -> List[str]:
# if not uri:
# return []
# try:
# c = _client(uri)
# vals = c[db][coll].distinct(field)
# return sorted([v for v in vals if isinstance(v, str) and v.strip()])
# except Exception:
# return []
# # @st.cache_data(show_spinner=False)
# def mongo_records(
# uri: str,
# db: str,
# coll: str,
# student: str | None,
# source: str | None,
# start: str | None,
# end: str | None,
# ) -> List[dict]:
# """Return flat rows with one column per skill; missing skills -> NaN (ignored in means)."""
# if not uri:
# return []
# q = {}
# if student and student != "(All)":
# q["student"] = student
# if source and source != "(All)":
# q["source"] = source
# if start or end:
# q["date"] = {}
# if start:
# q["date"]["$gte"] = start
# if end:
# q["date"]["$lte"] = end
# try:
# c = _client(uri)
# proj = {"_id": 0, "student": 1, "source": 1, "date": 1, "skills": 1}
# docs = list(c[db][coll].find(q, proj))
# rows = []
# for d in docs:
# base = {
# "student": str(d.get("student", "")),
# "source": str(d.get("source", "")),
# "date": str(d.get("date", "")),
# }
# sd = d.get("skills") or {}
# for k in SKILLS:
# base[k] = to_01_or_nan(sd.get(k, np.nan))
# rows.append(base)
# return rows
# except Exception:
# return []
# # ------------------- UI -------------------
# st.title("📊 Student Skill Radar")
# with st.sidebar:
# st.subheader("MongoDB Settings")
# db_name = st.text_input("Database name", value="student_skills")
# coll_name = st.text_input("Collection name", value="responses_IFE_2025")
# mongo_uri = _build_uri(db_name)
# if not mongo_uri:
# st.warning("Missing MONGO_USER, MONGO_PASS, or MONGO_CLUSTER in secrets/env.")
# else:
# try:
# _client(mongo_uri).admin.command("ping")
# st.success("Connected via secrets ✅")
# except Exception as e:
# st.error(f"Mongo connection failed: {e}")
# # Filters
# students = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "student") if mongo_uri else [])
# sources = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "source") if mongo_uri else [])
# student_choice = st.selectbox("Select student", students)
# source_choice = st.selectbox("Select source/week", sources)
# c1, c2 = st.columns(2)
# start_dt = c1.date_input("Start date", value=None)
# end_dt = c2.date_input("End date", value=None)
# agg_level = st.selectbox("Aggregation level", ["student", "student+source"], index=0)
# grouped = st.toggle("Grouped skills (skill clusters)", value=True)
# chart_title = st.text_input("Chart title", value="")
# # Convert dates to strings (YYYY-MM-DD)
# start_str = start_dt.strftime("%Y-%m-%d") if isinstance(start_dt, date) else None
# end_str = end_dt.strftime("%Y-%m-%d") if isinstance(end_dt, date) else None
# # Fetch + aggregate
# records = mongo_records(mongo_uri, db_name, coll_name, student_choice, source_choice, start_str, end_str) if mongo_uri else []
# df = summarize(records, level=agg_level) if records else pd.DataFrame()
# # ------------------- Output -------------------
# fig = plot_radar(df, grouped, chart_title)
# st.plotly_chart(fig, use_container_width=True)
# st.caption(f"{len(df)} line(s) aggregated." if not df.empty else "No data. Adjust filters or check Mongo connection.")