ShinyaJ's picture
Upload app.py
e423362 verified
import gradio as gr
import pandas as pd
import numpy as np
import re
import os
import uuid
from typing import List, Dict, Tuple, Optional
try:
from rapidfuzz import process as rf_process
HAS_FUZZ = True
except Exception:
HAS_FUZZ = False
APP_TITLE = "Ward Ranking Random Assigner"
DESCRIPTION = """
**Flow**
1) Upload .csv/.xlsx
2) Choose wards + set capacity
3) Check Available columns
4) Map by Auto-detect (Thai/English + fuzzy) or by numbers (1-based)
5) **Clean** → keep NAME/ID + selected wards; convert ranks to integers
6) **Show Stepwise Report** → Rank 1 results → random assign rank 1 → Rank 2 results → random assign … until done
7) (Optional) Assign button runs the full allocation too and provides CSVs to download
"""
WARD_CHOICES = [
("Medical", "อายุรศาสตร์ ภาคปกติ"),
("Medical_1", "อายุรศาสตร์_1"),
("Medical_2", "อายุรศาสตร์_2"),
("Surgical", "ศัลยศาสตร์"),
("Pediatric", "เด็ก"),
("Community", "ชุมชน"),
("Psychiatric", "จิตเวช"),
("Obstetrics", "สูติศาสตร์"),
]
# ===== Display labels (English-first with Thai in parentheses) =====
WARD_LABELS = {
"Medical": ("Internal Medicine", "อายุรศาสตร์"),
"Medical_1": ("Internal Medicine 1", "อายุรศาสตร์_1"),
"Medical_2": ("Internal Medicine 2", "อายุรศาสตร์_2"),
"Surgical": ("Surgery", "ศัลยศาสตร์"),
"Pediatric": ("Pediatrics", "เด็ก"),
"Community": ("Community Health", "ชุมชน"),
"Psychiatric": ("Psychiatry", "จิตเวช"),
"Obstetrics": ("Obstetrics", "สูติศาสตร์"),
}
def ward_display(ward_key: str) -> str:
en, th = WARD_LABELS.get(ward_key, (ward_key, ward_key))
return f"{en} ({th})"
# Keyword dictionary for auto mapping
AUTO_MAP = {
"NAME": ["ชื่อ-สกุล", "ชื่อ - สกุล", "fullname", "full name", "name", "student name"],
"ID": ["รหัสนักศึกษา", "รหัส", "student id", "id", "studentid"],
"Medical": ["อายุรศาสตร์", "medical"],
"Medical_1": ["อายุรศาสตร์_1", "medical_1", "med_1", "med1"],
"Medical_2": ["อายุรศาสตร์_2", "medical_2", "med_2", "med2"],
"Surgical": ["ศัลยศาสตร์", "surgical", "surgery","surg"],
"Pediatric": ["เด็ก", "pediatric", "pediatrics"],
"Community": ["ชุมชน", "community"],
"Psychiatric": ["จิตเวช", "psychiatric"],
"Obstetrics": ["สูติศาสตร์", "obstetrics", "obgyn", "ob/gyn"],
}
def read_table(file) -> Tuple[Optional[pd.DataFrame], str]:
if file is None:
return None, "Please upload a file (.csv or .xlsx)"
name = file.name.lower() if hasattr(file, "name") else ""
try:
if name.endswith(".csv"):
df = pd.read_csv(file.name if hasattr(file, "name") else file)
elif name.endswith(".xlsx"):
df = pd.read_excel(file.name if hasattr(file, "name") else file)
else:
try:
df = pd.read_csv(file)
except Exception:
return None, "Only .csv or .xlsx are supported"
except Exception as e:
return None, f"Failed to read file: {e}"
df.columns = [str(c).strip() for c in df.columns]
return df, ""
def available_columns_text(df: pd.DataFrame) -> str:
lines = ["Available columns:"]
for i, c in enumerate(df.columns, start=1):
lines.append(f"{i}. {c}")
return "\n".join(lines)
def parse_rank(value) -> Optional[int]:
if pd.isna(value):
return None
s = str(value)
m = re.search(r'(\d+)', s)
if m:
try:
return int(m.group(1))
except ValueError:
return None
return None
def auto_map_columns(df: pd.DataFrame, selected_wards: List[str]) -> Dict[str, int]:
cols = list(df.columns)
col_lower = [c.lower() for c in cols]
result: Dict[str, int] = {}
def find_by_keywords(keywords: List[str]) -> Optional[int]:
for kw in keywords:
kw_low = kw.lower()
for idx, c_low in enumerate(col_lower):
if kw_low in c_low:
return idx + 1
if HAS_FUZZ:
best_idx = None
best_score = -1
for idx, c in enumerate(cols):
for kw in keywords:
match = rf_process.extractOne(kw, [c], score_cutoff=85)
if match:
_, score, _ = match
if score > best_score:
best_score = score
best_idx = idx + 1
if best_idx is not None:
return best_idx
return None
n_idx = find_by_keywords(AUTO_MAP["NAME"])
if n_idx: result["NAME"] = n_idx
i_idx = find_by_keywords(AUTO_MAP["ID"])
if i_idx: result["ID"] = i_idx
for w in selected_wards:
kws = AUTO_MAP.get(w, [w])
w_idx = find_by_keywords(kws)
if w_idx:
result[w] = w_idx
return result
def build_cleaned_from_indices(df: pd.DataFrame,
mapping_indices: Dict[str, int]) -> pd.DataFrame:
def idx_to_name(k: str) -> str:
idx = mapping_indices.get(k, None)
if idx is None: return ""
if not (1 <= idx <= len(df.columns)): return ""
return df.columns[idx - 1]
name_col = idx_to_name("NAME")
id_col = idx_to_name("ID")
if not name_col or not id_col:
missing = []
if not name_col: missing.append("NAME")
if not id_col: missing.append("ID")
raise ValueError(f"Missing required columns: {', '.join(missing)}")
ward_cols_src, ward_cols_dst = [], []
for w, _th in WARD_CHOICES:
if w in mapping_indices:
c = idx_to_name(w)
if c:
ward_cols_src.append(c)
ward_cols_dst.append(w)
keep_cols = [name_col, id_col] + ward_cols_src
cleaned = df[keep_cols].copy()
rename_map = {name_col: "NAME", id_col: "ID"}
rename_map.update({src: dst for src, dst in zip(ward_cols_src, ward_cols_dst)})
cleaned = cleaned.rename(columns=rename_map)
for c in cleaned.columns:
if c not in ("NAME", "ID"):
cleaned[c] = cleaned[c].apply(parse_rank).astype("Int64")
# Sort โดยใช้เฉพาะตัวเลขจาก ID
digits = cleaned["ID"].astype(str).str.extract(r"(\d+)", expand=False)
num_id = pd.to_numeric(digits, errors="coerce")
cleaned = (
cleaned.assign(_num_id=num_id)
.sort_values(by="_num_id", kind="mergesort", na_position="last")
.drop(columns="_num_id")
.reset_index(drop=True)
)
return cleaned
def max_rank_in(cleaned: pd.DataFrame) -> int:
wards = [w for w in cleaned.columns if w not in ("NAME", "ID")]
mr = 0
for w in wards:
m = cleaned[w].max(skipna=True)
if pd.notna(m):
mr = max(mr, int(m))
return int(mr)
# ===== Stepwise simulation & reports (random each round) =====
def simulate_stepwise_report(cleaned: pd.DataFrame, capacities: Dict[str, int]) -> str:
"""Round-by-round report: show rank r results, then randomly assign rank r, update remaining capacity, continue."""
wards = [w for w in cleaned.columns if w not in ("NAME", "ID")]
total_students = len(cleaned)
cap = {w: int(capacities.get(w, 0)) for w in wards}
assigned = pd.Series(index=cleaned.index, data=False) # True if assigned already
assigned_ward = pd.Series(index=cleaned.index, data="", dtype="object")
mr = max_rank_in(cleaned)
lines = []
lines.append(f"### Total Students (จำนวนนักศึกษาทั้งหมด): {total_students}")
lines.append(f"### Total Capacity (ความจุรวม): {sum(cap.values())}")
lines.append("")
for r in range(1, mr + 1):
lines.append("\n---\n")
lines.append(f"## Rank {r} Results (การแสดงผลอันดับที่ {r})\n")
header = "| Ward (วอร์ด) | Remaining Capacity (ความจุคงเหลือ) | Rank {r} Count (จำนวนเลือกอันดับ {r}) | Students (รายชื่อนักศึกษา) |".format(r=r)
sep = "|---|---:|---:|---|"
lines += [header, sep]
# Show BEFORE assignment
for w in wards:
names = cleaned.loc[(~assigned) & (cleaned[w] == r), "NAME"].astype(str).tolist()
cnt = len(names)
sample = ", ".join(names[:3]) + ("..." if cnt > 3 else "")
lines.append(f"| {ward_display(w)} | {cap[w]} | {cnt} | {sample} |")
# Now perform random assignment at this rank
lines.append("")
lines.append(f"### Allocation at Rank {r} (การสุ่มจัดสรรในอันดับที่ {r})")
for w in wards:
candidates_idx = cleaned.index[(~assigned) & (cleaned[w] == r)].tolist()
if not candidates_idx or cap[w] <= 0:
lines.append(f"- {ward_display(w)}: No allocation (ไม่มีการจัดสรร)")
continue
if len(candidates_idx) <= cap[w]:
chosen = candidates_idx
else:
chosen = list(np.random.choice(candidates_idx, size=cap[w], replace=False))
assigned.loc[chosen] = True
assigned_ward.loc[chosen] = w
cap[w] -= len(chosen)
chosen_names = cleaned.loc[chosen, "NAME"].astype(str).tolist()
sample = ", ".join(chosen_names[:10]) + ("..." if len(chosen_names) > 10 else "")
lines.append(f"- {ward_display(w)} : {len(chosen_names)} | {sample}")
# After assignment at this rank
lines.append("")
lines.append("**Remaining capacity (จำนวนรับที่เหลือหลังรอบนี้):**")
for w in wards:
lines.append(f"- {ward_display(w)}: {cap[w]}")
# Final summary
lines.append("\n---\n")
lines.append("## Final Summary (สรุปสุดท้าย)")
for w in wards:
sel_names = assigned_ward[assigned_ward == w]
lines.append(f"- {ward_display(w)}: {len(sel_names)} assigned (จัดสรรแล้ว)")
unassigned = cleaned.loc[~assigned, "NAME"].astype(str).tolist()
lines.append(f"- Not assigned (ยังไม่ได้รับการจัดสรร): {len(unassigned)}")
if unassigned:
sample_un = ", ".join(unassigned[:15]) + ("..." if len(unassigned) > 15 else "")
lines.append(f" - {sample_un}")
return "\n".join(lines)
def random_assign(cleaned: pd.DataFrame,
capacities: Dict[str, int]) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, int]]:
"""Full allocation using the same stepwise logic (for CSV outputs)."""
wards = [w for w in cleaned.columns if w not in ("NAME", "ID")]
cap = {w: int(capacities.get(w, 0)) for w in wards}
assigned = pd.Series(index=cleaned.index, data=pd.NA, dtype="object")
choice_no = pd.Series(index=cleaned.index, data=pd.NA, dtype="Int64")
mr = max_rank_in(cleaned)
for r in range(1, mr + 1):
if all(c <= 0 for c in cap.values()):
break
for w in wards:
if cap[w] <= 0:
continue
mask = (assigned.isna()) & (cleaned[w] == r)
candidates = cleaned.index[mask].tolist()
if not candidates:
continue
if len(candidates) <= cap[w]:
pick = candidates
else:
pick = list(np.random.choice(candidates, size=cap[w], replace=False))
assigned.loc[pick] = w
choice_no.loc[pick] = r
cap[w] -= len(pick)
result = cleaned.copy()
result["AssignedWard"] = assigned
result["ChoiceNumber"] = choice_no
not_assigned = result[result["AssignedWard"].isna()].copy()
return result.fillna(""), not_assigned.fillna(""), cap
# ===== Helpers for temp file paths =====
def _tmp(name: str) -> str:
os.makedirs("/tmp", exist_ok=True)
return f"/tmp/{uuid.uuid4().hex}-{name}"
# ===== Gradio callbacks =====
def update_capacity_table(selected_wards: List[str]) -> pd.DataFrame:
rows = []
for w, th in WARD_CHOICES:
if selected_wards and w in selected_wards:
rows.append([w, th, 0])
return pd.DataFrame(rows, columns=["Ward", "Thai Name", "Capacity"])
def on_upload(file, selected_wards):
df, msg = read_table(file)
if df is None:
return gr.update(value=msg, visible=True), "", None, None, None, None, None, None, None, None, None, None
avail = available_columns_text(df)
auto_idx = auto_map_columns(df, selected_wards or [])
def idx_or_none(key):
return int(auto_idx[key]) if key in auto_idx else None
name_num = idx_or_none("NAME")
id_num = idx_or_none("ID")
med_num = idx_or_none("Medical")
med1_num = idx_or_none("Medical_1")
med2_num = idx_or_none("Medical_2")
surg_num = idx_or_none("Surgical")
ped_num = idx_or_none("Pediatric")
comm_num = idx_or_none("Community")
psy_num = idx_or_none("Psychiatric")
obs_num = idx_or_none("Obstetrics")
return (gr.update(value="✓ File loaded", visible=True), avail, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num)
def collect_mapping_numbers(name_num, id_num, ward_nums, selected_wards, n_cols):
errors = []
mapping = {}
def valid(num, label):
if num is None:
errors.append(f"- Please enter column number for {label}")
return None
try:
num = int(num)
except Exception:
errors.append(f"- {label} must be a number")
return None
if not (1 <= num <= n_cols):
errors.append(f"- {label} must be within 1–{n_cols}")
return None
return num
nn = valid(name_num, "NAME")
ii = valid(id_num, "ID")
if nn: mapping["NAME"] = nn
if ii: mapping["ID"] = ii
for w in selected_wards:
wn = valid(ward_nums.get(w, None), f"{w}")
if wn:
mapping[w] = wn
return errors, mapping
def on_clean(file, selected_wards, capacity_df, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num):
if not selected_wards:
return gr.update(value="Please select at least one ward.", visible=True), None, None, None
df, msg = read_table(file)
if df is None:
return gr.update(value=msg, visible=True), None, None, None
n_cols = len(df.columns)
ward_nums = {
"Medical": med_num, "Medical_1": med1_num, "Medical_2": med2_num,
"Surgical": surg_num, "Pediatric": ped_num, "Community": comm_num,
"Psychiatric": psy_num, "Obstetrics": obs_num
}
errors, mapping_idx = collect_mapping_numbers(name_num, id_num, ward_nums, selected_wards, n_cols)
if errors:
return gr.update(value="❌ Mapping invalid:\n" + "\n".join(errors), visible=True), None, None, None
try:
cleaned = build_cleaned_from_indices(df, mapping_idx)
except Exception as e:
return gr.update(value=f"❌ Error: {e}", visible=True), None, None, None
cleaned_path = _tmp("cleaned.csv")
cleaned.to_csv(cleaned_path, index=False, encoding="utf-8-sig")
info = "✓ Cleaning completed"
return gr.update(value=info, visible=True), cleaned.head(30), cleaned_path, len(cleaned)
def _capacities_from_df(cleaned: pd.DataFrame, capacity_df: Optional[pd.DataFrame]) -> Dict[str, int]:
if capacity_df is None or capacity_df.empty:
return {w: 0 for w in cleaned.columns if w not in ("NAME", "ID")}
cap_df = capacity_df.copy()
cap_df.columns = ["Ward", "Thai Name", "Capacity"]
cap_df = cap_df[cap_df["Ward"].isin([c for c in cleaned.columns if c not in ("NAME", "ID")])]
capacities = {}
for _, row in cap_df.iterrows():
try:
capacities[str(row["Ward"])] = int(row["Capacity"])
except Exception:
capacities[str(row["Ward"])] = 0
return capacities
def on_assign(file, selected_wards, capacity_df, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num):
status, cleaned_preview, cleaned_file, n_students = on_clean(file, selected_wards, capacity_df, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num)
if cleaned_preview is None:
return status, None, None, None, None, None
df, _ = read_table(file)
n_cols = len(df.columns)
ward_nums = {
"Medical": med_num, "Medical_1": med1_num, "Medical_2": med2_num,
"Surgical": surg_num, "Pediatric": ped_num, "Community": comm_num,
"Psychiatric": psy_num, "Obstetrics": obs_num
}
_errors, mapping_idx = collect_mapping_numbers(name_num, id_num, ward_nums, selected_wards, n_cols)
cleaned = build_cleaned_from_indices(df, mapping_idx)
capacities = _capacities_from_df(cleaned, capacity_df)
total_capacity = sum(capacities.values())
if n_students is None:
n_students = len(cleaned)
if n_students > total_capacity:
msg = f"❌ Students {n_students} > total capacity {total_capacity} (shortage allowed, not exceed)"
return gr.update(value=msg, visible=True), None, None, None, None, None
assigned, not_assigned, leftover = random_assign(cleaned, capacities)
assigned_path = _tmp("assigned.csv")
not_assigned_path = _tmp("not_assigned.csv")
assigned.to_csv(assigned_path, index=False, encoding="utf-8-sig")
not_assigned.to_csv(not_assigned_path, index=False, encoding="utf-8-sig")
leftover_text = "Remaining capacity (จำนวนรับที่เหลือ):\n" + "\n".join([f"- {ward_display(k)}: {v}" for k, v in leftover.items()])
stepwise_md = simulate_stepwise_report(cleaned, capacities)
return status, assigned.head(30), assigned_path, not_assigned_path, leftover_text, stepwise_md
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# {APP_TITLE}")
gr.Markdown(DESCRIPTION)
with gr.Row():
file = gr.File(file_count="single", file_types=[".csv", ".xlsx"], label="Upload data (.csv/.xlsx)")
with gr.Accordion("1) Select wards (เลือกวอร์ด)", open=True):
selected_wards = gr.CheckboxGroup(
choices=[w for w, _ in WARD_CHOICES],
label="Select wards (เลือกได้หลายข้อ)",
value=["Medical", "Surgical", "Pediatric", "Community", "Psychiatric", "Obstetrics"]
)
gr.Markdown("Legend: " + ", ".join([f"**{w}** = {ward_display(w)}" for w, _ in WARD_CHOICES]))
with gr.Accordion("2) Set capacity per ward (กำหนดความจุต่อวอร์ด)", open=True):
capacity_df = gr.Dataframe(
headers=["Ward", "Thai Name", "Capacity"],
value=[],
row_count=(0, "dynamic"),
col_count=3,
interactive=True,
wrap=True,
label="Fill only selected wards"
)
selected_wards.change(fn=update_capacity_table, inputs=selected_wards, outputs=capacity_df)
with gr.Accordion("3) Column headers & mapping (หัวคอลัมน์และการจับคู่)", open=True):
status = gr.Markdown(visible=False)
available = gr.Code(label="Available columns (index starts at 1)", language="markdown", interactive=False)
auto_btn = gr.Button("Read & Auto-detect mapping")
name_num = gr.Number(label="Column number for NAME", precision=0)
id_num = gr.Number(label="Column number for ID", precision=0)
with gr.Row():
med_num = gr.Number(label="Column number Medical", precision=0)
med1_num = gr.Number(label="Column number Medical_1", precision=0)
med2_num = gr.Number(label="Column number Medical_2", precision=0)
with gr.Row():
surg_num = gr.Number(label="Column number Surgical", precision=0)
ped_num = gr.Number(label="Column number Pediatric", precision=0)
comm_num = gr.Number(label="Column number Community", precision=0)
with gr.Row():
psy_num = gr.Number(label="Column number Psychiatric", precision=0)
obs_num = gr.Number(label="Column number Obstetrics", precision=0)
auto_btn.click(fn=on_upload, inputs=[file, selected_wards],
outputs=[status, available, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num])
# >>> Moved CLEAN button here (before reports) <<<
clean_btn = gr.Button("Clean data (ดูพรีวิว)", variant="secondary")
preview = gr.Dataframe(label="Cleaned preview (first 30 rows)", visible=True)
cleaned_file = gr.File(label="Download cleaned.csv")
clean_btn.click(
fn=on_clean,
inputs=[file, selected_wards, capacity_df, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num],
outputs=[status, preview, cleaned_file, gr.State()]
)
assign_btn = gr.Button("Assign (สุ่มตามลำดับอันดับ)", variant="primary")
assigned_preview = gr.Dataframe(label="Assigned preview (first 30 rows)")
assigned_file = gr.File(label="Download assigned.csv")
not_assigned_file = gr.File(label="Download not_assigned.csv")
leftover_text = gr.Textbox(label="Remaining capacity summary", interactive=False)
allocation_report = gr.Markdown(label="Stepwise Report (ผลการสุ่มรอบต่อรอบ)")
assign_btn.click(
fn=on_assign,
inputs=[file, selected_wards, capacity_df, name_num, id_num,
med_num, med1_num, med2_num, surg_num, ped_num, comm_num, psy_num, obs_num],
outputs=[status, assigned_preview, assigned_file, not_assigned_file, leftover_text, allocation_report]
)
if __name__ == "__main__":
demo.launch()