"""
Missing Value Analyzer — Statistically Rigorous Pipeline
=========================================================
Phases:
1 Upload CSV & Train/Test Split
2 Missing Value Overview (train set only)
3 Per-Column Diagnostics (Tables for all tests)
4 Imputation Feasibility Gate (KDE plots, Variance %, New Outliers)
5 Final Report & Recommendations
"""
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import chi2_contingency, ttest_ind, norm, chi2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import warnings
warnings.filterwarnings("ignore")
# ─────────────────────────── Page config ────────────────────────────
st.set_page_config(
page_title="Missing Value Analyzer",
page_icon="🔬",
layout="wide",
initial_sidebar_state="expanded",
)
# ─────────────────────────── CSS ────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ════════════════════════════════════════════════════════════════════
# SESSION STATE INIT
# ════════════════════════════════════════════════════════════════════
defaults = {"df_full": None, "df_train": None, "df_test": None, "target_col": None, "split_ratio": 0.8, "col_diagnostics": {}}
for k, v in defaults.items():
if k not in st.session_state: st.session_state[k] = v
# ════════════════════════════════════════════════════════════════════
# STATISTICAL TEST HELPERS
# ════════════════════════════════════════════════════════════════════
def littles_mcar_test(df: pd.DataFrame, cols_with_missing: list) -> dict:
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
chi2_total, df_total = 0.0, 0
for col in cols_with_missing:
if col not in numeric_cols: continue
missing_mask = df[col].isnull()
if missing_mask.sum() < 5 or (~missing_mask).sum() < 5: continue
for other in numeric_cols:
if other == col: continue
g1, g2 = df.loc[missing_mask, other].dropna(), df.loc[~missing_mask, other].dropna()
if len(g1) < 3 or len(g2) < 3: continue
grand_mean, grand_var = df[other].mean(), df[other].var()
if grand_var < 1e-12: continue
chi2_total += (len(g1)*(g1.mean() - grand_mean)**2 + len(g2)*(g2.mean() - grand_mean)**2) / grand_var
df_total += 1
if df_total == 0: return {"chi2": None, "p_value": None, "verdict": "Insufficient numeric data"}
p_val = 1 - chi2.cdf(chi2_total, df_total)
verdict = f"Fail to reject MCAR" if p_val >= 0.05 else f"Reject MCAR"
return {"chi2": round(chi2_total, 4), "df": df_total, "p_value": round(p_val, 4), "verdict": verdict, "reject_mcar": p_val < 0.05}
def feature_dependency_tests(df: pd.DataFrame, col: str) -> dict:
missing_mask = df[col].isnull()
if missing_mask.sum() < 5 or (~missing_mask).sum() < 5: return {"results": {}, "n_significant": 0, "signal": "Insufficient data"}
results = {}
for other in df.columns:
if other == col: continue
g_miss, g_obs = df.loc[missing_mask, other].dropna(), df.loc[~missing_mask, other].dropna()
if len(g_miss) < 3 or len(g_obs) < 3: continue
try:
if pd.api.types.is_numeric_dtype(df[other]):
n1, n2 = len(g_miss), len(g_obs)
if min(n1, n2) >= 30:
se = np.sqrt(g_miss.var()/n1 + g_obs.var()/n2)
if se < 1e-12: continue
z_stat = (g_miss.mean() - g_obs.mean()) / se
p_val = 2 * (1 - norm.cdf(abs(z_stat)))
test_name, stat = "z-test", round(z_stat, 4)
else:
t_stat, p_val = ttest_ind(g_miss, g_obs, equal_var=False)
test_name, stat = "Welch t-test", round(t_stat, 4)
results[other] = {"test": test_name, "stat": stat, "p_value": round(p_val, 4), "significant": p_val < 0.05, "type": "numeric"}
else:
ct = pd.crosstab(missing_mask.astype(int), df[other])
if ct.shape[0] < 2 or ct.shape[1] < 2: continue
chi2_stat, p_val, _, _ = chi2_contingency(ct)
results[other] = {"test": "chi²", "stat": round(chi2_stat, 4), "p_value": round(p_val, 4), "significant": p_val < 0.05, "type": "categorical"}
except Exception: continue
n_sig = sum(1 for r in results.values() if r["significant"])
sig_pct = n_sig / max(len(results), 1) * 100
signal = "No features differ significantly" if sig_pct == 0 else f"{n_sig}/{len(results)} features differ (p<0.05)"
return {"results": results, "n_significant": n_sig, "total_tested": len(results), "sig_pct": round(sig_pct, 1), "signal": signal}
def target_dependency_test(df: pd.DataFrame, col: str, target_col: str) -> dict:
missing_mask = df[col].isnull()
if missing_mask.sum() < 5 or (~missing_mask).sum() < 5: return {"p_value": None, "signal": "Insufficient data", "significant": False}
try:
g_miss, g_obs = df.loc[missing_mask, target_col].dropna(), df.loc[~missing_mask, target_col].dropna()
if pd.api.types.is_numeric_dtype(df[target_col]):
n1, n2 = len(g_miss), len(g_obs)
if min(n1, n2) >= 30:
se = np.sqrt(g_miss.var()/n1 + g_obs.var()/n2)
if se < 1e-12: return {"p_value": None, "signal": "Zero variance", "significant": False}
z_stat = (g_miss.mean() - g_obs.mean()) / se
p_val = 2 * (1 - norm.cdf(abs(z_stat)))
else:
_, p_val = ttest_ind(g_miss, g_obs, equal_var=False)
diff_pct = abs(g_miss.mean() - g_obs.mean()) / max(abs(g_obs.mean()), 1e-9) * 100
else:
ct = pd.crosstab(missing_mask.astype(int), df[target_col])
_, p_val, _, _ = chi2_contingency(ct)
p1, p2 = g_miss.value_counts(normalize=True).iloc[0]*100, g_obs.value_counts(normalize=True).iloc[0]*100
diff_pct = abs(p1 - p2)
sig = p_val < 0.05
signal = f"Not significant (p={p_val:.4f})" if not sig else f"Significant — target differs by {diff_pct:.1f}%"
return {"p_value": round(p_val, 4), "significant": sig, "diff_pct": round(diff_pct, 2), "signal": signal}
except Exception as e: return {"p_value": None, "signal": f"Error: {e}", "significant": False}
def classify_mechanism(t_feat, t_target, little):
tgt_sig, tgt_diff = t_target.get("significant", False), t_target.get("diff_pct", 0)
sig_pct = t_feat.get("sig_pct", 0)
if tgt_sig and tgt_diff >= 10: return "MNAR", "High", "Missingness strongly correlates with the outcome."
elif tgt_sig and tgt_diff >= 5: return "MNAR", "Moderate", "Moderate dependency on target. Treat conservatively as MNAR."
elif sig_pct > 30: return "MAR", "High", "Strong dependency on observed features detected."
elif sig_pct > 0: return "MAR", "Moderate", "Weak but present dependency on observed features."
elif little.get("reject_mcar"): return "MAR", "Low", "Little's test rejects MCAR, but feature tests show weak dependency."
else: return "MCAR", "High", "No statistical evidence of systematic missingness."
def run_single_diagnostic(df, col, target_col):
little, t_feat = littles_mcar_test(df, [col]), feature_dependency_tests(df, col)
t_target = {"p_value": None, "significant": False, "signal": "Skipped (Is Target)", "diff_pct": 0} if col == target_col else target_dependency_test(df, col, target_col)
mech, conf, expl = classify_mechanism(t_feat, t_target, little)
st.session_state["col_diagnostics"][col] = {
"mechanism": mech, "confidence": conf, "explanation": expl,
"miss_pct": round(df[col].isnull().mean()*100, 2),
"dtype": str(df[col].dtype),
"little": little, "t_feat": t_feat, "t_target": t_target
}
# ════════════════════════════════════════════════════════════════════
# IMPUTATION SIMULATION HELPERS
# ════════════════════════════════════════════════════════════════════
def feasibility_checks(df: pd.DataFrame, col: str, target_col: str, impute_method: str) -> dict:
series = df[col].dropna()
if len(series) < 5 or not pd.api.types.is_numeric_dtype(df[col]):
return {"applicable": False}
results = {"applicable": True, "escalate_to_knn": False, "reasons": []}
# ── 1. Impute ──
if impute_method == "Mean": imputed_series = df[col].fillna(series.mean())
elif impute_method == "Median": imputed_series = df[col].fillna(series.median())
else:
numeric_cols = [c for c in df.select_dtypes(include=[np.number]).columns if c != target_col]
X_num = df[numeric_cols].copy()
try:
scaler = StandardScaler()
X_scaled = pd.DataFrame(scaler.fit_transform(X_num), columns=X_num.columns)
imputer = KNNImputer(n_neighbors=5) if impute_method == "KNN" else IterativeImputer(random_state=42, max_iter=10)
X_imputed_scaled = pd.DataFrame(imputer.fit_transform(X_scaled), columns=X_num.columns)
X_imputed = pd.DataFrame(scaler.inverse_transform(X_imputed_scaled), columns=X_num.columns)
imputed_series = X_imputed[col]
except Exception:
imputed_series = df[col].fillna(series.median())
results["imputed_series"] = imputed_series
# ── 2. Skewness & Outliers ──
skew = series.skew()
Q1_b, Q3_b = series.quantile(0.25), series.quantile(0.75)
IQR_b = Q3_b - Q1_b
outliers_before = ((series < Q1_b - 1.5*IQR_b) | (series > Q3_b + 1.5*IQR_b)).sum()
Q1_a, Q3_a = imputed_series.quantile(0.25), imputed_series.quantile(0.75)
IQR_a = Q3_a - Q1_a
outliers_after = ((imputed_series < Q1_a - 1.5*IQR_a) | (imputed_series > Q3_a + 1.5*IQR_a)).sum()
new_outliers = max(0, outliers_after - outliers_before)
if impute_method == "Mean":
skew_verdict = "fail" if abs(skew) > 1 else "ok"
elif impute_method == "Median":
skew_verdict = "warn" if abs(skew) > 3 else "ok"
else:
skew_verdict = "ok"
results["skewness"] = {"verdict": skew_verdict, "value": skew, "msg": f"Skewness = {skew:.3f}"}
if new_outliers > (len(series) * 0.05):
out_verdict = "warn"
else:
out_verdict = "ok"
results["outliers"] = {
"verdict": out_verdict,
"new_outliers": new_outliers,
"outliers_before": outliers_before,
"outliers_after": outliers_after
}
# ── 3. Variance Impact ──
var_before = series.var()
var_after = imputed_series.var()
var_drop_pct = (var_before - var_after) / var_before * 100 if var_before > 1e-12 else 0
if var_drop_pct <= 10: var_verdict, var_msg = "ok", f"Variance Change: {var_drop_pct:.1f}%"
elif var_drop_pct <= 20: var_verdict, var_msg = "warn", f"Variance Change: {var_drop_pct:.1f}%"
else: var_verdict, var_msg = "fail", f"Variance Change: {var_drop_pct:.1f}%"
results["variance"] = {"verdict": var_verdict, "msg": var_msg, "var_drop_pct": var_drop_pct}
# ── 4. Correlation Preservation ──
numeric_others = [c for c in df.select_dtypes(include=[np.number]).columns if c != col and c != target_col]
corr_results, max_corr_shift, sign_flip = {}, 0.0, False
for other in numeric_others[:10]:
s_before = df[[col, other]].dropna()
if len(s_before) < 5: continue
r_before = s_before[col].corr(s_before[other])
r_after = imputed_series.corr(df[other])
delta = abs(r_before - r_after)
flipped = (r_before * r_after < 0) and (abs(r_before) > 0.1)
corr_results[other] = {"r_before": round(r_before, 4), "r_after": round(r_after, 4), "delta": round(delta, 4), "sign_flip": flipped}
max_corr_shift = max(max_corr_shift, delta)
if flipped: sign_flip = True
if max_corr_shift <= 0.05 and not sign_flip: corr_verdict, corr_msg = "ok", f"Max Δ = {max_corr_shift:.3f} — Correlation well preserved"
elif sign_flip: corr_verdict, corr_msg = "fail", f"Sign flip detected! Correlation direction reversed."
elif max_corr_shift <= 0.10: corr_verdict, corr_msg = "warn", f"Max Δ = {max_corr_shift:.3f} — Moderate correlation shift"
else: corr_verdict, corr_msg = "fail", f"Max Δ = {max_corr_shift:.3f} — Large correlation shift detected"
results["correlation"] = {"details": corr_results, "verdict": corr_verdict, "msg": corr_msg, "max_shift": round(max_corr_shift, 4)}
return results
def get_auto_recommendation(df, col, target, mechanism, miss_pct, dtype):
"""Determine best imputation strategy with explicit labeling."""
needs_indicator = (mechanism == "MNAR") or (mechanism == "MAR" and miss_pct >= 10)
indicator_suffix = " + Missing Indicator" if needs_indicator else ""
# High missingness — always flag
if miss_pct > 70:
return f"Drop Column"
if mechanism == "MCAR" and miss_pct <= 5:
return "Drop Rows"
# Categorical / non-numeric
if not pd.api.types.is_numeric_dtype(df[col]):
return f"Mode Imputation{indicator_suffix}"
# Numeric: run quick feasibility to decide
feas_med = feasibility_checks(df, col, target, "Median")
if not feas_med.get("applicable"):
return f"Median Imputation{indicator_suffix}"
var_ok = feas_med["variance"]["var_drop_pct"] <= 20
corr_ok = feas_med["correlation"]["verdict"] != "fail"
skew_val = abs(feas_med["skewness"].get("value", 0))
if var_ok and corr_ok:
if skew_val <= 1:
return f"Mean Imputation{indicator_suffix}"
else:
return f"Median Imputation{indicator_suffix}"
else:
if miss_pct > 30:
return f"MICE Imputer{indicator_suffix}"
else:
return f"KNN Imputer{indicator_suffix}"
# ════════════════════════════════════════════════════════════════════
# SIDEBAR NAVIGATION
# ════════════════════════════════════════════════════════════════════
STEPS = ["1 · Upload & Split", "2 · Overview", "3 · Column Diagnostics", "4 · Feasibility Gate", "5 · Final Report"]
with st.sidebar:
st.markdown("## 🔬 Missing Value Analyzer")
st.markdown("---")
step = st.radio("Navigate:", STEPS, label_visibility="collapsed")
st.markdown("---")
if st.session_state.get("df_train") is not None:
st.markdown(f"**Train set:** {st.session_state['df_train'].shape[0]} rows × {st.session_state['df_train'].shape[1]} cols")
st.markdown(f"**Diagnosed:** {len(st.session_state['col_diagnostics'])} columns")
st.markdown("Analysis runs on TRAIN SET only to prevent data leakage.", unsafe_allow_html=True)
# ════════════════════════════════════════════════════════════════════
# STEP 1 — UPLOAD & SPLIT
# ════════════════════════════════════════════════════════════════════
def render_step1():
st.markdown('
📂 Step 1 — Upload CSV & Train/Test Split
', unsafe_allow_html=True)
uploaded = st.file_uploader("Choose a CSV file", type=["csv"])
if not uploaded: return st.info("👆 Upload a CSV file to begin.")
df = pd.read_csv(uploaded)
st.success(f"✅ Loaded **{uploaded.name}**")
col1, col2 = st.columns(2)
target = col1.selectbox("Target column (Y):", df.columns.tolist(), index=len(df.columns)-1)
split_pct = col2.slider("Train size:", 50, 95, 80, 5, format="%d%%")
if st.button("✅ Confirm & Split", type="primary"):
df_train, df_test = train_test_split(df, train_size=split_pct/100.0, random_state=42)
st.session_state.update({"df_full": df, "df_train": df_train.reset_index(drop=True), "df_test": df_test.reset_index(drop=True), "target_col": target, "col_diagnostics": {}})
st.success("✅ Split complete!")
st.dataframe(df_train.head(), use_container_width=True)
# ════════════════════════════════════════════════════════════════════
# STEP 2 — OVERVIEW
# ════════════════════════════════════════════════════════════════════
def render_step2():
st.markdown('📊 Step 2 — Missing Value Overview
', unsafe_allow_html=True)
df = st.session_state.get("df_train")
if df is None: return st.warning("⚠️ Please complete Step 1.")
miss_cols = [c for c in df.columns if df[c].isnull().any()]
if not miss_cols: return st.success("🎉 No missing values!")
# ── Summary table ──
summary = pd.DataFrame({
"Missing Count": df[miss_cols].isnull().sum(),
"Missing %": (df[miss_cols].isnull().sum() / len(df) * 100).round(2)
}).sort_values("Missing %", ascending=False)
st.dataframe(
summary.style.background_gradient(cmap="YlOrRd", subset=["Missing %"]),
use_container_width=True
)
st.markdown("---")
# ── Missingness Heatmap ──
st.markdown("### 🗺️ Missingness Heatmap")
st.caption("Each dark stripe = a missing value in that row. Aligned stripes across columns = rows missing together (MAR signal).")
fig_h, ax_h = plt.subplots(figsize=(14, max(3, len(miss_cols) * 0.6)))
fig_h.patch.set_facecolor('#f8f8f8')
ax_h.set_facecolor('#f0f0f0')
miss_matrix = df[miss_cols].isnull().astype(int)
# Subsample rows for performance if large
if len(miss_matrix) > 2000:
miss_matrix = miss_matrix.sample(2000, random_state=42).reset_index(drop=True)
ax_h.imshow(
miss_matrix.T.values,
aspect='auto',
cmap=sns.color_palette(["#f0f0f0", "#17172b"], as_cmap=True),
interpolation='none'
)
ax_h.set_yticks(range(len(miss_cols)))
ax_h.set_yticklabels(miss_cols, fontsize=10)
ax_h.set_xlabel("Row index (sampled)" if len(df) > 2000 else "Row index", fontsize=10)
ax_h.set_title("Missing Value Pattern (dark = missing)", fontsize=12, fontweight='bold', pad=10)
ax_h.spines[['top','right','bottom','left']].set_visible(False)
plt.tight_layout()
st.pyplot(fig_h, use_container_width=True)
plt.close()
st.markdown("---")
# ── Missingness Correlation Heatmap ──
st.markdown("### 🔗 Missingness Correlation")
st.caption("Correlation between missing patterns of columns. Values near 1.0 = these columns tend to be missing in the same rows — strong MAR signal.")
if len(miss_cols) >= 2:
miss_indicator = df[miss_cols].isnull().astype(int)
corr_matrix = miss_indicator.corr()
fig_c, ax_c = plt.subplots(figsize=(max(6, len(miss_cols) * 1.2), max(5, len(miss_cols) * 1.0)))
fig_c.patch.set_facecolor('#f8f8f8')
mask = np.zeros_like(corr_matrix, dtype=bool)
mask[np.triu_indices_from(mask, k=1)] = True # show lower triangle only
sns.heatmap(
corr_matrix,
mask=mask,
annot=True,
fmt=".2f",
cmap="RdYlGn",
vmin=-1, vmax=1,
center=0,
ax=ax_c,
square=True,
linewidths=0.5,
linecolor='white',
annot_kws={"size": 10, "weight": "bold"},
cbar_kws={"shrink": 0.8}
)
ax_c.set_title("Pairwise Missingness Correlation", fontsize=12, fontweight='bold', pad=12)
ax_c.tick_params(axis='x', rotation=45, labelsize=10)
ax_c.tick_params(axis='y', rotation=0, labelsize=10)
plt.tight_layout()
st.pyplot(fig_c, use_container_width=True)
plt.close()
# Interpretation callout
max_corr_pair = None
max_val = 0
for i in range(len(miss_cols)):
for j in range(i):
val = abs(corr_matrix.iloc[i, j])
if val > max_val:
max_val = val
max_corr_pair = (miss_cols[i], miss_cols[j], corr_matrix.iloc[i, j])
if max_corr_pair:
c1, c2, v = max_corr_pair
if v >= 0.9:
st.markdown(f'🚨 Very high missingness correlation ({v:.2f}) between {c1} and {c2} — these rows go missing together. Strong MAR signal; consider joint imputation (KNN/MICE).
', unsafe_allow_html=True)
elif v >= 0.5:
st.markdown(f'⚠️ Moderate missingness correlation ({v:.2f}) between {c1} and {c2} — partial co-occurrence of missingness detected.
', unsafe_allow_html=True)
else:
st.markdown(f'✅ Low missingness correlation (max {v:.2f}) — columns appear to be missing independently.
', unsafe_allow_html=True)
else:
st.info("Only one column with missing values — correlation requires at least two.")
# ════════════════════════════════════════════════════════════════════
# STEP 3 — DIAGNOSTICS
# ════════════════════════════════════════════════════════════════════
def render_step3():
st.markdown('🧪 Step 3 — Per-Column Diagnostics
', unsafe_allow_html=True)
df, target = st.session_state.get("df_train"), st.session_state.get("target_col")
if df is None: return st.warning("⚠️ Please complete Step 1.")
miss_cols = [c for c in df.columns if df[c].isnull().any()]
if not miss_cols: return st.success("🎉 No missing values.")
col1, col2 = st.columns([1, 4])
selected_col = col1.selectbox("Select column to view:", miss_cols)
run_single = col1.button("▶ Run Diagnostics")
run_all = col2.button("▶ Run ALL columns", type="primary")
if run_single:
run_single_diagnostic(df, selected_col, target)
if run_all:
progress = st.progress(0, text="Running diagnostics...")
for i, c in enumerate(miss_cols):
run_single_diagnostic(df, c, target)
progress.progress((i+1)/len(miss_cols), text=f"Diagnosing: {c}")
progress.empty()
st.success(f"✅ Diagnosed {len(miss_cols)} columns.")
if selected_col in st.session_state["col_diagnostics"]:
res = st.session_state["col_diagnostics"][selected_col]
little, t_feat, t_target = res["little"], res["t_feat"], res["t_target"]
st.markdown("---")
# ── Mechanism verdict card ──
card_class = {"MCAR":"card-mcar","MAR":"card-mar","MNAR":"card-mnar"}[res["mechanism"]]
emoji = {"MCAR":"🟢","MAR":"🟠","MNAR":"🔴"}[res["mechanism"]]
st.markdown(
f''
f'
{emoji} Mechanism: {res["mechanism"]} — {res["confidence"]} Confidence
'
f'
{res["explanation"]}
'
f'
Missing: {res["miss_pct"]}% | dtype: {res["dtype"]}
'
f'
',
unsafe_allow_html=True
)
# ══ TEST 1: Little's MCAR ══
st.markdown('', unsafe_allow_html=True)
with st.expander("ℹ️ What does this test measure?", expanded=False):
st.markdown("""
**Little's MCAR test** checks if missingness is completely random.
- **H₀ (null):** Data is Missing Completely At Random (MCAR)
- **p ≥ 0.05:** Fail to reject → data may be MCAR
- **p < 0.05:** Reject → systematic missingness detected
""")
little_rows = [{
"Test": "Little's MCAR",
"χ² Statistic": little.get("chi2", "N/A"),
"Degrees of Freedom": little.get("df", "N/A"),
"p-value": little.get("p_value", "N/A"),
"Verdict": little.get("verdict", "N/A"),
"Reject MCAR?": "✅ Yes — systematic" if little.get("reject_mcar") else "❌ No — may be MCAR"
}]
st.dataframe(pd.DataFrame(little_rows), use_container_width=True, hide_index=True)
# ══ TEST 2: Target Dependency ══
st.markdown('', unsafe_allow_html=True)
with st.expander("ℹ️ What does this test measure?", expanded=False):
st.markdown("""
Tests if the **target variable** has different values when this column is missing vs. observed.
- **Numeric target:** z-test or Welch t-test
- **Categorical target:** Chi-squared test
- **Significant (p<0.05) + large diff % → MNAR** (missingness depends on outcome)
""")
tgt_rows = [{
"Test Applied": "z-test / Welch t-test / Chi²",
"p-value": t_target.get("p_value", "N/A"),
"Target Diff %": f'{t_target.get("diff_pct", 0):.1f}%' if t_target.get("diff_pct") is not None else "N/A",
"Significant (p<0.05)?": "✅ Yes" if t_target.get("significant") else "❌ No",
"Interpretation": t_target.get("signal", "N/A")
}]
st.dataframe(pd.DataFrame(tgt_rows), use_container_width=True, hide_index=True)
# ══ TEST 3: Feature Dependency ══
st.markdown('', unsafe_allow_html=True)
with st.expander("ℹ️ What does this test measure?", expanded=False):
st.markdown("""
For each other feature, tests if values differ **significantly** between rows where this column is missing vs. observed.
- **Numeric features:** z-test (n≥30) or Welch t-test
- **Categorical features:** Chi-squared test
- **Many significant features (>30%) → MAR** (missingness explained by observed data)
""")
# Summary row first
summary_cols = st.columns(3)
summary_cols[0].metric("Features Tested", t_feat.get("total_tested", 0))
summary_cols[1].metric("Significant (p<0.05)", t_feat.get("n_significant", 0))
summary_cols[2].metric("% Significant", f'{t_feat.get("sig_pct", 0):.1f}%')
if t_feat["results"]:
rows = []
for f, r in t_feat["results"].items():
rows.append({
"Feature": f,
"Data Type": r["type"].capitalize(),
"Test Used": r["test"],
"Test Statistic": r["stat"],
"p-value": r["p_value"],
"p < 0.05?": "✅ Significant" if r["significant"] else "—"
})
feat_df = pd.DataFrame(rows).sort_values("p-value")
def highlight_sig(row):
if row["p < 0.05?"] == "✅ Significant":
return ["background-color:#ffe4e1; color:#900000"] * len(row)
return [""] * len(row)
st.dataframe(
feat_df.style.apply(highlight_sig, axis=1),
use_container_width=True,
hide_index=True
)
else:
st.info("No feature dependency results available (insufficient data or no other columns).")
# ══ Decision Logic Summary ══
st.markdown('', unsafe_allow_html=True)
logic_rows = [
{"Rule Check": "Little's test rejects MCAR?", "Result": "✅ Yes" if little.get("reject_mcar") else "❌ No"},
{"Rule Check": "Target differs significantly?", "Result": "✅ Yes" if t_target.get("significant") else "❌ No"},
{"Rule Check": "Target diff magnitude", "Result": f'{t_target.get("diff_pct", 0):.1f}% difference'},
{"Rule Check": "% of features with significant diff", "Result": f'{t_feat.get("sig_pct", 0):.1f}%'},
{"Rule Check": "→ Final Mechanism", "Result": f'{res["mechanism"]} ({res["confidence"]} confidence)'},
]
st.dataframe(pd.DataFrame(logic_rows), use_container_width=True, hide_index=True)
# ════════════════════════════════════════════════════════════════════
# STEP 4 — FEASIBILITY GATE (Interactive)
# ════════════════════════════════════════════════════════════════════
def render_step4():
st.markdown('⚖️ Step 4 — Imputation Feasibility Gate
', unsafe_allow_html=True)
with st.expander("📚 Theory & Guide: Why test imputation mathematically? (Click to expand)"):
st.markdown("""
Why test imputation mathematically?
Single-value imputations (like filling blanks with Mean or Median) are dangerous if overused. They can:
- Collapse Variance: If you fill 20% of the data with the same number, the spread of your data shrinks unnaturally.
- Create Artificial Outliers: Because the variance (IQR) shrank, real valid data points at the edges suddenly look like outliers!
- Destroy Correlation: Assigning a median weight to someone without considering their height breaks the natural relationship between features.
KNN and MICE solve this by acting like mini machine-learning models — they look at other features to make an educated guess, preserving variance and correlations.
""", unsafe_allow_html=True)
df, target = st.session_state.get("df_train"), st.session_state.get("target_col")
col_diag = st.session_state.get("col_diagnostics", {})
if not col_diag: return st.warning("⚠️ Please run diagnostics in Step 3 first.")
numeric_diag = {c: v for c, v in col_diag.items() if pd.api.types.is_numeric_dtype(df[c])}
if not numeric_diag: return st.info("No numeric columns available.")
col1, col2 = st.columns([1, 2])
selected_col = col1.selectbox("Select numeric column:", list(numeric_diag.keys()))
impute_choice = col2.radio("Simulate impact of:", ["Mean", "Median", "KNN", "MICE"], horizontal=True)
if st.button(f"▶ Simulate {impute_choice} Imputation", type="primary"):
with st.spinner(f"Running {impute_choice} simulation (may take a moment for KNN/MICE)..."):
feas = feasibility_checks(df, selected_col, target, impute_choice)
if not feas.get("applicable"):
return st.error("Column not applicable for numeric feasibility checks.")
ICONS = {"ok": "✅", "warn": "⚠️", "fail": "❌"}
COLORS = {"ok": "stat-ok", "warn": "stat-warn", "fail": "stat-fail"}
# ── Big Stats Banner ──
st.markdown("### 📊 Imputation Impact — Key Statistics")
m1, m2, m3, m4 = st.columns(4)
var_pct = feas["variance"]["var_drop_pct"]
var_verd = feas["variance"]["verdict"]
new_out = feas["outliers"]["new_outliers"]
out_verd = feas["outliers"]["verdict"]
corr_verd = feas["correlation"]["verdict"]
corr_max = feas["correlation"]["max_shift"]
skew_val = feas["skewness"]["value"]
skew_verd = feas["skewness"]["verdict"]
m1.markdown(
f''
f'
-{var_pct:.1f}%
'
f'
Variance Change
'
f'
{ICONS[var_verd]} {"Safe" if var_verd=="ok" else "Caution" if var_verd=="warn" else "High Risk"}
'
f'
', unsafe_allow_html=True
)
m2.markdown(
f''
f'
+{new_out}
'
f'
New Outliers Created
'
f'
{ICONS[out_verd]} Before: {feas["outliers"]["outliers_before"]} → After: {feas["outliers"]["outliers_after"]}
'
f'
', unsafe_allow_html=True
)
m3.markdown(
f''
f'
Δ{corr_max:.3f}
'
f'
Max Corr. Shift
'
f'
{ICONS[corr_verd]} {corr_verd.capitalize()}
'
f'
', unsafe_allow_html=True
)
m4.markdown(
f''
f'
{skew_val:.3f}
'
f'
Skewness
'
f'
{ICONS[skew_verd]} {"Low" if abs(skew_val)<=1 else "Moderate" if abs(skew_val)<=3 else "High"} skew
'
f'
', unsafe_allow_html=True
)
st.markdown("---")
# ── KDE Plots — Two clear separate charts ──
st.markdown("### 📈 Distribution Comparison (KDE)")
series = df[selected_col].dropna()
imputed = feas["imputed_series"]
miss_pct_col = df[selected_col].isnull().mean() * 100
fig, axes = plt.subplots(1, 2, figsize=(16, 5))
fig.patch.set_facecolor('#fafafa')
# Plot 1: Overlapping KDE
ax = axes[0]
ax.set_facecolor('#f8f8f8')
try:
from scipy.stats import gaussian_kde
# Original KDE
kde_orig = gaussian_kde(series.values, bw_method='scott')
x_range = np.linspace(min(series.min(), imputed.min()), max(series.max(), imputed.max()), 300)
ax.fill_between(x_range, kde_orig(x_range), alpha=0.35, color='#17172b', label='Original (observed only)')
ax.plot(x_range, kde_orig(x_range), color='#17172b', lw=2.5)
# Imputed KDE
kde_imp = gaussian_kde(imputed.values, bw_method='scott')
ax.fill_between(x_range, kde_imp(x_range), alpha=0.35, color='#d6336c', label=f'After {impute_choice}')
ax.plot(x_range, kde_imp(x_range), color='#d6336c', lw=2.5, linestyle='--')
except Exception:
ax.hist(series.values, bins=25, alpha=0.5, color='#17172b', label='Original', density=True)
ax.hist(imputed.values, bins=25, alpha=0.4, color='#d6336c', label=f'After {impute_choice}', density=True)
ax.set_title(f'KDE: Original vs After {impute_choice}\n({miss_pct_col:.1f}% was missing)', fontsize=13, fontweight='bold', pad=12)
ax.set_xlabel(selected_col, fontsize=11)
ax.set_ylabel('Density', fontsize=11)
ax.legend(fontsize=10)
ax.grid(axis='y', alpha=0.3)
ax.spines[['top','right']].set_visible(False)
# Plot 2: Box plots side by side
ax2 = axes[1]
ax2.set_facecolor('#f8f8f8')
bp = ax2.boxplot(
[series.values, imputed.values],
labels=['Original\n(non-missing)', f'After\n{impute_choice}'],
patch_artist=True,
widths=0.5,
medianprops=dict(color='#d6336c', linewidth=2.5),
flierprops=dict(marker='o', markerfacecolor='#d6336c', markersize=5, alpha=0.5),
whiskerprops=dict(linewidth=1.5),
capprops=dict(linewidth=1.5),
)
bp['boxes'][0].set_facecolor('#c8d8f0')
bp['boxes'][1].set_facecolor('#f5c6d0')
# Annotate variance change
ax2.set_title(
f'Spread & Outliers\nVariance Change: {var_pct:.1f}% | New Outliers: +{new_out}',
fontsize=13, fontweight='bold', pad=12
)
ax2.set_ylabel('Value', fontsize=11)
ax2.grid(axis='y', alpha=0.3)
ax2.spines[['top','right']].set_visible(False)
plt.tight_layout(pad=2.5)
st.pyplot(fig, use_container_width=True)
plt.close()
# ── Correlation Details ──
st.markdown("---")
st.markdown("#### 🔗 Correlation Preservation Details")
st.markdown(f'{ICONS[corr_verd]} {feas["correlation"]["msg"]}
', unsafe_allow_html=True)
if feas["correlation"]["details"]:
rows = [{
"Feature": f,
"r (before)": r["r_before"],
"r (after)": r["r_after"],
"Δ (shift)": r["delta"],
"Sign Flip?": "🚨 YES" if r["sign_flip"] else "No"
} for f, r in feas["correlation"]["details"].items()]
corr_df = pd.DataFrame(rows).sort_values("Δ (shift)", ascending=False)
def highlight_corr(row):
if row["Sign Flip?"] == "🚨 YES": return ["background-color:#fde8e8; color:#900000"] * len(row)
if row["Δ (shift)"] > 0.10: return ["background-color:#fff0ed; color:#900000"] * len(row)
return [""] * len(row)
st.dataframe(corr_df.style.apply(highlight_corr, axis=1), use_container_width=True, hide_index=True)
# ════════════════════════════════════════════════════════════════════
# STEP 5 — FINAL REPORT
# ════════════════════════════════════════════════════════════════════
def render_step5():
st.markdown('📋 Step 5 — Final Diagnostic Report
', unsafe_allow_html=True)
df, target = st.session_state.get("df_train"), st.session_state.get("target_col")
col_diag = st.session_state.get("col_diagnostics", {})
if not col_diag: return st.warning("⚠️ Run diagnostics in Step 3 first.")
# ── Legend ──
with st.expander("📖 How to read the Recommended Strategy column"):
st.markdown("""
| Label | Meaning |
|-------|---------|
| **Drop Rows** | MCAR + <5% missing — safe to delete affected rows |
| **Drop Column** | >70% missing — too little data to impute reliably |
| **Mean Imputation** | Low-skew numeric, variance loss is acceptable |
| **Median Imputation** | Skewed numeric; median is more robust than mean |
| **Mode Imputation** | Categorical / non-numeric columns |
| **KNN Imputer** | Moderate missingness; feature relationships preserved |
| **MICE Imputer** | High missingness (>30%); multiple-imputation approach |
| **+ Missing Indicator** | Added when mechanism is MNAR, or MAR ≥ 10% missing — add a binary flag column `col_missing` alongside imputed values |
""")
table_rows = []
for col, res in col_diag.items():
rec_string = get_auto_recommendation(df, col, target, res["mechanism"], res["miss_pct"], res["dtype"])
table_rows.append({
"Column": col,
"dtype": res["dtype"],
"Missing %": f'{res["miss_pct"]:.1f}%',
"Mechanism": res["mechanism"],
"Confidence": res["confidence"],
"Recommended Strategy": rec_string
})
report_df = pd.DataFrame(table_rows).sort_values("Missing %", ascending=False)
def color_rows(row):
mech_colors = {
"MNAR": "background-color:#fff0ed; color:#000",
"MAR": "background-color:#fffaeb; color:#000",
"MCAR": "background-color:#edfaf3; color:#000"
}
return [mech_colors.get(row["Mechanism"], "")] * len(row)
st.dataframe(
report_df.style.apply(color_rows, axis=1),
use_container_width=True,
hide_index=True
)
# ── Summary counts ──
st.markdown("---")
c1, c2, c3 = st.columns(3)
mcar_n = sum(1 for r in col_diag.values() if r["mechanism"] == "MCAR")
mar_n = sum(1 for r in col_diag.values() if r["mechanism"] == "MAR")
mnar_n = sum(1 for r in col_diag.values() if r["mechanism"] == "MNAR")
c1.markdown(f'', unsafe_allow_html=True)
c2.markdown(f'', unsafe_allow_html=True)
c3.markdown(f'', unsafe_allow_html=True)
if step == STEPS[0]: render_step1()
elif step == STEPS[1]: render_step2()
elif step == STEPS[2]: render_step3()
elif step == STEPS[3]: render_step4()
elif step == STEPS[4]: render_step5()