# app.py — Marks Clustering (Student + Section) + Manager-Friendly Labels & Summaries # Adds (WITHOUT changing the clustering logic): # ✅ Human-friendly cluster meanings (Balanced / Lab–Theory Mismatch / Final Exam Misalignment) # ✅ Traffic-light indicator (Green/Amber/Red) # ✅ Section list with section names + cluster meaning + key signal # ✅ Student distribution (% in each cluster) + meaning text # ✅ Same for sections (% of sections) # ✅ Keeps original detailed reasons per student/section import io import tempfile import numpy as np import pandas as pd import gradio as gr from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from sklearn.decomposition import PCA from sklearn.metrics import silhouette_score import plotly.express as px MAXIMA = { "Quiz": 10.0, "Mid": 20.0, "Lab Report": 20.0, "Lab Activity": 10.0, "Final": 40.0, } # ----------------------------------- # Reading / mapping / cleaning # ----------------------------------- def _normalize_colname(c: str) -> str: c = str(c).strip().lower() c = c.replace("_", " ").replace("-", " ") c = c.replace("0", "o") # fixes '0ctivity' -> 'octivity' c = " ".join(c.split()) return c def _read_excel_safely(path: str) -> pd.DataFrame: df_try = pd.read_excel(path) # if the first row is blank and headers appear on row 2 if all(str(c).startswith("Unnamed") for c in df_try.columns): df_try = pd.read_excel(path, header=1) return df_try def _map_columns(df: pd.DataFrame) -> pd.DataFrame: cols = list(df.columns) norm = {_normalize_colname(c): c for c in cols} candidates = { "studentid": ["studentid", "student id", "id", "student", "student number", "student no"], "section": ["section", "sec", "class", "group"], "quiz": ["quiz", "quiz (10)", "quiz(10)", "quizzes"], "mid": ["mid", "mid (20)", "mid(20)", "midterm", "mid term"], "lab report": ["lab report", "lab report(20)", "labreport", "report", "lab reports", "labreport(20)"], "lab activity": [ "lab activity", "lab activity(10)", "labactivity", "activity", "lab act", "lab octivity(10)", "lab 0ctivity(10)" # both will normalize to same ], "final": ["final", "final (40)", "final(40)", "final exam", "exam", "endterm", "end term"], } mapped = {} for logical, opts in candidates.items(): for o in opts: o2 = _normalize_colname(o) if o2 in norm: mapped[logical] = norm[o2] break # StudentID optional (auto-generate) required = ["section", "quiz", "mid", "lab report", "lab activity", "final"] missing = [r for r in required if r not in mapped] if missing: raise ValueError( "Missing required columns (after auto-matching): " + ", ".join(missing) + "\n\nYour file should include: Section, Quiz, Mid, Lab Report, Lab Activity, Final." ) df2 = df.rename( columns={ mapped["section"]: "Section", mapped["quiz"]: "Quiz", mapped["mid"]: "Mid", mapped["lab report"]: "Lab Report", mapped["lab activity"]: "Lab Activity", mapped["final"]: "Final", } ).copy() if "studentid" in mapped: df2 = df2.rename(columns={mapped["studentid"]: "StudentID"}) else: df2.insert(0, "StudentID", [f"S{i+1:03d}" for i in range(len(df2))]) return df2 def _clean(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df["StudentID"] = df["StudentID"].astype(str).str.strip() df["Section"] = df["Section"].astype(str).str.strip() for c in ["Quiz", "Mid", "Lab Report", "Lab Activity", "Final"]: df[c] = pd.to_numeric(df[c], errors="coerce") df = df.dropna(subset=["Section", "Quiz", "Mid", "Lab Report", "Lab Activity", "Final"]) df = df[(df["StudentID"] != "") & (df["Section"] != "")] for c, mx in MAXIMA.items(): df[c] = df[c].clip(lower=0, upper=mx) return df # ----------------------------------- # Features + clustering # ----------------------------------- def _features(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() for c, mx in MAXIMA.items(): df[f"{c}_pct"] = df[c] / mx comp = [f"{c}_pct" for c in ["Quiz", "Mid", "Lab Report", "Lab Activity", "Final"]] df["mean_score"] = df[comp].mean(axis=1) df["std_score"] = df[comp].std(axis=1) df["lab_avg"] = df[["Lab Report_pct", "Lab Activity_pct"]].mean(axis=1) df["theory_avg"] = df[["Quiz_pct", "Mid_pct", "Final_pct"]].mean(axis=1) df["exam_gap"] = df["Final_pct"] - df[["Quiz_pct", "Mid_pct"]].mean(axis=1) df["coursework_avg"] = df[["Quiz_pct", "Mid_pct", "Lab Report_pct", "Lab Activity_pct"]].mean(axis=1) df["final_vs_coursework_gap"] = df["Final_pct"] - df["coursework_avg"] df["lab_theory_gap"] = df["lab_avg"] - df["theory_avg"] return df def _kmeans(df_feat: pd.DataFrame, k=3): scaler = StandardScaler() X = scaler.fit_transform(df_feat.values) model = KMeans(n_clusters=k, random_state=42, n_init=10) labels = model.fit_predict(X) sil = None try: if len(df_feat) >= k + 2 and len(set(labels)) > 1: sil = float(silhouette_score(X, labels)) except Exception: sil = None return labels, X, sil def _pca_fig(X, labels, hover_df, title): pca = PCA(n_components=2, random_state=42) coords = pca.fit_transform(X) plot_df = hover_df.copy() plot_df["PC1"] = coords[:, 0] plot_df["PC2"] = coords[:, 1] plot_df["Cluster"] = labels.astype(int).astype(str) fig = px.scatter( plot_df, x="PC1", y="PC2", color="Cluster", hover_data=list(hover_df.columns), title=title, ) fig.update_layout(height=480) return fig # ----------------------------------- # Reasons (kept) # ----------------------------------- def _student_reason(r): s = float(r["std_score"]) eg = float(r["final_vs_coursework_gap"]) ltg = float(r["lab_theory_gap"]) bullets = [] if s <= 0.10: bullets.append("Low variation across assessments (consistent distribution).") elif s <= 0.18: bullets.append("Moderate variation across assessments (expected academic spread).") else: bullets.append("High variation across components (irregular distribution).") if abs(eg) >= 0.25: bullets.append("Large mismatch between final exam and coursework trend.") elif abs(eg) >= 0.15: bullets.append("Noticeable final vs coursework difference.") if abs(ltg) >= 0.20: bullets.append("Large mismatch between lab/practical and theory performance.") elif abs(ltg) >= 0.12: bullets.append("Some lab vs theory mismatch.") if (s > 0.18) or (abs(eg) >= 0.25) or (abs(ltg) >= 0.20): rec = "Recommendation: Consider academic review for assessment alignment and/or student support (not an allegation)." else: rec = "Recommendation: Pattern looks academically reasonable; routine moderation is sufficient." return "Key observations:\n- " + "\n- ".join(bullets) + "\n\n" + rec def _section_reason(r): s = float(r["std_score"]) eg = float(r["final_vs_coursework_gap"]) ltg = float(r["lab_theory_gap"]) bullets = [] if s <= 0.08: bullets.append("Uniform distribution across components in this section.") elif s <= 0.14: bullets.append("Genuine variation consistent with expected student diversity.") else: bullets.append("Higher variability across components (less uniform distribution).") if abs(eg) >= 0.18: bullets.append("Final vs coursework trend mismatch at section level.") if abs(ltg) >= 0.15: bullets.append("Lab vs theory mismatch at section level.") if (s > 0.14) or (abs(eg) >= 0.18) or (abs(ltg) >= 0.15): rec = "Recommendation: Review assessment design/moderation consistency for this section (not an allegation)." else: rec = "Recommendation: Distribution appears aligned; routine moderation is sufficient." return "Key observations:\n- " + "\n- ".join(bullets) + "\n\n" + rec # ----------------------------------- # NEW: Manager-friendly cluster meanings (automatic mapping) # ----------------------------------- def _assign_cluster_meanings(profile_df: pd.DataFrame, cluster_col: str) -> dict: """ Given a cluster profile table (averages per cluster), map numeric cluster ids to human-friendly meaning based on data patterns. Rules (simple + consistent for managers): 1) "Final Exam Misalignment" = most negative final_vs_coursework_gap (largest drop in final vs coursework) 2) "Lab–Theory Mismatch" = largest positive lab_theory_gap (labs much stronger than theory) 3) remaining = "Balanced / Acceptable Pattern" """ if profile_df.empty: return {} dfp = profile_df.copy() # Ensure expected columns exist if "final_vs_coursework_gap" not in dfp.columns or "lab_theory_gap" not in dfp.columns: # fallback: all balanced return {int(cid): "Balanced / Acceptable Pattern" for cid in dfp[cluster_col].tolist()} # cluster id for "Final Exam Misalignment" (most negative gap) cid_final = int(dfp.loc[dfp["final_vs_coursework_gap"].idxmin(), cluster_col]) # cluster id for "Lab–Theory Mismatch" (largest lab advantage over theory) cid_lab = int(dfp.loc[dfp["lab_theory_gap"].idxmax(), cluster_col]) # remaining cluster all_cids = [int(x) for x in dfp[cluster_col].tolist()] remaining = [c for c in all_cids if c not in {cid_final, cid_lab}] cid_bal = remaining[0] if remaining else cid_final # safe fallback meaning = { cid_bal: "Balanced / Acceptable Pattern", cid_lab: "Lab–Theory Mismatch", cid_final: "Final Exam Misalignment", } return meaning def _traffic_light(meaning: str) -> str: if meaning == "Balanced / Acceptable Pattern": return "🟢 Green" if meaning == "Lab–Theory Mismatch": return "🟡 Amber" if meaning == "Final Exam Misalignment": return "🔴 Red" return "⚪" def _manager_signal(meaning: str) -> str: if meaning == "Balanced / Acceptable Pattern": return "No major assessment signal detected (routine moderation only)." if meaning == "Lab–Theory Mismatch": return "Strong indication of lab–theory alignment gap (practical stronger than theory)." if meaning == "Final Exam Misalignment": return "Strong indication of final exam misalignment (final lower than coursework)." return "" def _meaning_text(meaning: str, level: str) -> str: """ Short, safe text for managers/students. """ if meaning == "Balanced / Acceptable Pattern": return f"{level} shows consistent distribution across assessment components." if meaning == "Lab–Theory Mismatch": return f"{level} shows practical (lab) outcomes much stronger than theory-based assessments." if meaning == "Final Exam Misalignment": return f"{level} shows final exam outcomes significantly lower than coursework trend." return "" def _build_distribution_table(labels_series: pd.Series, meaning_map: dict, entity_name: str) -> pd.DataFrame: """ entity_name: "Students" or "Sections" Return: Meaning, TrafficLight, Count, Percentage, SimpleMeaning """ total = int(labels_series.shape[0]) counts = labels_series.value_counts().sort_index() rows = [] for cid, cnt in counts.items(): cid_int = int(cid) meaning = meaning_map.get(cid_int, f"Cluster {cid_int}") rows.append({ "Cluster": cid_int, "Cluster Meaning": meaning, "Traffic Light": _traffic_light(meaning), "Count": int(cnt), f"% of {entity_name}": round((int(cnt) / total) * 100, 1) if total else 0.0, "What it means": _meaning_text(meaning, entity_name[:-1]) # Student/Section }) return pd.DataFrame(rows) # ----------------------------------- # Excel download # ----------------------------------- def _make_excel(student_out, section_out, student_profile, section_profile, student_distribution, section_distribution, section_list) -> bytes: out = io.BytesIO() with pd.ExcelWriter(out, engine="openpyxl") as writer: student_out.to_excel(writer, index=False, sheet_name="Student_Level") section_out.to_excel(writer, index=False, sheet_name="Section_Level") student_profile.to_excel(writer, index=False, sheet_name="Student_Cluster_Profile") section_profile.to_excel(writer, index=False, sheet_name="Section_Cluster_Profile") student_distribution.to_excel(writer, index=False, sheet_name="Student_Distribution") section_distribution.to_excel(writer, index=False, sheet_name="Section_Distribution") section_list.to_excel(writer, index=False, sheet_name="Section_List_With_Meaning") out.seek(0) return out.getvalue() # ----------------------------------- # Main analysis # ----------------------------------- def run_analysis(file_obj): if file_obj is None: raise gr.Error("Please upload your Excel/CSV file first.") name = (getattr(file_obj, "name", "") or "").lower() try: if name.endswith(".csv"): raw = pd.read_csv(file_obj.name) else: raw = _read_excel_safely(file_obj.name) except Exception as e: raise gr.Error(f"Could not read the file. Details: {e}") df = _map_columns(raw) df = _clean(df) df = _features(df) if df.empty: raise gr.Error("No valid rows after cleaning. Please check missing values.") # ---------------- Student-level clustering ---------------- stud_feat = df[["mean_score", "std_score", "exam_gap", "lab_theory_gap", "final_vs_coursework_gap"]] stud_labels, Xs, stud_sil = _kmeans(stud_feat, k=3) df["StudentCluster"] = stud_labels.astype(int) df["StudentReason"] = df.apply(_student_reason, axis=1) # Student cluster profile (for mapping meanings) stud_profile = ( df.groupby("StudentCluster")[ ["Quiz_pct", "Mid_pct", "Lab Report_pct", "Lab Activity_pct", "Final_pct", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"] ] .mean() .reset_index() .sort_values("StudentCluster") ) # ---------------- Section-level aggregation + clustering ---------------- sec_cols = [ "Quiz_pct", "Mid_pct", "Lab Report_pct", "Lab Activity_pct", "Final_pct", "mean_score", "std_score", "exam_gap", "lab_theory_gap", "final_vs_coursework_gap" ] sec = df.groupby("Section")[sec_cols].mean().reset_index() sec_feat = sec[["mean_score", "std_score", "exam_gap", "lab_theory_gap", "final_vs_coursework_gap"]] sec_labels, Xsec, sec_sil = _kmeans(sec_feat, k=3) sec["SectionCluster"] = sec_labels.astype(int) sec["SectionReason"] = sec.apply(_section_reason, axis=1) # Section cluster profile (for mapping meanings) sec_profile = ( sec.groupby("SectionCluster")[ ["Quiz_pct", "Mid_pct", "Lab Report_pct", "Lab Activity_pct", "Final_pct", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"] ] .mean() .reset_index() .sort_values("SectionCluster") ) # ---------------- NEW: Map numeric cluster IDs -> meanings ---------------- student_meaning_map = _assign_cluster_meanings(stud_profile, "StudentCluster") section_meaning_map = _assign_cluster_meanings(sec_profile, "SectionCluster") df["StudentClusterMeaning"] = df["StudentCluster"].map(lambda x: student_meaning_map.get(int(x), f"Cluster {int(x)}")) df["StudentTrafficLight"] = df["StudentClusterMeaning"].map(_traffic_light) df["StudentManagerSignal"] = df["StudentClusterMeaning"].map(_manager_signal) sec["SectionClusterMeaning"] = sec["SectionCluster"].map(lambda x: section_meaning_map.get(int(x), f"Cluster {int(x)}")) sec["SectionTrafficLight"] = sec["SectionClusterMeaning"].map(_traffic_light) sec["SectionManagerSignal"] = sec["SectionClusterMeaning"].map(_manager_signal) # ---------------- NEW: Distribution tables (percentages) ---------------- student_distribution = _build_distribution_table(df["StudentCluster"], student_meaning_map, "Students") section_distribution = _build_distribution_table(sec["SectionCluster"], section_meaning_map, "Sections") # ---------------- NEW: Section list with section number/name in app ---------------- section_list = sec[[ "Section", "SectionCluster", "SectionClusterMeaning", "SectionTrafficLight", "SectionManagerSignal", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap", "SectionReason", ]].copy() # Make key metrics more readable (%) for c in ["mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"]: section_list[c] = (section_list[c] * 100).round(1) # ---------------- Plots (kept) ---------------- hover_student = df[ ["StudentID", "Section", "Quiz", "Mid", "Lab Report", "Lab Activity", "Final", "StudentCluster", "StudentClusterMeaning", "StudentTrafficLight", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"] ].copy() fig_student = _pca_fig(Xs, stud_labels, hover_student, "Student-Level Clustering (PCA)") hover_section = sec[ ["Section", "SectionCluster", "SectionClusterMeaning", "SectionTrafficLight", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"] ].copy() fig_section = _pca_fig(Xsec, sec_labels, hover_section, "Section-Level Clustering (PCA)") # ---------------- Output tables (student + section) ---------------- student_out = df[ ["StudentID", "Section", "StudentCluster", "StudentClusterMeaning", "StudentTrafficLight", "StudentManagerSignal", "Quiz", "Mid", "Lab Report", "Lab Activity", "Final", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap", "StudentReason"] ].copy() for c in ["mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"]: student_out[c] = (student_out[c] * 100).round(1) section_out = sec[ ["Section", "SectionCluster", "SectionClusterMeaning", "SectionTrafficLight", "SectionManagerSignal", "mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap", "SectionReason"] ].copy() for c in ["mean_score", "std_score", "final_vs_coursework_gap", "lab_theory_gap"]: section_out[c] = (section_out[c] * 100).round(1) # Profiles as % stud_profile_show = stud_profile.copy() sec_profile_show = sec_profile.copy() for t in [stud_profile_show, sec_profile_show]: for c in t.columns: if c != t.columns[0]: t[c] = (t[c] * 100).round(1) # ---------------- Summary ---------------- sil_s = f"{stud_sil:.3f}" if stud_sil is not None else "N/A" sil_c = f"{sec_sil:.3f}" if sec_sil is not None else "N/A" summary = ( f"Students analysed: {len(df)}\n" f"Sections analysed: {df['Section'].nunique()}\n\n" f"Student-level silhouette (k=3): {sil_s}\n" f"Section-level silhouette (k=3): {sil_c}\n\n" f"Cluster numbers (0/1/2) are just IDs. Use 'Cluster Meaning' and 'Traffic Light' for interpretation.\n" f"Note: Results indicate patterns for review/support; they do not imply misconduct." ) # ---------------- Download (Excel) ---------------- excel_bytes = _make_excel( student_out, section_out, stud_profile_show, sec_profile_show, student_distribution, section_distribution, section_list ) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") tmp.write(excel_bytes) tmp.flush() tmp.close() return ( summary, student_distribution, section_distribution, section_list, student_out, fig_student, stud_profile_show, section_out, fig_section, sec_profile_show, tmp.name ) # ----------------------------------- # UI # ----------------------------------- with gr.Blocks(title="Marks Clustering (Student + Section) — Manager Friendly") as demo: gr.Markdown( """ # Marks Clustering Dashboard (Unsupervised) — Student & Section Level Upload your Excel/CSV marks file. This app produces **Student-level + Section-level clustering (3 clusters)** with: - **Cluster Meaning** (easy interpretation) - **Traffic Light** (Green/Amber/Red) - **% distribution** of students and sections - Detailed reasons per student/section **Note:** This is assessment-pattern analysis for QA/moderation and student support (no allegations). """.strip() ) file_in = gr.File(label="Upload Excel/CSV", file_types=[".xlsx", ".xls", ".csv"]) run_btn = gr.Button("Run Clustering", variant="primary") summary = gr.Textbox(label="Summary", lines=9) with gr.Tabs(): with gr.Tab("Manager Summary"): gr.Markdown("## Student Distribution (by Cluster Meaning)") student_dist_df = gr.Dataframe(wrap=True, interactive=False) gr.Markdown("## Section Distribution (by Cluster Meaning)") section_dist_df = gr.Dataframe(wrap=True, interactive=False) gr.Markdown("## Section List (Section → Cluster Meaning)") section_list_df = gr.Dataframe(wrap=True, interactive=False) with gr.Tab("Student-Level"): student_df = gr.Dataframe(label="Student-Level Results (with meanings & reasons)", wrap=True) student_plot = gr.Plot(label="Student PCA Plot") student_profile = gr.Dataframe(label="Student Cluster Profile (avg %)", wrap=True) with gr.Tab("Section-Level"): section_df = gr.Dataframe(label="Section-Level Results (with meanings & reasons)", wrap=True) section_plot = gr.Plot(label="Section PCA Plot") section_profile = gr.Dataframe(label="Section Cluster Profile (avg %)", wrap=True) with gr.Tab("Download"): dl = gr.File(label="Download Excel Report (includes distributions + section list)") run_btn.click( run_analysis, inputs=[file_in], outputs=[ summary, student_dist_df, section_dist_df, section_list_df, student_df, student_plot, student_profile, section_df, section_plot, section_profile, dl ], ) if __name__ == "__main__": demo.launch()