Spaces:
Running
Running
| """ | |
| A/B Testing & Causal Inference Simulator β Gradio App | |
| 4-tab interactive dashboard: | |
| Tab 1: Power Analysis β sample size calculator + power curve | |
| Tab 2: A/B Test Analyzer β Frequentist vs Bayesian vs CUPED | |
| Tab 3: Sequential Testing β peeking problem + mSPRT solution (pre-computed) | |
| Tab 4: Uplift Modeling (HTE) β heterogeneous treatment effects from Hillstrom | |
| Heavy computations (HTE training) are pre-computed and loaded from JSON. | |
| Light computations (z-test, Bayesian) run on-the-fly with scipy only. | |
| """ | |
| from __future__ import annotations | |
| import json, sys | |
| from pathlib import Path | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from scipy import stats as sp_stats | |
| # On HF Spaces, app.py is at repo root; on local it's in app/ subdirectory | |
| ROOT = Path(__file__).resolve().parent | |
| sys.path.insert(0, str(ROOT)) | |
| from src.frequentist import ( | |
| two_proportion_ztest, compute_power, required_sample_size, | |
| ) | |
| from src.bayesian import bayesian_proportion_test | |
| # ββ Load pre-computed results βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load(path: Path): | |
| try: | |
| with open(path, encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return None | |
| def _load_hte(path: Path): | |
| """ | |
| Load pre-processed HTE JSON (histograms already computed). | |
| Falls back to the full hte_results.json and converts on-the-fly if needed. | |
| """ | |
| data = _load(path) | |
| if data is None: | |
| return None | |
| # If the file still has raw CATE arrays (legacy), convert them now | |
| for outcome in ["conversion", "spend"]: | |
| if outcome not in data: | |
| continue | |
| for mk in ["causalforest", "x_learner", "t_learner"]: | |
| col = f"{mk}_cates" | |
| if col in data[outcome]: | |
| arr = np.array(data[outcome][col], dtype=float) | |
| counts, edges = np.histogram(arr, bins=60) | |
| data[outcome][f"{mk}_hist"] = { | |
| "counts": counts.tolist(), | |
| "edges": edges.tolist(), | |
| "mean": float(arr.mean()), | |
| "std": float(arr.std()), | |
| "pct_positive": float((arr > 0).mean() * 100), | |
| } | |
| del data[outcome][col] | |
| return data | |
| ANALYSIS = _load(ROOT / "data" / "processed" / "analysis_results.json") | |
| SEQ_SIM = _load(ROOT / "data" / "processed" / "sequential_sim.json") | |
| # Use pre-processed lightweight file (23 KB vs 5.4 MB for hte_results.json) | |
| _hte_app_path = ROOT / "data" / "processed" / "hte_app.json" | |
| _hte_full_path = ROOT / "data" / "processed" / "hte_results.json" | |
| HTE_DATA = _load_hte(_hte_app_path if _hte_app_path.exists() else _hte_full_path) | |
| # ββ Plotting helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DARK_BG = "#0f172a" | |
| PANEL_BG = "#1e293b" | |
| PURPLE = "#7c3aed" | |
| PURPLE_L = "#a78bfa" | |
| BLUE = "#60a5fa" | |
| GREEN = "#10b981" | |
| RED = "#f87171" | |
| TEXT_WHITE = "#f1f5f9" | |
| GRID_COLOR = "#334155" | |
| def _style_ax(ax, title: str = "", xlabel: str = "", ylabel: str = ""): | |
| ax.set_facecolor(PANEL_BG) | |
| ax.tick_params(colors=TEXT_WHITE, labelsize=9) | |
| for spine in ax.spines.values(): | |
| spine.set_edgecolor(GRID_COLOR) | |
| ax.grid(color=GRID_COLOR, linewidth=0.5, alpha=0.5) | |
| if title: | |
| ax.set_title(title, color=TEXT_WHITE, fontsize=11, fontweight="bold", pad=8) | |
| if xlabel: | |
| ax.set_xlabel(xlabel, color=TEXT_WHITE, fontsize=10) | |
| if ylabel: | |
| ax.set_ylabel(ylabel, color=TEXT_WHITE, fontsize=10) | |
| def _new_fig(ncols=1, figsize=None, nrows=1): | |
| if figsize is None: | |
| figsize = (9 * ncols, 5) | |
| fig, axes = plt.subplots(nrows, ncols, figsize=figsize, facecolor=DARK_BG) | |
| plt.subplots_adjust(wspace=0.35, hspace=0.4) | |
| return fig, axes | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 1: Power Analysis | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def power_analysis(baseline_cvr: float, mde_pct: float, alpha: float, power_target: float): | |
| plt.close("all") # prevent memory leak | |
| mde = mde_pct / 100.0 | |
| new_cvr = baseline_cvr + mde | |
| if new_cvr >= 1.0: | |
| return None, "MDE too large for baseline CVR β treatment rate would exceed 100%." | |
| n_req = required_sample_size(baseline_cvr, mde, alpha=alpha, power=power_target) | |
| actual_power = compute_power(n_req, baseline_cvr, mde, alpha=alpha) | |
| # ββ Power curve ββ | |
| n_max = max(int(n_req * 3), 500) | |
| ns = list(range(30, n_max, max(1, n_max // 300))) | |
| powers = [compute_power(n, baseline_cvr, mde, alpha=alpha) * 100 for n in ns] | |
| fig, ax = _new_fig(figsize=(9, 5)) | |
| _style_ax(ax, "Power Curve β Sample Size vs Detected Power", | |
| "Sample size per group", "Statistical power (%)") | |
| ax.plot(ns, powers, color=PURPLE, linewidth=2.5, label="Power") | |
| ax.fill_between(ns, powers, alpha=0.15, color=PURPLE) | |
| ax.axhline(power_target * 100, color=PURPLE_L, linestyle="--", alpha=0.7, | |
| label=f"Target: {power_target*100:.0f}%") | |
| ax.axvline(n_req, color=GREEN, linestyle="--", linewidth=1.8, | |
| label=f"Required n = {n_req:,}") | |
| ax.scatter([n_req], [actual_power * 100], color=GREEN, s=90, zorder=6) | |
| ax.set_ylim(0, 108) | |
| ax.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| plt.tight_layout() | |
| total_n = n_req * 2 | |
| relative_lift = mde / baseline_cvr * 100 | |
| summary = f""" | |
| ### Required sample size: **{n_req:,} per group** ({total_n:,} total) | |
| | Parameter | Value | | |
| |:---|:---| | |
| | Baseline conversion rate | {baseline_cvr*100:.2f}% | | |
| | Treatment conversion rate | {new_cvr*100:.2f}% | | |
| | Minimum Detectable Effect | +{mde_pct:.2f}pp ({relative_lift:.1f}% relative) | | |
| | Significance level (Ξ±) | {alpha*100:.0f}% | | |
| | Target power | {power_target*100:.0f}% | | |
| | Actual power at n = {n_req:,} | {actual_power*100:.1f}% | | |
| **How to read:** This curve shows how statistical power grows as you collect more data. | |
| At n = **{n_req:,}** per group you have a **{actual_power*100:.0f}% chance** of detecting | |
| the {mde_pct:.1f}pp lift β if it truly exists β and only a {alpha*100:.0f}% chance of a false positive. | |
| > **Practical tip:** Running fewer than {n_req//2:,} per group means you're more likely to *miss* | |
| > a real effect than to find it. Running more than {n_req*2:,} per group rarely helps. | |
| """ | |
| return fig, summary | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 2: A/B Test Analyzer (Frequentist + Bayesian + CUPED demo) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ab_test_analyze(n_a: int, conv_a: int, n_b: int, conv_b: int, | |
| alpha: float, corr_cuped: float): | |
| plt.close("all") # prevent matplotlib memory leak | |
| # Safety checks β guard against zero/negative inputs | |
| n_a = max(int(n_a), 1) | |
| n_b = max(int(n_b), 1) | |
| conv_a = max(0, min(int(conv_a), n_a)) | |
| conv_b = max(0, min(int(conv_b), n_b)) | |
| p_a = conv_a / n_a | |
| p_b = conv_b / n_b | |
| # ββ Frequentist ββ | |
| freq = two_proportion_ztest(n_a, conv_a, n_b, conv_b, alpha=alpha) | |
| # ββ Bayesian ββ | |
| bayes = bayesian_proportion_test(n_a, conv_a, n_b, conv_b, n_samples=50_000) | |
| # ββ Figure 1: CVR bar chart + posterior ββ | |
| fig1, (ax1, ax2) = _new_fig(ncols=2, figsize=(12, 5)) | |
| # Bar chart | |
| _style_ax(ax1, "Conversion Rate Comparison", "", "Conversion rate (%)") | |
| bar_colors = [BLUE, PURPLE_L] | |
| bars = ax1.bar(["Control (A)", "Treatment (B)"], [p_a * 100, p_b * 100], | |
| color=bar_colors, alpha=0.85, width=0.45) | |
| for bar, n, c in zip(bars, [n_a, n_b], [conv_a, conv_b]): | |
| p = c / n | |
| se = (p * (1 - p) / n) ** 0.5 | |
| ax1.errorbar(bar.get_x() + bar.get_width() / 2, p * 100, | |
| yerr=1.96 * se * 100, fmt="none", | |
| color=TEXT_WHITE, capsize=5, linewidth=1.8) | |
| ax1.text(bar.get_x() + bar.get_width() / 2, p * 100 + 0.05, | |
| f"{p*100:.2f}%", ha="center", va="bottom", | |
| color=TEXT_WHITE, fontsize=10, fontweight="bold") | |
| sig_color = GREEN if freq.significant else RED | |
| sig_label = ("SIGNIFICANT" if freq.significant else "NOT SIGNIFICANT") + f" (p={freq.p_value:.4f})" | |
| ax1.set_title(sig_label, color=sig_color, fontsize=11, fontweight="bold") | |
| # Posterior distributions | |
| _style_ax(ax2, f"Posterior Distributions | P(B > A) = {bayes.prob_b_beats_a:.1%}", | |
| "Conversion rate", "Posterior density") | |
| margin = 0.06 | |
| lo = max(0, min(p_a, p_b) - margin) | |
| hi = min(1, max(p_a, p_b) + margin) | |
| x = np.linspace(lo, hi, 400) | |
| a_a = 1 + conv_a; b_a = 1 + n_a - conv_a | |
| a_b = 1 + conv_b; b_b = 1 + n_b - conv_b | |
| pdf_a = sp_stats.beta.pdf(x, a_a, b_a) | |
| pdf_b = sp_stats.beta.pdf(x, a_b, b_b) | |
| ax2.fill_between(x, pdf_a, alpha=0.35, color=BLUE) | |
| ax2.fill_between(x, pdf_b, alpha=0.35, color=PURPLE_L) | |
| ax2.plot(x, pdf_a, color=BLUE, linewidth=2.2, label="Control (A)") | |
| ax2.plot(x, pdf_b, color=PURPLE_L, linewidth=2.2, label="Treatment (B)") | |
| ax2.axvline(p_a, color=BLUE, linestyle=":", alpha=0.7) | |
| ax2.axvline(p_b, color=PURPLE_L, linestyle=":", alpha=0.7) | |
| ax2.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| plt.tight_layout() | |
| # ββ Figure 2: CUPED theoretical power chart (instant β no simulation) ββ | |
| # Theoretical basis: CUPED reduces variance by (1 - ΟΒ²), equivalent to | |
| # having n_effective = n / (1 - ΟΒ²) samples. Power improves accordingly. | |
| corrs = np.linspace(0, 0.95, 40) | |
| mde = abs(p_b - p_a) if abs(p_b - p_a) > 0.001 else 0.01 | |
| raw_power_val = compute_power(n_a, p_a, mde) | |
| # Theoretical CUPED power: effective n scales as 1/(1-rhoΒ²) | |
| cuped_powers_theory = [ | |
| compute_power(max(int(n_a / max(1 - c**2, 0.01)), 10), p_a, mde) * 100 | |
| for c in corrs | |
| ] | |
| raw_power_line = [raw_power_val * 100] * len(corrs) | |
| # Variance reduction percentage | |
| var_reduction_at_current = corr_cuped ** 2 * 100 | |
| n_savings_at_current = int(n_a * corr_cuped**2) | |
| fig2, (ax3, ax4) = _new_fig(ncols=2, figsize=(12, 5)) | |
| # Left: Power gain curve | |
| _style_ax(ax3, "CUPED Theoretical Power Gain", | |
| "Pre-post metric correlation (Ο)", "Statistical power (%)") | |
| ax3.plot(corrs, raw_power_line, color=BLUE, linewidth=2, linestyle="--", | |
| label="Without CUPED") | |
| ax3.plot(corrs, cuped_powers_theory, color=GREEN, linewidth=2.5, | |
| label="With CUPED (theoretical)") | |
| ax3.fill_between(corrs, raw_power_line, cuped_powers_theory, | |
| alpha=0.18, color=GREEN, label="Power gain") | |
| ax3.axvline(corr_cuped, color=PURPLE_L, linestyle=":", linewidth=1.8, | |
| label=f"Current Ο = {corr_cuped:.2f}") | |
| ax3.set_ylim(0, 108) | |
| ax3.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| # Right: Sample size savings | |
| _style_ax(ax4, "Sample Size Savings from CUPED", | |
| "Pre-post metric correlation (Ο)", "Sample size reduction (%)") | |
| savings_pct = [c**2 * 100 for c in corrs] | |
| ax4.plot(corrs, savings_pct, color=GREEN, linewidth=2.5) | |
| ax4.fill_between(corrs, 0, savings_pct, alpha=0.18, color=GREEN) | |
| ax4.axvline(corr_cuped, color=PURPLE_L, linestyle=":", linewidth=1.8, | |
| label=f"Ο={corr_cuped:.2f} β save {var_reduction_at_current:.0f}%") | |
| ax4.axhline(var_reduction_at_current, color=PURPLE_L, linestyle=":", alpha=0.6) | |
| ax4.set_ylim(0, 105) | |
| ax4.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| plt.tight_layout() | |
| # ββ Markdown summary ββ | |
| sig_icon = "β " if freq.significant else "β" | |
| bay_icon = ("π’ Deploy B" if bayes.prob_b_beats_a > 0.95 | |
| else ("π‘ Lean B" if bayes.prob_b_beats_a > 0.5 else "π΄ Keep A")) | |
| cuped_power_at_rho = compute_power( | |
| max(int(n_a / max(1 - corr_cuped**2, 0.01)), 10), p_a, mde | |
| ) * 100 | |
| results_md = f""" | |
| ## Results Summary | |
| | | **Frequentist** | **Bayesian (Beta-Binomial)** | | |
| |:---|:---|:---| | |
| | Test | Two-proportion Z-test | Conjugate posterior update | | |
| | Statistic | z = {freq.statistic:+.3f} | P(B > A) = {bayes.prob_b_beats_a:.1%} | | |
| | p-value / Expected loss | p = {freq.p_value:.5f} | Loss(deploy B) = {bayes.expected_loss_choosing_b:.5f} | | |
| | 95% Interval for lift | [{freq.ci_lower:+.4f}, {freq.ci_upper:+.4f}] | [{bayes.ci_lower:+.4f}, {bayes.ci_upper:+.4f}] | | |
| | Effect size | {freq.effect_name} = {freq.effect_size:.3f} | β | | |
| | Decision (Ξ± = {alpha}) | {sig_icon} {"Reject Hβ" if freq.significant else "Fail to reject Hβ"} | {bay_icon} | | |
| **Observed lift:** {conv_a:,}/{n_a:,} = {p_a:.2%} β {conv_b:,}/{n_b:,} = {p_b:.2%} | |
| (**{freq.observed_diff:+.4f}pp absolute**, {freq.relative_lift:+.1f}% relative) | |
| --- | |
| ### CUPED Variance Reduction (at Ο = {corr_cuped:.2f}) β Theoretical | |
| | Metric | Without CUPED | With CUPED | | |
| |:---|:---|:---| | |
| | Power | {raw_power_val*100:.1f}% | {cuped_power_at_rho:.1f}% | | |
| | Effective sample size | {n_a:,} | {min(int(n_a / max(1-corr_cuped**2, 0.01)), n_a*10):,} equivalent | | |
| | Sample size savings | β | ~{var_reduction_at_current:.0f}% ({n_savings_at_current:,} fewer users) | | |
| > **Why CUPED?** CUPED uses a *pre-experiment* metric (e.g., last month's purchases) to | |
| > remove user-level noise, reducing outcome variance by ΟΒ². At Ο = {corr_cuped:.2f}, | |
| > you could achieve the same power with **~{var_reduction_at_current:.0f}% fewer users**. | |
| > _(Theoretical: Deng et al. 2013, Microsoft KDD)_ | |
| """ | |
| return fig1, fig2, results_md | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 3: Sequential Testing | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def sequential_testing(): | |
| plt.close("all") # prevent memory leak | |
| if SEQ_SIM is None: | |
| return None, "Pre-computed sequential simulation not found.\nRun: python scripts/run_analysis.py" | |
| peek = SEQ_SIM["peeking_simulation"] | |
| alpha = peek["alpha"] | |
| trad_fpr = peek["traditional_fpr"] | |
| msprt_fpr = peek["msprt_fpr"] | |
| n_exp = peek["n_experiments"] | |
| speed_s = SEQ_SIM.get("detection_speed_small_effect", {}) | |
| speed_m = SEQ_SIM.get("detection_speed_medium_effect", {}) | |
| hist_s = SEQ_SIM.get("stopping_times_small_hist", {}) | |
| hist_m = SEQ_SIM.get("stopping_times_medium_hist", {}) | |
| fig, axes = _new_fig(ncols=2, figsize=(13, 5.5)) | |
| ax1, ax2 = axes | |
| # ββ Left: FPR comparison ββ | |
| _style_ax(ax1, f"False Positive Rate Under Hβ\n({n_exp:,} simulations, peeking at 25/50/75/100%)", | |
| "", "False positive rate (%)") | |
| methods = ["Traditional\n(with peeking)", "mSPRT\n(always valid)"] | |
| fprs = [trad_fpr * 100, msprt_fpr * 100] | |
| colors = [RED, GREEN] | |
| bars = ax1.bar(methods, fprs, color=colors, alpha=0.85, width=0.42) | |
| ax1.axhline(alpha * 100, color=TEXT_WHITE, linestyle="--", alpha=0.5, | |
| linewidth=1.5, label=f"Nominal Ξ± = {alpha*100:.0f}%") | |
| for bar, val in zip(bars, fprs): | |
| ax1.text(bar.get_x() + bar.get_width() / 2, val + 0.4, | |
| f"{val:.1f}%", ha="center", color=TEXT_WHITE, | |
| fontweight="bold", fontsize=14) | |
| ax1.set_ylim(0, max(fprs) * 1.4) | |
| ax1.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| # ββ Right: Stopping time histograms ββ | |
| _style_ax(ax2, "mSPRT Stops Earlier for Larger Effects\n(Under Hβ: True effect exists)", | |
| "Stopping time (n observations)", "Count of experiments") | |
| if hist_s and hist_m: | |
| es = hist_s["bin_edges"] | |
| cs = [(es[i] + es[i+1]) / 2 for i in range(len(es) - 1)] | |
| w = (es[1] - es[0]) * 0.45 | |
| ax2.bar(cs, hist_s["counts"], width=w, color=BLUE, alpha=0.65, | |
| label=f"Small effect (d=0.2) power={speed_s.get('power',0):.0%}") | |
| em = hist_m["bin_edges"] | |
| cm = [(em[i] + em[i+1]) / 2 for i in range(len(em) - 1)] | |
| wm = (em[1] - em[0]) * 0.45 | |
| ax2.bar(cm, hist_m["counts"], width=wm, color=PURPLE_L, alpha=0.65, | |
| label=f"Medium effect (d=0.5) power={speed_m.get('power',0):.0%}") | |
| ax2.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| plt.tight_layout(pad=2.5) | |
| summary = f""" | |
| ## Sequential Testing (mSPRT β Always Valid Inference) | |
| **Reference:** Johari, Pekelis & Walsh (2015) β *"Always Valid Inference"* (arXiv:1512.04922) | |
| --- | |
| ### The Peeking Problem | |
| Traditional A/B testing requires you to decide your sample size *before* the experiment. | |
| If you check significance midway and stop early (a common practice!), you inflate the false | |
| positive rate far above the nominal Ξ±. | |
| | Method | False Positive Rate | vs Nominal Ξ± = 5% | | |
| |:---|:---|:---| | |
| | Traditional + 4 peeks | **{trad_fpr*100:.1f}%** | {trad_fpr/alpha:.1f}Γ inflation | | |
| | mSPRT (any time) | **{msprt_fpr*100:.1f}%** | β Controlled | | |
| Simulation: {n_exp:,} experiments, true null, 4 peeks at 25/50/75/100% of n=1,000. | |
| --- | |
| ### How mSPRT Works | |
| Instead of a p-value, mSPRT maintains a martingale **M_t** (e-value): | |
| ``` | |
| M_t = β(ΟΒ²/(ΟΒ²+tΒ·ΟΒ²)) Β· exp( (tXΜ)Β² Β· ΟΒ² / (2ΟΒ²(ΟΒ²+tΒ·ΟΒ²)) ) | |
| ``` | |
| - Under Hβ: E[M_t] = 1 for *all* t | |
| - Under Hβ: M_t β β (detection is guaranteed) | |
| - **Reject Hβ when M_t β₯ 1/Ξ±** β valid at any stopping time | |
| --- | |
| ### Detection Speed Comparison (right chart) | |
| | Effect size | Power (mSPRT) | Median stop time | Fixed-n would need | | |
| |:---|:---|:---|:---| | |
| | Small (Cohen's d = 0.2) | {speed_s.get('power',0):.0%} | {speed_s.get('median_stopping_time','?')} obs | 500 obs | | |
| | Medium (Cohen's d = 0.5) | {speed_m.get('power',0):.0%} | {speed_m.get('median_stopping_time','?')} obs | 500 obs | | |
| mSPRT detects medium effects at **median {speed_m.get('median_stopping_time','?')} observations** β potentially | |
| much earlier than the fixed-n design, without inflating false positives. | |
| """ | |
| return fig, summary | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 4: Uplift Modeling (HTE) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _MODEL_MAP = { | |
| "CausalForest (DML) β Recommended": "causalforest", | |
| "X-Learner": "x_learner", | |
| "T-Learner": "t_learner", | |
| } | |
| def plot_hte(model_label: str, outcome_label: str): | |
| plt.close("all") # prevent memory leak | |
| if HTE_DATA is None: | |
| return None, "Pre-computed HTE results not found.\nRun: python scripts/run_hte.py" | |
| outcome_key = "conversion" if "Conversion" in outcome_label else "spend" | |
| data = HTE_DATA.get(outcome_key, {}) | |
| mk = _MODEL_MAP.get(model_label, "causalforest") | |
| hist = data.get(f"{mk}_hist", {}) | |
| segs = data.get("segment_summaries", {}).get(mk, []) | |
| overall = data.get("overall_ate", {}).get(mk, {}) | |
| fig, (ax1, ax2) = _new_fig(ncols=2, figsize=(13, 5.5)) | |
| # ββ CATE distribution ββ | |
| _style_ax(ax1, f"CATE Distribution β {model_label} [{outcome_label}]", | |
| "Individual treatment effect (CATE)", "Count") | |
| if hist: | |
| edges = hist["edges"] | |
| counts = hist["counts"] | |
| ctrs = [(edges[i] + edges[i+1]) / 2 for i in range(len(edges) - 1)] | |
| widths = [(edges[i+1] - edges[i]) * 0.9 for i in range(len(edges) - 1)] | |
| bar_colors = [GREEN if c > 0 else RED for c in ctrs] | |
| ax1.bar(ctrs, counts, width=widths, color=bar_colors, alpha=0.75) | |
| ate = hist.get("mean", 0.0) | |
| ax1.axvline(0, color=TEXT_WHITE, linestyle=":", alpha=0.4, linewidth=1.5) | |
| ax1.axvline(ate, color=GREEN, linestyle="-", linewidth=2.0, | |
| label=f"ATE = {ate:.5f}") | |
| pct_pos = hist.get("pct_positive", 0) | |
| ax1.set_title(f"CATE Distribution | {pct_pos:.1f}% users benefit", | |
| color=TEXT_WHITE, fontsize=11, fontweight="bold") | |
| ax1.legend(facecolor=PANEL_BG, edgecolor=PURPLE_L, labelcolor=TEXT_WHITE, fontsize=9) | |
| # ββ Segment bar chart ββ | |
| _style_ax(ax2, f"Average CATE by Segment β {model_label}", "Mean CATE", "") | |
| if segs: | |
| segs_sorted = sorted(segs, key=lambda x: x["cate_mean"], reverse=True) | |
| labels = [s["segment"] for s in segs_sorted] | |
| values = [s["cate_mean"] for s in segs_sorted] | |
| seg_colors = [GREEN if v > 0 else RED for v in values] | |
| bars = ax2.barh(labels, values, color=seg_colors, alpha=0.8) | |
| ax2.axvline(0, color=TEXT_WHITE, alpha=0.3, linewidth=1) | |
| # Add value labels | |
| for bar, val in zip(bars, values): | |
| x_pos = val + max(abs(v) for v in values) * 0.02 | |
| ax2.text(x_pos if val >= 0 else val - max(abs(v) for v in values) * 0.02, | |
| bar.get_y() + bar.get_height() / 2, | |
| f"{val:+.5f}", va="center", color=TEXT_WHITE, fontsize=8) | |
| plt.tight_layout(pad=2.5) | |
| # ββ Summary markdown ββ | |
| top3 = sorted(segs, key=lambda x: x["cate_mean"], reverse=True)[:3] | |
| bot3 = sorted(segs, key=lambda x: x["cate_mean"])[:3] | |
| ate_val = overall.get("ate_mean", 0) | |
| pct_pos = overall.get("pct_positive", 0) | |
| naive = data.get("naive_ate", 0) | |
| def _seg_rows(slist): | |
| return "\n".join(f"| {s['segment']} | {s['cate_mean']:+.5f} | {s['n']:,} |" | |
| for s in slist) | |
| summary = f""" | |
| ## Uplift Modeling / HTE Results β {model_label} [{outcome_label}] | |
| **Dataset:** Hillstrom Email Marketing β 64,000 customers, 3-arm RCT (2008) | |
| **Treatment:** Any e-mail (Men's or Women's) vs. No e-mail (control) | |
| --- | |
| ### Average Treatment Effect | |
| | Estimator | ATE | % Users Benefiting | | |
| |:---|:---|:---| | |
| | Naive (difference in means) | {naive:+.5f} | β | | |
| | {model_label} | **{ate_val:+.5f}** | **{pct_pos:.1f}%** | | |
| --- | |
| ### Top 3 Segments to Target (highest CATE) | |
| | Segment | Est. CATE | n | | |
| |:---|:---|:---| | |
| {_seg_rows(top3)} | |
| ### Lowest Responding Segments | |
| | Segment | Est. CATE | n | | |
| |:---|:---|:---| | |
| {_seg_rows(bot3)} | |
| --- | |
| ### How to Use This | |
| 1. **Ranking:** Sort users by their estimated CATE β target those above a threshold. | |
| 2. **Budget constraint:** With a fixed email budget, send only to the top-N% by CATE. | |
| 3. **A/B validation:** Run a follow-up experiment *only* on the high-CATE segment to verify. | |
| > **Key insight:** The average treatment effect ({naive:+.5f}) can mask huge heterogeneity. | |
| > Some segments may show 2β3Γ the average response β targeting them delivers | |
| > the same conversion uplift at a fraction of the marketing cost. | |
| --- | |
| *Three estimators compared (select via dropdown):* | |
| - **CausalForest DML** β doubly-robust SOTA; orthogonalizes Y and T residuals before fitting causal forest | |
| - **X-Learner** β cross-fitted CATE; better for imbalanced treatment/control sizes | |
| - **T-Learner** β separate outcome models per arm; simple baseline | |
| """ | |
| return fig, summary | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Gradio Layout | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _DESC = """ | |
| ## A/B Testing & Causal Inference Simulator | |
| A four-tab interactive dashboard demonstrating **state-of-the-art experimentation methods** | |
| used at companies like Netflix, Spotify, Microsoft, and Airbnb. | |
| | Tab | Method | What it demonstrates | | |
| |:---|:---|:---| | |
| | 1 Power Analysis | Z-test power formula | Sample size planning | | |
| | 2 A/B Test Analyzer | Frequentist Β· Bayesian Β· CUPED | Multi-method comparison | | |
| | 3 Sequential Testing | mSPRT (Always-Valid Inference) | Safe continuous monitoring | | |
| | 4 Uplift Modeling | CausalForest Β· X-Learner Β· T-Learner | Heterogeneous treatment effects | | |
| **Dataset (Tabs 3 & 4):** [Hillstrom E-mail Analytics Challenge](https://www.minethatdata.com/) | |
| β 64,000 customers, 3-arm RCT, 2008. | |
| """ | |
| with gr.Blocks( | |
| title="A/B Testing & Causal Inference Simulator", | |
| theme=gr.themes.Base(primary_hue="violet", secondary_hue="blue", neutral_hue="slate"), | |
| ) as demo: | |
| gr.Markdown(_DESC) | |
| # ββ TAB 1: Power Analysis βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("1. Power Analysis"): | |
| gr.Markdown(""" | |
| ### Sample Size & Power Calculator | |
| Compute the required experiment size before running your A/B test. | |
| A well-powered experiment is the foundation of valid inference. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| t1_baseline = gr.Slider(0.01, 0.50, value=0.10, step=0.01, | |
| label="Baseline conversion rate (Control CVR)") | |
| t1_mde = gr.Slider(0.10, 10.0, value=2.0, step=0.1, | |
| label="Minimum Detectable Effect (pp, absolute)") | |
| t1_alpha = gr.Dropdown([0.01, 0.05, 0.10], value=0.05, | |
| label="Significance level (Ξ±)") | |
| t1_power = gr.Slider(0.70, 0.95, value=0.80, step=0.05, | |
| label="Target power (1 - Ξ²)") | |
| t1_btn = gr.Button("Calculate", variant="primary") | |
| with gr.Column(scale=2): | |
| t1_plot = gr.Plot() | |
| t1_md = gr.Markdown() | |
| t1_btn.click( | |
| fn=power_analysis, | |
| inputs=[t1_baseline, t1_mde, t1_alpha, t1_power], | |
| outputs=[t1_plot, t1_md], | |
| ) | |
| demo.load( | |
| fn=power_analysis, | |
| inputs=[t1_baseline, t1_mde, t1_alpha, t1_power], | |
| outputs=[t1_plot, t1_md], | |
| ) | |
| # ββ TAB 2: A/B Test Analyzer ββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("2. A/B Test Analyzer"): | |
| gr.Markdown(""" | |
| ### Multi-Method A/B Test Analysis | |
| Enter observed results from any A/B test to compare **Frequentist**, **Bayesian**, | |
| and **CUPED** methods side-by-side. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("#### Group A (Control)") | |
| t2_na = gr.Number(value=21306, label="Users in Control (n_A)", precision=0) | |
| t2_ca = gr.Number(value=121, label="Conversions in Control", precision=0) | |
| gr.Markdown("#### Group B (Treatment)") | |
| t2_nb = gr.Number(value=42694, label="Users in Treatment (n_B)", precision=0) | |
| t2_cb = gr.Number(value=457, label="Conversions in Treatment", precision=0) | |
| t2_alpha = gr.Dropdown([0.01, 0.05, 0.10], value=0.05, | |
| label="Significance level (Ξ±)") | |
| t2_corr = gr.Slider(0.0, 0.95, value=0.5, step=0.05, | |
| label="Pre-post metric correlation (for CUPED demo, Ο)") | |
| t2_btn = gr.Button("Analyze", variant="primary") | |
| with gr.Column(scale=2): | |
| t2_plot1 = gr.Plot(label="Conversion rates & Posteriors") | |
| t2_plot2 = gr.Plot(label="CUPED Power Gain vs Correlation") | |
| t2_md = gr.Markdown() | |
| t2_btn.click( | |
| fn=ab_test_analyze, | |
| inputs=[t2_na, t2_ca, t2_nb, t2_cb, t2_alpha, t2_corr], | |
| outputs=[t2_plot1, t2_plot2, t2_md], | |
| ) | |
| demo.load( | |
| fn=ab_test_analyze, | |
| inputs=[t2_na, t2_ca, t2_nb, t2_cb, t2_alpha, t2_corr], | |
| outputs=[t2_plot1, t2_plot2, t2_md], | |
| ) | |
| # ββ TAB 3: Sequential Testing βββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("3. Sequential Testing"): | |
| gr.Markdown(""" | |
| ### mSPRT β Always Valid Inference | |
| Traditional A/B testing breaks if you peek at results mid-experiment. | |
| The **mixture Sequential Probability Ratio Test** (mSPRT) lets you monitor continuously | |
| without inflating the false positive rate. Pre-computed on 3,000 simulated experiments. | |
| """) | |
| t3_plot = gr.Plot() | |
| t3_md = gr.Markdown() | |
| demo.load(fn=sequential_testing, inputs=[], outputs=[t3_plot, t3_md]) | |
| # ββ TAB 4: Uplift Modeling ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("4. Uplift Modeling (HTE)"): | |
| gr.Markdown(""" | |
| ### Heterogeneous Treatment Effect (HTE) Estimation | |
| Not all users respond equally to a treatment. **Uplift modeling** estimates each user's | |
| individual treatment effect (CATE) using three ML-based causal estimators from | |
| [Microsoft EconML](https://github.com/py-why/EconML). | |
| **Data:** Hillstrom E-mail Marketing β 64,000 customers, randomized 3-arm experiment. | |
| **Treatment:** Any marketing email vs. no email. | |
| """) | |
| with gr.Row(): | |
| t4_model = gr.Dropdown( | |
| list(_MODEL_MAP.keys()), | |
| value="CausalForest (DML) β Recommended", | |
| label="Causal estimator", | |
| ) | |
| t4_outcome = gr.Dropdown( | |
| ["Conversion (binary)", "Spend (continuous)"], | |
| value="Conversion (binary)", | |
| label="Outcome variable", | |
| ) | |
| t4_btn = gr.Button("Show Results", variant="primary") | |
| t4_plot = gr.Plot() | |
| t4_md = gr.Markdown() | |
| t4_btn.click( | |
| fn=plot_hte, | |
| inputs=[t4_model, t4_outcome], | |
| outputs=[t4_plot, t4_md], | |
| ) | |
| demo.load( | |
| fn=plot_hte, | |
| inputs=[t4_model, t4_outcome], | |
| outputs=[t4_plot, t4_md], | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| Built by [Muhammad Fikri Wahidin](https://github.com/Fikri645) Β· | |
| [GitHub](https://github.com/Fikri645/ab-testing-causal) Β· | |
| Methods: CUPED (Microsoft 2013) Β· mSPRT (Johari et al. 2015) Β· | |
| CausalForestDML (Athey & Wager 2019, via EconML) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |