DQ_analytics / app.py
irhamni's picture
Update app.py
52557ea verified
import os, re, math, io
import numpy as np
import pandas as pd
import gradio as gr
from PIL import Image
# Force matplotlib non-GUI backend (HF friendly)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from mpmath import gammainc, gamma # pure python for chi-square p-value (optional)
# ============================================================
# CONFIG
# ============================================================
DATA_PATH = os.getenv("IPLM_DATA_PATH", "IPLM_clean_manual_131225.xlsx")
EXCLUDE_COLS_EXACT = {"kontak_wa", "npp", "tanggal_kirim", "updated_at", "created_at"}
BENFORD_P = np.array([math.log10(1 + 1/d) for d in range(1, 10)])
BENFORD_EXCLUDE_PATTERNS = [
r"\bid\b", r"\bid_", r"_id\b",
r"\bkode\b", r"\bcode\b",
r"\bnpsn\b", r"\bnik\b", r"\bnpwp\b",
r"\bkontak\b", r"\bwa\b", r"\bwhatsapp\b", r"\btelepon\b", r"\bphone\b", r"\bnohp\b",
r"\btanggal\b", r"\bdate\b",
r"\bwaktu\b", r"\btime\b", r"\bjam\b",
r"\bcreated\b", r"\bupdated\b", r"\bmodified\b",
r"\bsubmit\b", r"\bkirim\b",
r"\bmulai\b", r"\bselesai\b",
r"\blastpage\b", r"\bpage\b",
r"\bstatus\b",
r"\bnpp\b",
]
# ============================================================
# HELPERS
# ============================================================
def canon(s: str) -> str:
return re.sub(r"[^a-z0-9]+", "", str(s).lower())
def pick_col(df, candidates):
cols = list(df.columns)
cc = {canon(c): c for c in cols}
for cand in candidates:
k = canon(cand)
if k in cc:
return cc[k]
for c in cols:
kc = canon(c)
for cand in candidates:
if canon(cand) in kc:
return c
return None
def detect_geo_cols(df):
prov = pick_col(df, ["provinsi", "propinsi", "province"])
kab = pick_col(df, ["kab_kota", "kabkota", "kabupatenkota", "kabupaten/kota", "kabupaten", "kota", "regency", "city"])
return prov, kab
def detect_kewenangan_col(df):
return pick_col(df, ["kewenangan", "pu_level", "level_kewenangan", "kewenangan_pengelola", "kewenangan_perpustakaan", "level"])
def load_excel(path):
df = pd.read_excel(path, engine="openpyxl")
for c in df.columns:
if df[c].dtype == object:
df[c] = (df[c].astype(str)
.str.replace("\u00a0", " ", regex=False)
.str.replace(r"\s+", " ", regex=True)
.str.strip())
df.loc[df[c].str.lower().isin(["nan", "none", "null", ""]), c] = np.nan
return df
def clean_str_list(values):
out = []
for v in values:
if v is None:
continue
s = str(v).strip()
if s == "" or s.lower() in ["nan", "none", "null"]:
continue
out.append(s)
seen, uniq = set(), []
for s in out:
if s not in seen:
uniq.append(s)
seen.add(s)
return uniq
def safe_numeric_cols(df, exclude=set(), min_non_na=0.25):
hard = {canon(x) for x in EXCLUDE_COLS_EXACT}
cols = []
for c in df.columns:
if c in exclude:
continue
if canon(c) in hard:
continue
s = pd.to_numeric(df[c], errors="coerce")
if s.notna().mean() >= min_non_na and s.nunique(dropna=True) >= 3:
cols.append(c)
return cols
def is_benford_applicable(colname: str) -> bool:
if canon(colname) in {canon(x) for x in EXCLUDE_COLS_EXACT}:
return False
name = str(colname).lower()
return not any(re.search(p, name) for p in BENFORD_EXCLUDE_PATTERNS)
def leading_digit_series(x: pd.Series):
x = pd.to_numeric(x, errors="coerce").replace([np.inf, -np.inf], np.nan).dropna()
x = x[np.abs(x) > 0]
if len(x) == 0:
return None
def first_digit(v):
v = abs(float(v))
if v == 0:
return np.nan
while v < 1:
v *= 10
return int(str(v).replace(".", "")[0])
digs = x.apply(first_digit).dropna().astype(int)
digs = digs[(digs >= 1) & (digs <= 9)]
return digs
# --- chi-square p-value without scipy (optional) ---
def chi2_sf(x, k):
# survival function = 1 - CDF for chi-square(k)
# CDF uses lower incomplete gamma. SF uses upper incomplete gamma.
# SF = gammainc(k/2, x/2, inf) / gamma(k/2)
a = k / 2.0
return float(gammainc(a, x/2.0, math.inf) / gamma(a))
def benford_stats(x: pd.Series, min_n=50, with_p=True):
digs = leading_digit_series(x)
if digs is None or len(digs) < min_n:
return None
obs = np.array([(digs == d).sum() for d in range(1, 10)], dtype=float)
n = obs.sum()
exp = BENFORD_P * n
obs_p = obs / n
mad = float(np.mean(np.abs(obs_p - BENFORD_P)))
# chi-square
chi2 = float(((obs - exp) ** 2 / np.where(exp == 0, 1.0, exp)).sum())
p = chi2_sf(chi2, 8) if with_p else None
return {"n": int(n), "mad": mad, "obs": obs_p, "chi2": chi2, "p_value": p}
def benford_flag(mad):
if mad < 0.012:
return "OK"
if mad < 0.015:
return "WASPADA"
return "RED FLAG"
def fig_to_pil(fig):
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=160, bbox_inches="tight")
plt.close(fig)
buf.seek(0)
return Image.open(buf).convert("RGBA")
def benford_plot(obs_p):
fig, ax = plt.subplots(figsize=(7, 3))
d = np.arange(1, 10)
ax.bar(d - 0.2, BENFORD_P, width=0.4, label="Benford")
ax.bar(d + 0.2, obs_p, width=0.4, label="Aktual")
ax.set_xticks(d)
ax.set_xlabel("Digit pertama")
ax.set_ylabel("Proporsi")
ax.legend()
return fig_to_pil(fig)
def scatter_plot(peer_agg, x_col, y_col):
fig, ax = plt.subplots(figsize=(7, 3.5))
ax.scatter(peer_agg[x_col], peer_agg[y_col], s=18)
ax.set_xlabel(x_col)
ax.set_ylabel(y_col)
ax.set_title("Peer Scatter (2 kolom paling variatif)")
return fig_to_pil(fig)
# ============================================================
# NUMPY-ONLY SIMILARITY (NO sklearn)
# ============================================================
def standardize_matrix(X: np.ndarray) -> np.ndarray:
mu = np.nanmean(X, axis=0)
sd = np.nanstd(X, axis=0)
sd = np.where(sd == 0, 1.0, sd)
Z = (X - mu) / sd
Z = np.nan_to_num(Z, nan=0.0, posinf=0.0, neginf=0.0)
return Z
def cosine_sim_row(Z: np.ndarray, idx: int) -> np.ndarray:
v = Z[idx]
v_norm = np.linalg.norm(v)
if v_norm == 0:
return np.zeros(Z.shape[0], dtype=float)
norms = np.linalg.norm(Z, axis=1)
norms = np.where(norms == 0, 1.0, norms)
sims = (Z @ v) / (norms * v_norm)
sims = np.clip(sims, -1.0, 1.0)
return sims
# ============================================================
# LOAD DATA ONCE
# ============================================================
if not os.path.exists(DATA_PATH):
raise FileNotFoundError(f"Data file not found: {DATA_PATH}. Pastikan file ada di repo, contoh: data/IPLM_clean_manual_131225.xlsx")
df_raw = load_excel(DATA_PATH)
prov_col, kab_col = detect_geo_cols(df_raw)
kew_col = detect_kewenangan_col(df_raw)
if prov_col is None or kab_col is None:
raise ValueError("Kolom provinsi/kab_kota tidak terdeteksi. Pastikan ada kolom provinsi dan kab_kota.")
df = df_raw.copy()
df["_prov_str"] = df[prov_col].astype(str).str.strip()
df["_kab_str"] = df[kab_col].astype(str).str.strip()
df.loc[df["_prov_str"].str.lower().isin(["nan","none","null",""]), "_prov_str"] = np.nan
df.loc[df["_kab_str"].str.lower().isin(["nan","none","null",""]), "_kab_str"] = np.nan
df = df[df["_prov_str"].notna() & df["_kab_str"].notna()].copy() # prevent mixing
exclude_base = {prov_col, kab_col, "_prov_str", "_kab_str"}
hard_exclude_cols_in_file = {c for c in df.columns if canon(c) in {canon(x) for x in EXCLUDE_COLS_EXACT}}
exclude_base = exclude_base.union(hard_exclude_cols_in_file)
num_cols_all = safe_numeric_cols(df, exclude=exclude_base)
benford_cols = [c for c in num_cols_all if is_benford_applicable(c)]
PROVS = clean_str_list(df["_prov_str"].unique().tolist())
prov_cache_peer = {}
def kabs_for_prov(pv):
return clean_str_list(df.loc[df["_prov_str"] == pv, "_kab_str"].unique().tolist())
def kew_for(pv, kv):
if not kew_col or kew_col not in df.columns:
return ["(kewenangan tidak tersedia)"]
vals = clean_str_list(df.loc[(df["_prov_str"] == pv) & (df["_kab_str"] == kv), kew_col].dropna().unique().tolist())
return vals if vals else ["(kewenangan kosong)"]
def get_peer_agg_for_prov(pv):
if pv in prov_cache_peer:
return prov_cache_peer[pv]
peer = df[df["_prov_str"] == pv]
peer_agg = peer.groupby("_kab_str")[num_cols_all].apply(
lambda g: g.apply(pd.to_numeric, errors="coerce").mean()
).reset_index().rename(columns={"_kab_str": "kab_kota"})
prov_cache_peer[pv] = peer_agg
return peer_agg
# ============================================================
# AUDIT
# ============================================================
def audit(pv, kv, kw):
dfx = df[(df["_prov_str"] == pv) & (df["_kab_str"] == kv)].copy()
if kew_col and kew_col in dfx.columns and kw and not str(kw).startswith("("):
dfx = dfx[dfx[kew_col].astype(str).str.strip() == str(kw).strip()].copy()
if dfx.empty:
return ("❌ Data kosong setelah filter (cek kewenangan).", pd.DataFrame(), pd.DataFrame(), None, None, pd.DataFrame())
if not num_cols_all:
return ("❌ Tidak ada kolom numerik yang cukup.", pd.DataFrame(), pd.DataFrame(), None, None, pd.DataFrame())
num_all = dfx[num_cols_all].apply(pd.to_numeric, errors="coerce")
completeness = float(num_all.notna().mean().mean())
zero_rate = float((num_all.fillna(0) == 0).mean().mean())
# Benford
best, rows = None, []
for c in benford_cols:
st = benford_stats(num_all[c], with_p=True) # p_value via mpmath (safe)
if st:
rows.append({
"kolom": c, "n": st["n"], "MAD": st["mad"],
"flag": benford_flag(st["mad"]),
"chi2": st["chi2"],
"p_value": st["p_value"]
})
if best is None or st["mad"] > best["mad"]:
best = {"kolom": c, **st}
ben_tbl = pd.DataFrame(rows).sort_values("MAD", ascending=False).head(15) if rows else pd.DataFrame()
if best is None:
ben_note = "Benford (applicable only): tidak ada kolom memenuhi syarat (butuh ≥50 non-zero)."
ben_img = None
else:
ben_note = f"Benford strongest: {best['kolom']} | n={best['n']} | MAD={best['mad']:.4f} ({benford_flag(best['mad'])})"
if best.get("p_value") is not None:
ben_note += f" | p={best['p_value']:.3g}"
ben_img = benford_plot(best["obs"])
# Similarity (numpy only, strict within prov)
peer_agg = get_peer_agg_for_prov(pv)
sim_tbl = pd.DataFrame()
top_sim = None
if peer_agg.shape[0] >= 3:
X = peer_agg[num_cols_all].replace([np.inf, -np.inf], np.nan).fillna(0.0).to_numpy(float)
Z = standardize_matrix(X)
idx = None
for i in range(len(peer_agg)):
if str(peer_agg.loc[i, "kab_kota"]) == kv:
idx = i
break
if idx is not None:
sims = cosine_sim_row(Z, idx)
order = np.argsort(-sims)
sim_tbl = pd.DataFrame([
{"kab_kota_pembanding": str(peer_agg.loc[j, "kab_kota"]), "cosine_similarity": float(sims[j])}
for j in order[1:11]
])
if not sim_tbl.empty:
top_sim = float(sim_tbl["cosine_similarity"].max())
scat_img = None
if peer_agg.shape[0] >= 3 and len(num_cols_all) >= 2:
vars_ = peer_agg[num_cols_all].replace([np.inf, -np.inf], np.nan).fillna(0.0).var(axis=0).sort_values(ascending=False)
if len(vars_) >= 2 and vars_.iloc[0] > 0 and vars_.iloc[1] > 0:
x_col, y_col = vars_.index[0], vars_.index[1]
scat_img = scatter_plot(peer_agg, x_col, y_col)
too_perfect = (completeness > 0.98) and (zero_rate < 0.02)
scorecard = pd.DataFrame([
["Provinsi", pv, ""],
["Kab/Kota", kv, ""],
["Kewenangan", str(kw), f"Sumber: {kew_col}" if (kew_col and not str(kw).startswith("(")) else "Kewenangan tidak tersedia/kosong."],
["Completeness (numeric)", f"{completeness:.2%}", "Kelengkapan tinggi; pastikan bukan hasil imputasi/rekayasa."],
["Zero-rate (numeric)", f"{zero_rate:.2%}", "Proporsi nol dipengaruhi indikator; cek nol pada indikator inti."],
["Benford (applicable only)", "ADA" if best else "TIDAK", ben_note],
["Top similarity (peer)", f"{top_sim:.3f}" if top_sim is not None else "NA", "≥0.95 indikasi template/duplikasi."],
["Catatan pola", "WASPADA" if too_perfect else "Normal", "Jika WASPADA: cek bukti dukung/log input/konsistensi antar indikator."]
], columns=["Komponen", "Nilai", "Catatan"])
narasi = (
f"**Filter aktif:** Provinsi = `{pv}` · Kab/Kota = `{kv}` · Kewenangan = `{kw}`\n\n"
f"**EXCLUDE (no analysis):** `{', '.join(sorted(EXCLUDE_COLS_EXACT))}`\n\n"
f"{ben_note}"
)
return narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl
# ============================================================
# GRADIO UI
# ============================================================
def ui_init():
pv = PROVS[0] if PROVS else None
kabs = kabs_for_prov(pv) if pv else []
kv = kabs[0] if kabs else None
kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"]
kw = kews[0] if kews else None
return pv, kv, kw, kabs, kews
def on_prov_change(pv):
kabs = kabs_for_prov(pv) if pv else []
kv = kabs[0] if kabs else None
kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"]
kw = kews[0] if kews else None
return gr.update(choices=kabs, value=kv), gr.update(choices=kews, value=kw)
def on_kab_change(pv, kv):
kews = kew_for(pv, kv) if (pv and kv) else ["(kewenangan tidak tersedia)"]
kw = kews[0] if kews else None
return gr.update(choices=kews, value=kw)
def run_audit(pv, kv, kw):
narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl = audit(pv, kv, kw)
return narasi, scorecard, ben_tbl, ben_img, scat_img, sim_tbl
pv0, kv0, kw0, kabs0, kews0 = ui_init()
with gr.Blocks(title="IPLM Audit — Kualitas Data & Indikasi Tidak Wajar", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"# IPLM — Audit Kualitas Data & Indikasi Data Tidak Wajar (Satu Wilayah)\n"
f"- Sumber data: `{DATA_PATH}`\n"
f"- EXCLUDE (no analysis): `{', '.join(sorted(EXCLUDE_COLS_EXACT))}`\n"
f"- prov_col = `{prov_col}` · kab_col = `{kab_col}` · kewenangan_col = `{kew_col if kew_col else 'TIDAK ADA'}`\n"
"---"
)
with gr.Row():
prov = gr.Dropdown(label="Provinsi", choices=PROVS, value=pv0, filterable=True)
kab = gr.Dropdown(label="Kab/Kota", choices=kabs0, value=kv0, filterable=True)
kew = gr.Dropdown(label="Kewenangan", choices=kews0, value=kw0, filterable=True)
prov.change(on_prov_change, inputs=prov, outputs=[kab, kew], show_progress=False)
kab.change(on_kab_change, inputs=[prov, kab], outputs=kew, show_progress=False)
btn = gr.Button("Run Audit", variant="primary")
out_md = gr.Markdown()
out_score = gr.Dataframe(label="Scorecard", interactive=False, wrap=True)
out_ben_tbl = gr.Dataframe(label="Top Benford Signals (Applicable Only, max 15)", interactive=False, wrap=True)
with gr.Row():
out_ben_img = gr.Image(label="Benford Plot (Strongest Applicable Column)")
out_scat_img = gr.Image(label="Peer Scatter (2 kolom paling variatif)")
out_sim = gr.Dataframe(label="Top Similarity (se-Provinsi)", interactive=False, wrap=True)
btn.click(run_audit, inputs=[prov, kab, kew], outputs=[out_md, out_score, out_ben_tbl, out_ben_img, out_scat_img, out_sim])
demo.queue().launch()