import os, re, math, io import numpy as np import pandas as pd import gradio as gr from PIL import Image # Force matplotlib non-GUI backend (HF friendly) import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from mpmath import gammainc, gamma # pure python for chi-square p-value (optional) # ============================================================ # CONFIG # ============================================================ DATA_PATH = os.getenv("IPLM_DATA_PATH", "IPLM_clean_manual_131225.xlsx") EXCLUDE_COLS_EXACT = {"kontak_wa", "npp", "tanggal_kirim", "updated_at", "created_at"} BENFORD_P = np.array([math.log10(1 + 1/d) for d in range(1, 10)]) BENFORD_EXCLUDE_PATTERNS = [ r"\bid\b", r"\bid_", r"_id\b", r"\bkode\b", r"\bcode\b", r"\bnpsn\b", r"\bnik\b", r"\bnpwp\b", r"\bkontak\b", r"\bwa\b", r"\bwhatsapp\b", r"\btelepon\b", r"\bphone\b", r"\bnohp\b", r"\btanggal\b", r"\bdate\b", r"\bwaktu\b", r"\btime\b", r"\bjam\b", r"\bcreated\b", r"\bupdated\b", r"\bmodified\b", r"\bsubmit\b", r"\bkirim\b", r"\bmulai\b", r"\bselesai\b", r"\blastpage\b", r"\bpage\b", r"\bstatus\b", r"\bnpp\b", ] # ============================================================ # HELPERS # ============================================================ def canon(s: str) -> str: return re.sub(r"[^a-z0-9]+", "", str(s).lower()) def pick_col(df, candidates): cols = list(df.columns) cc = {canon(c): c for c in cols} for cand in candidates: k = canon(cand) if k in cc: return cc[k] for c in cols: kc = canon(c) for cand in candidates: if canon(cand) in kc: return c return None def detect_geo_cols(df): prov = pick_col(df, ["provinsi", "propinsi", "province"]) kab = pick_col(df, ["kab_kota", "kabkota", "kabupatenkota", "kabupaten/kota", "kabupaten", "kota", "regency", "city"]) return prov, kab def detect_kewenangan_col(df): return pick_col(df, ["kewenangan", "pu_level", "level_kewenangan", "kewenangan_pengelola", "kewenangan_perpustakaan", "level"]) def load_excel(path): df = pd.read_excel(path, engine="openpyxl") for c in df.columns: if df[c].dtype == object: df[c] = (df[c].astype(str) .str.replace("\u00a0", " ", regex=False) .str.replace(r"\s+", " ", regex=True) .str.strip()) df.loc[df[c].str.lower().isin(["nan", "none", "null", ""]), c] = np.nan return df def clean_str_list(values): out = [] for v in values: if v is None: continue s = str(v).strip() if s == "" or s.lower() in ["nan", "none", "null"]: continue out.append(s) seen, uniq = set(), [] for s in out: if s not in seen: uniq.append(s) seen.add(s) return uniq def safe_numeric_cols(df, exclude=set(), min_non_na=0.25): hard = {canon(x) for x in EXCLUDE_COLS_EXACT} cols = [] for c in df.columns: if c in exclude: continue if canon(c) in hard: continue s = pd.to_numeric(df[c], errors="coerce") if s.notna().mean() >= min_non_na and s.nunique(dropna=True) >= 3: cols.append(c) return cols def is_benford_applicable(colname: str) -> bool: if canon(colname) in {canon(x) for x in EXCLUDE_COLS_EXACT}: return False name = str(colname).lower() return not any(re.search(p, name) for p in BENFORD_EXCLUDE_PATTERNS) def leading_digit_series(x: pd.Series): x = pd.to_numeric(x, errors="coerce").replace([np.inf, -np.inf], np.nan).dropna() x = x[np.abs(x) > 0] if len(x) == 0: return None def first_digit(v): v = abs(float(v)) if v == 0: return np.nan while v < 1: v *= 10 return int(str(v).replace(".", "")[0]) digs = x.apply(first_digit).dropna().astype(int) digs = digs[(digs >= 1) & (digs <= 9)] return digs # --- chi-square p-value without scipy (optional) --- def chi2_sf(x, k): # survival function = 1 - CDF for chi-square(k) # CDF uses lower incomplete gamma. SF uses upper incomplete gamma. # SF = gammainc(k/2, x/2, inf) / gamma(k/2) a = k / 2.0 return float(gammainc(a, x/2.0, math.inf) / gamma(a)) def benford_stats(x: pd.Series, min_n=50, with_p=True): digs = leading_digit_series(x) if digs is None or len(digs) < min_n: return None obs = np.array([(digs == d).sum() for d in range(1, 10)], dtype=float) n = obs.sum() exp = BENFORD_P * n obs_p = obs / n mad = float(np.mean(np.abs(obs_p - BENFORD_P))) # chi-square chi2 = float(((obs - exp) ** 2 / np.where(exp == 0, 1.0, exp)).sum()) p = chi2_sf(chi2, 8) if with_p else None return {"n": int(n), "mad": mad, "obs": obs_p, "chi2": chi2, "p_value": p} def benford_flag(mad): if mad < 0.012: return "OK" if mad < 0.015: return "WASPADA" return "RED FLAG" def fig_to_pil(fig): buf = io.BytesIO() fig.savefig(buf, format="png", dpi=160, bbox_inches="tight") plt.close(fig) buf.seek(0) return Image.open(buf).convert("RGBA") def benford_plot(obs_p): fig, ax = plt.subplots(figsize=(7, 3)) d = np.arange(1, 10) ax.bar(d - 0.2, BENFORD_P, width=0.4, label="Benford") ax.bar(d + 0.2, obs_p, width=0.4, label="Aktual") ax.set_xticks(d) ax.set_xlabel("Digit pertama") ax.set_ylabel("Proporsi") ax.legend() return fig_to_pil(fig) def scatter_plot(peer_agg, x_col, y_col): fig, ax = plt.subplots(figsize=(7, 3.5)) ax.scatter(peer_agg[x_col], peer_agg[y_col], s=18) ax.set_xlabel(x_col) ax.set_ylabel(y_col) ax.set_title("Peer Scatter (2 kolom paling variatif)") return fig_to_pil(fig) # ============================================================ # NUMPY-ONLY SIMILARITY (NO sklearn) # ============================================================ def standardize_matrix(X: np.ndarray) -> np.ndarray: mu = np.nanmean(X, axis=0) sd = np.nanstd(X, axis=0) sd = np.where(sd == 0, 1.0, sd) Z = (X - mu) / sd Z = np.nan_to_num(Z, nan=0.0, posinf=0.0, neginf=0.0) return Z def cosine_sim_row(Z: np.ndarray, idx: int) -> np.ndarray: v = Z[idx] v_norm = np.linalg.norm(v) if v_norm == 0: return np.zeros(Z.shape[0], dtype=float) norms = np.linalg.norm(Z, axis=1) norms = np.where(norms == 0, 1.0, norms) sims = (Z @ v) / (norms * v_norm) sims = np.clip(sims, -1.0, 1.0) return sims # ============================================================ # LOAD DATA ONCE # ============================================================ if not os.path.exists(DATA_PATH): raise FileNotFoundError(f"Data file not found: {DATA_PATH}. Pastikan file ada di repo, contoh: data/IPLM_clean_manual_131225.xlsx") df_raw = load_excel(DATA_PATH) prov_col, kab_col = detect_geo_cols(df_raw) kew_col = detect_kewenangan_col(df_raw) if prov_col is None or kab_col is None: raise ValueError("Kolom provinsi/kab_kota tidak terdeteksi. Pastikan ada kolom provinsi dan kab_kota.") df = df_raw.copy() df["_prov_str"] = df[prov_col].astype(str).str.strip() df["_kab_str"] = df[kab_col].astype(str).str.strip() df.loc[df["_prov_str"].str.lower().isin(["nan","none","null",""]), "_prov_str"] = np.nan df.loc[df["_kab_str"].str.lower().isin(["nan","none","null",""]), "_kab_str"] = np.nan df = df[df["_prov_str"].notna() & df["_kab_str"].notna()].copy() # prevent mixing exclude_base = {prov_col, kab_col, "_prov_str", "_kab_str"} hard_exclude_cols_in_file = {c for c in df.columns if canon(c) in {canon(x) for x in EXCLUDE_COLS_EXACT}} exclude_base = exclude_base.union(hard_exclude_cols_in_file) num_cols_all = safe_numeric_cols(df, exclude=exclude_base) benford_cols = [c for c in num_cols_all if is_benford_applicable(c)] PROVS = clean_str_list(df["_prov_str"].unique().tolist()) prov_cache_peer = {} def kabs_for_prov(pv): return clean_str_list(df.loc[df["_prov_str"] == pv, "_kab_str"].unique().tolist()) def kew_for(pv, kv): if not kew_col or kew_col not in df.columns: return ["(kewenangan tidak tersedia)"] vals = clean_str_list(df.loc[(df["_prov_str"] == pv) & (df["_kab_str"] == kv), kew_col].dropna().unique().tolist()) return vals if vals else ["(kewenangan kosong)"] def get_peer_agg_for_prov(pv): if pv in prov_cache_peer: return prov_cache_peer[pv] peer = df[df["_prov_str"] == pv] peer_agg = peer.groupby("_kab_str")[num_cols_all].apply( lambda g: g.apply(pd.to_numeric, errors="coerce").mean() ).reset_index().rename(columns={"_kab_str": "kab_kota"}) prov_cache_peer[pv] = peer_agg return peer_agg # ============================================================ # AUDIT # ============================================================ def audit(pv, kv, kw): dfx = df[(df["_prov_str"] == pv) & (df["_kab_str"] == kv)].copy() if kew_col and kew_col in dfx.columns and kw and not str(kw).startswith("("): dfx = dfx[dfx[kew_col].astype(str).str.strip() == str(kw).strip()].copy() if dfx.empty: return ("❌ Data kosong setelah filter (cek kewenangan).", pd.DataFrame(), pd.DataFrame(), None, None, pd.DataFrame()) if not num_cols_all: return ("❌ Tidak ada kolom numerik yang cukup.", pd.DataFrame(), pd.DataFrame(), None, None, pd.DataFrame()) num_all = dfx[num_cols_all].apply(pd.to_numeric, errors="coerce") completeness = float(num_all.notna().mean().mean()) zero_rate = float((num_all.fillna(0) == 0).mean().mean()) # Benford best, rows = None, [] for c in benford_cols: st = benford_stats(num_all[c], with_p=True) # p_value via mpmath (safe) if st: rows.append({ "kolom": c, "n": st["n"], "MAD": st["mad"], "flag": benford_flag(st["mad"]), "chi2": st["chi2"], "p_value": st["p_value"] }) if best is None or st["mad"] > best["mad"]: best = {"kolom": c, **st} ben_tbl = pd.DataFrame(rows).sort_values("MAD", ascending=False).head(15) if rows else pd.DataFrame() if best is None: ben_note = "Benford (applicable only): tidak ada kolom memenuhi syarat (butuh ≥50 non-zero)." ben_img = None else: ben_note = f"Benford strongest: {best['kolom']} | n={best['n']} | MAD={best['mad']:.4f} ({benford_flag(best['mad'])})" if best.get("p_value") is not None: ben_note += f" | p={best['p_value']:.3g}" ben_img = benford_plot(best["obs"]) # Similarity (numpy only, strict within prov) peer_agg = get_peer_agg_for_prov(pv) sim_tbl = pd.DataFrame() top_sim = None if peer_agg.shape[0] >= 3: X = peer_agg[num_cols_all].replace([np.inf, -np.inf], np.nan).fillna(0.0).to_numpy(float) Z = standardize_matrix(X) idx = None for i in range(len(peer_agg)): if str(peer_agg.loc[i, "kab_kota"]) == kv: idx = i break if idx is not None: sims = cosine_sim_row(Z, idx) order = np.argsort(-sims) sim_tbl = pd.DataFrame([ {"kab_kota_pembanding": str(peer_agg.loc[j, "kab_kota"]), "cosine_similarity": float(sims[j])} for j in order[1:11] ]) if not sim_tbl.empty: top_sim = float(sim_tbl["cosine_similarity"].max()) scat_img = None if peer_agg.shape[0] >= 3 and len(num_cols_all) >= 2: vars_ = peer_agg[num_cols_all].replace([np.inf, -np.inf], np.nan).fillna(0.0).var(axis=0).sort_values(ascending=False) if len(vars_) >= 2 and vars_.iloc[0] > 0 and vars_.iloc[1] > 0: x_col, y_col = vars_.index[0], vars_.index[1] scat_img = scatter_plot(peer_agg, x_col, y_col) too_perfect = (completeness > 0.98) and (zero_rate < 0.02) scorecard = pd.DataFrame([ ["Provinsi", pv, ""], ["Kab/Kota", kv, ""], ["Kewenangan", str(kw), f"Sumber: {kew_col}" if (kew_col and not str(kw).startswith("(")) else "Kewenangan tidak tersedia/kosong."], ["Completeness (numeric)", f"{completeness:.2%}", "Kelengkapan tinggi; pastikan bukan hasil imputasi/rekayasa."], ["Zero-rate (numeric)", f"{zero_rate:.2%}", "Proporsi nol dipengaruhi indikator; cek nol pada indikator inti."], ["Benford (applicable only)", "ADA" if best else "TIDAK", ben_note], ["Top similarity (peer)", f"{top_sim:.3f}" if top_sim is not None else "NA", "≥0.95 indikasi template/duplikasi."], ["Catatan pola", "WASPADA" if too_perfect else "Normal", "Jika WASPADA: cek bukti dukung/log input/konsistensi antar indikator."] ], columns=["Komponen", "Nilai", "Catatan"]) narasi = ( f"**Filter aktif:** Provinsi = `{pv}` · Kab/Kota = `{kv}` · Kewenangan = `{kw}`\n\n" f"**EXCLUDE (no analysis):** `{', '.join(sorted(EXCLUDE_COLS_EXACT))}`\n\n" f"{ben_note}" ) return narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl # ============================================================ # GRADIO UI # ============================================================ def ui_init(): pv = PROVS[0] if PROVS else None kabs = kabs_for_prov(pv) if pv else [] kv = kabs[0] if kabs else None kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"] kw = kews[0] if kews else None return pv, kv, kw, kabs, kews def on_prov_change(pv): kabs = kabs_for_prov(pv) if pv else [] kv = kabs[0] if kabs else None kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"] kw = kews[0] if kews else None return gr.update(choices=kabs, value=kv), gr.update(choices=kews, value=kw) def on_kab_change(pv, kv): kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"] kw = kews[0] if kews else None return gr.update(choices=kews, value=kw) def run_audit(pv, kv, kw): narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl = audit(pv, kv, kw) return narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl pv0, kv0, kw0, kabs0, kews0 = ui_init() with gr.Blocks(title="IPLM Audit — Kualitas Data & Indikasi Tidak Wajar", theme=gr.themes.Soft()) as demo: gr.Markdown( "# IPLM — Audit Kualitas Data & Indikasi Data Tidak Wajar (Satu Wilayah)\n" f"- Sumber data: `{DATA_PATH}`\n" f"- EXCLUDE (no analysis): `{', '.join(sorted(EXCLUDE_COLS_EXACT))}`\n" f"- prov_col = `{prov_col}` · kab_col = `{kab_col}` · kewenangan_col = `{kew_col if kew_col else 'TIDAK ADA'}`\n" "---" ) with gr.Row(): prov = gr.Dropdown(label="Provinsi", choices=PROVS, value=pv0, filterable=True) kab = gr.Dropdown(label="Kab/Kota", choices=kabs0, value=kv0, filterable=True) kew = gr.Dropdown(label="Kewenangan", choices=kews0, value=kw0, filterable=True) prov.change(on_prov_change, inputs=prov, outputs=[kab, kew], show_progress=False) kab.change(on_kab_change, inputs=[prov, kab], outputs=kew, show_progress=False) btn = gr.Button("Run Audit", variant="primary") out_md = gr.Markdown() out_score = gr.Dataframe(label="Scorecard", interactive=False, wrap=True) out_ben_tbl = gr.Dataframe(label="Top Benford Signals (Applicable Only, max 15)", interactive=False, wrap=True) with gr.Row(): out_ben_img = gr.Image(label="Benford Plot (Strongest Applicable Column)") out_scat_img = gr.Image(label="Peer Scatter (2 kolom paling variatif)") out_sim = gr.Dataframe(label="Top Similarity (se-Provinsi)", interactive=False, wrap=True) btn.click(run_audit, inputs=[prov, kab, kew], outputs=[out_md, out_score, out_ben_tbl, out_ben_img, out_scat_img, out_sim]) demo.queue().launch()