Spaces:
Sleeping
Sleeping
| import os, re, math, io | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| from PIL import Image | |
| # Force matplotlib non-GUI backend (HF friendly) | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from mpmath import gammainc, gamma # pure python for chi-square p-value (optional) | |
| # ============================================================ | |
| # CONFIG | |
| # ============================================================ | |
| DATA_PATH = os.getenv("IPLM_DATA_PATH", "IPLM_clean_manual_131225.xlsx") | |
| EXCLUDE_COLS_EXACT = {"kontak_wa", "npp", "tanggal_kirim", "updated_at", "created_at"} | |
| BENFORD_P = np.array([math.log10(1 + 1/d) for d in range(1, 10)]) | |
| BENFORD_EXCLUDE_PATTERNS = [ | |
| r"\bid\b", r"\bid_", r"_id\b", | |
| r"\bkode\b", r"\bcode\b", | |
| r"\bnpsn\b", r"\bnik\b", r"\bnpwp\b", | |
| r"\bkontak\b", r"\bwa\b", r"\bwhatsapp\b", r"\btelepon\b", r"\bphone\b", r"\bnohp\b", | |
| r"\btanggal\b", r"\bdate\b", | |
| r"\bwaktu\b", r"\btime\b", r"\bjam\b", | |
| r"\bcreated\b", r"\bupdated\b", r"\bmodified\b", | |
| r"\bsubmit\b", r"\bkirim\b", | |
| r"\bmulai\b", r"\bselesai\b", | |
| r"\blastpage\b", r"\bpage\b", | |
| r"\bstatus\b", | |
| r"\bnpp\b", | |
| ] | |
| # ============================================================ | |
| # HELPERS | |
| # ============================================================ | |
| def canon(s: str) -> str: | |
| return re.sub(r"[^a-z0-9]+", "", str(s).lower()) | |
| def pick_col(df, candidates): | |
| cols = list(df.columns) | |
| cc = {canon(c): c for c in cols} | |
| for cand in candidates: | |
| k = canon(cand) | |
| if k in cc: | |
| return cc[k] | |
| for c in cols: | |
| kc = canon(c) | |
| for cand in candidates: | |
| if canon(cand) in kc: | |
| return c | |
| return None | |
| def detect_geo_cols(df): | |
| prov = pick_col(df, ["provinsi", "propinsi", "province"]) | |
| kab = pick_col(df, ["kab_kota", "kabkota", "kabupatenkota", "kabupaten/kota", "kabupaten", "kota", "regency", "city"]) | |
| return prov, kab | |
| def detect_kewenangan_col(df): | |
| return pick_col(df, ["kewenangan", "pu_level", "level_kewenangan", "kewenangan_pengelola", "kewenangan_perpustakaan", "level"]) | |
| def load_excel(path): | |
| df = pd.read_excel(path, engine="openpyxl") | |
| for c in df.columns: | |
| if df[c].dtype == object: | |
| df[c] = (df[c].astype(str) | |
| .str.replace("\u00a0", " ", regex=False) | |
| .str.replace(r"\s+", " ", regex=True) | |
| .str.strip()) | |
| df.loc[df[c].str.lower().isin(["nan", "none", "null", ""]), c] = np.nan | |
| return df | |
| def clean_str_list(values): | |
| out = [] | |
| for v in values: | |
| if v is None: | |
| continue | |
| s = str(v).strip() | |
| if s == "" or s.lower() in ["nan", "none", "null"]: | |
| continue | |
| out.append(s) | |
| seen, uniq = set(), [] | |
| for s in out: | |
| if s not in seen: | |
| uniq.append(s) | |
| seen.add(s) | |
| return uniq | |
| def safe_numeric_cols(df, exclude=set(), min_non_na=0.25): | |
| hard = {canon(x) for x in EXCLUDE_COLS_EXACT} | |
| cols = [] | |
| for c in df.columns: | |
| if c in exclude: | |
| continue | |
| if canon(c) in hard: | |
| continue | |
| s = pd.to_numeric(df[c], errors="coerce") | |
| if s.notna().mean() >= min_non_na and s.nunique(dropna=True) >= 3: | |
| cols.append(c) | |
| return cols | |
| def is_benford_applicable(colname: str) -> bool: | |
| if canon(colname) in {canon(x) for x in EXCLUDE_COLS_EXACT}: | |
| return False | |
| name = str(colname).lower() | |
| return not any(re.search(p, name) for p in BENFORD_EXCLUDE_PATTERNS) | |
| def leading_digit_series(x: pd.Series): | |
| x = pd.to_numeric(x, errors="coerce").replace([np.inf, -np.inf], np.nan).dropna() | |
| x = x[np.abs(x) > 0] | |
| if len(x) == 0: | |
| return None | |
| def first_digit(v): | |
| v = abs(float(v)) | |
| if v == 0: | |
| return np.nan | |
| while v < 1: | |
| v *= 10 | |
| return int(str(v).replace(".", "")[0]) | |
| digs = x.apply(first_digit).dropna().astype(int) | |
| digs = digs[(digs >= 1) & (digs <= 9)] | |
| return digs | |
| # --- chi-square p-value without scipy (optional) --- | |
| def chi2_sf(x, k): | |
| # survival function = 1 - CDF for chi-square(k) | |
| # CDF uses lower incomplete gamma. SF uses upper incomplete gamma. | |
| # SF = gammainc(k/2, x/2, inf) / gamma(k/2) | |
| a = k / 2.0 | |
| return float(gammainc(a, x/2.0, math.inf) / gamma(a)) | |
| def benford_stats(x: pd.Series, min_n=50, with_p=True): | |
| digs = leading_digit_series(x) | |
| if digs is None or len(digs) < min_n: | |
| return None | |
| obs = np.array([(digs == d).sum() for d in range(1, 10)], dtype=float) | |
| n = obs.sum() | |
| exp = BENFORD_P * n | |
| obs_p = obs / n | |
| mad = float(np.mean(np.abs(obs_p - BENFORD_P))) | |
| # chi-square | |
| chi2 = float(((obs - exp) ** 2 / np.where(exp == 0, 1.0, exp)).sum()) | |
| p = chi2_sf(chi2, 8) if with_p else None | |
| return {"n": int(n), "mad": mad, "obs": obs_p, "chi2": chi2, "p_value": p} | |
| def benford_flag(mad): | |
| if mad < 0.012: | |
| return "OK" | |
| if mad < 0.015: | |
| return "WASPADA" | |
| return "RED FLAG" | |
| def fig_to_pil(fig): | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png", dpi=160, bbox_inches="tight") | |
| plt.close(fig) | |
| buf.seek(0) | |
| return Image.open(buf).convert("RGBA") | |
| def benford_plot(obs_p): | |
| fig, ax = plt.subplots(figsize=(7, 3)) | |
| d = np.arange(1, 10) | |
| ax.bar(d - 0.2, BENFORD_P, width=0.4, label="Benford") | |
| ax.bar(d + 0.2, obs_p, width=0.4, label="Aktual") | |
| ax.set_xticks(d) | |
| ax.set_xlabel("Digit pertama") | |
| ax.set_ylabel("Proporsi") | |
| ax.legend() | |
| return fig_to_pil(fig) | |
| def scatter_plot(peer_agg, x_col, y_col): | |
| fig, ax = plt.subplots(figsize=(7, 3.5)) | |
| ax.scatter(peer_agg[x_col], peer_agg[y_col], s=18) | |
| ax.set_xlabel(x_col) | |
| ax.set_ylabel(y_col) | |
| ax.set_title("Peer Scatter (2 kolom paling variatif)") | |
| return fig_to_pil(fig) | |
| # ============================================================ | |
| # NUMPY-ONLY SIMILARITY (NO sklearn) | |
| # ============================================================ | |
| def standardize_matrix(X: np.ndarray) -> np.ndarray: | |
| mu = np.nanmean(X, axis=0) | |
| sd = np.nanstd(X, axis=0) | |
| sd = np.where(sd == 0, 1.0, sd) | |
| Z = (X - mu) / sd | |
| Z = np.nan_to_num(Z, nan=0.0, posinf=0.0, neginf=0.0) | |
| return Z | |
| def cosine_sim_row(Z: np.ndarray, idx: int) -> np.ndarray: | |
| v = Z[idx] | |
| v_norm = np.linalg.norm(v) | |
| if v_norm == 0: | |
| return np.zeros(Z.shape[0], dtype=float) | |
| norms = np.linalg.norm(Z, axis=1) | |
| norms = np.where(norms == 0, 1.0, norms) | |
| sims = (Z @ v) / (norms * v_norm) | |
| sims = np.clip(sims, -1.0, 1.0) | |
| return sims | |
| # ============================================================ | |
| # LOAD DATA ONCE | |
| # ============================================================ | |
| if not os.path.exists(DATA_PATH): | |
| raise FileNotFoundError(f"Data file not found: {DATA_PATH}. Pastikan file ada di repo, contoh: data/IPLM_clean_manual_131225.xlsx") | |
| df_raw = load_excel(DATA_PATH) | |
| prov_col, kab_col = detect_geo_cols(df_raw) | |
| kew_col = detect_kewenangan_col(df_raw) | |
| if prov_col is None or kab_col is None: | |
| raise ValueError("Kolom provinsi/kab_kota tidak terdeteksi. Pastikan ada kolom provinsi dan kab_kota.") | |
| df = df_raw.copy() | |
| df["_prov_str"] = df[prov_col].astype(str).str.strip() | |
| df["_kab_str"] = df[kab_col].astype(str).str.strip() | |
| df.loc[df["_prov_str"].str.lower().isin(["nan","none","null",""]), "_prov_str"] = np.nan | |
| df.loc[df["_kab_str"].str.lower().isin(["nan","none","null",""]), "_kab_str"] = np.nan | |
| df = df[df["_prov_str"].notna() & df["_kab_str"].notna()].copy() # prevent mixing | |
| exclude_base = {prov_col, kab_col, "_prov_str", "_kab_str"} | |
| hard_exclude_cols_in_file = {c for c in df.columns if canon(c) in {canon(x) for x in EXCLUDE_COLS_EXACT}} | |
| exclude_base = exclude_base.union(hard_exclude_cols_in_file) | |
| num_cols_all = safe_numeric_cols(df, exclude=exclude_base) | |
| benford_cols = [c for c in num_cols_all if is_benford_applicable(c)] | |
| PROVS = clean_str_list(df["_prov_str"].unique().tolist()) | |
| prov_cache_peer = {} | |
| def kabs_for_prov(pv): | |
| return clean_str_list(df.loc[df["_prov_str"] == pv, "_kab_str"].unique().tolist()) | |
| def kew_for(pv, kv): | |
| if not kew_col or kew_col not in df.columns: | |
| return ["(kewenangan tidak tersedia)"] | |
| vals = clean_str_list(df.loc[(df["_prov_str"] == pv) & (df["_kab_str"] == kv), kew_col].dropna().unique().tolist()) | |
| return vals if vals else ["(kewenangan kosong)"] | |
| def get_peer_agg_for_prov(pv): | |
| if pv in prov_cache_peer: | |
| return prov_cache_peer[pv] | |
| peer = df[df["_prov_str"] == pv] | |
| peer_agg = peer.groupby("_kab_str")[num_cols_all].apply( | |
| lambda g: g.apply(pd.to_numeric, errors="coerce").mean() | |
| ).reset_index().rename(columns={"_kab_str": "kab_kota"}) | |
| prov_cache_peer[pv] = peer_agg | |
| return peer_agg | |
| # ============================================================ | |
| # AUDIT | |
| # ============================================================ | |
| def audit(pv, kv, kw): | |
| dfx = df[(df["_prov_str"] == pv) & (df["_kab_str"] == kv)].copy() | |
| if kew_col and kew_col in dfx.columns and kw and not str(kw).startswith("("): | |
| dfx = dfx[dfx[kew_col].astype(str).str.strip() == str(kw).strip()].copy() | |
| if dfx.empty: | |
| return ("❌ Data kosong setelah filter (cek kewenangan).", pd.DataFrame(), pd.DataFrame(), None, None, pd.DataFrame()) | |
| if not num_cols_all: | |
| return ("❌ Tidak ada kolom numerik yang cukup.", pd.DataFrame(), pd.DataFrame(), None, None, pd.DataFrame()) | |
| num_all = dfx[num_cols_all].apply(pd.to_numeric, errors="coerce") | |
| completeness = float(num_all.notna().mean().mean()) | |
| zero_rate = float((num_all.fillna(0) == 0).mean().mean()) | |
| # Benford | |
| best, rows = None, [] | |
| for c in benford_cols: | |
| st = benford_stats(num_all[c], with_p=True) # p_value via mpmath (safe) | |
| if st: | |
| rows.append({ | |
| "kolom": c, "n": st["n"], "MAD": st["mad"], | |
| "flag": benford_flag(st["mad"]), | |
| "chi2": st["chi2"], | |
| "p_value": st["p_value"] | |
| }) | |
| if best is None or st["mad"] > best["mad"]: | |
| best = {"kolom": c, **st} | |
| ben_tbl = pd.DataFrame(rows).sort_values("MAD", ascending=False).head(15) if rows else pd.DataFrame() | |
| if best is None: | |
| ben_note = "Benford (applicable only): tidak ada kolom memenuhi syarat (butuh ≥50 non-zero)." | |
| ben_img = None | |
| else: | |
| ben_note = f"Benford strongest: {best['kolom']} | n={best['n']} | MAD={best['mad']:.4f} ({benford_flag(best['mad'])})" | |
| if best.get("p_value") is not None: | |
| ben_note += f" | p={best['p_value']:.3g}" | |
| ben_img = benford_plot(best["obs"]) | |
| # Similarity (numpy only, strict within prov) | |
| peer_agg = get_peer_agg_for_prov(pv) | |
| sim_tbl = pd.DataFrame() | |
| top_sim = None | |
| if peer_agg.shape[0] >= 3: | |
| X = peer_agg[num_cols_all].replace([np.inf, -np.inf], np.nan).fillna(0.0).to_numpy(float) | |
| Z = standardize_matrix(X) | |
| idx = None | |
| for i in range(len(peer_agg)): | |
| if str(peer_agg.loc[i, "kab_kota"]) == kv: | |
| idx = i | |
| break | |
| if idx is not None: | |
| sims = cosine_sim_row(Z, idx) | |
| order = np.argsort(-sims) | |
| sim_tbl = pd.DataFrame([ | |
| {"kab_kota_pembanding": str(peer_agg.loc[j, "kab_kota"]), "cosine_similarity": float(sims[j])} | |
| for j in order[1:11] | |
| ]) | |
| if not sim_tbl.empty: | |
| top_sim = float(sim_tbl["cosine_similarity"].max()) | |
| scat_img = None | |
| if peer_agg.shape[0] >= 3 and len(num_cols_all) >= 2: | |
| vars_ = peer_agg[num_cols_all].replace([np.inf, -np.inf], np.nan).fillna(0.0).var(axis=0).sort_values(ascending=False) | |
| if len(vars_) >= 2 and vars_.iloc[0] > 0 and vars_.iloc[1] > 0: | |
| x_col, y_col = vars_.index[0], vars_.index[1] | |
| scat_img = scatter_plot(peer_agg, x_col, y_col) | |
| too_perfect = (completeness > 0.98) and (zero_rate < 0.02) | |
| scorecard = pd.DataFrame([ | |
| ["Provinsi", pv, ""], | |
| ["Kab/Kota", kv, ""], | |
| ["Kewenangan", str(kw), f"Sumber: {kew_col}" if (kew_col and not str(kw).startswith("(")) else "Kewenangan tidak tersedia/kosong."], | |
| ["Completeness (numeric)", f"{completeness:.2%}", "Kelengkapan tinggi; pastikan bukan hasil imputasi/rekayasa."], | |
| ["Zero-rate (numeric)", f"{zero_rate:.2%}", "Proporsi nol dipengaruhi indikator; cek nol pada indikator inti."], | |
| ["Benford (applicable only)", "ADA" if best else "TIDAK", ben_note], | |
| ["Top similarity (peer)", f"{top_sim:.3f}" if top_sim is not None else "NA", "≥0.95 indikasi template/duplikasi."], | |
| ["Catatan pola", "WASPADA" if too_perfect else "Normal", "Jika WASPADA: cek bukti dukung/log input/konsistensi antar indikator."] | |
| ], columns=["Komponen", "Nilai", "Catatan"]) | |
| narasi = ( | |
| f"**Filter aktif:** Provinsi = `{pv}` · Kab/Kota = `{kv}` · Kewenangan = `{kw}`\n\n" | |
| f"**EXCLUDE (no analysis):** `{', '.join(sorted(EXCLUDE_COLS_EXACT))}`\n\n" | |
| f"{ben_note}" | |
| ) | |
| return narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl | |
| # ============================================================ | |
| # GRADIO UI | |
| # ============================================================ | |
| def ui_init(): | |
| pv = PROVS[0] if PROVS else None | |
| kabs = kabs_for_prov(pv) if pv else [] | |
| kv = kabs[0] if kabs else None | |
| kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"] | |
| kw = kews[0] if kews else None | |
| return pv, kv, kw, kabs, kews | |
| def on_prov_change(pv): | |
| kabs = kabs_for_prov(pv) if pv else [] | |
| kv = kabs[0] if kabs else None | |
| kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"] | |
| kw = kews[0] if kews else None | |
| return gr.update(choices=kabs, value=kv), gr.update(choices=kews, value=kw) | |
| def on_kab_change(pv, kv): | |
| kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"] | |
| kw = kews[0] if kews else None | |
| return gr.update(choices=kews, value=kw) | |
| def run_audit(pv, kv, kw): | |
| narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl = audit(pv, kv, kw) | |
| return narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl | |
| pv0, kv0, kw0, kabs0, kews0 = ui_init() | |
| with gr.Blocks(title="IPLM Audit — Kualitas Data & Indikasi Tidak Wajar", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| "# IPLM — Audit Kualitas Data & Indikasi Data Tidak Wajar (Satu Wilayah)\n" | |
| f"- Sumber data: `{DATA_PATH}`\n" | |
| f"- EXCLUDE (no analysis): `{', '.join(sorted(EXCLUDE_COLS_EXACT))}`\n" | |
| f"- prov_col = `{prov_col}` · kab_col = `{kab_col}` · kewenangan_col = `{kew_col if kew_col else 'TIDAK ADA'}`\n" | |
| "---" | |
| ) | |
| with gr.Row(): | |
| prov = gr.Dropdown(label="Provinsi", choices=PROVS, value=pv0, filterable=True) | |
| kab = gr.Dropdown(label="Kab/Kota", choices=kabs0, value=kv0, filterable=True) | |
| kew = gr.Dropdown(label="Kewenangan", choices=kews0, value=kw0, filterable=True) | |
| prov.change(on_prov_change, inputs=prov, outputs=[kab, kew], show_progress=False) | |
| kab.change(on_kab_change, inputs=[prov, kab], outputs=kew, show_progress=False) | |
| btn = gr.Button("Run Audit", variant="primary") | |
| out_md = gr.Markdown() | |
| out_score = gr.Dataframe(label="Scorecard", interactive=False, wrap=True) | |
| out_ben_tbl = gr.Dataframe(label="Top Benford Signals (Applicable Only, max 15)", interactive=False, wrap=True) | |
| with gr.Row(): | |
| out_ben_img = gr.Image(label="Benford Plot (Strongest Applicable Column)") | |
| out_scat_img = gr.Image(label="Peer Scatter (2 kolom paling variatif)") | |
| out_sim = gr.Dataframe(label="Top Similarity (se-Provinsi)", interactive=False, wrap=True) | |
| btn.click(run_audit, inputs=[prov, kab, kew], outputs=[out_md, out_score, out_ben_tbl, out_ben_img, out_scat_img, out_sim]) | |
| demo.queue().launch() | |