Spaces:
Sleeping
Sleeping
| # app.py — Student Skill Radar (MongoDB, secrets-based) | |
| import os | |
| from datetime import date | |
| from typing import Dict, List | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import streamlit as st | |
| from pymongo import MongoClient | |
| from urllib.parse import quote_plus | |
| st.set_page_config(page_title="Student Skill Radar", layout="wide") | |
| # ------------------- Constants ------------------- | |
| SKILLS = [ | |
| "Problem-Solving", "Critical Thinking", "Analytical Reasoning", | |
| "Adaptability", "Continuous Learning", "Creativity", | |
| "Communication", "Collaboration", "Community Engagement", | |
| "Emotional Intelligence", "Ethical Decision-Making", | |
| "Time Management", "Tech Aptitude" | |
| ] | |
| SKILL_GROUPS = { | |
| "Problem-Solving, Critical Thinking, Analytical Reasoning": [ | |
| "Problem-Solving", "Critical Thinking", "Analytical Reasoning" | |
| ], | |
| "Adaptability, Continuous Learning, Creativity": [ | |
| "Adaptability", "Continuous Learning", "Creativity" | |
| ], | |
| "Time Management": ["Time Management"], | |
| "Communication, Teamwork, Collaboration, Community Engagement": [ | |
| "Communication", "Collaboration", "Community Engagement" | |
| ], | |
| "Emotional Intelligence, Ethical Decision Making": [ | |
| "Emotional Intelligence", "Ethical Decision-Making" | |
| ], | |
| "Tech Aptitude": ["Tech Aptitude"] | |
| } | |
| SOURCE_TO_STAGE = { | |
| "onboarding_responses": "onboarding", | |
| "closing_responses": "closing", | |
| } | |
| # ------------------- Helpers ------------------- | |
| def safe_mean(vals): | |
| clean = [v for v in vals if v is not None and not pd.isna(v)] | |
| return float(np.mean(clean)) if clean else np.nan | |
| def to_01_or_nan(x): | |
| try: | |
| v = float(x) | |
| except Exception: | |
| return np.nan | |
| if pd.isna(v): | |
| return np.nan | |
| return max(0.0, min(1.0, v)) | |
| def aggregate_groups_row(row: pd.Series) -> Dict[str, float]: | |
| return { | |
| g: safe_mean([row.get(s, np.nan) for s in members]) | |
| for g, members in SKILL_GROUPS.items() | |
| } | |
| def df_to_grouped(df_in: pd.DataFrame) -> pd.DataFrame: | |
| if df_in.empty: | |
| return df_in | |
| rows = [] | |
| for _, r in df_in.iterrows(): | |
| grp = aggregate_groups_row(r) | |
| out = {"label": r["label"]} | |
| for glabel in SKILL_GROUPS.keys(): | |
| v = grp.get(glabel) | |
| out[glabel] = 0.0 if pd.isna(v) else float(v) | |
| rows.append(out) | |
| return pd.DataFrame(rows, columns=["label"] + list(SKILL_GROUPS.keys())) | |
| def plot_radar(df: pd.DataFrame, grouped: bool, title: str, avg_label: str = None): | |
| if df.empty: | |
| return go.Figure() | |
| traces = [] | |
| labels = list(SKILL_GROUPS.keys()) if grouped else SKILLS | |
| for _, r in df.iterrows(): | |
| values = [0.0 if pd.isna(r.get(k)) else float(r.get(k)) for k in labels] | |
| is_avg = avg_label and (str(r["label"]) == avg_label) | |
| traces.append(go.Scatterpolar( | |
| r=values + [values[0]], | |
| theta=labels + [labels[0]], | |
| name=r["label"], | |
| fill="toself", | |
| line=dict( | |
| width=4 if is_avg else 2, | |
| dash="dash" if is_avg else "solid", | |
| color="red" if is_avg else None | |
| ), | |
| opacity=0.7 if is_avg else 0.5 | |
| )) | |
| fig = go.Figure(traces) | |
| fig.update_layout( | |
| title=title or "Skill Radar", | |
| showlegend=True, | |
| polar=dict( | |
| radialaxis=dict( | |
| autorange=False, range=[0, 1], tick0=0, dtick=0.2, | |
| ticks="outside", showline=True, showgrid=True, visible=True | |
| ) | |
| ), | |
| margin=dict(l=30, r=30, t=60, b=30), | |
| ) | |
| return fig | |
| def _vector_from_row(row: pd.Series, cols: list[str]) -> dict: | |
| return {k: (None if pd.isna(row.get(k)) else float(row.get(k))) for k in cols} | |
| def _percent_change(new: float | None, old: float | None) -> float | None: | |
| if new is None or old is None: | |
| return None | |
| if old == 0: | |
| return None # avoid div-by-zero; you can choose to show 100% if new>0 | |
| return (new - old) / old * 100.0 | |
| def _merge_resp_and_likert_vector(resp_vec: dict, likert_grouped_vec: dict | None, grouped: bool, SKILL_TO_GROUPS: dict[str, list[str]], SKILL_GROUPS: dict[str, list[str]]) -> dict: | |
| """ | |
| Returns a merged vector: | |
| - If grouped: keys are group labels | |
| - If ungrouped: keys are per-skill; Likert (group) is projected to skills by averaging groups a skill belongs to | |
| """ | |
| if likert_grouped_vec is None: | |
| return resp_vec | |
| if grouped: | |
| out = {} | |
| for g in SKILL_GROUPS.keys(): | |
| rv = resp_vec.get(g, None) | |
| lv = likert_grouped_vec.get(g, None) | |
| if rv is not None and lv is not None: | |
| out[g] = (rv + lv) / 2.0 | |
| elif rv is not None: | |
| out[g] = rv | |
| else: | |
| out[g] = lv | |
| return out | |
| else: | |
| # project group likert to each skill | |
| out = {} | |
| for s in resp_vec.keys(): | |
| rv = resp_vec.get(s, None) | |
| groups = SKILL_TO_GROUPS.get(s, []) | |
| lik_vals = [likert_grouped_vec.get(g) for g in groups if likert_grouped_vec.get(g) is not None] | |
| lv = float(np.mean(lik_vals)) if lik_vals else None | |
| if rv is not None and lv is not None: | |
| out[s] = (rv + lv) / 2.0 | |
| elif rv is not None: | |
| out[s] = rv | |
| else: | |
| out[s] = lv | |
| return out | |
| # ------------------- Mongo ------------------- | |
| def _get_secret(name: str) -> str | None: | |
| try: | |
| val = st.secrets.get(name) | |
| if val is not None: | |
| return str(val) | |
| except Exception: | |
| pass | |
| return os.getenv(name) | |
| def _build_uri(db_name: str | None) -> str | None: | |
| user = _get_secret("MONGO_USER") | |
| pw = _get_secret("MONGO_PASS") | |
| cluster = _get_secret("MONGO_CLUSTER") | |
| if not (user and pw and cluster): | |
| return None | |
| return f"mongodb+srv://{quote_plus(user)}:{quote_plus(pw)}@{cluster}/{db_name}?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true" | |
| def _client(uri: str): | |
| return MongoClient(uri, serverSelectionTimeoutMS=10000) | |
| def mongo_distinct(uri: str, db: str, coll: str, field: str) -> List[str]: | |
| if not uri: | |
| return [] | |
| try: | |
| return sorted([v for v in _client(uri)[db][coll].distinct(field) if isinstance(v, str) and v.strip()]) | |
| except Exception: | |
| return [] | |
| def mongo_records(uri: str, db: str, coll: str, student: str | None, source: str | None) -> List[dict]: | |
| if not uri: | |
| return [] | |
| q = {} | |
| if student and student != "(All)": | |
| q["student"] = student | |
| if source and source != "(All)": | |
| q["source"] = source | |
| try: | |
| docs = list(_client(uri)[db][coll].find(q, {"_id": 0, "student": 1, "source": 1, "skills": 1})) | |
| rows = [] | |
| for d in docs: | |
| base = {"student": str(d.get("student", "")), "source": str(d.get("source", ""))} | |
| for k in SKILLS: | |
| base[k] = to_01_or_nan((d.get("skills") or {}).get(k, np.nan)) | |
| rows.append(base) | |
| return rows | |
| except Exception: | |
| return [] | |
| # ---------- Likert helpers ---------- | |
| def _norm_01(v): | |
| try: | |
| return max(0.0, min(1.0, float(v) / 5.0 if float(v) > 1 else float(v))) | |
| except Exception: | |
| return None | |
| def mongo_get_likert_grouped(uri: str, db: str, coll: str, student: str, stage: str) -> dict: | |
| if not (uri and student and stage): | |
| return {} | |
| try: | |
| doc = _client(uri)[db][coll].find_one({"student_name": student, "stage": stage}, {"_id": 0, "average_skill_scores": 1}) | |
| avg = (doc or {}).get("average_skill_scores") or {} | |
| return {g: _norm_01(avg.get(g)) for g in SKILL_GROUPS.keys()} | |
| except Exception: | |
| return {} | |
| # ---- Analyses (Markdown) helpers ---- | |
| ANALYSES_DIR = os.getenv("ANALYSES_DIR", "student_analyses") # folder in your HF Space | |
| def _normalize_name(s: str) -> str: | |
| # Lower, remove non-alphanumerics, collapse spaces/underscores | |
| import re, unicodedata | |
| s = unicodedata.normalize("NFKC", s or "").strip().lower() | |
| s = re.sub(r"[^\w\s]", "", s) | |
| s = re.sub(r"[\s_]+", " ", s).strip() | |
| return s | |
| def _build_analysis_index(analyses_dir: str) -> dict: | |
| """Return dict: normalized_name -> file_path for *.md under analyses_dir.""" | |
| import os, glob | |
| index = {} | |
| if not os.path.isdir(analyses_dir): | |
| return index | |
| for path in glob.glob(os.path.join(analyses_dir, "*.md")): | |
| base = os.path.splitext(os.path.basename(path))[0] # "Student_Name" | |
| # accept both "Student Name" and "Student_Name" as same | |
| norm = _normalize_name(base.replace("_", " ")) | |
| index[norm] = path | |
| return index | |
| def _load_markdown(path: str) -> str: | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except Exception: | |
| return "" | |
| # ------------------- UI ------------------- | |
| st.title("📊 Student Skill Radar") | |
| with st.sidebar: | |
| db_name = st.text_input("Database name", value="student_skills") | |
| coll_name = st.text_input("Collection name", value="responses_IFE_2025") | |
| summaries_coll = st.text_input("Likert summaries collection", value="likert_summaries_IFE_2025") | |
| mongo_uri = _build_uri(db_name) | |
| students = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "student") if mongo_uri else []) | |
| sources = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "source") if mongo_uri else []) | |
| student_choice = st.selectbox("Select student", students) | |
| source_choice = st.selectbox("Select source/week", sources) | |
| # start_dt = st.date_input("Start date", value=None) | |
| # end_dt = st.date_input("End date", value=None) | |
| grouped = st.toggle("Grouped skills", value=True) | |
| overlay_sources = st.toggle("Overlay all sources when '(All)' selected", value=False) | |
| chart_title = st.text_input("Chart title", value="") | |
| # start_str = start_dt.strftime("%Y-%m-%d") if isinstance(start_dt, date) else None | |
| # end_str = end_dt.strftime("%Y-%m-%d") if isinstance(end_dt, date) else None | |
| # ------------------- Fetch + merge ------------------- | |
| records = mongo_records(mongo_uri, db_name, coll_name, student_choice, source_choice) if mongo_uri else [] | |
| df_raw = pd.DataFrame(records) if records else pd.DataFrame() | |
| if not df_raw.empty: | |
| df_raw["label"] = df_raw["student"].astype(str) + " — " + df_raw["source"].astype(str) | |
| df_resp = df_raw.groupby("label", dropna=False)[SKILLS].mean().reset_index() | |
| if grouped: | |
| df_resp = df_to_grouped(df_resp) | |
| else: | |
| df_resp = pd.DataFrame() | |
| # ---------- Merge Likert scores (works for grouped and ungrouped) ---------- | |
| from statistics import mean | |
| # map each skill to the group(s) it belongs to (almost always one) | |
| SKILL_TO_GROUPS = {s: [g for g, members in SKILL_GROUPS.items() if s in members] for s in SKILLS} | |
| def _likert_for_skill(likert_grouped: dict, skill: str) -> float | None: | |
| groups = SKILL_TO_GROUPS.get(skill, []) | |
| vals = [likert_grouped.get(g) for g in groups if likert_grouped.get(g) is not None] | |
| return mean(vals) if vals else None | |
| if not df_resp.empty and summaries_coll: | |
| merged_rows = [] | |
| # choose which columns we're merging | |
| cols = list(SKILL_GROUPS.keys()) if grouped else SKILLS | |
| for _, r in df_resp.iterrows(): | |
| label = str(r["label"]) | |
| student, stage = label.split(" — ", 1) if " — " in label else (label, None) | |
| stage = SOURCE_TO_STAGE.get(stage.strip()) if stage else None | |
| # only onboarding/closing have Likert summaries | |
| likert_g = ( | |
| mongo_get_likert_grouped(mongo_uri, db_name, summaries_coll, student.strip(), stage) | |
| if stage in ("onboarding", "closing") else {} | |
| ) | |
| out = {"label": label} | |
| if grouped: | |
| # combine grouped columns directly | |
| for g in SKILL_GROUPS.keys(): | |
| resp_val = None if pd.isna(r.get(g)) else float(r.get(g)) | |
| likert_val = likert_g.get(g, None) | |
| if resp_val is not None and likert_val is not None: | |
| out[g] = (resp_val + likert_val) / 2.0 | |
| elif resp_val is not None: | |
| out[g] = resp_val | |
| elif likert_val is not None: | |
| out[g] = likert_val | |
| else: | |
| out[g] = np.nan | |
| else: | |
| # map group Likert down to each skill, then combine | |
| for s in SKILLS: | |
| resp_val = None if pd.isna(r.get(s)) else float(r.get(s)) | |
| likert_val = _likert_for_skill(likert_g, s) | |
| if resp_val is not None and likert_val is not None: | |
| out[s] = (resp_val + likert_val) / 2.0 | |
| elif resp_val is not None: | |
| out[s] = resp_val | |
| elif likert_val is not None: | |
| out[s] = likert_val | |
| else: | |
| out[s] = np.nan | |
| merged_rows.append(out) | |
| df_final = pd.DataFrame(merged_rows, columns=["label"] + cols) | |
| else: | |
| df_final = df_resp | |
| # Overlay mode | |
| # if grouped and not df_final.empty and source_choice == "(All)" and not overlay_sources: | |
| # df_final["_student"] = df_final["label"].apply(lambda s: s.split(" — ", 1)[0]) | |
| # df_final = df_final.groupby("_student", dropna=False)[list(SKILL_GROUPS.keys())].mean().reset_index() | |
| # df_final = df_final.rename(columns={"_student": "label"}) | |
| # ---------------- Overlay vs Combine ---------------- | |
| if not df_final.empty and source_choice == "(All)": | |
| if overlay_sources: | |
| # Overlay ON → keep one line per source (do nothing) | |
| pass | |
| else: | |
| # Overlay OFF → combine all sources into one line per student | |
| df_final["_student"] = df_final["label"].apply(lambda s: s.split(" — ", 1)[0]) | |
| if grouped: | |
| cols = list(SKILL_GROUPS.keys()) | |
| else: | |
| cols = SKILLS | |
| df_final = ( | |
| df_final | |
| .groupby("_student", dropna=False)[cols] | |
| .mean() | |
| .reset_index() | |
| .rename(columns={"_student": "label"}) | |
| ) | |
| # ------------------- Output ------------------- | |
| # fig = plot_radar(df_final, grouped, chart_title) | |
| # st.plotly_chart(fig, use_container_width=True) | |
| # ============== Build per-stage vectors for comparisons (LIKERT-AWARE) ============== | |
| # Columns to use based on mode | |
| COLS = list(SKILL_GROUPS.keys()) if grouped else SKILLS | |
| # Map each skill to its group(s) once (used to project group Likert down to skills) | |
| SKILL_TO_GROUPS = {s: [g for g, members in SKILL_GROUPS.items() if s in members] for s in SKILLS} | |
| def _project_likert_to_cols(likert_grouped: dict | None, cols: list[str], grouped_flag: bool) -> dict: | |
| """Return a vector aligned to COLS from group-level Likert. If ungrouped, project to skills.""" | |
| if not likert_grouped: | |
| return {k: None for k in cols} | |
| if grouped_flag: | |
| return {k: (likert_grouped.get(k) if k in likert_grouped else None) for k in cols} | |
| # ungrouped → average the groups a skill belongs to | |
| out = {} | |
| for s in cols: | |
| gs = SKILL_TO_GROUPS.get(s, []) | |
| vals = [likert_grouped.get(g) for g in gs if likert_grouped.get(g) is not None] | |
| out[s] = float(np.mean(vals)) if vals else None | |
| return out | |
| def _merge_resp_and_likert(resp_vec: dict, likert_vec: dict) -> dict: | |
| """Average where both exist; else take whichever exists.""" | |
| out = {} | |
| for k in resp_vec.keys(): | |
| rv = resp_vec.get(k, None) | |
| lv = likert_vec.get(k, None) | |
| if rv is not None and lv is not None: | |
| out[k] = (rv + lv) / 2.0 | |
| elif rv is not None: | |
| out[k] = rv | |
| else: | |
| out[k] = lv | |
| return out | |
| def _mean_vectors(vecs: list[dict]) -> dict: | |
| """Element-wise mean ignoring None; returns None if all Nones for a key.""" | |
| if not vecs: | |
| return {} | |
| keys = list(vecs[0].keys()) | |
| out = {} | |
| for k in keys: | |
| vals = [v.get(k) for v in vecs if v.get(k) is not None] | |
| out[k] = (float(np.mean(vals)) if vals else None) | |
| return out | |
| def _resp_mean_for_sources(df_src: pd.DataFrame, student: str | None, sources: list[str], cols: list[str]) -> dict: | |
| """Mean of response scores across docs for (student,sources). If student None → cohort.""" | |
| if df_src.empty: | |
| return {k: None for k in cols} | |
| sub = df_src.copy() | |
| if student: | |
| sub = sub[sub["student"] == student] | |
| sub = sub[sub["source"].isin(sources)] | |
| if sub.empty: | |
| return {k: None for k in cols} | |
| m = sub[cols].mean(numeric_only=True) | |
| return {k: (None if pd.isna(m.get(k)) else float(m.get(k))) for k in cols} | |
| def _likert_grouped_for_student_stage(student: str, stage: str) -> dict | None: | |
| """Get normalized (0–1) group-level Likert for onboarding/closing only.""" | |
| if stage not in ("onboarding", "closing"): | |
| return None | |
| lg = mongo_get_likert_grouped(mongo_uri, db_name, summaries_coll, student, stage) | |
| return lg if lg else None | |
| def _student_stage_vectors(df_src: pd.DataFrame, stu: str, cols: list[str], grouped_flag: bool) -> dict: | |
| """Per-student vectors with Likert merged for onboarding/closing; combined includes closing(merged).""" | |
| # Onboarding = RESP(onboarding) ⊕ Likert(onboarding) | |
| onb_resp = _resp_mean_for_sources(df_src, stu, ["onboarding_responses"], cols) | |
| onb_lik = _project_likert_to_cols(_likert_grouped_for_student_stage(stu, "onboarding"), cols, grouped_flag) | |
| onb = _merge_resp_and_likert(onb_resp, onb_lik) | |
| # Closing = RESP(closing) ⊕ Likert(closing) | |
| cls_resp = _resp_mean_for_sources(df_src, stu, ["closing_responses"], cols) | |
| cls_lik = _project_likert_to_cols(_likert_grouped_for_student_stage(stu, "closing"), cols, grouped_flag) | |
| cls = _merge_resp_and_likert(cls_resp, cls_lik) | |
| # Combined = mean( RESP(week2), RESP(week3), CLOSING(merged) ) | |
| w2 = _resp_mean_for_sources(df_src, stu, ["week_2_responses"], cols) | |
| w3 = _resp_mean_for_sources(df_src, stu, ["week_3_responses"], cols) | |
| combo = _mean_vectors([w2, w3, cls]) # <- note: closing already merged with Likert | |
| return {"onboarding": onb, "closing": cls, "combined": combo} | |
| def _stage_vectors_for_current_selection(df_src: pd.DataFrame, student_choice: str | None, cols: list[str], grouped_flag: bool) -> dict: | |
| """ | |
| If a student is selected → return their vectors. | |
| If cohort (“(All)”) → average per-student vectors (Likert included where available). | |
| """ | |
| if student_choice and student_choice != "(All)": | |
| return _student_stage_vectors(df_src, student_choice, cols, grouped_flag) | |
| # Cohort: compute for each student then average | |
| if df_src.empty: | |
| empty_vec = {k: None for k in cols} | |
| return {"onboarding": empty_vec, "closing": empty_vec, "combined": empty_vec} | |
| students = sorted(set(str(x) for x in df_src["student"].dropna().unique())) | |
| per_student = [_student_stage_vectors(df_src, s, cols, grouped_flag) for s in students] | |
| return { | |
| "onboarding": _mean_vectors([p["onboarding"] for p in per_student]), | |
| "closing": _mean_vectors([p["closing"] for p in per_student]), | |
| "combined": _mean_vectors([p["combined"] for p in per_student]), | |
| } | |
| def _percent_change(new: float | None, old: float | None) -> float | None: | |
| if new is None or old is None: | |
| return None | |
| if old == 0: | |
| return None # or return 100.0 if you prefer | |
| return (new - old) / old * 100.0 | |
| # Use df_raw (one row per doc) so overlay/aggregation doesn’t hide sources | |
| # Ensure df_raw has the per-skill or per-group columns we need: | |
| if grouped and not df_raw.empty: | |
| # build grouped view just for comparisons | |
| df_grouped_for_comp = df_raw.copy() | |
| # aggregate per-doc row to grouped columns | |
| df_grouped_for_comp = ( | |
| df_grouped_for_comp | |
| .assign(**{ | |
| g: df_grouped_for_comp.apply(lambda r: safe_mean([r.get(s, np.nan) for s in SKILL_GROUPS[g]]), axis=1) | |
| for g in SKILL_GROUPS.keys() | |
| }) | |
| ) | |
| df_src_for_comp = df_grouped_for_comp[["student", "source"] + list(SKILL_GROUPS.keys())] | |
| else: | |
| df_src_for_comp = df_raw # already per-skill | |
| stage_vecs = _stage_vectors_for_current_selection(df_src_for_comp, student_choice, COLS, grouped) | |
| vec_onb = stage_vecs["onboarding"] | |
| vec_cls = stage_vecs["closing"] | |
| vec_combo = stage_vecs["combined"] | |
| pct_onb_to_cls = {k: _percent_change(vec_cls.get(k), vec_onb.get(k)) for k in COLS} | |
| pct_onb_to_combo = {k: _percent_change(vec_combo.get(k), vec_onb.get(k)) for k in COLS} | |
| # ------------------- Plot + table above stays the same ------------------- | |
| df_plot = df_final.copy() | |
| avg_label = None | |
| if not df_plot.empty: | |
| cols = list(SKILL_GROUPS.keys()) if grouped else SKILLS | |
| show_cohort_avg = st.toggle("Show cohort average (all students)", value=True) | |
| if show_cohort_avg: | |
| avg_vals = df_plot[cols].mean() | |
| avg_row = {"label": "Average (All Students)"} | |
| avg_row.update({k: float(avg_vals[k]) for k in cols}) | |
| df_plot = pd.concat([df_plot, pd.DataFrame([avg_row])], ignore_index=True) | |
| avg_label = "Average (All Students)" | |
| fig = plot_radar(df_plot, grouped, chart_title, avg_label=avg_label) | |
| st.plotly_chart(fig, use_container_width=True) | |
| st.caption(f"{len(df_final)} line(s) aggregated." if not df_final.empty else "No data.") | |
| # ================== Dynamic Stage Summaries (only if student answered that week) ================== | |
| import re | |
| import unicodedata | |
| from collections import Counter | |
| from difflib import SequenceMatcher | |
| import math | |
| # Stage <-> Source mapping | |
| STAGE_TO_SOURCE = { | |
| "onboarding": "onboarding_responses", | |
| "week_2": "week_2_responses", | |
| "week_3": "week_3_responses", | |
| "closing": "closing_responses", # future-proof | |
| } | |
| SOURCE_TO_STAGE = {v: k for k, v in STAGE_TO_SOURCE.items()} | |
| def _answer_total_score(resp: dict) -> float: | |
| skills = resp.get("skills") or {} | |
| total = 0.0 | |
| for v in skills.values(): | |
| try: | |
| total += float(v) | |
| except Exception: | |
| pass | |
| return total | |
| def _responses_for_student_stage(uri, db, responses_coll, student: str, stage: str) -> list[dict]: | |
| """Return responses for a student at a stage (mapped to source) with non-empty answers.""" | |
| if not (uri and student and stage): | |
| return [] | |
| src = STAGE_TO_SOURCE.get(stage) | |
| if not src: | |
| return [] | |
| try: | |
| c = _client(uri) | |
| docs = list(c[db][responses_coll].find( | |
| {"student": student, "source": src}, | |
| {"_id": 0, "answer": 1, "skills": 1} | |
| )) | |
| # keep only responses with a non-empty answer | |
| return [d for d in docs if (d.get("answer") or "").strip()] | |
| except Exception: | |
| return [] | |
| def _normalize_quotes_spaces(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = unicodedata.normalize("NFKC", s) | |
| s = s.replace("…", "...") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _clean_tokens(s: str) -> list[str]: | |
| s = _normalize_quotes_spaces(s).lower() | |
| # keep letters/digits/spaces; drop punctuation | |
| s = re.sub(r"[^\w\s]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s.split() | |
| def _vectorize(tokens: list[str]) -> Counter: | |
| return Counter(tokens) | |
| def _cosine_sim(a: Counter, b: Counter) -> float: | |
| if not a or not b: | |
| return 0.0 | |
| # dot | |
| dot = sum(a[k] * b.get(k, 0) for k in a) | |
| # norms | |
| na = math.sqrt(sum(v*v for v in a.values())) | |
| nb = math.sqrt(sum(v*v for v in b.values())) | |
| if na == 0.0 or nb == 0.0: | |
| return 0.0 | |
| return dot / (na * nb) | |
| def _seq_ratio(a: str, b: str) -> float: | |
| # SequenceMatcher returns 0..1 | |
| return SequenceMatcher(None, a, b).ratio() | |
| def _best_full_answer_for_quote(q: str, responses: list[dict]) -> str | None: | |
| """ | |
| Return the best-matching full answer for a (possibly truncated/middle) quote. | |
| Uses semantic similarity: 0.6*cosine(token) + 0.4*SequenceMatcher. | |
| If multiple tie, picks the one with HIGHEST total skill score. | |
| """ | |
| q_norm = _normalize_quotes_spaces(q) | |
| q_clean = _normalize_quotes_spaces(q).lower() | |
| q_tokens = _clean_tokens(q_norm) | |
| q_vec = _vectorize(q_tokens) | |
| best = None # (combined_score, skill_total, full_answer) | |
| for r in responses: | |
| full = (r.get("answer") or "").strip() | |
| if not full: | |
| continue | |
| full_norm = _normalize_quotes_spaces(full) | |
| full_clean = full_norm.lower() | |
| full_tokens = _clean_tokens(full_norm) | |
| full_vec = _vectorize(full_tokens) | |
| cos = _cosine_sim(q_vec, full_vec) | |
| seq = _seq_ratio(q_clean, full_clean) | |
| combined = 0.6 * cos + 0.4 * seq | |
| # small boost if the normalized quote substring appears (cheap heuristic) | |
| if q_clean and q_clean in full_clean: | |
| combined += 0.05 | |
| # compute skill total for tie-break | |
| skills = r.get("skills") or {} | |
| skill_total = 0.0 | |
| for v in skills.values(): | |
| try: | |
| skill_total += float(v) | |
| except Exception: | |
| pass | |
| cand = (combined, skill_total, full) | |
| if (best is None) or (cand[0] > best[0]) or (cand[0] == best[0] and cand[1] > best[1]): | |
| best = cand | |
| # Threshold so we don't replace with a bad match; tweak 0.45–0.65 as needed | |
| if best and best[0] >= 0.5: | |
| return best[2] | |
| return None | |
| def _fix_cutoff_quotes(quotes: list[str], responses: list[dict]) -> list[str]: | |
| """ | |
| Replace truncated/middle quotes with the best-matching full answer from `responses` | |
| (already filtered to student+stage). If no decent semantic match, keep original. | |
| """ | |
| if not quotes: | |
| return [] | |
| out = [] | |
| for q in quotes: | |
| q_raw = (q or "").strip() | |
| if not q_raw: | |
| continue | |
| # If it looks truncated (ellipsis) OR is short, try semantic match | |
| looks_truncated = ("..." in q_raw) or (len(q_raw) < 100) | |
| if looks_truncated: | |
| full = _best_full_answer_for_quote(q_raw, responses) | |
| out.append(full if full else q_raw) | |
| else: | |
| out.append(q_raw) | |
| return out | |
| def _top3_answers_by_skill_sum(responses: list[dict]) -> list[str]: | |
| """Pick up to 3 answers with the highest total skill score.""" | |
| scored = [] | |
| for r in responses: | |
| ans = (r.get("answer") or "").strip() | |
| if not ans: | |
| continue | |
| total = _answer_total_score(r) | |
| scored.append((total, ans)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| return [ans for _, ans in scored[:3]] | |
| def fetch_student_stage_summary( | |
| uri: str, | |
| db: str, | |
| summaries_coll: str, | |
| responses_coll: str, | |
| student: str, | |
| stage: str | |
| ): | |
| """ | |
| Return summary dict for a student+stage ONLY if the student has responses for that week. | |
| Otherwise, return None (so we don't render the panel). | |
| """ | |
| # 1) Require that the student answered that week (source derived from stage) | |
| responses = _responses_for_student_stage(uri, db, responses_coll, student, stage) | |
| if not responses: | |
| return None | |
| # 2) Pull summary doc (patterns nested) | |
| patterns = {} | |
| top_strengths = [] | |
| notable_quotes = [] | |
| try: | |
| c = _client(uri) | |
| doc = c[db][summaries_coll].find_one( | |
| {"student_name": student, "stage": stage}, | |
| {"_id": 0, "patterns": 1, "top_strengths": 1, "notable_quotes": 1} | |
| ) or {} | |
| patterns = doc.get("patterns") or {} | |
| top_strengths = doc.get("top_strengths") or [] | |
| notable_quotes = doc.get("notable_quotes") or [] | |
| except Exception: | |
| pass | |
| most_consistent = patterns.get("most_consistent") | |
| most_developed = patterns.get("most_developed") | |
| # 3) Repair cut-off quotes; if none after fixing, fallback to top 3 highest-scoring answers | |
| notable_quotes = _fix_cutoff_quotes(notable_quotes, responses) | |
| if not notable_quotes: | |
| notable_quotes = _top3_answers_by_skill_sum(responses) | |
| return { | |
| "most_consistent": most_consistent, | |
| "most_developed": most_developed, | |
| "top_strengths": top_strengths, | |
| "notable_quotes": notable_quotes, | |
| } | |
| # # ------------------- Output (Tabs) ------------------- | |
| # tab_summary, tab_analyses, tab_compare = st.tabs(["📈 Summary", "📝 Analyses","📊 Comparisons"]) | |
| tabs = st.tabs(["📈 Summary", "📝 Analyses", "📊 Comparisons"]) | |
| with tabs[0]: | |
| # ---------- Render the summary panel dynamically ---------- | |
| if mongo_uri and student_choice != "(All)" and source_choice != "(All)": | |
| stage = SOURCE_TO_STAGE.get(source_choice.strip()) | |
| if stage: | |
| # set to your actual summaries collection name | |
| summaries_coll_name = "summaries_IFE_2025" | |
| summary = fetch_student_stage_summary( | |
| mongo_uri, db_name, summaries_coll_name, coll_name, | |
| student=student_choice, stage=stage | |
| ) | |
| if summary: | |
| st.markdown("---") | |
| st.subheader(f"Summary — {student_choice} ({stage.replace('_', ' ').title()})") | |
| c1, c2 = st.columns(2) | |
| with c1: | |
| st.markdown(f"**Most Consistent:** {summary.get('most_consistent') or '—'}") | |
| st.markdown(f"**Most Developed:** {summary.get('most_developed') or '—'}") | |
| with c2: | |
| strengths = summary.get("top_strengths") or [] | |
| st.markdown("**Top Strengths:** " + (", ".join(strengths) if strengths else "—")) | |
| st.markdown("**Notable Quotes:**") | |
| for q in (summary.get("notable_quotes") or [])[:3]: | |
| st.markdown(f"> {q}") | |
| with tabs[1]: | |
| st.subheader("Student Analysis") | |
| # Use the folder you defined at top (ANALYSES_DIR), or expose it in the sidebar if you prefer. | |
| idx = _build_analysis_index(ANALYSES_DIR) | |
| if student_choice == "(All)": | |
| st.info("Pick a specific student on the left to view their analysis.") | |
| # (Optional) show what's available so you can browse: | |
| if idx: | |
| st.caption("Available analyses:") | |
| st.write(", ".join(sorted({name.title() for name in idx.keys()}))) | |
| file_path="full_class_summary.md" | |
| full_summary=_load_markdown(file_path) | |
| if full_summary.strip(): | |
| st.markdown(full_summary, unsafe_allow_html=False) | |
| # Optional download button | |
| with open(file_path, "rb") as f: | |
| st.download_button( | |
| "Download analysis (.md)", f, | |
| file_name=os.path.basename(file_path), mime="text/markdown" | |
| ) | |
| else: | |
| st.warning("Analysis file found but empty.") | |
| else: | |
| # Normalize the selected student name to match filenames | |
| norm = _normalize_name(student_choice) | |
| path = idx.get(norm) | |
| # If exact match not found, try simple underscore variant | |
| if not path: | |
| alt = student_choice.replace(" ", "_") | |
| path = idx.get(_normalize_name(alt)) | |
| if path: | |
| md = _load_markdown(path) | |
| if md.strip(): | |
| st.markdown(md, unsafe_allow_html=False) | |
| system = '''### 🔵🔵 Skill Indicator System | |
| | Symbol | Meaning | | |
| |---------|----------------------------------------------| | |
| | 🔵 | Clear evidence of the skill that week | | |
| | 🔵🔵 | Strong or standout performance that week | | |
| | ⚪⚪ | Little to no evidence for that skill that week| | |
| ''' | |
| st.markdown(system) | |
| # Optional download button | |
| with open(path, "rb") as f: | |
| st.download_button( | |
| "Download analysis (.md)", f, | |
| file_name=os.path.basename(path), mime="text/markdown" | |
| ) | |
| else: | |
| st.warning("Analysis file found but empty.") | |
| else: | |
| st.warning(f"No analysis found for **{student_choice}** in `{ANALYSES_DIR}` yet.") | |
| if idx: | |
| st.caption("Available analyses:") | |
| st.write(", ".join(sorted({name.title() for name in idx.keys()}))) | |
| with tabs[2]: | |
| st.subheader("Onboarding vs Closing — % Change") | |
| df1 = pd.DataFrame({ | |
| "Dimension": COLS, | |
| "Onboarding": [vec_onb.get(k) for k in COLS], | |
| "Closing": [vec_cls.get(k) for k in COLS], | |
| "% Change": [pct_onb_to_cls.get(k) for k in COLS], | |
| }) | |
| st.dataframe(df1.style.format({"Onboarding": "{:.2f}", "Closing": "{:.2f}", "% Change": "{:+.1f}%"}), use_container_width=True) | |
| st.subheader("Onboarding vs (Week2+Week3+Closing) — % Change") | |
| df2 = pd.DataFrame({ | |
| "Dimension": COLS, | |
| "Onboarding": [vec_onb.get(k) for k in COLS], | |
| "Weeks 2+3+Closing (combined)": [vec_combo.get(k) for k in COLS], | |
| "% Change": [pct_onb_to_combo.get(k) for k in COLS], | |
| }) | |
| st.dataframe(df2.style.format({"Onboarding": "{:.2f}", "Weeks 2+3+Closing (combined)": "{:.2f}", "% Change": "{:+.1f}%"}), use_container_width=True) | |
| # Optional bar chart: % change Onboarding -> Closing | |
| try: | |
| fig_delta = go.Figure() | |
| fig_delta.add_bar(x=COLS, y=[pct_onb_to_cls.get(k) if pct_onb_to_cls.get(k) is not None else 0 for k in COLS], name="%Δ Onb→Closing") | |
| fig_delta.update_layout(title="% Change: Onboarding → Closing", xaxis_title="Dimension", yaxis_title="% change", margin=dict(l=20, r=20, t=50, b=20)) | |
| st.plotly_chart(fig_delta, use_container_width=True) | |
| except Exception: | |
| pass | |
| # # app.py — Student Skill Radar (MongoDB, secrets-based, no CSV) | |
| # import os | |
| # from datetime import date | |
| # from typing import Dict, List | |
| # import numpy as np | |
| # import pandas as pd | |
| # import plotly.graph_objects as go | |
| # import streamlit as st | |
| # from pymongo import MongoClient | |
| # from urllib.parse import quote_plus | |
| # st.set_page_config(page_title="Student Skill Radar", layout="wide") | |
| # # ------------------- Constants ------------------- | |
| # SKILLS = [ | |
| # "Problem-Solving", | |
| # "Critical Thinking", | |
| # "Analytical Reasoning", | |
| # "Adaptability", | |
| # "Continuous Learning", | |
| # "Creativity", | |
| # "Communication", | |
| # "Collaboration", | |
| # "Community Engagement", | |
| # "Emotional Intelligence", | |
| # "Ethical Decision-Making", | |
| # "Time Management", | |
| # "Tech Aptitude", | |
| # ] | |
| # SKILL_GROUPS = { | |
| # "Problem-Solving, Critical Thinking, Analytical Reasoning": [ | |
| # "Problem-Solving", "Critical Thinking", "Analytical Reasoning" | |
| # ], | |
| # "Adaptability, Continuous Learning, Creativity": [ | |
| # "Adaptability", "Continuous Learning", "Creativity" | |
| # ], | |
| # "Time Management": ["Time Management"], | |
| # "Communication, Teamwork, Collaboration, Community Engagement": [ | |
| # "Communication", "Collaboration", "Community Engagement" | |
| # ], | |
| # "Emotional Intelligence, Ethical Decision Making": [ | |
| # "Emotional Intelligence", "Ethical Decision-Making" | |
| # ], | |
| # "Tech Aptitude": ["Tech Aptitude"], | |
| # } | |
| # # ------------------- Helpers ------------------- | |
| # def safe_mean(vals): | |
| # clean = [v for v in vals if v is not None and not pd.isna(v)] | |
| # return float(np.mean(clean)) if clean else np.nan | |
| # def to_01_or_nan(x): | |
| # try: | |
| # v = float(x) | |
| # except Exception: | |
| # return np.nan | |
| # if pd.isna(v): | |
| # return np.nan | |
| # return max(0.0, min(1.0, v)) | |
| # def aggregate_groups_row(row: pd.Series) -> Dict[str, float]: | |
| # return { | |
| # g: safe_mean([row.get(s, np.nan) for s in members]) | |
| # for g, members in SKILL_GROUPS.items() | |
| # } | |
| # def summarize(records: List[dict], level: str = "student") -> pd.DataFrame: | |
| # df = pd.DataFrame(records) if records else pd.DataFrame() | |
| # if df.empty: | |
| # return df | |
| # if level == "student+source": | |
| # df["label"] = df["student"].astype(str) + " — " + df["source"].astype(str) | |
| # else: | |
| # df["label"] = df["student"].astype(str) | |
| # # groupby mean skips NaNs by default | |
| # return df.groupby("label", dropna=False)[SKILLS].mean().reset_index() | |
| # def plot_radar(df: pd.DataFrame, grouped: bool, title: str): | |
| # if df.empty: | |
| # return go.Figure() | |
| # traces = [] | |
| # if grouped: | |
| # labels = list(SKILL_GROUPS.keys()) | |
| # for _, r in df.iterrows(): | |
| # grp = aggregate_groups_row(r) | |
| # values = [0.0 if pd.isna(grp[k]) else float(grp[k]) for k in labels] | |
| # traces.append(go.Scatterpolar( | |
| # r=values + [values[0]], | |
| # theta=labels + [labels[0]], | |
| # name=r["label"], | |
| # fill="toself", | |
| # )) | |
| # else: | |
| # labels = SKILLS | |
| # for _, r in df.iterrows(): | |
| # values = [] | |
| # for k in SKILLS: | |
| # v = r.get(k, np.nan) | |
| # values.append(0.0 if pd.isna(v) else float(v)) | |
| # traces.append(go.Scatterpolar( | |
| # r=values + [values[0]], | |
| # theta=labels + [labels[0]], | |
| # name=r["label"], | |
| # fill="toself", | |
| # )) | |
| # fig = go.Figure(traces) | |
| # fig.update_layout( | |
| # title=title or "Skill Radar", | |
| # showlegend=True, | |
| # polar=dict( | |
| # radialaxis=dict( | |
| # autorange=False, | |
| # range=[0, 1], | |
| # tick0=0, | |
| # dtick=0.2, | |
| # ticks="outside", | |
| # showline=True, | |
| # showgrid=True, | |
| # visible=True, | |
| # ) | |
| # ), | |
| # margin=dict(l=30, r=30, t=60, b=30), | |
| # ) | |
| # return fig | |
| # # ------------------- Mongo Access (secrets-only) ------------------- | |
| # def _get_secret(name: str) -> str | None: | |
| # try: | |
| # val = st.secrets.get(name) | |
| # if val is not None: | |
| # return str(val) | |
| # except Exception: | |
| # pass | |
| # return os.getenv(name) | |
| # def _build_uri(db_name: str | None) -> str | None: | |
| # user = _get_secret("MONGO_USER") | |
| # pw = _get_secret("MONGO_PASS") | |
| # cluster = _get_secret("MONGO_CLUSTER") | |
| # if not (user and pw and cluster): | |
| # return None | |
| # user_q = quote_plus(user) | |
| # pw_q = quote_plus(pw) | |
| # db_path = f"/{db_name}" if db_name else "" | |
| # return ( | |
| # f"mongodb+srv://{user_q}:{pw_q}@{cluster}{db_path}" | |
| # f"?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true" | |
| # ) | |
| # @st.cache_resource(show_spinner=False) | |
| # def _client(uri: str): | |
| # return MongoClient(uri, serverSelectionTimeoutMS=10000) | |
| # # @st.cache_data(show_spinner=False) | |
| # def mongo_distinct(uri: str, db: str, coll: str, field: str) -> List[str]: | |
| # if not uri: | |
| # return [] | |
| # try: | |
| # c = _client(uri) | |
| # vals = c[db][coll].distinct(field) | |
| # return sorted([v for v in vals if isinstance(v, str) and v.strip()]) | |
| # except Exception: | |
| # return [] | |
| # # @st.cache_data(show_spinner=False) | |
| # def mongo_records( | |
| # uri: str, | |
| # db: str, | |
| # coll: str, | |
| # student: str | None, | |
| # source: str | None, | |
| # start: str | None, | |
| # end: str | None, | |
| # ) -> List[dict]: | |
| # """Return flat rows with one column per skill; missing skills -> NaN (ignored in means).""" | |
| # if not uri: | |
| # return [] | |
| # q = {} | |
| # if student and student != "(All)": | |
| # q["student"] = student | |
| # if source and source != "(All)": | |
| # q["source"] = source | |
| # if start or end: | |
| # q["date"] = {} | |
| # if start: | |
| # q["date"]["$gte"] = start | |
| # if end: | |
| # q["date"]["$lte"] = end | |
| # try: | |
| # c = _client(uri) | |
| # proj = {"_id": 0, "student": 1, "source": 1, "date": 1, "skills": 1} | |
| # docs = list(c[db][coll].find(q, proj)) | |
| # rows = [] | |
| # for d in docs: | |
| # base = { | |
| # "student": str(d.get("student", "")), | |
| # "source": str(d.get("source", "")), | |
| # "date": str(d.get("date", "")), | |
| # } | |
| # sd = d.get("skills") or {} | |
| # for k in SKILLS: | |
| # base[k] = to_01_or_nan(sd.get(k, np.nan)) | |
| # rows.append(base) | |
| # return rows | |
| # except Exception: | |
| # return [] | |
| # # ------------------- UI ------------------- | |
| # st.title("📊 Student Skill Radar") | |
| # with st.sidebar: | |
| # st.subheader("MongoDB Settings") | |
| # db_name = st.text_input("Database name", value="student_skills") | |
| # coll_name = st.text_input("Collection name", value="responses_IFE_2025") | |
| # mongo_uri = _build_uri(db_name) | |
| # if not mongo_uri: | |
| # st.warning("Missing MONGO_USER, MONGO_PASS, or MONGO_CLUSTER in secrets/env.") | |
| # else: | |
| # try: | |
| # _client(mongo_uri).admin.command("ping") | |
| # st.success("Connected via secrets ✅") | |
| # except Exception as e: | |
| # st.error(f"Mongo connection failed: {e}") | |
| # # Filters | |
| # students = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "student") if mongo_uri else []) | |
| # sources = ["(All)"] + (mongo_distinct(mongo_uri, db_name, coll_name, "source") if mongo_uri else []) | |
| # student_choice = st.selectbox("Select student", students) | |
| # source_choice = st.selectbox("Select source/week", sources) | |
| # c1, c2 = st.columns(2) | |
| # start_dt = c1.date_input("Start date", value=None) | |
| # end_dt = c2.date_input("End date", value=None) | |
| # agg_level = st.selectbox("Aggregation level", ["student", "student+source"], index=0) | |
| # grouped = st.toggle("Grouped skills (skill clusters)", value=True) | |
| # chart_title = st.text_input("Chart title", value="") | |
| # # Convert dates to strings (YYYY-MM-DD) | |
| # start_str = start_dt.strftime("%Y-%m-%d") if isinstance(start_dt, date) else None | |
| # end_str = end_dt.strftime("%Y-%m-%d") if isinstance(end_dt, date) else None | |
| # # Fetch + aggregate | |
| # records = mongo_records(mongo_uri, db_name, coll_name, student_choice, source_choice, start_str, end_str) if mongo_uri else [] | |
| # df = summarize(records, level=agg_level) if records else pd.DataFrame() | |
| # # ------------------- Output ------------------- | |
| # fig = plot_radar(df, grouped, chart_title) | |
| # st.plotly_chart(fig, use_container_width=True) | |
| # st.caption(f"{len(df)} line(s) aggregated." if not df.empty else "No data. Adjust filters or check Mongo connection.") |