Bad-Actor-Simulation / src /streamlit_app.py
ishaq101's picture
update data and default section
d382570
from pathlib import Path
import pandas as pd
import numpy as np
import streamlit as st
st.set_page_config(
page_title="Bad Actor Simulation",
page_icon="⚠️",
layout="wide",
initial_sidebar_state="expanded",
)
st.markdown("""
<style>
/* ── Section card headers ─────────────────────────────────────────────────── */
.section-card {
background: #f8f9fa;
border-left: 4px solid #e63946;
border-radius: 6px;
padding: 10px 16px;
margin: 16px 0 8px 0;
}
.section-card h3 { margin: 0; font-size: 1.05rem; font-weight: 700; color: #1d3557; }
.section-card .sub { font-size: 0.78rem; color: #6c757d; margin-top: 3px; }
/* ── KPI metric cards ─────────────────────────────────────────────────────── */
[data-testid="metric-container"] {
background: #ffffff;
border: 1px solid #e9ecef;
border-radius: 10px;
padding: 14px 18px !important;
box-shadow: 0 1px 3px rgba(0,0,0,0.06);
}
[data-testid="stMetricValue"] { color: #1d3557; font-weight: 700; }
[data-testid="stMetricLabel"] { color: #6c757d; font-size: 0.82rem; }
/* ── Welcome card ─────────────────────────────────────────────────────────── */
.welcome-card {
background: linear-gradient(135deg, #1d3557 0%, #457b9d 100%);
border-radius: 12px;
padding: 32px 36px;
color: white;
margin-bottom: 24px;
}
.welcome-card h2 { margin: 0 0 8px 0; font-size: 1.4rem; color: white; }
.welcome-card p { margin: 0 0 20px 0; color: rgba(255,255,255,0.8); font-size: 0.9rem; }
.step-list { list-style: none; padding: 0; margin: 0; }
.step-list li {
display: flex; align-items: center; gap: 10px;
padding: 7px 0; border-bottom: 1px solid rgba(255,255,255,0.15);
color: rgba(255,255,255,0.9); font-size: 0.88rem;
}
.step-list li:last-child { border-bottom: none; }
.step-num {
background: #e63946; color: white; font-weight: 700;
border-radius: 50%; width: 22px; height: 22px;
display: flex; align-items: center; justify-content: center;
font-size: 0.75rem; flex-shrink: 0;
}
/* ── App header ───────────────────────────────────────────────────────────── */
.app-header { margin-bottom: 4px; }
.app-header h1 { margin: 0; font-size: 1.7rem; color: #1d3557; font-weight: 800; }
.app-header .tagline { color: #6c757d; font-size: 0.85rem; margin-top: 2px; }
.dataset-badge {
display: inline-block;
background: #e9ecef; color: #495057;
border-radius: 20px; padding: 4px 12px;
font-size: 0.78rem; margin-top: 4px;
}
/* ── Admin banner ─────────────────────────────────────────────────────────── */
.admin-banner {
background: linear-gradient(135deg, #212529 0%, #343a40 100%);
border-radius: 10px;
padding: 16px 20px;
color: white;
margin-bottom: 20px;
border-left: 4px solid #ffc107;
}
.admin-banner h3 { margin: 0 0 4px 0; font-size: 1rem; color: #ffc107; }
.admin-banner p { margin: 0; font-size: 0.82rem; color: rgba(255,255,255,0.75); }
/* ── Sidebar ──────────────────────────────────────────────────────────────── */
[data-testid="stSidebar"] { background: #f8f9fa; }
[data-testid="stSidebar"] .stButton > button {
background: #e63946 !important; color: white !important;
border: none !important; font-weight: 600 !important;
letter-spacing: 0.3px;
}
[data-testid="stSidebar"] .stButton > button:hover {
background: #c1121f !important;
}
</style>
""", unsafe_allow_html=True)
# ── Admin mode detection ───────────────────────────────────────────────────────
is_admin = "admin" in st.query_params
# ── Session state init ─────────────────────────────────────────────────────────
if "custom_df" not in st.session_state:
st.session_state.custom_df = None
st.session_state.custom_source = None
def _section(icon, title, subtitle=""):
sub_html = f'<div class="sub">{subtitle}</div>' if subtitle else ""
st.markdown(
f'<div class="section-card"><h3>{icon}&nbsp; {title}</h3>{sub_html}</div>',
unsafe_allow_html=True,
)
REQUIRED_COLS = {"year", "month", "site_id", "section", "model", "eqm_no", "MA", "MTBF"}
@st.cache_data
def load_default():
df = pd.read_excel(Path(__file__).parent / "bad_actor_simulation.xlsx", sheet_name="data_badactor")
df.columns = df.columns.str.strip()
df["model"] = df["model"].astype(str)
return df
@st.cache_data
def load_csv(data: bytes) -> pd.DataFrame:
import io
df = pd.read_csv(io.BytesIO(data))
df.columns = df.columns.str.strip()
df["model"] = df["model"].astype(str)
return df
@st.cache_data
def load_xlsx(data: bytes, sheet: str) -> pd.DataFrame:
import io
df = pd.read_excel(io.BytesIO(data), sheet_name=sheet)
df.columns = df.columns.str.strip()
df["model"] = df["model"].astype(str)
return df
def get_xlsx_sheets(data: bytes) -> list:
import io
xf = pd.ExcelFile(io.BytesIO(data))
return xf.sheet_names
# ── Helpers ────────────────────────────────────────────────────────────────────
def minmax_norm(s):
lo, hi = s.min(), s.max()
if hi == lo:
return pd.Series(0.5, index=s.index)
return (s - lo) / (hi - lo)
def flag_consecutive(group, col_below, col_period, threshold):
"""Mark rows belonging to a consecutive-True run of length >= threshold."""
g = group.sort_values(col_period)
below = g[col_below].values
periods = g[col_period].values
result = np.zeros(len(g), dtype=bool)
run_start = None
for i in range(len(g)):
if not below[i]:
run_start = None
continue
if run_start is None:
run_start = i
elif periods[i] != periods[i - 1] + 1:
run_start = i
if (i - run_start + 1) >= threshold:
result[run_start : i + 1] = True
return pd.Series(result, index=g.index)
# ── Default filter values (edit here to change the initial widget state) ───────
DEFAULT_YEARS = [2026]
DEFAULT_SITES = [2009]
DEFAULT_SECTIONS = ["OB LOADER"]
DEFAULT_MODELS = ["6015B", "6020B", "EX2500-5", "EX3600-6", "PC1250SP-7", "PC1250SP-8", "PC2000-8", "PC4000-6"]
DEFAULT_CONSECUTIVE_N = 2
DEFAULT_OBS_MONTH = 2
def run_simulation(years, sites, sections, models, consecutive_n, obs_month):
# 1. Filter (obs_month = observation cutoff: only months 1..obs_month)
mask = (
df["year"].isin(years) &
df["site_id"].isin(sites) &
df["section"].isin(sections) &
(df["month"] <= obs_month)
)
if "ALL" not in models:
mask &= df["model"].isin(models)
filt = df[mask].copy()
if filt.empty:
st.warning("No data found for the selected filters.")
return
# basis_key: used for per-model detail stats and Q1
# agg_key: used for normalization min/max (matches the aggregated Normalisation Reference table)
basis_key = ["month", "site_id", "section", "model"]
agg_key = ["month", "site_id", "section"]
# 2. Acuan Basis 1 β€” min-max normalise using section-level min/max (agg_key, no model)
filt["norm_MA"] = filt.groupby(agg_key)["MA"] .transform(minmax_norm)
filt["norm_MTBF"] = filt.groupby(agg_key)["MTBF"].transform(minmax_norm)
# Normalisation reference stats (min, max, avg) for display β€” detail per model
norm_stats = filt.groupby(basis_key).agg(
MA_min=("MA", "min"), MA_max=("MA", "max"), MA_avg=("MA", "mean"),
MTBF_min=("MTBF", "min"), MTBF_max=("MTBF", "max"), MTBF_avg=("MTBF", "mean"),
).round(4).reset_index()
# 3. Bad actor score
filt["bad_actor_score"] = filt["norm_MA"] * filt["norm_MTBF"]
# 4. Acuan Basis 2 β€” Q1 threshold using the same basis_key
q1_df = (
filt.groupby(basis_key)["bad_actor_score"]
.quantile(0.25)
.reset_index()
.rename(columns={"bad_actor_score": "q1_threshold"})
)
filt = filt.merge(q1_df, on=basis_key, how="left")
# 5. Below-Q1 flag
filt["below_q1"] = filt["bad_actor_score"] < filt["q1_threshold"]
# 6. Consecutive detection (period = year*12 + month for cross-year safety)
filt["period"] = filt["year"] * 12 + filt["month"]
filt["is_bad_actor"] = (
filt.groupby("eqm_no", group_keys=False)
.apply(flag_consecutive,
col_below="below_q1",
col_period="period",
threshold=consecutive_n)
)
# ── Build bad actor summary ────────────────────────────────────────────────
bad_ids = filt.loc[filt["is_bad_actor"], "eqm_no"].unique()
def _streak(g):
periods = sorted(g.loc[g["below_q1"], "period"].tolist())
if not periods:
return 0
mx = cur = 1
for i in range(1, len(periods)):
cur = cur + 1 if periods[i] == periods[i - 1] + 1 else 1
mx = max(mx, cur)
return mx
rows = []
for eid, grp in filt[filt["eqm_no"].isin(bad_ids)].groupby("eqm_no"):
rows.append({
"eqm_no" : eid,
"site_id" : grp["site_id"].iloc[0],
"section" : grp["section"].iloc[0],
"model" : grp["model"].iloc[0],
"flagged_months" : int(grp["below_q1"].sum()),
"max_streak" : _streak(grp),
"bad_actor_months": ", ".join(
str(int(m)) for m in sorted(
grp.loc[grp["is_bad_actor"], "month"].unique())),
})
summary = (
pd.DataFrame(rows)
.sort_values(["section", "max_streak"], ascending=[True, False])
.reset_index(drop=True)
)
summary["last_bad_actor_month"] = summary["bad_actor_months"].apply(
lambda s: int(s.split(", ")[-1]) if s else None
)
summary = summary[summary["last_bad_actor_month"] == obs_month].reset_index(drop=True)
# ── KPI row ────────────────────────────────────────────────────────────────
total_eqm = filt["eqm_no"].nunique()
n_bad = len(summary)
rate = n_bad / total_eqm * 100 if total_eqm else 0
k1, k2, k3 = st.columns(3)
k1.metric("Equipment Evaluated", f"{total_eqm:,}")
k2.metric("Bad Actors Detected", f"{n_bad:,}")
k3.metric("Bad Actor Rate", f"{rate:.1f}%")
st.divider()
# ── Tabs ───────────────────────────────────────────────────────────────────
tab1, tab2, tab3 = st.tabs([
"⚠️ Bad Actors",
"πŸ“Š Reference Basis",
"πŸ”’ Scored Data",
])
# ── Tab 1: Bad actor list ──────────────────────────────────────────────────
with tab1:
_section("⚠️", "Bad Actor List",
f"Min {consecutive_n} consecutive month(s) Β· last flagged = Month {obs_month}")
if summary.empty:
st.success(f"No bad actors with last flagged month = Month {obs_month}.")
else:
st.markdown(
f'<p style="color:#e63946;font-weight:600;margin:4px 0 12px">'
f'{n_bad} equipment flagged</p>',
unsafe_allow_html=True,
)
st.dataframe(summary, use_container_width=True)
# Bad actor rate per section
_section("πŸ“ˆ", "Bad Actor Rate by Section")
for sec, grp in summary.groupby("section"):
sec_total = filt.loc[filt["section"] == sec, "eqm_no"].nunique()
sec_rate = len(grp) / sec_total if sec_total else 0
st.caption(f"{sec} β€” {len(grp)} / {sec_total} ({sec_rate*100:.1f}%)")
st.progress(sec_rate)
# ── Tab 2: Reference basis ─────────────────────────────────────────────────
with tab2:
norm_agg = (
filt.groupby(agg_key).agg(
MA_min=("MA", "min"), MA_max=("MA", "max"), MA_avg=("MA", "mean"),
MTBF_min=("MTBF", "min"), MTBF_max=("MTBF", "max"), MTBF_avg=("MTBF", "mean"),
)
.round(4)
.reset_index()
)
_section("πŸ“", "Normalisation Reference",
"min / max / avg of MA & MTBF used for normalization β€” aggregated across models")
st.dataframe(
norm_agg.style.format({
"MA_min": "{:.4f}", "MA_max": "{:.4f}", "MA_avg": "{:.4f}",
"MTBF_min": "{:.4f}", "MTBF_max": "{:.4f}", "MTBF_avg": "{:.4f}",
}),
use_container_width=True,
)
with st.expander("Detail per model"):
st.dataframe(
norm_stats.style.format({
"MA_min": "{:.4f}", "MA_max": "{:.4f}", "MA_avg": "{:.4f}",
"MTBF_min": "{:.4f}", "MTBF_max": "{:.4f}", "MTBF_avg": "{:.4f}",
}),
use_container_width=True,
)
pivot_idx = [k for k in basis_key if k not in ("model", "month")]
q1_pivot = q1_df.pivot_table(
index=pivot_idx, columns="month", values="q1_threshold", aggfunc="mean"
).round(4)
q1_pivot.columns = [f"Month {int(c)}" for c in q1_pivot.columns]
_section("πŸ“‰", "Q1 Threshold Table",
"25th percentile of bad actor score β€” aggregated across models")
st.dataframe(q1_pivot, use_container_width=True)
with st.expander("Detail per model"):
q1_pivot_detail = q1_df.pivot_table(
index=[k for k in basis_key if k != "month"],
columns="month", values="q1_threshold"
).round(4)
q1_pivot_detail.columns = [f"Month {int(c)}" for c in q1_pivot_detail.columns]
st.dataframe(q1_pivot_detail, use_container_width=True)
# ── Tab 3: Scored data ─────────────────────────────────────────────────────
with tab3:
_section("πŸ”’", "Scored Data",
"norm_MA Γ— norm_MTBF = bad_actor_score Β· rows in red = bad actor")
show_cols = [
"year", "month", "site_id", "section", "model", "eqm_no",
"MA", "MTBF", "norm_MA", "norm_MTBF",
"bad_actor_score", "q1_threshold", "below_q1", "is_bad_actor",
]
scored = (
filt[show_cols]
.sort_values(["site_id", "section", "month", "eqm_no"])
.reset_index(drop=True)
)
def _highlight(row):
color = "background-color: #ffe0e0" if row["is_bad_actor"] else ""
return [color] * len(row)
st.dataframe(
scored.style
.format({
"norm_MA": "{:.4f}", "norm_MTBF": "{:.4f}",
"bad_actor_score": "{:.4f}", "q1_threshold": "{:.4f}",
})
.apply(_highlight, axis=1),
use_container_width=True,
height=420,
)
# ── Sidebar controls ───────────────────────────────────────────────────────────
with st.sidebar:
st.markdown(
'<p style="font-size:1.15rem;font-weight:800;color:#1d3557;'
'border-left:4px solid #e63946;padding-left:10px;margin-bottom:12px">'
'Simulation Controls</p>',
unsafe_allow_html=True,
)
# ── Dataset management (admin only) ───────────────────────────────────────
if is_admin:
with st.expander("πŸ—‚οΈ Dataset", expanded=True):
# Status aktif saat ini
if st.session_state.custom_df is not None:
st.success(
f"**Custom dataset aktif**\n\n"
f"{st.session_state.custom_source}\n\n"
f"{len(st.session_state.custom_df):,} rows"
)
if st.button("πŸ—‘οΈ Hapus & kembali ke default", use_container_width=True):
st.session_state.custom_df = None
st.session_state.custom_source = None
st.rerun()
else:
st.info("Menggunakan **default** dataset\n\n`bad_actor_simulation.xlsx`")
st.markdown("---")
st.caption("Upload file untuk mengganti dataset aktif:")
uploaded = st.file_uploader(
"CSV atau XLSX",
type=["csv", "xlsx"],
label_visibility="collapsed",
)
if uploaded is not None:
raw = uploaded.read()
ext = uploaded.name.rsplit(".", 1)[-1].lower()
if ext == "csv":
candidate_df = load_csv(raw)
candidate_source = uploaded.name
else:
sheets = get_xlsx_sheets(raw)
sheet = sheets[0] if len(sheets) == 1 else st.selectbox("Pilih sheet", sheets)
candidate_df = load_xlsx(raw, sheet)
candidate_source = f"{uploaded.name} [sheet: {sheet}]"
missing = REQUIRED_COLS - set(candidate_df.columns)
if missing:
st.error(f"Kolom tidak ditemukan: {', '.join(sorted(missing))}")
else:
st.caption(f"Preview β€” {len(candidate_df):,} rows, {len(candidate_df.columns)} cols")
st.dataframe(candidate_df.head(3), use_container_width=True)
if st.button("βœ… Terapkan dataset ini", use_container_width=True):
st.session_state.custom_df = candidate_df
st.session_state.custom_source = candidate_source
st.rerun()
st.divider()
# ── Determine active dataframe ─────────────────────────────────────────────
df = st.session_state.custom_df if st.session_state.custom_df is not None else load_default()
data_source = st.session_state.custom_source or "bad_actor_simulation.xlsx (default)"
# ── Filters ────────────────────────────────────────────────────────────────
with st.expander("πŸŽ›οΈ Filters", expanded=True):
all_years = sorted(df["year"].dropna().unique().tolist())
all_sites = sorted(df["site_id"].dropna().unique().tolist())
all_sections = sorted(df["section"].dropna().unique().tolist())
all_models = ["ALL"] + sorted(df["model"].dropna().astype(str).unique().tolist())
def _default(lst, vals):
r = [v for v in vals if v in lst]
return r if r else lst[:1]
sel_years = st.multiselect(
"πŸ—“οΈ Year(s)", all_years, default=_default(all_years, DEFAULT_YEARS))
obs_month = st.slider("πŸ“… Observation Month (cutoff)", 1, 12, DEFAULT_OBS_MONTH,
help="Only data up to this month is included in the evaluation.")
sel_sites = st.multiselect(
"🏭 Site(s)", all_sites, default=_default(all_sites, DEFAULT_SITES))
sel_sections = st.multiselect(
"πŸ”§ Section(s)", all_sections,
default=_default(all_sections, DEFAULT_SECTIONS))
sel_models = st.multiselect(
"🚜 Model(s) (ALL = no model filter)",
all_models, default=_default(all_models, DEFAULT_MODELS))
consecutive_n = st.slider("πŸ” Min Consecutive Months", 1, 3, DEFAULT_CONSECUTIVE_N)
st.markdown("<br>", unsafe_allow_html=True)
run = st.button("β–Ά Run Simulation", type="primary", use_container_width=True)
st.caption("Adjust filters above, then click Run.")
# ── Main area ──────────────────────────────────────────────────────────────────
# Admin banner (hanya muncul di mode admin)
if is_admin:
st.markdown(
'<div class="admin-banner">'
'<h3>βš™οΈ Admin Mode</h3>'
'<p>Dataset management aktif. Gunakan panel <strong>Dataset</strong> di sidebar '
'untuk upload, replace, atau hapus dataset. '
'File default (<code>bad_actor_simulation.xlsx</code>) tidak akan pernah diubah.</p>'
'</div>',
unsafe_allow_html=True,
)
hcol1, hcol2 = st.columns([3, 1])
with hcol1:
st.markdown(
'<div class="app-header">'
'<h1>⚠️ Bad Actor Simulation</h1>'
'<div class="tagline">Equipment reliability scoring based on normalised MA Γ— MTBF</div>'
'</div>',
unsafe_allow_html=True,
)
with hcol2:
st.markdown(
f'<div style="text-align:right;padding-top:8px">'
f'<span class="dataset-badge">πŸ“ {data_source}</span><br>'
f'<span class="dataset-badge" style="margin-top:4px;display:inline-block">'
f'πŸ“… Month 1 – {obs_month}</span>'
f'</div>',
unsafe_allow_html=True,
)
st.divider()
if run:
if not sel_years or not sel_sites or not sel_sections or not sel_models:
st.warning("Please select at least one value for each filter.")
else:
run_simulation(sel_years, sel_sites, sel_sections, sel_models,
consecutive_n, obs_month)
else:
st.markdown("""
<div class="welcome-card">
<h2>Welcome to the Simulation Console</h2>
<p>Identify equipment that consistently underperforms relative to its peers.</p>
<ul class="step-list">
<li><span class="step-num">1</span> Expand <strong>Filters</strong> to set year, site, section, model, and observation month.</li>
<li><span class="step-num">2</span> Click <strong>Run Simulation</strong> to compute scores and flag bad actors.</li>
</ul>
</div>
""", unsafe_allow_html=True)