| """Summarize all-real-task stratification sensitivity runs.""" |
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| from collections import Counter |
| from pathlib import Path |
|
|
| from make_tables import METHOD_LABELS |
|
|
|
|
| TASK_FILES = { |
| "cifar10": "exp2_2_softmax_cifar10_strata_{strata}_fixed.json", |
| "topics": "exp2_5_topics_K10_strata_{strata}_fixed.json", |
| "affectivetext": "exp2_6_affective_text_strata_{strata}_fixed.json", |
| "samson": "exp2_3_hyperspectral_samson_nmf_strata_{strata}_fixed.json", |
| "utkface": "exp2_4_age_ldl_K10_strata_{strata}_fixed.json", |
| "pbmc": "pbmc_sensitivity_exp2_1_bulk_deconv_{strata}_fixed.json", |
| } |
|
|
| DISPLAY_NAMES = { |
| "cifar10": "CIFAR-10", |
| "topics": "Topics", |
| "affectivetext": "AffectiveText", |
| "samson": "Samson", |
| "utkface": "UTKFace", |
| "pbmc": "PBMC", |
| } |
|
|
| STRATA_ORDER = ["boundary", "entropy", "dominant", "kmeans"] |
| EXPECTED_REPS = { |
| "cifar10": 50, |
| "topics": 50, |
| "affectivetext": 50, |
| "samson": 50, |
| "utkface": 50, |
| "pbmc": 200, |
| } |
|
|
|
|
| def extract_n_rep(data: dict) -> int | None: |
| config = data.get("config", {}) |
| for key in ("n_rep", "n_reps"): |
| if key in config: |
| return int(config[key]) |
| conformal = config.get("conformal", {}) |
| for key in ("n_rep", "n_reps"): |
| if key in conformal: |
| return int(conformal[key]) |
| evaluation = config.get("evaluation", {}) |
| for key in ("n_rep", "n_reps"): |
| if key in evaluation: |
| return int(evaluation[key]) |
| return None |
|
|
|
|
| def load_summary(path: Path) -> tuple[dict, float, int | None]: |
| data = json.loads(path.read_text()) |
| summary = data.get("summary", data.get("aggregated")) |
| config = data.get("config", {}) |
|
|
| alpha = None |
| if "alpha" in config: |
| alpha = float(config["alpha"]) |
| elif "conformal" in config and "alpha" in config["conformal"]: |
| alpha = float(config["conformal"]["alpha"]) |
| if alpha is None: |
| alpha = 0.1 |
|
|
| return summary, alpha, extract_n_rep(data) |
|
|
|
|
| def metric_mean(entry: dict, key: str) -> float: |
| value = entry.get(key, {}) |
| if isinstance(value, dict): |
| return float(value.get("mean", float("nan"))) |
| return float(value) |
|
|
|
|
| def rank_methods(summary: dict, alpha: float, coverage_tol: float) -> tuple[list[str], list[str]]: |
| nominal = 1.0 - alpha |
| methods = list(summary) |
|
|
| valid = [ |
| method for method in methods |
| if metric_mean(summary[method], "marginal_coverage") >= nominal - coverage_tol |
| ] |
| valid_sorted = sorted( |
| valid, |
| key=lambda m: ( |
| metric_mean(summary[m], "max_disparity"), |
| metric_mean(summary[m], "mean_radius"), |
| -metric_mean(summary[m], "marginal_coverage"), |
| ), |
| ) |
| raw_sorted = sorted( |
| methods, |
| key=lambda m: ( |
| metric_mean(summary[m], "max_disparity"), |
| metric_mean(summary[m], "mean_radius"), |
| -metric_mean(summary[m], "marginal_coverage"), |
| ), |
| ) |
| return valid_sorted, raw_sorted |
|
|
|
|
| def summarize(input_dir: Path, coverage_tol: float) -> tuple[list[dict], list[dict]]: |
| detail_rows: list[dict] = [] |
| winner_rows: list[dict] = [] |
|
|
| for task, template in TASK_FILES.items(): |
| winners = [] |
| for strata in STRATA_ORDER: |
| path = input_dir / template.format(strata=strata) |
| if not path.exists(): |
| winner_rows.append({ |
| "task": task, |
| "task_label": DISPLAY_NAMES[task], |
| "strata": strata, |
| "status": "missing", |
| }) |
| continue |
|
|
| summary, alpha, n_rep = load_summary(path) |
| expected_reps = EXPECTED_REPS[task] |
| if n_rep is None or n_rep < expected_reps: |
| winner_rows.append({ |
| "task": task, |
| "task_label": DISPLAY_NAMES[task], |
| "strata": strata, |
| "status": "incomplete", |
| "observed_n_rep": n_rep, |
| "expected_n_rep": expected_reps, |
| }) |
| continue |
| valid_rank, raw_rank = rank_methods(summary, alpha, coverage_tol) |
| nominal = 1.0 - alpha |
|
|
| for rank, method in enumerate(raw_rank, start=1): |
| entry = summary[method] |
| detail_rows.append({ |
| "task": task, |
| "task_label": DISPLAY_NAMES[task], |
| "strata": strata, |
| "method": method, |
| "rank_raw": rank, |
| "is_valid": metric_mean(entry, "marginal_coverage") >= nominal - coverage_tol, |
| "marginal_coverage": metric_mean(entry, "marginal_coverage"), |
| "max_disparity": metric_mean(entry, "max_disparity"), |
| "worst_stratum_coverage": metric_mean(entry, "worst_stratum_coverage"), |
| "mean_radius": metric_mean(entry, "mean_radius"), |
| "runtime_sec": metric_mean(entry, "runtime_sec"), |
| "n_rep": n_rep, |
| }) |
|
|
| if valid_rank: |
| best = valid_rank[0] |
| winners.append(best) |
| best_entry = summary[best] |
| winner_rows.append({ |
| "task": task, |
| "task_label": DISPLAY_NAMES[task], |
| "strata": strata, |
| "status": "ok", |
| "best_valid_method": best, |
| "best_valid_coverage": metric_mean(best_entry, "marginal_coverage"), |
| "best_valid_disparity": metric_mean(best_entry, "max_disparity"), |
| "best_valid_radius": metric_mean(best_entry, "mean_radius"), |
| "best_raw_method": raw_rank[0] if raw_rank else "", |
| "valid_ranking": valid_rank, |
| "raw_ranking": raw_rank, |
| }) |
| else: |
| winner_rows.append({ |
| "task": task, |
| "task_label": DISPLAY_NAMES[task], |
| "strata": strata, |
| "status": "no_valid_method", |
| "best_raw_method": raw_rank[0] if raw_rank else "", |
| "valid_ranking": [], |
| "raw_ranking": raw_rank, |
| }) |
|
|
| if winners: |
| counts = Counter(winners) |
| modal, modal_count = counts.most_common(1)[0] |
| winner_rows.append({ |
| "task": task, |
| "task_label": DISPLAY_NAMES[task], |
| "strata": "_summary", |
| "status": "task_summary", |
| "modal_best_valid_method": modal, |
| "modal_count": modal_count, |
| "winner_stable": modal_count == len(STRATA_ORDER), |
| "winner_set": sorted(counts), |
| }) |
|
|
| return detail_rows, winner_rows |
|
|
|
|
| def write_csv(path: Path, rows: list[dict]) -> None: |
| if not rows: |
| return |
| fieldnames = list(rows[0].keys()) |
| with open(path, "w", newline="", encoding="utf-8") as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
|
|
| def write_markdown(path: Path, winner_rows: list[dict]) -> None: |
| per_task = {} |
| for row in winner_rows: |
| task = row["task"] |
| per_task.setdefault(task, {}) |
| per_task[task][row["strata"]] = row |
|
|
| lines = [ |
| "# Real-Task Stratification Sensitivity Summary", |
| "", |
| "Rows report the best valid method under each fixed stratification.", |
| "A method is considered valid if mean marginal coverage is at least nominal minus the configured tolerance.", |
| "", |
| "| Task | Boundary | Entropy | Dominant | KMeans | Stable winner? |", |
| "|---|---|---|---|---|---|", |
| ] |
|
|
| for task in TASK_FILES: |
| task_rows = per_task.get(task, {}) |
| cells = [] |
| for strata in STRATA_ORDER: |
| row = task_rows.get(strata) |
| if not row or row.get("status") != "ok": |
| if row and row.get("status") == "incomplete": |
| cells.append("incomplete") |
| else: |
| cells.append("missing") |
| continue |
| cells.append( |
| f"{row['best_valid_method']} " |
| f"({row['best_valid_disparity']:.3f}, cov={row['best_valid_coverage']:.3f})" |
| ) |
| summary = task_rows.get("_summary", {}) |
| stable = summary.get("winner_stable") |
| if stable is True: |
| stable_text = f"yes ({summary.get('modal_best_valid_method')})" |
| elif stable is False: |
| stable_text = ", ".join(summary.get("winner_set", [])) |
| else: |
| stable_text = "pending" |
| lines.append( |
| f"| {DISPLAY_NAMES[task]} | {cells[0]} | {cells[1]} | {cells[2]} | {cells[3]} | {stable_text} |" |
| ) |
|
|
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
|
|
| def latex_escape(text: str) -> str: |
| return ( |
| text.replace("\\", "\\textbackslash{}") |
| .replace("_", "\\_") |
| .replace("%", "\\%") |
| .replace("&", "\\&") |
| .replace("#", "\\#") |
| ) |
|
|
|
|
| def format_cell(row: dict | None) -> str: |
| if not row or row.get("status") != "ok": |
| if row and row.get("status") == "incomplete": |
| return "incomplete" |
| return "--" |
| method = METHOD_LABELS.get(row["best_valid_method"], row["best_valid_method"]) |
| disparity = row["best_valid_disparity"] |
| coverage = row["best_valid_coverage"] |
| return f"{latex_escape(method)} ({disparity:.3f}, {coverage:.3f})" |
|
|
|
|
| def write_latex(path: Path, winner_rows: list[dict]) -> None: |
| per_task = {} |
| for row in winner_rows: |
| task = row["task"] |
| per_task.setdefault(task, {}) |
| per_task[task][row["strata"]] = row |
|
|
| lines = [ |
| "% Auto-generated by scripts/summarize_real_strata_sensitivity.py", |
| "\\begin{table*}[t]", |
| "\\centering", |
| "\\caption{Real-task stratification sensitivity across fixed alternative strata. Each cell reports the best valid method under that stratification, shown as method name with $(\\text{max disparity}, \\text{marginal coverage})$. A method is treated as valid when mean marginal coverage is at least nominal minus the configured tolerance.}", |
| "\\label{tab:real-strata-sensitivity}", |
| "\\scriptsize", |
| "\\resizebox{\\textwidth}{!}{%", |
| "\\begin{tabular}{@{}lccccp{3.3cm}@{}}", |
| "\\toprule", |
| "Task & Boundary & Entropy & Dominant & KMeans & Winner stability \\\\", |
| "\\midrule", |
| ] |
|
|
| for task in TASK_FILES: |
| task_rows = per_task.get(task, {}) |
| summary = task_rows.get("_summary", {}) |
| stable = summary.get("winner_stable") |
| if stable is True: |
| stability_text = f"Stable: {METHOD_LABELS.get(summary.get('modal_best_valid_method', ''), summary.get('modal_best_valid_method', ''))}" |
| elif stable is False: |
| winners = [ |
| METHOD_LABELS.get(name, name) |
| for name in summary.get("winner_set", []) |
| ] |
| stability_text = "Mixed: " + ", ".join(winners) |
| else: |
| stability_text = "Pending" |
|
|
| row = [ |
| latex_escape(DISPLAY_NAMES[task]), |
| format_cell(task_rows.get("boundary")), |
| format_cell(task_rows.get("entropy")), |
| format_cell(task_rows.get("dominant")), |
| format_cell(task_rows.get("kmeans")), |
| latex_escape(stability_text), |
| ] |
| lines.append(" & ".join(row) + " \\\\") |
|
|
| lines.extend([ |
| "\\bottomrule", |
| "\\end{tabular}", |
| "}", |
| "\\end{table*}", |
| ]) |
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--input-dir", default="results/tables") |
| parser.add_argument("--coverage-tol", type=float, default=0.02) |
| parser.add_argument( |
| "--detail-csv", |
| default="results/tables/real_strata_sensitivity_detail.csv", |
| ) |
| parser.add_argument( |
| "--winner-json", |
| default="results/tables/real_strata_sensitivity_winners.json", |
| ) |
| parser.add_argument( |
| "--winner-md", |
| default="results/tables/real_strata_sensitivity_summary.md", |
| ) |
| parser.add_argument( |
| "--winner-tex", |
| default="paper/rewrite_2026/latex/generated_real_strata_sensitivity.tex", |
| ) |
| args = parser.parse_args() |
|
|
| detail_rows, winner_rows = summarize(Path(args.input_dir), args.coverage_tol) |
|
|
| write_csv(Path(args.detail_csv), detail_rows) |
| Path(args.winner_json).write_text(json.dumps(winner_rows, indent=2), encoding="utf-8") |
| write_markdown(Path(args.winner_md), winner_rows) |
| write_latex(Path(args.winner_tex), winner_rows) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|