# app.py β€” Student Skill Radar (MongoDB, secrets-based) import os from datetime import date from typing import Dict, List import numpy as np import pandas as pd import plotly.graph_objects as go import streamlit as st from pymongo import MongoClient from urllib.parse import quote_plus st.set_page_config(page_title="Student Skill Radar", layout="wide") # ------------------- Constants ------------------- SKILLS = [ "Problem-Solving", "Critical Thinking", "Analytical Reasoning", "Adaptability", "Continuous Learning", "Creativity", "Communication", "Collaboration", "Community Engagement", "Emotional Intelligence", "Ethical Decision-Making", "Time Management", "Tech Aptitude" ] SKILL_GROUPS = { "Problem-Solving, Critical Thinking, Analytical Reasoning": [ "Problem-Solving", "Critical Thinking", "Analytical Reasoning" ], "Adaptability, Continuous Learning, Creativity": [ "Adaptability", "Continuous Learning", "Creativity" ], "Time Management": ["Time Management"], "Communication, Teamwork, Collaboration, Community Engagement": [ "Communication", "Collaboration", "Community Engagement" ], "Emotional Intelligence, Ethical Decision Making": [ "Emotional Intelligence", "Ethical Decision-Making" ], "Tech Aptitude": ["Tech Aptitude"] } SOURCE_TO_STAGE = { "onboarding_responses": "onboarding", "closing_responses": "closing", } # ------------------- Helpers ------------------- def safe_mean(vals): clean = [v for v in vals if v is not None and not pd.isna(v)] return float(np.mean(clean)) if clean else np.nan def to_01_or_nan(x): try: v = float(x) except Exception: return np.nan if pd.isna(v): return np.nan return max(0.0, min(1.0, v)) def aggregate_groups_row(row: pd.Series) -> Dict[str, float]: return { g: safe_mean([row.get(s, np.nan) for s in members]) for g, members in SKILL_GROUPS.items() } def df_to_grouped(df_in: pd.DataFrame) -> pd.DataFrame: if df_in.empty: return df_in rows = [] for _, r in df_in.iterrows(): grp = aggregate_groups_row(r) out = {"label": r["label"]} for glabel in SKILL_GROUPS.keys(): v = grp.get(glabel) out[glabel] = 0.0 if pd.isna(v) else float(v) rows.append(out) return pd.DataFrame(rows, columns=["label"] + list(SKILL_GROUPS.keys())) def plot_radar(df: pd.DataFrame, grouped: bool, title: str, avg_label: str = None): if df.empty: return go.Figure() traces = [] labels = list(SKILL_GROUPS.keys()) if grouped else SKILLS for _, r in df.iterrows(): values = [0.0 if pd.isna(r.get(k)) else float(r.get(k)) for k in labels] is_avg = avg_label and (str(r["label"]) == avg_label) traces.append(go.Scatterpolar( r=values + [values[0]], theta=labels + [labels[0]], name=r["label"], fill="toself", line=dict( width=4 if is_avg else 2, dash="dash" if is_avg else "solid", color="red" if is_avg else None ), opacity=0.7 if is_avg else 0.5 )) fig = go.Figure(traces) fig.update_layout( title=title or "Skill Radar", showlegend=True, polar=dict( radialaxis=dict( autorange=False, range=[0, 1], tick0=0, dtick=0.2, ticks="outside", showline=True, showgrid=True, visible=True ) ), margin=dict(l=30, r=30, t=60, b=30), ) return fig def _vector_from_row(row: pd.Series, cols: list[str]) -> dict: return {k: (None if pd.isna(row.get(k)) else float(row.get(k))) for k in cols} def _percent_change(new: float | None, old: float | None) -> float | None: if new is None or old is None: return None if old == 0: return None # avoid div-by-zero; you can choose to show 100% if new>0 return (new - old) / old * 100.0 def _merge_resp_and_likert_vector(resp_vec: dict, likert_grouped_vec: dict | None, grouped: bool, SKILL_TO_GROUPS: dict[str, list[str]], SKILL_GROUPS: dict[str, list[str]]) -> dict: """ Returns a merged vector: - If grouped: keys are group labels - If ungrouped: keys are per-skill; Likert (group) is projected to skills by averaging groups a skill belongs to """ if likert_grouped_vec is None: return resp_vec if grouped: out = {} for g in SKILL_GROUPS.keys(): rv = resp_vec.get(g, None) lv = likert_grouped_vec.get(g, None) if rv is not None and lv is not None: out[g] = (rv + lv) / 2.0 elif rv is not None: out[g] = rv else: out[g] = lv return out else: # project group likert to each skill out = {} for s in resp_vec.keys(): rv = resp_vec.get(s, None) groups = SKILL_TO_GROUPS.get(s, []) lik_vals = [likert_grouped_vec.get(g) for g in groups if likert_grouped_vec.get(g) is not None] lv = float(np.mean(lik_vals)) if lik_vals else None if rv is not None and lv is not None: out[s] = (rv + lv) / 2.0 elif rv is not None: out[s] = rv else: out[s] = lv return out # ------------------- Mongo ------------------- def _get_secret(name: str) -> str | None: try: val = st.secrets.get(name) if val is not None: return str(val) except Exception: pass return os.getenv(name) def _build_uri(db_name: str | None) -> str | None: user = _get_secret("MONGO_USER") pw = _get_secret("MONGO_PASS") cluster = _get_secret("MONGO_CLUSTER") if not (user and pw and cluster): return None return f"mongodb+srv://{quote_plus(user)}:{quote_plus(pw)}@{cluster}/{db_name}?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true" @st.cache_resource(show_spinner=False) def _client(uri: str): return MongoClient(uri, serverSelectionTimeoutMS=10000) def mongo_distinct(uri: str, db: str, coll: str, field: str) -> List[str]: if not uri: return [] try: return sorted([v for v in _client(uri)[db][coll].distinct(field) if isinstance(v, str) and v.strip()]) except Exception: return [] def mongo_records(uri: str, db: str, coll: str, student: str | None, source: str | None) -> List[dict]: if not uri: return [] q = {} if student and student != "(All)": q["student"] = student if source and source != "(All)": q["source"] = source try: docs = list(_client(uri)[db][coll].find(q, {"_id": 0, "student": 1, "source": 1, "skills": 1})) rows = [] for d in docs: base = {"student": str(d.get("student", "")), "source": str(d.get("source", ""))} for k in SKILLS: base[k] = to_01_or_nan((d.get("skills") or {}).get(k, np.nan)) rows.append(base) return rows except Exception: return [] # ---------- Likert helpers ---------- def _norm_01(v): try: return max(0.0, min(1.0, float(v) / 5.0 if float(v) > 1 else float(v))) except Exception: return None def mongo_get_likert_grouped(uri: str, db: str, coll: str, student: str, stage: str) -> dict: if not (uri and student and stage): return {} try: doc = _client(uri)[db][coll].find_one({"student_name": student, "stage": stage}, {"_id": 0, "average_skill_scores": 1}) avg = (doc or {}).get("average_skill_scores") or {} return {g: _norm_01(avg.get(g)) for g in SKILL_GROUPS.keys()} except Exception: return {} # ---- Analyses (Markdown) helpers ---- ANALYSES_DIR = os.getenv("ANALYSES_DIR", "student_analyses") # folder in your HF Space def _normalize_name(s: str) -> str: # Lower, remove non-alphanumerics, collapse spaces/underscores import re, unicodedata s = unicodedata.normalize("NFKC", s or "").strip().lower() s = re.sub(r"[^\w\s]", "", s) s = re.sub(r"[\s_]+", " ", s).strip() return s @st.cache_data(show_spinner=False) def _build_analysis_index(analyses_dir: str) -> dict: """Return dict: normalized_name -> file_path for *.md under analyses_dir.""" import os, glob index = {} if not os.path.isdir(analyses_dir): return index for path in glob.glob(os.path.join(analyses_dir, "*.md")): base = os.path.splitext(os.path.basename(path))[0] # "Student_Name" # accept both "Student Name" and "Student_Name" as same norm = _normalize_name(base.replace("_", " ")) index[norm] = path return index @st.cache_data(show_spinner=False) def _load_markdown(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: return f.read() except Exception: return "" # ------------------- UI ------------------- st.title("πŸ“Š Student Skill Radar") with st.sidebar: db_name = st.text_input("Database name", value="student_skills") coll_name = st.text_input("Collection name", value="responses_IFE_2025") summaries_coll = st.text_input("Likert summaries collection", value="likert_summaries_IFE_2025") mongo_uri = _build_uri(db_name) students = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "student") if mongo_uri else []) sources = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "source") if mongo_uri else []) student_choice = st.selectbox("Select student", students) source_choice = st.selectbox("Select source/week", sources) # start_dt = st.date_input("Start date", value=None) # end_dt = st.date_input("End date", value=None) grouped = st.toggle("Grouped skills", value=True) overlay_sources = st.toggle("Overlay all sources when '(All)' selected", value=False) chart_title = st.text_input("Chart title", value="") # start_str = start_dt.strftime("%Y-%m-%d") if isinstance(start_dt, date) else None # end_str = end_dt.strftime("%Y-%m-%d") if isinstance(end_dt, date) else None # ------------------- Fetch + merge ------------------- records = mongo_records(mongo_uri, db_name, coll_name, student_choice, source_choice) if mongo_uri else [] df_raw = pd.DataFrame(records) if records else pd.DataFrame() if not df_raw.empty: df_raw["label"] = df_raw["student"].astype(str) + " β€” " + df_raw["source"].astype(str) df_resp = df_raw.groupby("label", dropna=False)[SKILLS].mean().reset_index() if grouped: df_resp = df_to_grouped(df_resp) else: df_resp = pd.DataFrame() # ---------- Merge Likert scores (works for grouped and ungrouped) ---------- from statistics import mean # map each skill to the group(s) it belongs to (almost always one) SKILL_TO_GROUPS = {s: [g for g, members in SKILL_GROUPS.items() if s in members] for s in SKILLS} def _likert_for_skill(likert_grouped: dict, skill: str) -> float | None: groups = SKILL_TO_GROUPS.get(skill, []) vals = [likert_grouped.get(g) for g in groups if likert_grouped.get(g) is not None] return mean(vals) if vals else None if not df_resp.empty and summaries_coll: merged_rows = [] # choose which columns we're merging cols = list(SKILL_GROUPS.keys()) if grouped else SKILLS for _, r in df_resp.iterrows(): label = str(r["label"]) student, stage = label.split(" β€” ", 1) if " β€” " in label else (label, None) stage = SOURCE_TO_STAGE.get(stage.strip()) if stage else None # only onboarding/closing have Likert summaries likert_g = ( mongo_get_likert_grouped(mongo_uri, db_name, summaries_coll, student.strip(), stage) if stage in ("onboarding", "closing") else {} ) out = {"label": label} if grouped: # combine grouped columns directly for g in SKILL_GROUPS.keys(): resp_val = None if pd.isna(r.get(g)) else float(r.get(g)) likert_val = likert_g.get(g, None) if resp_val is not None and likert_val is not None: out[g] = (resp_val + likert_val) / 2.0 elif resp_val is not None: out[g] = resp_val elif likert_val is not None: out[g] = likert_val else: out[g] = np.nan else: # map group Likert down to each skill, then combine for s in SKILLS: resp_val = None if pd.isna(r.get(s)) else float(r.get(s)) likert_val = _likert_for_skill(likert_g, s) if resp_val is not None and likert_val is not None: out[s] = (resp_val + likert_val) / 2.0 elif resp_val is not None: out[s] = resp_val elif likert_val is not None: out[s] = likert_val else: out[s] = np.nan merged_rows.append(out) df_final = pd.DataFrame(merged_rows, columns=["label"] + cols) else: df_final = df_resp # Overlay mode # if grouped and not df_final.empty and source_choice == "(All)" and not overlay_sources: # df_final["_student"] = df_final["label"].apply(lambda s: s.split(" β€” ", 1)[0]) # df_final = df_final.groupby("_student", dropna=False)[list(SKILL_GROUPS.keys())].mean().reset_index() # df_final = df_final.rename(columns={"_student": "label"}) # ---------------- Overlay vs Combine ---------------- if not df_final.empty and source_choice == "(All)": if overlay_sources: # Overlay ON β†’ keep one line per source (do nothing) pass else: # Overlay OFF β†’ combine all sources into one line per student df_final["_student"] = df_final["label"].apply(lambda s: s.split(" β€” ", 1)[0]) if grouped: cols = list(SKILL_GROUPS.keys()) else: cols = SKILLS df_final = ( df_final .groupby("_student", dropna=False)[cols] .mean() .reset_index() .rename(columns={"_student": "label"}) ) # ------------------- Output ------------------- # fig = plot_radar(df_final, grouped, chart_title) # st.plotly_chart(fig, use_container_width=True) # ============== Build per-stage vectors for comparisons (LIKERT-AWARE) ============== # Columns to use based on mode COLS = list(SKILL_GROUPS.keys()) if grouped else SKILLS # Map each skill to its group(s) once (used to project group Likert down to skills) SKILL_TO_GROUPS = {s: [g for g, members in SKILL_GROUPS.items() if s in members] for s in SKILLS} def _project_likert_to_cols(likert_grouped: dict | None, cols: list[str], grouped_flag: bool) -> dict: """Return a vector aligned to COLS from group-level Likert. If ungrouped, project to skills.""" if not likert_grouped: return {k: None for k in cols} if grouped_flag: return {k: (likert_grouped.get(k) if k in likert_grouped else None) for k in cols} # ungrouped β†’ average the groups a skill belongs to out = {} for s in cols: gs = SKILL_TO_GROUPS.get(s, []) vals = [likert_grouped.get(g) for g in gs if likert_grouped.get(g) is not None] out[s] = float(np.mean(vals)) if vals else None return out def _merge_resp_and_likert(resp_vec: dict, likert_vec: dict) -> dict: """Average where both exist; else take whichever exists.""" out = {} for k in resp_vec.keys(): rv = resp_vec.get(k, None) lv = likert_vec.get(k, None) if rv is not None and lv is not None: out[k] = (rv + lv) / 2.0 elif rv is not None: out[k] = rv else: out[k] = lv return out def _mean_vectors(vecs: list[dict]) -> dict: """Element-wise mean ignoring None; returns None if all Nones for a key.""" if not vecs: return {} keys = list(vecs[0].keys()) out = {} for k in keys: vals = [v.get(k) for v in vecs if v.get(k) is not None] out[k] = (float(np.mean(vals)) if vals else None) return out def _resp_mean_for_sources(df_src: pd.DataFrame, student: str | None, sources: list[str], cols: list[str]) -> dict: """Mean of response scores across docs for (student,sources). If student None β†’ cohort.""" if df_src.empty: return {k: None for k in cols} sub = df_src.copy() if student: sub = sub[sub["student"] == student] sub = sub[sub["source"].isin(sources)] if sub.empty: return {k: None for k in cols} m = sub[cols].mean(numeric_only=True) return {k: (None if pd.isna(m.get(k)) else float(m.get(k))) for k in cols} def _likert_grouped_for_student_stage(student: str, stage: str) -> dict | None: """Get normalized (0–1) group-level Likert for onboarding/closing only.""" if stage not in ("onboarding", "closing"): return None lg = mongo_get_likert_grouped(mongo_uri, db_name, summaries_coll, student, stage) return lg if lg else None def _student_stage_vectors(df_src: pd.DataFrame, stu: str, cols: list[str], grouped_flag: bool) -> dict: """Per-student vectors with Likert merged for onboarding/closing; combined includes closing(merged).""" # Onboarding = RESP(onboarding) βŠ• Likert(onboarding) onb_resp = _resp_mean_for_sources(df_src, stu, ["onboarding_responses"], cols) onb_lik = _project_likert_to_cols(_likert_grouped_for_student_stage(stu, "onboarding"), cols, grouped_flag) onb = _merge_resp_and_likert(onb_resp, onb_lik) # Closing = RESP(closing) βŠ• Likert(closing) cls_resp = _resp_mean_for_sources(df_src, stu, ["closing_responses"], cols) cls_lik = _project_likert_to_cols(_likert_grouped_for_student_stage(stu, "closing"), cols, grouped_flag) cls = _merge_resp_and_likert(cls_resp, cls_lik) # Combined = mean( RESP(week2), RESP(week3), CLOSING(merged) ) w2 = _resp_mean_for_sources(df_src, stu, ["week_2_responses"], cols) w3 = _resp_mean_for_sources(df_src, stu, ["week_3_responses"], cols) combo = _mean_vectors([w2, w3, cls]) # <- note: closing already merged with Likert return {"onboarding": onb, "closing": cls, "combined": combo} def _stage_vectors_for_current_selection(df_src: pd.DataFrame, student_choice: str | None, cols: list[str], grouped_flag: bool) -> dict: """ If a student is selected β†’ return their vectors. If cohort (β€œ(All)”) β†’ average per-student vectors (Likert included where available). """ if student_choice and student_choice != "(All)": return _student_stage_vectors(df_src, student_choice, cols, grouped_flag) # Cohort: compute for each student then average if df_src.empty: empty_vec = {k: None for k in cols} return {"onboarding": empty_vec, "closing": empty_vec, "combined": empty_vec} students = sorted(set(str(x) for x in df_src["student"].dropna().unique())) per_student = [_student_stage_vectors(df_src, s, cols, grouped_flag) for s in students] return { "onboarding": _mean_vectors([p["onboarding"] for p in per_student]), "closing": _mean_vectors([p["closing"] for p in per_student]), "combined": _mean_vectors([p["combined"] for p in per_student]), } def _percent_change(new: float | None, old: float | None) -> float | None: if new is None or old is None: return None if old == 0: return None # or return 100.0 if you prefer return (new - old) / old * 100.0 # Use df_raw (one row per doc) so overlay/aggregation doesn’t hide sources # Ensure df_raw has the per-skill or per-group columns we need: if grouped and not df_raw.empty: # build grouped view just for comparisons df_grouped_for_comp = df_raw.copy() # aggregate per-doc row to grouped columns df_grouped_for_comp = ( df_grouped_for_comp .assign(**{ g: df_grouped_for_comp.apply(lambda r: safe_mean([r.get(s, np.nan) for s in SKILL_GROUPS[g]]), axis=1) for g in SKILL_GROUPS.keys() }) ) df_src_for_comp = df_grouped_for_comp[["student", "source"] + list(SKILL_GROUPS.keys())] else: df_src_for_comp = df_raw # already per-skill stage_vecs = _stage_vectors_for_current_selection(df_src_for_comp, student_choice, COLS, grouped) vec_onb = stage_vecs["onboarding"] vec_cls = stage_vecs["closing"] vec_combo = stage_vecs["combined"] pct_onb_to_cls = {k: _percent_change(vec_cls.get(k), vec_onb.get(k)) for k in COLS} pct_onb_to_combo = {k: _percent_change(vec_combo.get(k), vec_onb.get(k)) for k in COLS} # ------------------- Plot + table above stays the same ------------------- df_plot = df_final.copy() avg_label = None if not df_plot.empty: cols = list(SKILL_GROUPS.keys()) if grouped else SKILLS show_cohort_avg = st.toggle("Show cohort average (all students)", value=True) if show_cohort_avg: avg_vals = df_plot[cols].mean() avg_row = {"label": "Average (All Students)"} avg_row.update({k: float(avg_vals[k]) for k in cols}) df_plot = pd.concat([df_plot, pd.DataFrame([avg_row])], ignore_index=True) avg_label = "Average (All Students)" fig = plot_radar(df_plot, grouped, chart_title, avg_label=avg_label) st.plotly_chart(fig, use_container_width=True) st.caption(f"{len(df_final)} line(s) aggregated." if not df_final.empty else "No data.") # ================== Dynamic Stage Summaries (only if student answered that week) ================== import re import unicodedata from collections import Counter from difflib import SequenceMatcher import math # Stage <-> Source mapping STAGE_TO_SOURCE = { "onboarding": "onboarding_responses", "week_2": "week_2_responses", "week_3": "week_3_responses", "closing": "closing_responses", # future-proof } SOURCE_TO_STAGE = {v: k for k, v in STAGE_TO_SOURCE.items()} def _answer_total_score(resp: dict) -> float: skills = resp.get("skills") or {} total = 0.0 for v in skills.values(): try: total += float(v) except Exception: pass return total def _responses_for_student_stage(uri, db, responses_coll, student: str, stage: str) -> list[dict]: """Return responses for a student at a stage (mapped to source) with non-empty answers.""" if not (uri and student and stage): return [] src = STAGE_TO_SOURCE.get(stage) if not src: return [] try: c = _client(uri) docs = list(c[db][responses_coll].find( {"student": student, "source": src}, {"_id": 0, "answer": 1, "skills": 1} )) # keep only responses with a non-empty answer return [d for d in docs if (d.get("answer") or "").strip()] except Exception: return [] def _normalize_quotes_spaces(s: str) -> str: if not s: return "" s = unicodedata.normalize("NFKC", s) s = s.replace("…", "...") s = re.sub(r"\s+", " ", s).strip() return s def _clean_tokens(s: str) -> list[str]: s = _normalize_quotes_spaces(s).lower() # keep letters/digits/spaces; drop punctuation s = re.sub(r"[^\w\s]", " ", s) s = re.sub(r"\s+", " ", s).strip() return s.split() def _vectorize(tokens: list[str]) -> Counter: return Counter(tokens) def _cosine_sim(a: Counter, b: Counter) -> float: if not a or not b: return 0.0 # dot dot = sum(a[k] * b.get(k, 0) for k in a) # norms na = math.sqrt(sum(v*v for v in a.values())) nb = math.sqrt(sum(v*v for v in b.values())) if na == 0.0 or nb == 0.0: return 0.0 return dot / (na * nb) def _seq_ratio(a: str, b: str) -> float: # SequenceMatcher returns 0..1 return SequenceMatcher(None, a, b).ratio() def _best_full_answer_for_quote(q: str, responses: list[dict]) -> str | None: """ Return the best-matching full answer for a (possibly truncated/middle) quote. Uses semantic similarity: 0.6*cosine(token) + 0.4*SequenceMatcher. If multiple tie, picks the one with HIGHEST total skill score. """ q_norm = _normalize_quotes_spaces(q) q_clean = _normalize_quotes_spaces(q).lower() q_tokens = _clean_tokens(q_norm) q_vec = _vectorize(q_tokens) best = None # (combined_score, skill_total, full_answer) for r in responses: full = (r.get("answer") or "").strip() if not full: continue full_norm = _normalize_quotes_spaces(full) full_clean = full_norm.lower() full_tokens = _clean_tokens(full_norm) full_vec = _vectorize(full_tokens) cos = _cosine_sim(q_vec, full_vec) seq = _seq_ratio(q_clean, full_clean) combined = 0.6 * cos + 0.4 * seq # small boost if the normalized quote substring appears (cheap heuristic) if q_clean and q_clean in full_clean: combined += 0.05 # compute skill total for tie-break skills = r.get("skills") or {} skill_total = 0.0 for v in skills.values(): try: skill_total += float(v) except Exception: pass cand = (combined, skill_total, full) if (best is None) or (cand[0] > best[0]) or (cand[0] == best[0] and cand[1] > best[1]): best = cand # Threshold so we don't replace with a bad match; tweak 0.45–0.65 as needed if best and best[0] >= 0.5: return best[2] return None def _fix_cutoff_quotes(quotes: list[str], responses: list[dict]) -> list[str]: """ Replace truncated/middle quotes with the best-matching full answer from `responses` (already filtered to student+stage). If no decent semantic match, keep original. """ if not quotes: return [] out = [] for q in quotes: q_raw = (q or "").strip() if not q_raw: continue # If it looks truncated (ellipsis) OR is short, try semantic match looks_truncated = ("..." in q_raw) or (len(q_raw) < 100) if looks_truncated: full = _best_full_answer_for_quote(q_raw, responses) out.append(full if full else q_raw) else: out.append(q_raw) return out def _top3_answers_by_skill_sum(responses: list[dict]) -> list[str]: """Pick up to 3 answers with the highest total skill score.""" scored = [] for r in responses: ans = (r.get("answer") or "").strip() if not ans: continue total = _answer_total_score(r) scored.append((total, ans)) scored.sort(key=lambda x: x[0], reverse=True) return [ans for _, ans in scored[:3]] def fetch_student_stage_summary( uri: str, db: str, summaries_coll: str, responses_coll: str, student: str, stage: str ): """ Return summary dict for a student+stage ONLY if the student has responses for that week. Otherwise, return None (so we don't render the panel). """ # 1) Require that the student answered that week (source derived from stage) responses = _responses_for_student_stage(uri, db, responses_coll, student, stage) if not responses: return None # 2) Pull summary doc (patterns nested) patterns = {} top_strengths = [] notable_quotes = [] try: c = _client(uri) doc = c[db][summaries_coll].find_one( {"student_name": student, "stage": stage}, {"_id": 0, "patterns": 1, "top_strengths": 1, "notable_quotes": 1} ) or {} patterns = doc.get("patterns") or {} top_strengths = doc.get("top_strengths") or [] notable_quotes = doc.get("notable_quotes") or [] except Exception: pass most_consistent = patterns.get("most_consistent") most_developed = patterns.get("most_developed") # 3) Repair cut-off quotes; if none after fixing, fallback to top 3 highest-scoring answers notable_quotes = _fix_cutoff_quotes(notable_quotes, responses) if not notable_quotes: notable_quotes = _top3_answers_by_skill_sum(responses) return { "most_consistent": most_consistent, "most_developed": most_developed, "top_strengths": top_strengths, "notable_quotes": notable_quotes, } # # ------------------- Output (Tabs) ------------------- # tab_summary, tab_analyses, tab_compare = st.tabs(["πŸ“ˆ Summary", "πŸ“ Analyses","πŸ“Š Comparisons"]) tabs = st.tabs(["πŸ“ˆ Summary", "πŸ“ Analyses", "πŸ“Š Comparisons"]) with tabs[0]: # ---------- Render the summary panel dynamically ---------- if mongo_uri and student_choice != "(All)" and source_choice != "(All)": stage = SOURCE_TO_STAGE.get(source_choice.strip()) if stage: # set to your actual summaries collection name summaries_coll_name = "summaries_IFE_2025" summary = fetch_student_stage_summary( mongo_uri, db_name, summaries_coll_name, coll_name, student=student_choice, stage=stage ) if summary: st.markdown("---") st.subheader(f"Summary β€” {student_choice} ({stage.replace('_', ' ').title()})") c1, c2 = st.columns(2) with c1: st.markdown(f"**Most Consistent:** {summary.get('most_consistent') or 'β€”'}") st.markdown(f"**Most Developed:** {summary.get('most_developed') or 'β€”'}") with c2: strengths = summary.get("top_strengths") or [] st.markdown("**Top Strengths:** " + (", ".join(strengths) if strengths else "β€”")) st.markdown("**Notable Quotes:**") for q in (summary.get("notable_quotes") or [])[:3]: st.markdown(f"> {q}") with tabs[1]: st.subheader("Student Analysis") # Use the folder you defined at top (ANALYSES_DIR), or expose it in the sidebar if you prefer. idx = _build_analysis_index(ANALYSES_DIR) if student_choice == "(All)": st.info("Pick a specific student on the left to view their analysis.") # (Optional) show what's available so you can browse: if idx: st.caption("Available analyses:") st.write(", ".join(sorted({name.title() for name in idx.keys()}))) file_path="full_class_summary.md" full_summary=_load_markdown(file_path) if full_summary.strip(): st.markdown(full_summary, unsafe_allow_html=False) # Optional download button with open(file_path, "rb") as f: st.download_button( "Download analysis (.md)", f, file_name=os.path.basename(file_path), mime="text/markdown" ) else: st.warning("Analysis file found but empty.") else: # Normalize the selected student name to match filenames norm = _normalize_name(student_choice) path = idx.get(norm) # If exact match not found, try simple underscore variant if not path: alt = student_choice.replace(" ", "_") path = idx.get(_normalize_name(alt)) if path: md = _load_markdown(path) if md.strip(): st.markdown(md, unsafe_allow_html=False) system = '''### πŸ”΅πŸ”΅ Skill Indicator System | Symbol | Meaning | |---------|----------------------------------------------| | πŸ”΅ | Clear evidence of the skill that week | | πŸ”΅πŸ”΅ | Strong or standout performance that week | | βšͺβšͺ | Little to no evidence for that skill that week| ''' st.markdown(system) # Optional download button with open(path, "rb") as f: st.download_button( "Download analysis (.md)", f, file_name=os.path.basename(path), mime="text/markdown" ) else: st.warning("Analysis file found but empty.") else: st.warning(f"No analysis found for **{student_choice}** in `{ANALYSES_DIR}` yet.") if idx: st.caption("Available analyses:") st.write(", ".join(sorted({name.title() for name in idx.keys()}))) with tabs[2]: st.subheader("Onboarding vs Closing β€” % Change") df1 = pd.DataFrame({ "Dimension": COLS, "Onboarding": [vec_onb.get(k) for k in COLS], "Closing": [vec_cls.get(k) for k in COLS], "% Change": [pct_onb_to_cls.get(k) for k in COLS], }) st.dataframe(df1.style.format({"Onboarding": "{:.2f}", "Closing": "{:.2f}", "% Change": "{:+.1f}%"}), use_container_width=True) st.subheader("Onboarding vs (Week2+Week3+Closing) β€” % Change") df2 = pd.DataFrame({ "Dimension": COLS, "Onboarding": [vec_onb.get(k) for k in COLS], "Weeks 2+3+Closing (combined)": [vec_combo.get(k) for k in COLS], "% Change": [pct_onb_to_combo.get(k) for k in COLS], }) st.dataframe(df2.style.format({"Onboarding": "{:.2f}", "Weeks 2+3+Closing (combined)": "{:.2f}", "% Change": "{:+.1f}%"}), use_container_width=True) # Optional bar chart: % change Onboarding -> Closing try: fig_delta = go.Figure() fig_delta.add_bar(x=COLS, y=[pct_onb_to_cls.get(k) if pct_onb_to_cls.get(k) is not None else 0 for k in COLS], name="%Ξ” Onbβ†’Closing") fig_delta.update_layout(title="% Change: Onboarding β†’ Closing", xaxis_title="Dimension", yaxis_title="% change", margin=dict(l=20, r=20, t=50, b=20)) st.plotly_chart(fig_delta, use_container_width=True) except Exception: pass # # app.py β€” Student Skill Radar (MongoDB, secrets-based, no CSV) # import os # from datetime import date # from typing import Dict, List # import numpy as np # import pandas as pd # import plotly.graph_objects as go # import streamlit as st # from pymongo import MongoClient # from urllib.parse import quote_plus # st.set_page_config(page_title="Student Skill Radar", layout="wide") # # ------------------- Constants ------------------- # SKILLS = [ # "Problem-Solving", # "Critical Thinking", # "Analytical Reasoning", # "Adaptability", # "Continuous Learning", # "Creativity", # "Communication", # "Collaboration", # "Community Engagement", # "Emotional Intelligence", # "Ethical Decision-Making", # "Time Management", # "Tech Aptitude", # ] # SKILL_GROUPS = { # "Problem-Solving, Critical Thinking, Analytical Reasoning": [ # "Problem-Solving", "Critical Thinking", "Analytical Reasoning" # ], # "Adaptability, Continuous Learning, Creativity": [ # "Adaptability", "Continuous Learning", "Creativity" # ], # "Time Management": ["Time Management"], # "Communication, Teamwork, Collaboration, Community Engagement": [ # "Communication", "Collaboration", "Community Engagement" # ], # "Emotional Intelligence, Ethical Decision Making": [ # "Emotional Intelligence", "Ethical Decision-Making" # ], # "Tech Aptitude": ["Tech Aptitude"], # } # # ------------------- Helpers ------------------- # def safe_mean(vals): # clean = [v for v in vals if v is not None and not pd.isna(v)] # return float(np.mean(clean)) if clean else np.nan # def to_01_or_nan(x): # try: # v = float(x) # except Exception: # return np.nan # if pd.isna(v): # return np.nan # return max(0.0, min(1.0, v)) # def aggregate_groups_row(row: pd.Series) -> Dict[str, float]: # return { # g: safe_mean([row.get(s, np.nan) for s in members]) # for g, members in SKILL_GROUPS.items() # } # def summarize(records: List[dict], level: str = "student") -> pd.DataFrame: # df = pd.DataFrame(records) if records else pd.DataFrame() # if df.empty: # return df # if level == "student+source": # df["label"] = df["student"].astype(str) + " β€” " + df["source"].astype(str) # else: # df["label"] = df["student"].astype(str) # # groupby mean skips NaNs by default # return df.groupby("label", dropna=False)[SKILLS].mean().reset_index() # def plot_radar(df: pd.DataFrame, grouped: bool, title: str): # if df.empty: # return go.Figure() # traces = [] # if grouped: # labels = list(SKILL_GROUPS.keys()) # for _, r in df.iterrows(): # grp = aggregate_groups_row(r) # values = [0.0 if pd.isna(grp[k]) else float(grp[k]) for k in labels] # traces.append(go.Scatterpolar( # r=values + [values[0]], # theta=labels + [labels[0]], # name=r["label"], # fill="toself", # )) # else: # labels = SKILLS # for _, r in df.iterrows(): # values = [] # for k in SKILLS: # v = r.get(k, np.nan) # values.append(0.0 if pd.isna(v) else float(v)) # traces.append(go.Scatterpolar( # r=values + [values[0]], # theta=labels + [labels[0]], # name=r["label"], # fill="toself", # )) # fig = go.Figure(traces) # fig.update_layout( # title=title or "Skill Radar", # showlegend=True, # polar=dict( # radialaxis=dict( # autorange=False, # range=[0, 1], # tick0=0, # dtick=0.2, # ticks="outside", # showline=True, # showgrid=True, # visible=True, # ) # ), # margin=dict(l=30, r=30, t=60, b=30), # ) # return fig # # ------------------- Mongo Access (secrets-only) ------------------- # def _get_secret(name: str) -> str | None: # try: # val = st.secrets.get(name) # if val is not None: # return str(val) # except Exception: # pass # return os.getenv(name) # def _build_uri(db_name: str | None) -> str | None: # user = _get_secret("MONGO_USER") # pw = _get_secret("MONGO_PASS") # cluster = _get_secret("MONGO_CLUSTER") # if not (user and pw and cluster): # return None # user_q = quote_plus(user) # pw_q = quote_plus(pw) # db_path = f"/{db_name}" if db_name else "" # return ( # f"mongodb+srv://{user_q}:{pw_q}@{cluster}{db_path}" # f"?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true" # ) # @st.cache_resource(show_spinner=False) # def _client(uri: str): # return MongoClient(uri, serverSelectionTimeoutMS=10000) # # @st.cache_data(show_spinner=False) # def mongo_distinct(uri: str, db: str, coll: str, field: str) -> List[str]: # if not uri: # return [] # try: # c = _client(uri) # vals = c[db][coll].distinct(field) # return sorted([v for v in vals if isinstance(v, str) and v.strip()]) # except Exception: # return [] # # @st.cache_data(show_spinner=False) # def mongo_records( # uri: str, # db: str, # coll: str, # student: str | None, # source: str | None, # start: str | None, # end: str | None, # ) -> List[dict]: # """Return flat rows with one column per skill; missing skills -> NaN (ignored in means).""" # if not uri: # return [] # q = {} # if student and student != "(All)": # q["student"] = student # if source and source != "(All)": # q["source"] = source # if start or end: # q["date"] = {} # if start: # q["date"]["$gte"] = start # if end: # q["date"]["$lte"] = end # try: # c = _client(uri) # proj = {"_id": 0, "student": 1, "source": 1, "date": 1, "skills": 1} # docs = list(c[db][coll].find(q, proj)) # rows = [] # for d in docs: # base = { # "student": str(d.get("student", "")), # "source": str(d.get("source", "")), # "date": str(d.get("date", "")), # } # sd = d.get("skills") or {} # for k in SKILLS: # base[k] = to_01_or_nan(sd.get(k, np.nan)) # rows.append(base) # return rows # except Exception: # return [] # # ------------------- UI ------------------- # st.title("πŸ“Š Student Skill Radar") # with st.sidebar: # st.subheader("MongoDB Settings") # db_name = st.text_input("Database name", value="student_skills") # coll_name = st.text_input("Collection name", value="responses_IFE_2025") # mongo_uri = _build_uri(db_name) # if not mongo_uri: # st.warning("Missing MONGO_USER, MONGO_PASS, or MONGO_CLUSTER in secrets/env.") # else: # try: # _client(mongo_uri).admin.command("ping") # st.success("Connected via secrets βœ…") # except Exception as e: # st.error(f"Mongo connection failed: {e}") # # Filters # students = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "student") if mongo_uri else []) # sources = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "source") if mongo_uri else []) # student_choice = st.selectbox("Select student", students) # source_choice = st.selectbox("Select source/week", sources) # c1, c2 = st.columns(2) # start_dt = c1.date_input("Start date", value=None) # end_dt = c2.date_input("End date", value=None) # agg_level = st.selectbox("Aggregation level", ["student", "student+source"], index=0) # grouped = st.toggle("Grouped skills (skill clusters)", value=True) # chart_title = st.text_input("Chart title", value="") # # Convert dates to strings (YYYY-MM-DD) # start_str = start_dt.strftime("%Y-%m-%d") if isinstance(start_dt, date) else None # end_str = end_dt.strftime("%Y-%m-%d") if isinstance(end_dt, date) else None # # Fetch + aggregate # records = mongo_records(mongo_uri, db_name, coll_name, student_choice, source_choice, start_str, end_str) if mongo_uri else [] # df = summarize(records, level=agg_level) if records else pd.DataFrame() # # ------------------- Output ------------------- # fig = plot_radar(df, grouped, chart_title) # st.plotly_chart(fig, use_container_width=True) # st.caption(f"{len(df)} line(s) aggregated." if not df.empty else "No data. Adjust filters or check Mongo connection.")