| | |
| | |
| |
|
| | import os, io, math |
| | from datetime import datetime |
| | import pandas as pd |
| | import numpy as np |
| | import gradio as gr |
| | import matplotlib.pyplot as plt |
| | import smtplib |
| | from email.message import EmailMessage |
| |
|
| | |
| | DEFAULT_ATTENDANCE_THRESHOLD = 75.0 |
| | DEFAULT_ATTENDANCE_HIGH_RISK = 60.0 |
| | DEFAULT_TEST_DECLINE_PERCENT = 10.0 |
| | DEFAULT_MAX_ATTEMPTS = 3 |
| | DEFAULT_HIGH_CUT = 0.6 |
| | DEFAULT_MED_CUT = 0.4 |
| |
|
| | |
| | def load_df_from_file(f): |
| | if f is None: |
| | return None |
| | try: |
| | name = getattr(f, "name", "") |
| | if str(name).lower().endswith((".xls", ".xlsx")): |
| | return pd.read_excel(f) |
| | else: |
| | return pd.read_csv(f) |
| | except Exception: |
| | try: |
| | f.seek(0) |
| | return pd.read_csv(f) |
| | except Exception as e: |
| | raise RuntimeError(f"Could not read file: {e}") |
| |
|
| | def normalise_colnames(df): |
| | if df is None: |
| | return None |
| | df = df.copy() |
| | df.columns = [c.strip() for c in df.columns] |
| | colmap = {} |
| | for c in df.columns: |
| | lc = c.lower().replace(" ", "").replace("_", "") |
| | if "roll" in lc and ("no" in lc or "number" in lc or "id" in lc): |
| | colmap[c] = "Roll Number" |
| | elif "name" == lc or "studentname" == lc or lc == "student": |
| | colmap[c] = "Name" |
| | elif "attendance" in lc: |
| | colmap[c] = "Attendance (%)" |
| | elif "mark" in lc or "score" in lc or "percentage" in lc: |
| | colmap[c] = "Marks (%)" |
| | elif "fee" in lc: |
| | colmap[c] = "Fees Paid" |
| | elif "attempt" in lc: |
| | colmap[c] = "Attempts" |
| | elif any(k in lc for k in ("test","exam","mid","quiz","assessment")): |
| | colmap[c] = c |
| | return df.rename(columns=colmap) |
| |
|
| | def merge_sources(att_df, test_df, fee_df): |
| | att = normalise_colnames(att_df) |
| | test = normalise_colnames(test_df) |
| | fee = normalise_colnames(fee_df) |
| | |
| | key = None |
| | for d in (att, test, fee): |
| | if d is not None and 'Roll Number' in d.columns: |
| | key = 'Roll Number' |
| | break |
| | if key is None: |
| | key = 'Name' |
| | dfs = [] |
| | for d in (att, test, fee): |
| | if d is None: |
| | continue |
| | if key not in d.columns: |
| | d[key] = np.nan |
| | dfs.append(d) |
| | if not dfs: |
| | return pd.DataFrame() |
| | merged = dfs[0] |
| | for d in dfs[1:]: |
| | merged = pd.merge(merged, d, on=key, how='outer', suffixes=(False, False)) |
| | return merged |
| |
|
| | def parse_test_scores(df): |
| | if df is None: |
| | return [] |
| | test_cols = [c for c in df.columns if any(k in c.lower() for k in ("test","exam","mid","quiz","assessment","score")) and c not in ["Marks (%)", "Attendance (%)"]] |
| | if "Marks (%)" in df.columns: |
| | test_cols.append("Marks (%)") |
| | return test_cols |
| |
|
| | def attendance_risk(att, threshold, high_risk): |
| | try: |
| | att = float(att) |
| | except: |
| | return 0.0 |
| | if math.isnan(att): |
| | return 0.0 |
| | if att < high_risk: |
| | return 1.0 |
| | if att < threshold: |
| | span = max(1.0, threshold - high_risk) |
| | return (threshold - att) / span * 0.9 |
| | return 0.0 |
| |
|
| | def test_decline_risk(row, test_cols, decline_pct): |
| | scores = [] |
| | for c in test_cols: |
| | v = row.get(c, None) |
| | try: |
| | scores.append(float(v)) |
| | except: |
| | continue |
| | if len(scores) < 2: |
| | return 0.0 |
| | latest, prev = scores[-1], scores[-2] |
| | if prev == 0: |
| | return 0.0 |
| | drop = (prev - latest) / prev * 100.0 |
| | if drop >= decline_pct: |
| | return min(1.0, drop / (2 * decline_pct)) |
| | return 0.0 |
| |
|
| | def attempts_risk(attempts_val, max_attempts): |
| | try: |
| | a = int(attempts_val) |
| | except: |
| | return 0.0 |
| | if a >= max_attempts: |
| | return 1.0 |
| | if a == 0: |
| | return 0.0 |
| | return a / max(1, max_attempts) |
| |
|
| | def combine_signals(signals, mode="max", weights=None): |
| | arr = np.array(signals) |
| | if mode == "max": |
| | return np.max(arr, axis=0) |
| | elif mode == "weighted" and weights is not None: |
| | w = np.array(weights) |
| | w = w / w.sum() |
| | return (arr * w[:, None]).sum(axis=0) |
| | return np.mean(arr, axis=0) |
| |
|
| | def label_from_score(score, high_cut=DEFAULT_HIGH_CUT, med_cut=DEFAULT_MED_CUT): |
| | if score >= high_cut: |
| | return "High" |
| | elif score >= med_cut: |
| | return "Medium" |
| | else: |
| | return "Low" |
| |
|
| | def send_email_notification(to_emails, subject, body, smtp_host, smtp_port, smtp_user, smtp_pass): |
| | try: |
| | msg = EmailMessage() |
| | msg["Subject"] = subject |
| | msg["From"] = smtp_user |
| | msg["To"] = ", ".join(to_emails) |
| | msg.set_content(body) |
| | with smtplib.SMTP(smtp_host, smtp_port, timeout=15) as s: |
| | s.starttls() |
| | s.login(smtp_user, smtp_pass) |
| | s.send_message(msg) |
| | return True, "Sent" |
| | except Exception as e: |
| | return False, str(e) |
| |
|
| | |
| | def process_and_report(att_file, test_file, fee_file, |
| | attendance_threshold=DEFAULT_ATTENDANCE_THRESHOLD, |
| | attendance_high_risk=DEFAULT_ATTENDANCE_HIGH_RISK, |
| | decline_pct=DEFAULT_TEST_DECLINE_PERCENT, |
| | max_attempts=DEFAULT_MAX_ATTEMPTS, |
| | combine_mode="max", |
| | weight_att=0.5, weight_test=0.3, weight_attempt=0.2, |
| | notify=False, notify_threshold="High", notify_emails="", smtp_overrides=None): |
| | |
| | try: |
| | att_df = load_df_from_file(att_file) if att_file else None |
| | test_df = load_df_from_file(test_file) if test_file else None |
| | fee_df = load_df_from_file(fee_file) if fee_file else None |
| | except Exception as e: |
| | return pd.DataFrame(), {"error": str(e)} |
| | merged = merge_sources(att_df, test_df, fee_df) |
| | if merged.empty: |
| | return pd.DataFrame(), {"error": "No data loaded. Upload at least one sheet."} |
| | merged = merged.reset_index(drop=True) |
| | merged['Attendance (%)'] = merged.get('Attendance (%)', np.nan) |
| | merged['Fees Paid'] = merged.get('Fees Paid', merged.get('FeesPaid', merged.get('Fees', np.nan))) |
| | test_cols = parse_test_scores(merged) |
| | n = len(merged) |
| | att_risks = np.zeros(n); test_risks = np.zeros(n); attempt_risks = np.zeros(n) |
| | for i, row in merged.iterrows(): |
| | att_risks[i] = attendance_risk(row.get('Attendance (%)', np.nan), attendance_threshold, attendance_high_risk) |
| | test_risks[i] = test_decline_risk(row, test_cols, decline_pct) |
| | attempt_risks[i] = attempts_risk(row.get('Attempts', 0), max_attempts) |
| | merged['Attendance_Risk'] = att_risks |
| | merged['Test_Decline_Risk'] = test_risks |
| | merged['Attempts_Risk'] = attempt_risks |
| |
|
| | signals = [att_risks, test_risks, attempt_risks] |
| | if combine_mode == "max": |
| | combined = combine_signals(signals, mode="max") |
| | else: |
| | weights = [weight_att, weight_test, weight_attempt] |
| | combined = combine_signals(signals, mode="weighted", weights=weights) |
| | merged['Combined_Risk_Score'] = combined |
| | merged['Risk_Label'] = merged['Combined_Risk_Score'].apply(label_from_score) |
| |
|
| | reasons = [] |
| | for i, row in merged.iterrows(): |
| | r = [] |
| | if row['Attendance_Risk'] >= 0.7: |
| | r.append(f"Low attendance ({row.get('Attendance (%)', 'NA')}%)") |
| | elif row['Attendance_Risk'] > 0: |
| | r.append(f"Attendance below threshold ({row.get('Attendance (%)', 'NA')}%)") |
| | if row['Test_Decline_Risk'] > 0: |
| | r.append("Recent test decline") |
| | if row['Attempts_Risk'] > 0: |
| | r.append(f"High attempts ({row.get('Attempts','NA')})") |
| | if str(row.get('Fees Paid','')).strip().lower() in ("no","n","false","0","unpaid"): |
| | r.append("Fees unpaid") |
| | reasons.append("; ".join(r) if r else "None") |
| | merged['Flag_Reason'] = reasons |
| | summary = merged['Risk_Label'].value_counts().to_dict() |
| |
|
| | notif_result = None |
| | if notify: |
| | notify_mask = merged['Risk_Label'] == notify_threshold |
| | notify_rows = merged[notify_mask] |
| | smtp = smtp_overrides or { |
| | "host": os.environ.get("SMTP_HOST"), |
| | "port": int(os.environ.get("SMTP_PORT", 587)), |
| | "user": os.environ.get("SMTP_USER"), |
| | "pass": os.environ.get("SMTP_PASS") |
| | } |
| | if not notify_rows.empty and smtp["user"] and smtp["pass"] and smtp["host"]: |
| | emails = [e.strip() for e in str(notify_emails).split(",") if e.strip()] |
| | subject = f"[Early Warning] {len(notify_rows)} students flagged {notify_threshold}" |
| | body_lines = ["Students flagged:\n"] |
| | for _, r in notify_rows.iterrows(): |
| | body_lines.append(f"{r.get('Name','')}\t{r.get('Roll Number','')}\t{r.get('Risk_Label')}\tReasons: {r.get('Flag_Reason')}") |
| | ok, msg = send_email_notification(emails, subject, "\n".join(body_lines), smtp["host"], smtp["port"], smtp["user"], smtp["pass"]) |
| | notif_result = (ok, msg) |
| | else: |
| | notif_result = (False, "Notification missing SMTP settings or no flagged students") |
| | return merged, {"summary": summary, "notify_result": notif_result} |
| |
|
| | |
| | def build_plot(df): |
| | if df.empty: |
| | return None |
| | counts = df['Risk_Label'].value_counts().reindex(['High','Medium','Low']).fillna(0) |
| | fig, ax = plt.subplots(figsize=(6,3)) |
| | ax.bar(counts.index, counts.values) |
| | ax.set_title("Risk distribution") |
| | ax.set_ylabel("Number of students") |
| | plt.tight_layout() |
| | return fig |
| |
|
| | def df_to_colored_html(df): |
| | if df.empty: |
| | return "<p>No data</p>" |
| | df = df.copy() |
| | |
| | cols = [c for c in ['Roll Number','Name','Attendance (%)','Marks (%)','Fees Paid','Combined_Risk_Score','Risk_Label','Flag_Reason'] if c in df.columns] |
| | df = df[cols] |
| | def row_style(r): |
| | label = r.get("Risk_Label","Low") |
| | if label=="High": |
| | return 'background:#ffcccc' |
| | if label=="Medium": |
| | return 'background:#fff2cc' |
| | return '' |
| | styled = df.style.apply(lambda r: [row_style(r)]*len(r), axis=1) |
| | return styled.hide_index().to_html() |
| |
|
| | with gr.Blocks() as demo: |
| | gr.Markdown("## Student Early-Warning Dashboard") |
| | with gr.Row(): |
| | with gr.Column(): |
| | att_file = gr.File(label="Attendance", file_types=['.csv','.xlsx']) |
| | test_file = gr.File(label="Tests", file_types=['.csv','.xlsx']) |
| | fee_file = gr.File(label="Fees", file_types=['.csv','.xlsx']) |
| | run_btn = gr.Button("Run") |
| | download = gr.File() |
| | with gr.Column(): |
| | att_thresh = gr.Slider(50,100,value=DEFAULT_ATTENDANCE_THRESHOLD,label="Attendance threshold") |
| | att_high = gr.Slider(20,80,value=DEFAULT_ATTENDANCE_HIGH_RISK,label="High-risk attendance cutoff") |
| | decline_pct = gr.Slider(1,50,value=DEFAULT_TEST_DECLINE_PERCENT,label="Test decline % to flag") |
| | max_attempts = gr.Number(value=DEFAULT_MAX_ATTEMPTS,label="Max attempts before flag",precision=0) |
| | combine_mode = gr.Radio(["max","weighted"],value="max",label="Combine mode") |
| | weight_att = gr.Slider(0.0,1.0,value=0.5,label="Weight: attendance (only for weighted)") |
| | weight_test = gr.Slider(0.0,1.0,value=0.3,label="Weight: test decline") |
| | weight_attempt = gr.Slider(0.0,1.0,value=0.2,label="Weight: attempts") |
| | notify = gr.Checkbox(False,label="Send email notifications to mentors (uses SMTP env vars or enter below)") |
| | notify_emails = gr.Textbox(label="Notify emails (comma-separated)") |
| | smtp_host = gr.Textbox(label="SMTP host (optional override)") |
| | smtp_port = gr.Number(value=587,label="SMTP port (optional override)",precision=0) |
| | smtp_user = gr.Textbox(label="SMTP user (optional override)") |
| | smtp_pass = gr.Textbox(type="password", label="SMTP pass (optional override)") |
| | result_html = gr.HTML() |
| | risk_plot = gr.Plot() |
| | summary_box = gr.Textbox() |
| |
|
| | def on_run(att_file_, test_file_, fee_file_, |
| | att_thresh_, att_high_, decline_pct_, max_attempts_, |
| | combine_mode_, weight_att_, weight_test_, weight_attempt_, |
| | notify_, notify_emails_, smtp_host_, smtp_port_, smtp_user_, smtp_pass_): |
| | smtp_overrides = None |
| | if smtp_user_ and smtp_pass_ and smtp_host_: |
| | smtp_overrides = {"host": smtp_host_, "port": int(smtp_port_), "user": smtp_user_, "pass": smtp_pass_} |
| | df, meta = process_and_report( |
| | att_file=att_file_, test_file=test_file_, fee_file=fee_file_, |
| | attendance_threshold=float(att_thresh_), attendance_high_risk=float(att_high_), |
| | decline_pct=float(decline_pct_), max_attempts=int(max_attempts_ or DEFAULT_MAX_ATTEMPTS), |
| | combine_mode=combine_mode_, weight_att=float(weight_att_), weight_test=float(weight_test_), weight_attempt=float(weight_attempt_), |
| | notify=notify_, notify_threshold="High", notify_emails=notify_emails_, smtp_overrides=smtp_overrides |
| | ) |
| | if isinstance(meta, dict) and meta.get("error"): |
| | return f"<pre style='color:red'>{meta['error']}</pre>", None, str(meta) |
| | html = df_to_colored_html(df) |
| | fig = build_plot(df) |
| | |
| | csv_bytes = df.to_csv(index=False).encode('utf-8') |
| | file_obj = io.BytesIO(csv_bytes) |
| | file_obj.name = f"risk_report_{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}.csv" |
| | return html, fig, str(meta), file_obj |
| |
|
| | run_btn.click(fn=on_run, inputs=[att_file, test_file, fee_file, |
| | att_thresh, att_high, decline_pct, max_attempts, |
| | combine_mode, weight_att, weight_test, weight_attempt, |
| | notify, notify_emails, smtp_host, smtp_port, smtp_user, smtp_pass], |
| | outputs=[result_html, risk_plot, summary_box, download]) |
| |
|
| | demo.launch() |
| |
|