import gradio as gr import pandas as pd import numpy as np import re import os import uuid from typing import List, Dict, Tuple, Optional try: from rapidfuzz import process as rf_process HAS_FUZZ = True except Exception: HAS_FUZZ = False APP_TITLE = "Ward Ranking Random Assigner" DESCRIPTION = """ **Flow** 1) Upload .csv/.xlsx 2) Choose wards + set capacity 3) Check Available columns 4) Map by Auto-detect (Thai/English + fuzzy) or by numbers (1-based) 5) **Clean** → keep NAME/ID + selected wards; convert ranks to integers 6) **Show Stepwise Report** → Rank 1 results → random assign rank 1 → Rank 2 results → random assign … until done 7) (Optional) Assign button runs the full allocation too and provides CSVs to download """ WARD_CHOICES = [ ("Medical", "อายุรศาสตร์ ภาคปกติ"), ("Medical_1", "อายุรศาสตร์_1"), ("Medical_2", "อายุรศาสตร์_2"), ("Surgical", "ศัลยศาสตร์"), ("Pediatric", "เด็ก"), ("Community", "ชุมชน"), ("Psychiatric", "จิตเวช"), ("Obstetrics", "สูติศาสตร์"), ] # ===== Display labels (English-first with Thai in parentheses) ===== WARD_LABELS = { "Medical": ("Internal Medicine", "อายุรศาสตร์"), "Medical_1": ("Internal Medicine 1", "อายุรศาสตร์_1"), "Medical_2": ("Internal Medicine 2", "อายุรศาสตร์_2"), "Surgical": ("Surgery", "ศัลยศาสตร์"), "Pediatric": ("Pediatrics", "เด็ก"), "Community": ("Community Health", "ชุมชน"), "Psychiatric": ("Psychiatry", "จิตเวช"), "Obstetrics": ("Obstetrics", "สูติศาสตร์"), } def ward_display(ward_key: str) -> str: en, th = WARD_LABELS.get(ward_key, (ward_key, ward_key)) return f"{en} ({th})" # Keyword dictionary for auto mapping AUTO_MAP = { "NAME": ["ชื่อ-สกุล", "ชื่อ - สกุล", "fullname", "full name", "name", "student name"], "ID": ["รหัสนักศึกษา", "รหัส", "student id", "id", "studentid"], "Medical": ["อายุรศาสตร์", "medical"], "Medical_1": ["อายุรศาสตร์_1", "medical_1", "med_1", "med1"], "Medical_2": ["อายุรศาสตร์_2", "medical_2", "med_2", "med2"], "Surgical": ["ศัลยศาสตร์", "surgical", "surgery","surg"], "Pediatric": ["เด็ก", "pediatric", "pediatrics"], "Community": ["ชุมชน", "community"], "Psychiatric": ["จิตเวช", "psychiatric"], "Obstetrics": ["สูติศาสตร์", "obstetrics", "obgyn", "ob/gyn"], } def read_table(file) -> Tuple[Optional[pd.DataFrame], str]: if file is None: return None, "Please upload a file (.csv or .xlsx)" name = file.name.lower() if hasattr(file, "name") else "" try: if name.endswith(".csv"): df = pd.read_csv(file.name if hasattr(file, "name") else file) elif name.endswith(".xlsx"): df = pd.read_excel(file.name if hasattr(file, "name") else file) else: try: df = pd.read_csv(file) except Exception: return None, "Only .csv or .xlsx are supported" except Exception as e: return None, f"Failed to read file: {e}" df.columns = [str(c).strip() for c in df.columns] return df, "" def available_columns_text(df: pd.DataFrame) -> str: lines = ["Available columns:"] for i, c in enumerate(df.columns, start=1): lines.append(f"{i}. {c}") return "\n".join(lines) def parse_rank(value) -> Optional[int]: if pd.isna(value): return None s = str(value) m = re.search(r'(\d+)', s) if m: try: return int(m.group(1)) except ValueError: return None return None def auto_map_columns(df: pd.DataFrame, selected_wards: List[str]) -> Dict[str, int]: cols = list(df.columns) col_lower = [c.lower() for c in cols] result: Dict[str, int] = {} def find_by_keywords(keywords: List[str]) -> Optional[int]: for kw in keywords: kw_low = kw.lower() for idx, c_low in enumerate(col_lower): if kw_low in c_low: return idx + 1 if HAS_FUZZ: best_idx = None best_score = -1 for idx, c in enumerate(cols): for kw in keywords: match = rf_process.extractOne(kw, [c], score_cutoff=85) if match: _, score, _ = match if score > best_score: best_score = score best_idx = idx + 1 if best_idx is not None: return best_idx return None n_idx = find_by_keywords(AUTO_MAP["NAME"]) if n_idx: result["NAME"] = n_idx i_idx = find_by_keywords(AUTO_MAP["ID"]) if i_idx: result["ID"] = i_idx for w in selected_wards: kws = AUTO_MAP.get(w, [w]) w_idx = find_by_keywords(kws) if w_idx: result[w] = w_idx return result def build_cleaned_from_indices(df: pd.DataFrame, mapping_indices: Dict[str, int]) -> pd.DataFrame: def idx_to_name(k: str) -> str: idx = mapping_indices.get(k, None) if idx is None: return "" if not (1 <= idx <= len(df.columns)): return "" return df.columns[idx - 1] name_col = idx_to_name("NAME") id_col = idx_to_name("ID") if not name_col or not id_col: missing = [] if not name_col: missing.append("NAME") if not id_col: missing.append("ID") raise ValueError(f"Missing required columns: {', '.join(missing)}") ward_cols_src, ward_cols_dst = [], [] for w, _th in WARD_CHOICES: if w in mapping_indices: c = idx_to_name(w) if c: ward_cols_src.append(c) ward_cols_dst.append(w) keep_cols = [name_col, id_col] + ward_cols_src cleaned = df[keep_cols].copy() rename_map = {name_col: "NAME", id_col: "ID"} rename_map.update({src: dst for src, dst in zip(ward_cols_src, ward_cols_dst)}) cleaned = cleaned.rename(columns=rename_map) for c in cleaned.columns: if c not in ("NAME", "ID"): cleaned[c] = cleaned[c].apply(parse_rank).astype("Int64") # Sort โดยใช้เฉพาะตัวเลขจาก ID digits = cleaned["ID"].astype(str).str.extract(r"(\d+)", expand=False) num_id = pd.to_numeric(digits, errors="coerce") cleaned = ( cleaned.assign(_num_id=num_id) .sort_values(by="_num_id", kind="mergesort", na_position="last") .drop(columns="_num_id") .reset_index(drop=True) ) return cleaned def max_rank_in(cleaned: pd.DataFrame) -> int: wards = [w for w in cleaned.columns if w not in ("NAME", "ID")] mr = 0 for w in wards: m = cleaned[w].max(skipna=True) if pd.notna(m): mr = max(mr, int(m)) return int(mr) # ===== Stepwise simulation & reports (random each round) ===== def simulate_stepwise_report(cleaned: pd.DataFrame, capacities: Dict[str, int]) -> str: """Round-by-round report: show rank r results, then randomly assign rank r, update remaining capacity, continue.""" wards = [w for w in cleaned.columns if w not in ("NAME", "ID")] total_students = len(cleaned) cap = {w: int(capacities.get(w, 0)) for w in wards} assigned = pd.Series(index=cleaned.index, data=False) # True if assigned already assigned_ward = pd.Series(index=cleaned.index, data="", dtype="object") mr = max_rank_in(cleaned) lines = [] lines.append(f"### Total Students (จำนวนนักศึกษาทั้งหมด): {total_students}") lines.append(f"### Total Capacity (ความจุรวม): {sum(cap.values())}") lines.append("") for r in range(1, mr + 1): lines.append("\n---\n") lines.append(f"## Rank {r} Results (การแสดงผลอันดับที่ {r})\n") header = "| Ward (วอร์ด) | Remaining Capacity (ความจุคงเหลือ) | Rank {r} Count (จำนวนเลือกอันดับ {r}) | Students (รายชื่อนักศึกษา) |".format(r=r) sep = "|---|---:|---:|---|" lines += [header, sep] # Show BEFORE assignment for w in wards: names = cleaned.loc[(~assigned) & (cleaned[w] == r), "NAME"].astype(str).tolist() cnt = len(names) sample = ", ".join(names[:3]) + ("..." if cnt > 3 else "") lines.append(f"| {ward_display(w)} | {cap[w]} | {cnt} | {sample} |") # Now perform random assignment at this rank lines.append("") lines.append(f"### Allocation at Rank {r} (การสุ่มจัดสรรในอันดับที่ {r})") for w in wards: candidates_idx = cleaned.index[(~assigned) & (cleaned[w] == r)].tolist() if not candidates_idx or cap[w] <= 0: lines.append(f"- {ward_display(w)}: No allocation (ไม่มีการจัดสรร)") continue if len(candidates_idx) <= cap[w]: chosen = candidates_idx else: chosen = list(np.random.choice(candidates_idx, size=cap[w], replace=False)) assigned.loc[chosen] = True assigned_ward.loc[chosen] = w cap[w] -= len(chosen) chosen_names = cleaned.loc[chosen, "NAME"].astype(str).tolist() sample = ", ".join(chosen_names[:10]) + ("..." if len(chosen_names) > 10 else "") lines.append(f"- {ward_display(w)} : {len(chosen_names)} | {sample}") # After assignment at this rank lines.append("") lines.append("**Remaining capacity (จำนวนรับที่เหลือหลังรอบนี้):**") for w in wards: lines.append(f"- {ward_display(w)}: {cap[w]}") # Final summary lines.append("\n---\n") lines.append("## Final Summary (สรุปสุดท้าย)") for w in wards: sel_names = assigned_ward[assigned_ward == w] lines.append(f"- {ward_display(w)}: {len(sel_names)} assigned (จัดสรรแล้ว)") unassigned = cleaned.loc[~assigned, "NAME"].astype(str).tolist() lines.append(f"- Not assigned (ยังไม่ได้รับการจัดสรร): {len(unassigned)}") if unassigned: sample_un = ", ".join(unassigned[:15]) + ("..." if len(unassigned) > 15 else "") lines.append(f" - {sample_un}") return "\n".join(lines) def random_assign(cleaned: pd.DataFrame, capacities: Dict[str, int]) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, int]]: """Full allocation using the same stepwise logic (for CSV outputs).""" wards = [w for w in cleaned.columns if w not in ("NAME", "ID")] cap = {w: int(capacities.get(w, 0)) for w in wards} assigned = pd.Series(index=cleaned.index, data=pd.NA, dtype="object") choice_no = pd.Series(index=cleaned.index, data=pd.NA, dtype="Int64") mr = max_rank_in(cleaned) for r in range(1, mr + 1): if all(c <= 0 for c in cap.values()): break for w in wards: if cap[w] <= 0: continue mask = (assigned.isna()) & (cleaned[w] == r) candidates = cleaned.index[mask].tolist() if not candidates: continue if len(candidates) <= cap[w]: pick = candidates else: pick = list(np.random.choice(candidates, size=cap[w], replace=False)) assigned.loc[pick] = w choice_no.loc[pick] = r cap[w] -= len(pick) result = cleaned.copy() result["AssignedWard"] = assigned result["ChoiceNumber"] = choice_no not_assigned = result[result["AssignedWard"].isna()].copy() return result.fillna(""), not_assigned.fillna(""), cap # ===== Helpers for temp file paths ===== def _tmp(name: str) -> str: os.makedirs("/tmp", exist_ok=True) return f"/tmp/{uuid.uuid4().hex}-{name}" # ===== Gradio callbacks ===== def update_capacity_table(selected_wards: List[str]) -> pd.DataFrame: rows = [] for w, th in WARD_CHOICES: if selected_wards and w in selected_wards: rows.append([w, th, 0]) return pd.DataFrame(rows, columns=["Ward", "Thai Name", "Capacity"]) def on_upload(file, selected_wards): df, msg = read_table(file) if df is None: return gr.update(value=msg, visible=True), "", None, None, None, None, None, None, None, None, None, None avail = available_columns_text(df) auto_idx = auto_map_columns(df, selected_wards or []) def idx_or_none(key): return int(auto_idx[key]) if key in auto_idx else None name_num = idx_or_none("NAME") id_num = idx_or_none("ID") med_num = idx_or_none("Medical") med1_num = idx_or_none("Medical_1") med2_num = idx_or_none("Medical_2") surg_num = idx_or_none("Surgical") ped_num = idx_or_none("Pediatric") comm_num = idx_or_none("Community") psy_num = idx_or_none("Psychiatric") obs_num = idx_or_none("Obstetrics") return (gr.update(value="✓ File loaded", visible=True), avail, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num) def collect_mapping_numbers(name_num, id_num, ward_nums, selected_wards, n_cols): errors = [] mapping = {} def valid(num, label): if num is None: errors.append(f"- Please enter column number for {label}") return None try: num = int(num) except Exception: errors.append(f"- {label} must be a number") return None if not (1 <= num <= n_cols): errors.append(f"- {label} must be within 1–{n_cols}") return None return num nn = valid(name_num, "NAME") ii = valid(id_num, "ID") if nn: mapping["NAME"] = nn if ii: mapping["ID"] = ii for w in selected_wards: wn = valid(ward_nums.get(w, None), f"{w}") if wn: mapping[w] = wn return errors, mapping def on_clean(file, selected_wards, capacity_df, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num): if not selected_wards: return gr.update(value="Please select at least one ward.", visible=True), None, None, None df, msg = read_table(file) if df is None: return gr.update(value=msg, visible=True), None, None, None n_cols = len(df.columns) ward_nums = { "Medical": med_num, "Medical_1": med1_num, "Medical_2": med2_num, "Surgical": surg_num, "Pediatric": ped_num, "Community": comm_num, "Psychiatric": psy_num, "Obstetrics": obs_num } errors, mapping_idx = collect_mapping_numbers(name_num, id_num, ward_nums, selected_wards, n_cols) if errors: return gr.update(value="❌ Mapping invalid:\n" + "\n".join(errors), visible=True), None, None, None try: cleaned = build_cleaned_from_indices(df, mapping_idx) except Exception as e: return gr.update(value=f"❌ Error: {e}", visible=True), None, None, None cleaned_path = _tmp("cleaned.csv") cleaned.to_csv(cleaned_path, index=False, encoding="utf-8-sig") info = "✓ Cleaning completed" return gr.update(value=info, visible=True), cleaned.head(30), cleaned_path, len(cleaned) def _capacities_from_df(cleaned: pd.DataFrame, capacity_df: Optional[pd.DataFrame]) -> Dict[str, int]: if capacity_df is None or capacity_df.empty: return {w: 0 for w in cleaned.columns if w not in ("NAME", "ID")} cap_df = capacity_df.copy() cap_df.columns = ["Ward", "Thai Name", "Capacity"] cap_df = cap_df[cap_df["Ward"].isin([c for c in cleaned.columns if c not in ("NAME", "ID")])] capacities = {} for _, row in cap_df.iterrows(): try: capacities[str(row["Ward"])] = int(row["Capacity"]) except Exception: capacities[str(row["Ward"])] = 0 return capacities def on_assign(file, selected_wards, capacity_df, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num): status, cleaned_preview, cleaned_file, n_students = on_clean(file, selected_wards, capacity_df, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num) if cleaned_preview is None: return status, None, None, None, None, None df, _ = read_table(file) n_cols = len(df.columns) ward_nums = { "Medical": med_num, "Medical_1": med1_num, "Medical_2": med2_num, "Surgical": surg_num, "Pediatric": ped_num, "Community": comm_num, "Psychiatric": psy_num, "Obstetrics": obs_num } _errors, mapping_idx = collect_mapping_numbers(name_num, id_num, ward_nums, selected_wards, n_cols) cleaned = build_cleaned_from_indices(df, mapping_idx) capacities = _capacities_from_df(cleaned, capacity_df) total_capacity = sum(capacities.values()) if n_students is None: n_students = len(cleaned) if n_students > total_capacity: msg = f"❌ Students {n_students} > total capacity {total_capacity} (shortage allowed, not exceed)" return gr.update(value=msg, visible=True), None, None, None, None, None assigned, not_assigned, leftover = random_assign(cleaned, capacities) assigned_path = _tmp("assigned.csv") not_assigned_path = _tmp("not_assigned.csv") assigned.to_csv(assigned_path, index=False, encoding="utf-8-sig") not_assigned.to_csv(not_assigned_path, index=False, encoding="utf-8-sig") leftover_text = "Remaining capacity (จำนวนรับที่เหลือ):\n" + "\n".join([f"- {ward_display(k)}: {v}" for k, v in leftover.items()]) stepwise_md = simulate_stepwise_report(cleaned, capacities) return status, assigned.head(30), assigned_path, not_assigned_path, leftover_text, stepwise_md with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# {APP_TITLE}") gr.Markdown(DESCRIPTION) with gr.Row(): file = gr.File(file_count="single", file_types=[".csv", ".xlsx"], label="Upload data (.csv/.xlsx)") with gr.Accordion("1) Select wards (เลือกวอร์ด)", open=True): selected_wards = gr.CheckboxGroup( choices=[w for w, _ in WARD_CHOICES], label="Select wards (เลือกได้หลายข้อ)", value=["Medical", "Surgical", "Pediatric", "Community", "Psychiatric", "Obstetrics"] ) gr.Markdown("Legend: " + ", ".join([f"**{w}** = {ward_display(w)}" for w, _ in WARD_CHOICES])) with gr.Accordion("2) Set capacity per ward (กำหนดความจุต่อวอร์ด)", open=True): capacity_df = gr.Dataframe( headers=["Ward", "Thai Name", "Capacity"], value=[], row_count=(0, "dynamic"), col_count=3, interactive=True, wrap=True, label="Fill only selected wards" ) selected_wards.change(fn=update_capacity_table, inputs=selected_wards, outputs=capacity_df) with gr.Accordion("3) Column headers & mapping (หัวคอลัมน์และการจับคู่)", open=True): status = gr.Markdown(visible=False) available = gr.Code(label="Available columns (index starts at 1)", language="markdown", interactive=False) auto_btn = gr.Button("Read & Auto-detect mapping") name_num = gr.Number(label="Column number for NAME", precision=0) id_num = gr.Number(label="Column number for ID", precision=0) with gr.Row(): med_num = gr.Number(label="Column number Medical", precision=0) med1_num = gr.Number(label="Column number Medical_1", precision=0) med2_num = gr.Number(label="Column number Medical_2", precision=0) with gr.Row(): surg_num = gr.Number(label="Column number Surgical", precision=0) ped_num = gr.Number(label="Column number Pediatric", precision=0) comm_num = gr.Number(label="Column number Community", precision=0) with gr.Row(): psy_num = gr.Number(label="Column number Psychiatric", precision=0) obs_num = gr.Number(label="Column number Obstetrics", precision=0) auto_btn.click(fn=on_upload, inputs=[file, selected_wards], outputs=[status, available, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num]) # >>> Moved CLEAN button here (before reports) <<< clean_btn = gr.Button("Clean data (ดูพรีวิว)", variant="secondary") preview = gr.Dataframe(label="Cleaned preview (first 30 rows)", visible=True) cleaned_file = gr.File(label="Download cleaned.csv") clean_btn.click( fn=on_clean, inputs=[file, selected_wards, capacity_df, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num], outputs=[status, preview, cleaned_file, gr.State()] ) assign_btn = gr.Button("Assign (สุ่มตามลำดับอันดับ)", variant="primary") assigned_preview = gr.Dataframe(label="Assigned preview (first 30 rows)") assigned_file = gr.File(label="Download assigned.csv") not_assigned_file = gr.File(label="Download not_assigned.csv") leftover_text = gr.Textbox(label="Remaining capacity summary", interactive=False) allocation_report = gr.Markdown(label="Stepwise Report (ผลการสุ่มรอบต่อรอบ)") assign_btn.click( fn=on_assign, inputs=[file, selected_wards, capacity_df, name_num, id_num, med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num], outputs=[status, assigned_preview, assigned_file, not_assigned_file, leftover_text, allocation_report] ) if __name__ == "__main__": demo.launch()