| |
| """Run a standard alpha robustness matrix on top of the standalone robust backtester.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import subprocess |
| import sys |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import pandas as pd |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent |
| RUNNER_PATH = PROJECT_ROOT / "deploy" / "v2" / "jsonl_alpha_robustness.py" |
|
|
|
|
| @dataclass(frozen=True) |
| class MatrixCase: |
| case_id: str |
| case_group: str |
| description: str |
| params: dict[str, Any] |
|
|
|
|
| def _baseline_params() -> dict[str, Any]: |
| return { |
| "backtest_engine": "custom", |
| "top_k": 5, |
| "rebalance_freq": 5, |
| "custom_weight_mode": "equal", |
| "position_size": 1.0, |
| "max_pos_each_stock": 0.2, |
| "max_daily_volume_participation": 0.0, |
| "max_daily_amount_participation": 0.0, |
| "buy_fee": 0.0013, |
| "sell_fee": 0.0013, |
| "enforce_cash_limit": True, |
| "score_transform": "identity", |
| "score_clip": 3.0, |
| "universe_filter": "none", |
| "universe_top_n": 0, |
| "universe_lookback_days": 20, |
| "redistribute_unfilled_cash": False, |
| } |
|
|
|
|
| def _make_standard_cases() -> list[MatrixCase]: |
| base = _baseline_params() |
| alpha_score_cap20 = { |
| **base, |
| "custom_weight_mode": "alpha_score", |
| "redistribute_unfilled_cash": True, |
| } |
| cases = [ |
| MatrixCase("baseline_replay", "baseline", "Baseline replay with fair equal-weight + 20% cap", base), |
| MatrixCase("topk_10", "top_k", "TOP_K sensitivity: 10", {**base, "top_k": 10}), |
| MatrixCase("topk_15", "top_k", "TOP_K sensitivity: 15", {**base, "top_k": 15}), |
| MatrixCase("weight_alpha_score_cap20", "weighting", "Alpha-score weighting with 20% cap", alpha_score_cap20), |
| MatrixCase( |
| "weight_alpha_score_no_cap", |
| "weighting", |
| "Alpha-score weighting with no per-name cap", |
| { |
| **alpha_score_cap20, |
| "max_pos_each_stock": 1.0, |
| }, |
| ), |
| MatrixCase("fee_0bps", "fee", "Fee sensitivity: 0 bps per side", {**base, "buy_fee": 0.0, "sell_fee": 0.0}), |
| MatrixCase("fee_10bps", "fee", "Fee sensitivity: 10 bps per side", {**base, "buy_fee": 0.0010, "sell_fee": 0.0010}), |
| MatrixCase("fee_20bps", "fee", "Fee sensitivity: 20 bps per side", {**base, "buy_fee": 0.0020, "sell_fee": 0.0020}), |
| MatrixCase("fee_30bps", "fee", "Fee sensitivity: 30 bps per side", {**base, "buy_fee": 0.0030, "sell_fee": 0.0030}), |
| MatrixCase("fee_50bps", "fee", "Fee sensitivity: 50 bps per side", {**base, "buy_fee": 0.0050, "sell_fee": 0.0050}), |
| MatrixCase("rebalance_10d", "rebalance", "Rebalance sensitivity: every 10 trading days", {**base, "rebalance_freq": 10}), |
| MatrixCase("rebalance_20d", "rebalance", "Rebalance sensitivity: every 20 trading days", {**base, "rebalance_freq": 20}), |
| MatrixCase( |
| "score_rank", |
| "score_transform", |
| "Score-transform robustness: rank transform under alpha-score weighting", |
| { |
| **alpha_score_cap20, |
| "score_transform": "rank", |
| }, |
| ), |
| MatrixCase( |
| "score_zscore", |
| "score_transform", |
| "Score-transform robustness: zscore transform under alpha-score weighting", |
| { |
| **alpha_score_cap20, |
| "score_transform": "zscore", |
| }, |
| ), |
| MatrixCase( |
| "score_rank_zscore", |
| "score_transform", |
| "Score-transform robustness: rank_zscore under alpha-score weighting", |
| { |
| **alpha_score_cap20, |
| "score_transform": "rank_zscore", |
| }, |
| ), |
| MatrixCase( |
| "frozen_recent_2026_ytd", |
| "frozen_recent", |
| "Frozen recent monitor on 2026 YTD", |
| { |
| **base, |
| "start_date": "2026-01-01", |
| }, |
| ), |
| ] |
| return cases |
|
|
|
|
| def _filter_cases(cases: list[MatrixCase], case_filter: set[str] | None, case_limit: int) -> list[MatrixCase]: |
| filtered = [case for case in cases if not case_filter or case.case_id in case_filter] |
| if case_limit > 0: |
| filtered = filtered[:case_limit] |
| return filtered |
|
|
|
|
| def _bool_flag(enabled: bool, flag: str) -> list[str]: |
| return [flag] if enabled else [] |
|
|
|
|
| def _build_case_command( |
| *, |
| jsonl_path: Path, |
| output_dir: Path, |
| period: str, |
| data_path: Path | None, |
| backtest_workers: int, |
| label_forward_days: int, |
| trade_guard_config: str | None, |
| capture_detail_artifacts: bool, |
| case: MatrixCase, |
| ) -> list[str]: |
| params = case.params |
| cmd = [ |
| sys.executable, |
| str(RUNNER_PATH), |
| "--jsonl", |
| str(jsonl_path), |
| "--period", |
| period, |
| "--output-dir", |
| str(output_dir), |
| "--backtest-workers", |
| str(backtest_workers), |
| "--label-forward-days", |
| str(label_forward_days), |
| "--backtest-engine", |
| str(params["backtest_engine"]), |
| "--top-k", |
| str(params["top_k"]), |
| "--rebalance-freq", |
| str(params["rebalance_freq"]), |
| "--custom-weight-mode", |
| str(params["custom_weight_mode"]), |
| "--position-size", |
| str(params["position_size"]), |
| "--max-pos-each-stock", |
| str(params["max_pos_each_stock"]), |
| "--max-daily-volume-participation", |
| str(params["max_daily_volume_participation"]), |
| "--max-daily-amount-participation", |
| str(params["max_daily_amount_participation"]), |
| "--buy-fee", |
| str(params["buy_fee"]), |
| "--sell-fee", |
| str(params["sell_fee"]), |
| "--score-transform", |
| str(params["score_transform"]), |
| "--score-clip", |
| str(params["score_clip"]), |
| "--universe-filter", |
| str(params["universe_filter"]), |
| "--universe-top-n", |
| str(params["universe_top_n"]), |
| "--universe-lookback-days", |
| str(params["universe_lookback_days"]), |
| ] |
| if data_path is not None: |
| cmd.extend(["--data-path", str(data_path)]) |
| if params.get("start_date"): |
| cmd.extend(["--start-date", str(params["start_date"])]) |
| if params.get("end_date"): |
| cmd.extend(["--end-date", str(params["end_date"])]) |
| if trade_guard_config: |
| cmd.extend(["--trade-guard-config", trade_guard_config]) |
| cmd.extend(_bool_flag(bool(params.get("enforce_cash_limit")), "--enforce-cash-limit")) |
| cmd.extend(_bool_flag(bool(params.get("redistribute_unfilled_cash")), "--redistribute-unfilled-cash")) |
| cmd.extend(_bool_flag(capture_detail_artifacts, "--capture-detail-artifacts")) |
| return cmd |
|
|
|
|
| def _load_case_manifest(case_dir: Path) -> dict[str, Any]: |
| manifest_path = case_dir / "robust_manifest.json" |
| if not manifest_path.exists(): |
| return {} |
| return json.loads(manifest_path.read_text(encoding="utf-8")) |
|
|
|
|
| def _read_csv(path: Path) -> pd.DataFrame: |
| if not path.exists(): |
| return pd.DataFrame() |
| try: |
| return pd.read_csv(path) |
| except pd.errors.EmptyDataError: |
| return pd.DataFrame() |
|
|
|
|
| def _merge_case_frames(case_records: list[dict[str, Any]], filename: str) -> pd.DataFrame: |
| frames: list[pd.DataFrame] = [] |
| for record in case_records: |
| path = Path(record["case_dir"]) / filename |
| df = _read_csv(path) |
| if df.empty: |
| continue |
| df.insert(0, "case_description", record["description"]) |
| df.insert(0, "case_group", record["case_group"]) |
| df.insert(0, "case_id", record["case_id"]) |
| frames.append(df) |
| if not frames: |
| return pd.DataFrame() |
| return pd.concat(frames, ignore_index=True) |
|
|
|
|
| def _safe_bool(series: pd.Series) -> pd.Series: |
| return series.fillna(False).astype(bool) |
|
|
|
|
| def _compute_coverage_summary(case_records: list[dict[str, Any]]) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
| for record in case_records: |
| case_dir = Path(record["case_dir"]) |
| manifest = _load_case_manifest(case_dir) |
| top_k = int(manifest.get("top_k") or 0) |
|
|
| portfolio_df = _read_csv(case_dir / "portfolio_daily.csv") |
| signal_df = _read_csv(case_dir / "signal_selection_daily.csv") |
| plan_df = _read_csv(case_dir / "rebalance_plan.csv") |
|
|
| seed_names: set[str] = set() |
| for frame in (portfolio_df, signal_df, plan_df): |
| if "seed_name" in frame.columns and not frame.empty: |
| seed_names.update(frame["seed_name"].dropna().astype(str).unique().tolist()) |
|
|
| for seed_name in sorted(seed_names): |
| row: dict[str, Any] = { |
| "case_id": record["case_id"], |
| "case_group": record["case_group"], |
| "case_description": record["description"], |
| "seed_name": seed_name, |
| "top_k": top_k, |
| } |
|
|
| seed_port = portfolio_df[portfolio_df["seed_name"].astype(str) == seed_name].copy() if not portfolio_df.empty else pd.DataFrame() |
| seed_signal = signal_df[signal_df["seed_name"].astype(str) == seed_name].copy() if not signal_df.empty else pd.DataFrame() |
| seed_plan = plan_df[plan_df["seed_name"].astype(str) == seed_name].copy() if not plan_df.empty else pd.DataFrame() |
|
|
| if not seed_port.empty: |
| row["n_portfolio_days"] = int(len(seed_port)) |
| row["n_rebalance_days"] = int(_safe_bool(seed_port["is_rebalance"]).sum()) if "is_rebalance" in seed_port.columns else 0 |
| row["n_trade_days"] = int(_safe_bool(seed_port["had_trade"]).sum()) if "had_trade" in seed_port.columns else 0 |
| row["cash_weight_mean"] = float(pd.to_numeric(seed_port.get("cash_weight"), errors="coerce").mean()) |
| row["cash_weight_p95"] = float(pd.to_numeric(seed_port.get("cash_weight"), errors="coerce").quantile(0.95)) |
| row["cash_weight_max"] = float(pd.to_numeric(seed_port.get("cash_weight"), errors="coerce").max()) |
| else: |
| row["n_portfolio_days"] = 0 |
| row["n_rebalance_days"] = 0 |
| row["n_trade_days"] = 0 |
| row["cash_weight_mean"] = None |
| row["cash_weight_p95"] = None |
| row["cash_weight_max"] = None |
|
|
| if not seed_plan.empty and "date" in seed_plan.columns: |
| per_day_plan = ( |
| seed_plan.groupby("date", as_index=False) |
| .agg( |
| target_count_eod=("target_count_eod", "max"), |
| unallocated_cash_eod=("unallocated_cash_eod", "max"), |
| invested_value_eod=("invested_value_eod", "max"), |
| ) |
| ) |
| counts = pd.to_numeric(per_day_plan["target_count_eod"], errors="coerce") |
| row["mean_target_count_eod"] = float(counts.mean()) |
| row["median_target_count_eod"] = float(counts.median()) |
| row["min_target_count_eod"] = float(counts.min()) |
| row["max_target_count_eod"] = float(counts.max()) |
| row["rebalance_days_lt_topk"] = int((counts < top_k).sum()) if top_k > 0 else 0 |
| row["pct_rebalance_days_lt_topk"] = float((counts < top_k).mean()) if top_k > 0 and len(counts) else 0.0 |
| row["unallocated_cash_eod_mean"] = float(pd.to_numeric(per_day_plan["unallocated_cash_eod"], errors="coerce").mean()) |
| else: |
| row["mean_target_count_eod"] = None |
| row["median_target_count_eod"] = None |
| row["min_target_count_eod"] = None |
| row["max_target_count_eod"] = None |
| row["rebalance_days_lt_topk"] = 0 |
| row["pct_rebalance_days_lt_topk"] = 0.0 |
| row["unallocated_cash_eod_mean"] = None |
|
|
| if not seed_signal.empty and "trade_date" in seed_signal.columns: |
| if "topk_by_score" in seed_signal.columns: |
| topk_rows = seed_signal[_safe_bool(seed_signal["topk_by_score"])].copy() |
| else: |
| topk_rows = seed_signal.copy() |
| if not topk_rows.empty: |
| per_trade_date = topk_rows.groupby("trade_date", as_index=False).agg( |
| topk_names=("instrument", "count"), |
| zero_score_names=("score", lambda s: int((pd.to_numeric(s, errors="coerce").fillna(0.0).abs() <= 1e-12).sum())), |
| ) |
| counts = pd.to_numeric(per_trade_date["topk_names"], errors="coerce") |
| row["signal_trade_dates"] = int(len(per_trade_date)) |
| row["mean_topk_names_per_signal_day"] = float(counts.mean()) |
| row["min_topk_names_per_signal_day"] = float(counts.min()) |
| row["pct_signal_days_lt_topk"] = float((counts < top_k).mean()) if top_k > 0 else 0.0 |
| row["all_zero_score_days"] = int((per_trade_date["zero_score_names"] == per_trade_date["topk_names"]).sum()) |
| row["pct_all_zero_score_days"] = float((per_trade_date["zero_score_names"] == per_trade_date["topk_names"]).mean()) |
| row["zero_score_row_rate"] = float( |
| ( |
| pd.to_numeric(topk_rows["score"], errors="coerce").fillna(0.0).abs() <= 1e-12 |
| ).mean() |
| ) |
| else: |
| row["signal_trade_dates"] = 0 |
| row["mean_topk_names_per_signal_day"] = 0.0 |
| row["min_topk_names_per_signal_day"] = 0.0 |
| row["pct_signal_days_lt_topk"] = 0.0 |
| row["all_zero_score_days"] = 0 |
| row["pct_all_zero_score_days"] = 0.0 |
| row["zero_score_row_rate"] = 0.0 |
| else: |
| row["signal_trade_dates"] = 0 |
| row["mean_topk_names_per_signal_day"] = 0.0 |
| row["min_topk_names_per_signal_day"] = 0.0 |
| row["pct_signal_days_lt_topk"] = 0.0 |
| row["all_zero_score_days"] = 0 |
| row["pct_all_zero_score_days"] = 0.0 |
| row["zero_score_row_rate"] = 0.0 |
|
|
| rows.append(row) |
| return pd.DataFrame(rows) |
|
|
|
|
| def _write_outputs(output_root: Path, case_records: list[dict[str, Any]]) -> None: |
| matrix_cases = pd.DataFrame(case_records) |
| matrix_cases.to_csv(output_root / "matrix_cases.csv", index=False) |
|
|
| merged_summary = _merge_case_frames(case_records, "summary.csv") |
| merged_trials = _merge_case_frames(case_records, "trials.csv") |
| merged_summary_yearly = _merge_case_frames(case_records, "summary_yearly.csv") |
| merged_trials_yearly = _merge_case_frames(case_records, "trials_yearly.csv") |
| merged_aggregate_yearly = _merge_case_frames(case_records, "aggregate_yearly.csv") |
| coverage_summary = _compute_coverage_summary(case_records) |
|
|
| merged_summary.to_csv(output_root / "merged_summary.csv", index=False) |
| merged_trials.to_csv(output_root / "merged_trials.csv", index=False) |
| merged_summary_yearly.to_csv(output_root / "merged_summary_yearly.csv", index=False) |
| merged_trials_yearly.to_csv(output_root / "merged_trials_yearly.csv", index=False) |
| merged_aggregate_yearly.to_csv(output_root / "merged_aggregate_yearly.csv", index=False) |
| coverage_summary.to_csv(output_root / "coverage_sparsity_summary.csv", index=False) |
|
|
|
|
| def _case_metadata(case: MatrixCase, case_dir: Path) -> dict[str, Any]: |
| return { |
| "case_id": case.case_id, |
| "case_group": case.case_group, |
| "description": case.description, |
| "case_dir": str(case_dir), |
| **case.params, |
| } |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Run the standard alpha robustness matrix") |
| parser.add_argument("--jsonl", required=True, help="Alpha pack JSONL file") |
| parser.add_argument("--output-root", required=True, help="Directory that will hold all case outputs") |
| parser.add_argument("--period", default="test", choices=["train", "val", "test"]) |
| parser.add_argument("--data-path", default=None, help="Optional daily_pv.h5 path") |
| parser.add_argument("--backtest-workers", type=int, default=1, help="Worker count passed into each case run") |
| parser.add_argument("--label-forward-days", type=int, default=5) |
| parser.add_argument("--trade-guard-config", default=None) |
| parser.add_argument("--case-filter", default="", help="Comma-separated case IDs for debugging / partial runs") |
| parser.add_argument("--case-limit", type=int, default=0, help="Optional cap after filtering") |
| parser.add_argument("--capture-detail-artifacts", action="store_true", help="Capture full detail artifacts for every case") |
| parser.add_argument("--dry-run", action="store_true", help="Print planned commands without executing them") |
| args = parser.parse_args() |
|
|
| jsonl_path = Path(args.jsonl).expanduser().resolve() |
| output_root = Path(args.output_root).expanduser().resolve() |
| data_path = Path(args.data_path).expanduser().resolve() if args.data_path else None |
| case_filter = {item.strip() for item in str(args.case_filter).split(",") if item.strip()} |
|
|
| cases = _filter_cases(_make_standard_cases(), case_filter or None, int(args.case_limit)) |
| if not cases: |
| raise SystemExit("No robustness cases selected.") |
|
|
| output_root.mkdir(parents=True, exist_ok=True) |
| case_records: list[dict[str, Any]] = [] |
|
|
| print(f"jsonl={jsonl_path}", flush=True) |
| print(f"output_root={output_root}", flush=True) |
| print(f"n_cases={len(cases)}", flush=True) |
|
|
| for idx, case in enumerate(cases, start=1): |
| case_dir = output_root / "cases" / case.case_id |
| case_dir.mkdir(parents=True, exist_ok=True) |
| cmd = _build_case_command( |
| jsonl_path=jsonl_path, |
| output_dir=case_dir, |
| period=args.period, |
| data_path=data_path, |
| backtest_workers=max(int(args.backtest_workers), 1), |
| label_forward_days=int(args.label_forward_days), |
| trade_guard_config=args.trade_guard_config, |
| capture_detail_artifacts=bool(args.capture_detail_artifacts), |
| case=case, |
| ) |
| print(f"\n[{idx}/{len(cases)}] {case.case_id} :: {case.description}", flush=True) |
| print(" ".join(cmd), flush=True) |
| if not args.dry_run: |
| subprocess.run(cmd, check=True) |
| case_records.append(_case_metadata(case, case_dir)) |
|
|
| _write_outputs(output_root, case_records) |
| (output_root / "matrix_manifest.json").write_text( |
| json.dumps( |
| { |
| "jsonl": str(jsonl_path), |
| "period": args.period, |
| "data_path": str(data_path) if data_path else None, |
| "capture_detail_artifacts": bool(args.capture_detail_artifacts), |
| "cases": case_records, |
| }, |
| ensure_ascii=False, |
| indent=2, |
| ) |
| + "\n", |
| encoding="utf-8", |
| ) |
|
|
| print("\nSaved matrix outputs:", flush=True) |
| print(output_root / "matrix_cases.csv", flush=True) |
| print(output_root / "merged_summary.csv", flush=True) |
| print(output_root / "merged_summary_yearly.csv", flush=True) |
| print(output_root / "coverage_sparsity_summary.csv", flush=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|