robust-AAE / scripts /jsonl_alpha_robustness.py
PuLam's picture
Add standalone alpha robustness matrix bundle
79e6483 verified
#!/usr/bin/env python3
"""Run baseline backtests for every factor in a JSONL file.
This script is intended for lightweight Kaggle usage:
- provide a JSONL file with {"name": ..., "expr": ...} rows
- run standalone backtests without model inference
- save summary, yearly breakdown, and optional detail artifacts
"""
from __future__ import annotations
import argparse
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
import json
import multiprocessing as mp
import os
from pathlib import Path
import random
import re
import sys
import time
from typing import Any
import pandas as pd
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
configure_periods = None
execute_expression = None
load_data = None
PERIOD_CONFIGS = {
"train": {"start": "2016-01-01", "end": "2020-12-31"},
"val": {"start": "2021-01-01", "end": "2021-12-31"},
"test": {"start": "2022-01-01", "end": "2026-12-31"},
}
YEAR_REGIMES = {
2022: "bearish",
2025: "bullish",
}
DETAIL_KEYS = {"yearly_metrics", "trade_log", "stock_contrib", "holding_log", "portfolio_log", "signal_selection_log"}
DETAIL_CONTEXT_COLUMNS = [
"seed_name",
"candidate_scope",
"factor_name",
"factor_expr",
"turn",
"call_index",
"proposal_rank",
]
STOCK_CONTRIB_COLUMNS = [
"year",
"market_regime",
"instrument",
"contribution_return",
"abs_contribution_return",
"realized_pnl",
"start_value",
"end_value",
"buy_trades",
"sell_trades",
"shares_bought",
"shares_sold",
"buy_cash_outflow",
"sell_net_proceeds",
"holding_days",
"avg_shares_held",
"ending_shares",
"avg_market_value",
"max_market_value",
"market_volume_sum",
"market_amount_sum",
"rank",
"seed_name",
"candidate_scope",
"factor_name",
"factor_expr",
"turn",
"call_index",
"proposal_rank",
]
TRADE_LOG_COLUMNS = [
"date",
"year",
"market_regime",
"action",
"instrument",
"shares",
"current_shares",
"target_shares",
"requested_shares",
"filled_shares",
"unfilled_shares",
"fill_ratio",
"price",
"order_value",
"filled_value",
"redistributed_notional",
"gross_notional",
"net_proceeds",
"cash_outflow",
"transaction_cost",
"realized_pnl",
"days_held",
"holdings_count_before",
"holdings_count_after",
"market_volume",
"market_amount",
"volume_participation",
"amount_participation",
"clip_reason",
"seed_name",
"candidate_scope",
"factor_name",
"factor_expr",
"turn",
"call_index",
"proposal_rank",
]
HOLDING_LOG_COLUMNS = [
"date",
"year",
"market_regime",
"instrument",
"shares_held",
"market_value",
"close_price",
"market_volume",
"market_amount",
"portfolio_value",
"cash_eod",
"weight",
"seed_name",
"candidate_scope",
"factor_name",
"factor_expr",
"turn",
"call_index",
"proposal_rank",
]
PORTFOLIO_DAILY_COLUMNS = [
"date",
"year",
"market_regime",
"portfolio_value",
"cash_eod",
"cash_weight",
"n_held",
"portfolio_return",
"benchmark_return",
"excess_return",
"is_rebalance",
"had_trade",
"buy_trades",
"sell_trades",
"gross_turnover",
"transaction_cost",
"fill_ratio_mean",
"fill_ratio_min",
"volume_participation_max",
"amount_participation_max",
"seed_name",
"candidate_scope",
"factor_name",
"factor_expr",
"turn",
"call_index",
"proposal_rank",
]
REBALANCE_LOG_COLUMNS = PORTFOLIO_DAILY_COLUMNS
REBALANCE_PLAN_COLUMNS = [
"date",
"year",
"market_regime",
"portfolio_value",
"cash_eod",
"cash_weight",
"invested_value_eod",
"unallocated_cash_eod",
"gross_turnover",
"transaction_cost",
"had_trade",
"buy_trades",
"sell_trades",
"target_count_eod",
"target_total_value_eod",
"target_list_eod",
"instrument",
"target_rank_eod",
"target_value_eod",
"target_weight_eod",
"shares_held_eod",
"trade_actions",
"current_shares_ref",
"target_shares_ref",
"requested_shares_total",
"filled_shares_total",
"unfilled_shares_total",
"requested_notional_total",
"filled_notional_total",
"buy_requested_shares",
"buy_filled_shares",
"sell_requested_shares",
"sell_filled_shares",
"fill_ratio_mean",
"clip_reasons",
*DETAIL_CONTEXT_COLUMNS,
]
REBALANCE_WINDOW_RETURN_COLUMNS = [
"window_index",
"window_start_date",
"window_end_date",
"year",
"market_regime",
"n_days",
"is_partial_window",
"had_trade_any",
"cash_weight_start",
"cash_weight_end",
"portfolio_return_compounded",
"benchmark_return_compounded",
"excess_compounded_return",
"portfolio_return_sum",
"benchmark_return_sum",
"excess_return_sum",
"mean_daily_portfolio_return",
"mean_daily_benchmark_return",
"mean_daily_excess_return",
"cumulative_portfolio_return_to_end",
"cumulative_benchmark_return_to_end",
"cumulative_excess_compounded_return_to_end",
"full_period_window_reconstructed_return",
"full_period_window_reconstruction_error",
*DETAIL_CONTEXT_COLUMNS,
]
SIGNAL_SELECTION_COLUMNS = [
"signal_date",
"trade_date",
"instrument",
"score",
"trade_score_rank",
"top5_by_score",
"topk_by_score",
"buy_gate",
"force_exit",
"defer_sell",
"selected_eod",
"eod_hold_rank",
"shares_held_eod",
"market_value_eod",
"weight_eod",
"had_trade",
"trade_actions",
"requested_shares_total",
"filled_shares_total",
"requested_notional_total",
"filled_notional_total",
"fill_ratio_mean",
"clip_reason",
*DETAIL_CONTEXT_COLUMNS,
]
ALPHA_CASH_COST_RANKING_COLUMNS = [
"candidate_scope",
"seed_name",
"factor_name",
"factor_expr",
"ir",
"performance_return",
"benchmark_performance_return",
"excess_compounded_return",
"portfolio_nav_mdd",
"turnover_ratio",
"transaction_cost",
"gross_turnover",
"return_per_turnover",
"cash_weight_mean",
"cash_weight_median",
"cash_weight_p95",
"round2_redistributed_notional",
"round2_buy_trade_count",
"round2_rebalance_days",
"rank_cash_weight_mean_asc",
"rank_transaction_cost_asc",
"rank_return_per_turnover_desc",
"cash_cost_efficiency_rank",
]
def _parse_trade_guard_config(raw: str | None) -> dict[str, Any] | None:
value = (raw or "").strip()
if not value or value.lower() in {"none", "null", "off", "false", "0"}:
return None
if value.lower() in {"vn", "default", "true", "1"}:
return {}
parsed = json.loads(value)
if parsed is None:
return None
if not isinstance(parsed, dict):
raise ValueError("--trade-guard-config must be none/null, 'vn', or a JSON object")
return parsed
def _ensure_backtest_imports() -> None:
global configure_periods, execute_expression, load_data
if configure_periods is not None and execute_expression is not None and load_data is not None:
return
from backtest.robust_factor_executor import configure_periods as _configure_periods
from backtest.robust_factor_executor import execute_expression as _execute_expression
from backtest.robust_factor_executor import load_data as _load_data
configure_periods = _configure_periods
execute_expression = _execute_expression
load_data = _load_data
def _read_jsonl(path: Path) -> list[dict[str, Any]]:
text = path.read_text(encoding="utf-8").strip()
if not text:
return []
if path.suffix.lower() == ".json" or text[:1] == "[":
payload = json.loads(text)
if isinstance(payload, list):
return [row for row in payload if isinstance(row, dict)]
if isinstance(payload, dict):
rows = payload.get("rows")
if isinstance(rows, list):
return [row for row in rows if isinstance(row, dict)]
return [payload]
raise ValueError(f"Unsupported JSON payload in {path}")
rows: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def _group_rows_by_seed(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
grouped: dict[str, list[dict[str, Any]]] = {}
for idx, row in enumerate(rows, start=1):
seed_name = str(
row.get("seed_name")
or row.get("root_seed_name")
or row.get("parent_seed_name")
or row.get("name")
or f"seed_{idx}"
).strip()
grouped.setdefault(seed_name, []).append(row)
return grouped
def _sample_rows(rows: list[dict[str, Any]], sample_size: int, sample_seed: int) -> list[dict[str, Any]]:
if sample_size <= 0 or sample_size >= len(rows):
return list(rows)
grouped = _group_rows_by_seed(rows)
structured_grouping = any(
row.get("seed_name")
or row.get("seed_version_name")
or row.get("candidate_scope")
or row.get("factor_name")
or row.get("expression")
for row in rows
)
if structured_grouping and 0 < sample_size < len(grouped):
rng = random.Random(sample_seed)
selected_seed_names = set(rng.sample(sorted(grouped.keys()), sample_size))
sampled_rows: list[dict[str, Any]] = []
for row in rows:
seed_name = str(
row.get("seed_name")
or row.get("root_seed_name")
or row.get("parent_seed_name")
or row.get("name")
or ""
).strip()
if seed_name in selected_seed_names:
sampled_rows.append(row)
return sampled_rows
rng = random.Random(sample_seed)
return rng.sample(rows, sample_size)
def _infer_turn_from_version_name(seed_name: str, seed_version_name: str) -> int | None:
version = str(seed_version_name or "").strip()
seed = str(seed_name or "").strip()
if not version or not seed or version == seed:
return None
match = re.fullmatch(rf"{re.escape(seed)}_v(\d+)", version)
if match:
return int(match.group(1))
return None
def _build_output_dir(base_output_dir: str | None, jsonl_path: Path) -> Path:
if base_output_dir:
return Path(base_output_dir).expanduser()
if Path("/kaggle/working").exists():
return Path("/kaggle/working/aae_v2") / f"jsonl_backtest_{jsonl_path.stem}"
return PROJECT_ROOT / "data" / f"jsonl_backtest_{jsonl_path.stem}"
def _build_data_quality_report(df: pd.DataFrame, data_path: str | None) -> dict[str, Any]:
index_names = list(df.index.names) if isinstance(df.index, pd.MultiIndex) else [str(df.index.name)]
dates = (
pd.DatetimeIndex(df.index.get_level_values("datetime")).sort_values()
if isinstance(df.index, pd.MultiIndex) and "datetime" in df.index.names
else pd.DatetimeIndex(df.index).sort_values()
)
instruments = (
pd.Index(df.index.get_level_values("instrument").astype(str)).unique().sort_values()
if isinstance(df.index, pd.MultiIndex) and "instrument" in df.index.names
else pd.Index([], dtype=object)
)
coverage_by_year: list[dict[str, Any]] = []
if len(dates):
years = pd.Series(dates.year, index=range(len(dates)))
if len(instruments):
inst_series = pd.Series(df.index.get_level_values("instrument").astype(str), index=range(len(df)))
else:
inst_series = pd.Series([], dtype=str)
for year in sorted(years.dropna().unique()):
mask = years == year
coverage_by_year.append(
{
"year": int(year),
"n_rows": int(mask.sum()),
"n_days": int(pd.DatetimeIndex(dates[mask.to_numpy()]).nunique()),
"n_instruments": int(inst_series[mask].nunique()) if not inst_series.empty else 0,
}
)
missing_rate_by_column = {
str(col): round(float(pd.to_numeric(df[col], errors="coerce").isna().mean() if pd.api.types.is_numeric_dtype(df[col]) else df[col].isna().mean()), 6)
for col in df.columns
}
nonpositive_rate_by_column: dict[str, float] = {}
for col in df.columns:
series = pd.to_numeric(df[col], errors="coerce")
if series.notna().any():
nonpositive_rate_by_column[str(col)] = round(float((series <= 0).mean()), 6)
return {
"data_path": data_path or "repo default",
"index_names": index_names,
"index_is_unique": bool(df.index.is_unique),
"duplicate_index_rows": int(df.index.duplicated().sum()),
"index_monotonic_increasing": bool(df.index.is_monotonic_increasing),
"n_rows": int(len(df)),
"n_columns": int(len(df.columns)),
"n_days": int(dates.nunique()) if len(dates) else 0,
"n_instruments": int(len(instruments)),
"date_start": dates.min().strftime("%Y-%m-%d") if len(dates) else None,
"date_end": dates.max().strftime("%Y-%m-%d") if len(dates) else None,
"columns_present": list(map(str, df.columns)),
"core_field_availability": {
"open": "$open" in df.columns,
"close": "$close" in df.columns,
"high": "$high" in df.columns,
"low": "$low" in df.columns,
"volume": "$volume" in df.columns,
"amount": "$amount" in df.columns,
"bench_return": "$bench_return" in df.columns,
},
"missing_rate_by_column": missing_rate_by_column,
"nonpositive_rate_by_column": nonpositive_rate_by_column,
"coverage_by_year": coverage_by_year,
"adjustment_status": "unknown_not_verified_from_h5",
"notes": [
"Report is structural/data-health oriented; it does not prove economic correctness.",
"Adjustment status is not inferred from HDF content alone and should be verified from the data pipeline/vendor contract.",
],
}
def _market_regime(year: int) -> str:
return YEAR_REGIMES.get(int(year), "neutral")
def _is_missing(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str):
return not value.strip()
try:
return bool(pd.isna(value))
except Exception:
return False
def _clean_scalar(value: Any) -> Any:
if _is_missing(value):
return None
if hasattr(value, "item"):
try:
return value.item()
except Exception:
pass
return value
def _optional_int(value: Any) -> int | None:
value = _clean_scalar(value)
if value is None:
return None
try:
return int(float(value))
except Exception:
return None
def _metric_payload(result: dict[str, Any]) -> dict[str, Any]:
ir = float(result.get("ir", 0.0) or 0.0)
ic_mean = float(result.get("ic_mean", 0.0) or 0.0)
icir = float(result.get("icir", 0.0) or 0.0)
rank_ic_mean = float(result.get("rank_ic_mean", 0.0) or 0.0)
rank_icir = float(result.get("rank_icir", 0.0) or 0.0)
annualized_return = float(result.get("annualized_return", 0.0) or 0.0)
mdd = float(result.get("mdd", 0.0) or 0.0)
benchmark_performance_return = float(result.get("benchmark_performance_return", 0.0) or 0.0)
excess_compounded_return = float(result.get("excess_compounded_return", 0.0) or 0.0)
portfolio_nav_mdd = float(result.get("portfolio_nav_mdd", 0.0) or 0.0)
return {
"success": bool(result.get("success", False)),
"backtest_engine": str(result.get("backtest_engine", "")),
"label_forward_days": int(result.get("label_forward_days", 0) or 0),
"ir": ir,
"ic_mean": ic_mean,
"ic_std": float(result.get("ic_std", 0.0) or 0.0),
"icir": icir,
"rank_ic_mean": rank_ic_mean,
"rank_ic_std": float(result.get("rank_ic_std", 0.0) or 0.0),
"rank_icir": rank_icir,
"aer": annualized_return,
"annualized_return": annualized_return,
"annualized_volatility": float(result.get("annualized_volatility", 0.0) or 0.0),
"performance_return": float(result.get("performance_return", 0.0) or 0.0),
"benchmark_performance_return": benchmark_performance_return,
"excess_compounded_return": excess_compounded_return,
"sharpe": float(result.get("sharpe", 0.0) or 0.0),
"winrate": float(result.get("winrate", 0.0) or 0.0),
"mdd": mdd,
"excess_mdd": float(result.get("excess_mdd", mdd) or 0.0),
"portfolio_nav_mdd": portfolio_nav_mdd,
"drawdown_duration_max": int(result.get("drawdown_duration_max", 0) or 0),
"drawdown_duration_mean": float(result.get("drawdown_duration_mean", 0.0) or 0.0),
"drawdown_duration_median": float(result.get("drawdown_duration_median", 0.0) or 0.0),
"total_return": float(result.get("total_return", 0.0) or 0.0),
"final_value": float(result.get("final_value", 0.0) or 0.0),
"n_days": int(result.get("n_days", 0) or 0),
"n_ic_days": int(result.get("n_ic_days", 0) or 0),
"exec_time": float(result.get("exec_time", 0.0) or 0.0),
"yearly_metrics": result.get("yearly_metrics") or {},
"trade_log": result.get("trade_log") or [],
"stock_contrib": result.get("stock_contrib") or [],
"holding_log": result.get("holding_log") or [],
"portfolio_log": result.get("portfolio_log") or [],
"signal_selection_log": result.get("signal_selection_log") or [],
"qlib_warnings": result.get("qlib_warnings") or [],
"trade_guard_config": result.get("trade_guard_config"),
"rebalance_mode": result.get("rebalance_mode", "dropout"),
"transaction_cost": float(result.get("transaction_cost", 0.0) or 0.0),
"gross_turnover": float(result.get("gross_turnover", 0.0) or 0.0),
"turnover_ratio": float(result.get("turnover_ratio", 0.0) or 0.0),
"error": result.get("error"),
# run22-style aliases for easier comparison
"information_ratio": ir,
"IC": ic_mean,
"ICIR": icir,
"rank_ic": rank_ic_mean,
"max_drawdown": mdd,
"portfolio_nav_max_drawdown": portfolio_nav_mdd,
}
def _to_float_or_none(value: Any) -> float | None:
value = _clean_scalar(value)
if value is None:
return None
try:
return float(value)
except Exception:
return None
def _series_stat(values: pd.Series, op: str) -> float | int | None:
clean = pd.to_numeric(values, errors="coerce").dropna()
if clean.empty:
return None
if op == "mean":
return float(clean.mean())
if op == "median":
return float(clean.median())
if op == "min":
return float(clean.min())
if op == "max":
return float(clean.max())
if op == "p95":
return float(clean.quantile(0.95))
if op == "p05":
return float(clean.quantile(0.05))
raise ValueError(f"Unsupported stat op: {op}")
def _ensure_detail_context_columns(frame: pd.DataFrame) -> pd.DataFrame:
frame = frame.copy()
for column in DETAIL_CONTEXT_COLUMNS:
if column not in frame.columns:
frame[column] = pd.NA
for column in ["seed_name", "candidate_scope", "factor_name", "factor_expr"]:
frame[column] = frame[column].where(frame[column].notna(), "").astype(str)
for column in ["turn", "call_index", "proposal_rank"]:
frame[column] = pd.to_numeric(frame[column], errors="coerce").astype("Int64")
return frame
def _annotate_portfolio_log_with_trade_summary(
portfolio_log: list[dict[str, Any]],
trade_log: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if not portfolio_log:
return []
portfolio_df = pd.DataFrame(portfolio_log).copy()
if portfolio_df.empty:
return []
portfolio_df["date"] = pd.to_datetime(portfolio_df["date"]).dt.strftime("%Y-%m-%d")
if trade_log:
trade_df = pd.DataFrame(trade_log).copy()
if not trade_df.empty:
trade_df["date"] = pd.to_datetime(trade_df["date"]).dt.strftime("%Y-%m-%d")
for col in [
"gross_notional",
"transaction_cost",
"fill_ratio",
"volume_participation",
"amount_participation",
]:
if col not in trade_df.columns:
trade_df[col] = pd.NA
for col in [
"gross_notional",
"transaction_cost",
"fill_ratio",
"volume_participation",
"amount_participation",
]:
if col in trade_df.columns:
trade_df[col] = pd.to_numeric(trade_df[col], errors="coerce")
action_series = trade_df["action"].astype(str) if "action" in trade_df.columns else pd.Series("", index=trade_df.index)
trade_df["buy_flag"] = (action_series == "buy").astype(int)
trade_df["sell_flag"] = (action_series == "sell").astype(int)
trade_summary = (
trade_df.groupby("date", dropna=False)
.agg(
had_trade=("action", "size"),
buy_trades=("buy_flag", "sum"),
sell_trades=("sell_flag", "sum"),
gross_turnover=("gross_notional", "sum"),
transaction_cost=("transaction_cost", "sum"),
fill_ratio_mean=("fill_ratio", "mean"),
fill_ratio_min=("fill_ratio", "min"),
volume_participation_max=("volume_participation", "max"),
amount_participation_max=("amount_participation", "max"),
)
.reset_index()
)
trade_summary["had_trade"] = trade_summary["had_trade"].fillna(0).astype(int) > 0
portfolio_df = portfolio_df.merge(trade_summary, on="date", how="left")
defaults = {
"had_trade": False,
"buy_trades": 0,
"sell_trades": 0,
"gross_turnover": 0.0,
"transaction_cost": 0.0,
"fill_ratio_mean": None,
"fill_ratio_min": None,
"volume_participation_max": None,
"amount_participation_max": None,
}
for key, default in defaults.items():
if key not in portfolio_df.columns:
portfolio_df[key] = default
else:
if isinstance(default, bool):
portfolio_df[key] = portfolio_df[key].where(portfolio_df[key].notna(), False).astype(bool)
elif isinstance(default, int):
portfolio_df[key] = pd.to_numeric(portfolio_df[key], errors="coerce").fillna(0).astype(int)
elif default == 0.0:
portfolio_df[key] = pd.to_numeric(portfolio_df[key], errors="coerce").fillna(0.0)
return portfolio_df.to_dict("records")
def _enrich_result_diagnostics(payload: dict[str, Any]) -> dict[str, Any]:
portfolio_log = _annotate_portfolio_log_with_trade_summary(
payload.get("portfolio_log") or [],
payload.get("trade_log") or [],
)
payload["portfolio_log"] = portfolio_log
portfolio_df = pd.DataFrame(portfolio_log)
trade_df = pd.DataFrame(payload.get("trade_log") or [])
if not portfolio_df.empty:
for col in [
"portfolio_value",
"cash_eod",
"cash_weight",
"n_held",
"portfolio_return",
"benchmark_return",
"excess_return",
"gross_turnover",
"transaction_cost",
"fill_ratio_mean",
"fill_ratio_min",
]:
if col in portfolio_df.columns:
portfolio_df[col] = pd.to_numeric(portfolio_df[col], errors="coerce")
if "cash_weight" not in portfolio_df.columns and {"cash_eod", "portfolio_value"}.issubset(portfolio_df.columns):
portfolio_df["cash_weight"] = portfolio_df["cash_eod"] / portfolio_df["portfolio_value"].replace(0, pd.NA)
if "is_rebalance" in portfolio_df.columns:
portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool)
else:
portfolio_df["is_rebalance"] = False
returns = pd.to_numeric(portfolio_df.get("portfolio_return"), errors="coerce").dropna()
if not returns.empty:
reconstructed = float((1.0 + returns).prod() - 1.0)
payload["performance_return_reconstructed"] = reconstructed
payload["performance_return_reconstruction_error"] = reconstructed - float(payload.get("performance_return", 0.0) or 0.0)
window_df = _build_rebalance_window_frame(portfolio_df)
payload["rebalance_window_count"] = int(len(window_df))
if not window_df.empty:
window_reconstructed = float((1.0 + pd.to_numeric(window_df["portfolio_return_compounded"], errors="coerce").fillna(0.0)).prod() - 1.0)
payload["rebalance_window_return_reconstructed"] = window_reconstructed
payload["rebalance_window_return_reconstruction_error"] = window_reconstructed - float(payload.get("performance_return", 0.0) or 0.0)
payload["rebalance_window_days_mean"] = _series_stat(window_df["n_days"], "mean")
payload["rebalance_window_days_median"] = _series_stat(window_df["n_days"], "median")
payload["rebalance_window_days_max"] = _series_stat(window_df["n_days"], "max")
else:
payload["rebalance_window_return_reconstructed"] = None
payload["rebalance_window_return_reconstruction_error"] = None
payload["rebalance_window_days_mean"] = None
payload["rebalance_window_days_median"] = None
payload["rebalance_window_days_max"] = None
cash_weight = pd.to_numeric(portfolio_df.get("cash_weight"), errors="coerce").dropna()
payload["cash_weight_mean"] = _series_stat(cash_weight, "mean")
payload["cash_weight_median"] = _series_stat(cash_weight, "median")
payload["cash_weight_p95"] = _series_stat(cash_weight, "p95")
rebalance_df = portfolio_df[portfolio_df["is_rebalance"]].copy()
payload["rebalance_days"] = int(len(rebalance_df))
if not rebalance_df.empty:
n_held = pd.to_numeric(rebalance_df.get("n_held"), errors="coerce").dropna()
payload["rebalance_holdings_min"] = _series_stat(n_held, "min")
payload["rebalance_holdings_max"] = _series_stat(n_held, "max")
payload["rebalance_holdings_mean"] = _series_stat(n_held, "mean")
payload["rebalance_holdings_median"] = _series_stat(n_held, "median")
payload["rebalance_days_with_trade"] = int(
(rebalance_df["had_trade"] if "had_trade" in rebalance_df.columns else pd.Series(False, index=rebalance_df.index))
.fillna(False)
.astype(bool)
.sum()
)
else:
payload["rebalance_holdings_min"] = None
payload["rebalance_holdings_max"] = None
payload["rebalance_holdings_mean"] = None
payload["rebalance_holdings_median"] = None
payload["rebalance_days_with_trade"] = 0
else:
payload["performance_return_reconstructed"] = None
payload["performance_return_reconstruction_error"] = None
payload["rebalance_window_count"] = 0
payload["rebalance_window_return_reconstructed"] = None
payload["rebalance_window_return_reconstruction_error"] = None
payload["rebalance_window_days_mean"] = None
payload["rebalance_window_days_median"] = None
payload["rebalance_window_days_max"] = None
payload["cash_weight_mean"] = None
payload["cash_weight_median"] = None
payload["cash_weight_p95"] = None
payload["rebalance_days"] = 0
payload["rebalance_holdings_min"] = None
payload["rebalance_holdings_max"] = None
payload["rebalance_holdings_mean"] = None
payload["rebalance_holdings_median"] = None
payload["rebalance_days_with_trade"] = 0
if not trade_df.empty:
for col in ["transaction_cost", "gross_notional", "filled_value", "fill_ratio"]:
if col in trade_df.columns:
trade_df[col] = pd.to_numeric(trade_df[col], errors="coerce")
gross_turnover_col = "gross_notional" if "gross_notional" in trade_df.columns else "filled_value"
reconstructed_turnover = float(trade_df[gross_turnover_col].fillna(0.0).sum()) if gross_turnover_col in trade_df.columns else 0.0
reconstructed_cost = float(trade_df["transaction_cost"].fillna(0.0).sum()) if "transaction_cost" in trade_df.columns else 0.0
payload["gross_turnover_reconstructed"] = reconstructed_turnover
payload["gross_turnover_reconstruction_error"] = reconstructed_turnover - float(payload.get("gross_turnover", 0.0) or 0.0)
payload["transaction_cost_reconstructed"] = reconstructed_cost
payload["transaction_cost_reconstruction_error"] = reconstructed_cost - float(payload.get("transaction_cost", 0.0) or 0.0)
fill_ratio_source = trade_df["fill_ratio"] if "fill_ratio" in trade_df.columns else pd.Series(dtype=float)
fill_ratio = pd.to_numeric(fill_ratio_source, errors="coerce").dropna()
payload["fill_ratio_mean"] = _series_stat(fill_ratio, "mean")
payload["fill_ratio_p05"] = _series_stat(fill_ratio, "p05")
clip_reason_source = trade_df["clip_reason"] if "clip_reason" in trade_df.columns else pd.Series("", index=trade_df.index, dtype=object)
clip_reason_series = clip_reason_source.fillna("").astype(str)
redistributed_source = (
trade_df["redistributed_notional"]
if "redistributed_notional" in trade_df.columns
else pd.Series(0.0, index=trade_df.index, dtype=float)
)
redistributed_series = pd.to_numeric(redistributed_source, errors="coerce").fillna(0.0)
round2_mask = redistributed_series.gt(0.0) | clip_reason_series.str.contains("round2_cash_redistribution", regex=False)
payload["round2_buy_trade_count"] = int(round2_mask.sum())
payload["round2_redistributed_notional"] = float(redistributed_series.sum()) if "redistributed_notional" in trade_df.columns else (
float(trade_df.loc[round2_mask, gross_turnover_col].fillna(0.0).sum())
if gross_turnover_col in trade_df.columns
else 0.0
)
payload["round2_rebalance_days"] = int(
trade_df.loc[round2_mask, "date"].astype(str).nunique()
) if "date" in trade_df.columns else 0
else:
payload["gross_turnover_reconstructed"] = None
payload["gross_turnover_reconstruction_error"] = None
payload["transaction_cost_reconstructed"] = None
payload["transaction_cost_reconstruction_error"] = None
payload["fill_ratio_mean"] = None
payload["fill_ratio_p05"] = None
payload["round2_buy_trade_count"] = 0
payload["round2_redistributed_notional"] = 0.0
payload["round2_rebalance_days"] = 0
turnover_ratio = _to_float_or_none(payload.get("turnover_ratio"))
performance_return = _to_float_or_none(payload.get("performance_return"))
if turnover_ratio is not None and abs(turnover_ratio) > 1e-12 and performance_return is not None:
payload["return_per_turnover"] = float(performance_return / turnover_ratio)
else:
payload["return_per_turnover"] = None
gross_turnover = _to_float_or_none(payload.get("gross_turnover"))
transaction_cost = _to_float_or_none(payload.get("transaction_cost"))
if gross_turnover is not None and gross_turnover > 1e-12 and transaction_cost is not None:
payload["cost_to_turnover"] = float(transaction_cost / gross_turnover)
else:
payload["cost_to_turnover"] = None
return payload
def _strip_detail_keys(payload: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in payload.items() if key not in DETAIL_KEYS}
def _rows_to_frame(rows: list[dict[str, Any]], columns: list[str] | None = None) -> pd.DataFrame:
if rows:
frame = pd.DataFrame(rows)
if columns:
ordered_columns = list(columns) + [col for col in frame.columns if col not in columns]
frame = frame.reindex(columns=ordered_columns)
return frame
return pd.DataFrame(columns=columns or [])
def _build_rebalance_window_frame(portfolio_df: pd.DataFrame) -> pd.DataFrame:
if portfolio_df is None or portfolio_df.empty:
return pd.DataFrame(columns=REBALANCE_WINDOW_RETURN_COLUMNS)
frame = _ensure_detail_context_columns(portfolio_df)
frame = frame.copy()
frame["date"] = pd.to_datetime(frame["date"], errors="coerce")
frame = frame.dropna(subset=["date"]).sort_values(DETAIL_CONTEXT_COLUMNS + ["date"], na_position="last").reset_index(drop=True)
if frame.empty:
return pd.DataFrame(columns=REBALANCE_WINDOW_RETURN_COLUMNS)
for col in ["portfolio_return", "benchmark_return", "cash_weight"]:
if col in frame.columns:
frame[col] = pd.to_numeric(frame[col], errors="coerce").fillna(0.0 if col != "cash_weight" else pd.NA)
else:
frame[col] = 0.0 if col != "cash_weight" else pd.NA
frame["excess_return"] = pd.to_numeric(frame.get("excess_return"), errors="coerce")
frame["is_rebalance"] = frame.get("is_rebalance", False)
frame["is_rebalance"] = frame["is_rebalance"].fillna(False).astype(bool)
frame["had_trade"] = frame.get("had_trade", False)
frame["had_trade"] = frame["had_trade"].fillna(False).astype(bool)
rows: list[dict[str, Any]] = []
group_cols = DETAIL_CONTEXT_COLUMNS
for group_key, grp in frame.groupby(group_cols, dropna=False, sort=False):
grp = grp.sort_values("date").reset_index(drop=True)
if grp.empty:
continue
starts = grp.index[grp["is_rebalance"]].tolist()
if not starts or starts[0] != 0:
starts = [0] + starts
starts = sorted(set(int(idx) for idx in starts))
expected_window_days = starts[1] - starts[0] if len(starts) >= 2 else None
full_portfolio_return = float((1.0 + grp["portfolio_return"].astype(float)).prod() - 1.0)
full_benchmark_return = float((1.0 + grp["benchmark_return"].astype(float)).prod() - 1.0)
full_excess_compounded_return = full_portfolio_return - full_benchmark_return
window_returns: list[float] = []
context = {
col: value for col, value in zip(group_cols, group_key if isinstance(group_key, tuple) else (group_key,))
}
for window_idx, start_idx in enumerate(starts, start=1):
end_idx = starts[window_idx] - 1 if window_idx < len(starts) else len(grp) - 1
window = grp.iloc[start_idx : end_idx + 1].copy()
if window.empty:
continue
portfolio_returns = window["portfolio_return"].astype(float)
benchmark_returns = window["benchmark_return"].astype(float)
excess_returns = portfolio_returns - benchmark_returns
portfolio_comp = float((1.0 + portfolio_returns).prod() - 1.0)
benchmark_comp = float((1.0 + benchmark_returns).prod() - 1.0)
excess_comp = portfolio_comp - benchmark_comp
window_returns.append(portfolio_comp)
upto_end = grp.iloc[: end_idx + 1].copy()
cumulative_portfolio = float((1.0 + upto_end["portfolio_return"].astype(float)).prod() - 1.0)
cumulative_benchmark = float((1.0 + upto_end["benchmark_return"].astype(float)).prod() - 1.0)
rows.append(
{
"window_index": int(window_idx),
"window_start_date": window["date"].iloc[0].strftime("%Y-%m-%d"),
"window_end_date": window["date"].iloc[-1].strftime("%Y-%m-%d"),
"year": int(window["date"].iloc[0].year),
"market_regime": str(window.get("market_regime", pd.Series(["neutral"])).iloc[0]),
"n_days": int(len(window)),
"is_partial_window": bool(expected_window_days is not None and window_idx == len(starts) and len(window) < expected_window_days),
"had_trade_any": bool(window["had_trade"].any()),
"cash_weight_start": _to_float_or_none(window.get("cash_weight", pd.Series([None])).iloc[0]),
"cash_weight_end": _to_float_or_none(window.get("cash_weight", pd.Series([None])).iloc[-1]),
"portfolio_return_compounded": portfolio_comp,
"benchmark_return_compounded": benchmark_comp,
"excess_compounded_return": excess_comp,
"portfolio_return_sum": float(portfolio_returns.sum()),
"benchmark_return_sum": float(benchmark_returns.sum()),
"excess_return_sum": float(excess_returns.sum()),
"mean_daily_portfolio_return": float(portfolio_returns.mean()),
"mean_daily_benchmark_return": float(benchmark_returns.mean()),
"mean_daily_excess_return": float(excess_returns.mean()),
"cumulative_portfolio_return_to_end": cumulative_portfolio,
"cumulative_benchmark_return_to_end": cumulative_benchmark,
"cumulative_excess_compounded_return_to_end": cumulative_portfolio - cumulative_benchmark,
"full_period_window_reconstructed_return": None, # filled after loop
"full_period_window_reconstruction_error": None, # filled after loop
**context,
}
)
full_window_reconstructed = float((1.0 + pd.Series(window_returns, dtype=float)).prod() - 1.0) if window_returns else 0.0
full_window_error = full_window_reconstructed - full_portfolio_return
for idx in range(len(rows) - len(window_returns), len(rows)):
rows[idx]["full_period_window_reconstructed_return"] = full_window_reconstructed
rows[idx]["full_period_window_reconstruction_error"] = full_window_error
if not rows:
return pd.DataFrame(columns=REBALANCE_WINDOW_RETURN_COLUMNS)
return _rows_to_frame(rows, REBALANCE_WINDOW_RETURN_COLUMNS)
def _build_rebalance_plan_frame(
trade_log_rows: list[dict[str, Any]],
holding_log_rows: list[dict[str, Any]],
portfolio_log_rows: list[dict[str, Any]],
) -> pd.DataFrame:
portfolio_df = _rows_to_frame(portfolio_log_rows, PORTFOLIO_DAILY_COLUMNS)
if portfolio_df.empty:
return pd.DataFrame(columns=REBALANCE_PLAN_COLUMNS)
portfolio_df = _ensure_detail_context_columns(portfolio_df)
portfolio_df["date"] = pd.to_datetime(portfolio_df["date"], errors="coerce").dt.strftime("%Y-%m-%d")
portfolio_df["is_rebalance"] = portfolio_df.get("is_rebalance", False)
portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool)
portfolio_df = portfolio_df[portfolio_df["is_rebalance"]].copy()
if portfolio_df.empty:
return pd.DataFrame(columns=REBALANCE_PLAN_COLUMNS)
for col in [
"portfolio_value",
"cash_eod",
"cash_weight",
"gross_turnover",
"transaction_cost",
]:
if col in portfolio_df.columns:
portfolio_df[col] = pd.to_numeric(portfolio_df[col], errors="coerce")
had_trade_source = portfolio_df["had_trade"] if "had_trade" in portfolio_df.columns else pd.Series(False, index=portfolio_df.index)
portfolio_df["had_trade"] = had_trade_source.fillna(False).astype(bool)
for col in ["buy_trades", "sell_trades"]:
if col in portfolio_df.columns:
portfolio_df[col] = pd.to_numeric(portfolio_df[col], errors="coerce").fillna(0).astype(int)
else:
portfolio_df[col] = 0
summary_cols = DETAIL_CONTEXT_COLUMNS + [
"date",
"year",
"market_regime",
"portfolio_value",
"cash_eod",
"cash_weight",
"gross_turnover",
"transaction_cost",
"had_trade",
"buy_trades",
"sell_trades",
]
rebalance_summary = portfolio_df[summary_cols].drop_duplicates(subset=DETAIL_CONTEXT_COLUMNS + ["date"], keep="last").copy()
rebalance_summary["invested_value_eod"] = rebalance_summary["portfolio_value"] - rebalance_summary["cash_eod"]
rebalance_summary["unallocated_cash_eod"] = rebalance_summary["cash_eod"]
holdings_df = _rows_to_frame(holding_log_rows, HOLDING_LOG_COLUMNS)
if not holdings_df.empty:
holdings_df = _ensure_detail_context_columns(holdings_df)
holdings_df["date"] = pd.to_datetime(holdings_df["date"], errors="coerce").dt.strftime("%Y-%m-%d")
holdings_df = holdings_df.merge(rebalance_summary[DETAIL_CONTEXT_COLUMNS + ["date"]], on=DETAIL_CONTEXT_COLUMNS + ["date"], how="inner")
for col in ["market_value", "weight", "shares_held"]:
if col in holdings_df.columns:
holdings_df[col] = pd.to_numeric(holdings_df[col], errors="coerce")
hold_instrument = (
holdings_df.groupby(DETAIL_CONTEXT_COLUMNS + ["date", "instrument"], dropna=False)
.agg(
target_value_eod=("market_value", "sum"),
target_weight_eod=("weight", "sum"),
shares_held_eod=("shares_held", "sum"),
)
.reset_index()
)
hold_instrument = hold_instrument.sort_values(
DETAIL_CONTEXT_COLUMNS + ["date", "target_value_eod", "instrument"],
ascending=[True] * (len(DETAIL_CONTEXT_COLUMNS) + 1) + [False, True],
na_position="last",
)
hold_instrument["target_rank_eod"] = (
hold_instrument.groupby(DETAIL_CONTEXT_COLUMNS + ["date"], dropna=False).cumcount() + 1
)
hold_summary = (
hold_instrument.groupby(DETAIL_CONTEXT_COLUMNS + ["date"], dropna=False)
.agg(
target_count_eod=("instrument", "nunique"),
target_total_value_eod=("target_value_eod", "sum"),
target_list_eod=("instrument", lambda s: json.dumps(list(map(str, s.tolist())), ensure_ascii=False)),
)
.reset_index()
)
else:
hold_instrument = pd.DataFrame(columns=DETAIL_CONTEXT_COLUMNS + ["date", "instrument", "target_value_eod", "target_weight_eod", "shares_held_eod", "target_rank_eod"])
hold_summary = pd.DataFrame(columns=DETAIL_CONTEXT_COLUMNS + ["date", "target_count_eod", "target_total_value_eod", "target_list_eod"])
trade_df = _rows_to_frame(trade_log_rows, TRADE_LOG_COLUMNS)
if not trade_df.empty:
trade_df = _ensure_detail_context_columns(trade_df)
trade_df["date"] = pd.to_datetime(trade_df["date"], errors="coerce").dt.strftime("%Y-%m-%d")
trade_df = trade_df.merge(rebalance_summary[DETAIL_CONTEXT_COLUMNS + ["date"]], on=DETAIL_CONTEXT_COLUMNS + ["date"], how="inner")
for col in [
"shares",
"filled_shares",
"requested_shares",
"unfilled_shares",
"price",
"order_value",
"filled_value",
"gross_notional",
"current_shares",
"target_shares",
"fill_ratio",
]:
if col in trade_df.columns:
trade_df[col] = pd.to_numeric(trade_df[col], errors="coerce")
trade_df["requested_shares_norm"] = pd.to_numeric(
trade_df.get("requested_shares", trade_df.get("filled_shares", trade_df.get("shares", 0.0))),
errors="coerce",
).fillna(pd.to_numeric(trade_df.get("filled_shares", trade_df.get("shares", 0.0)), errors="coerce"))
trade_df["filled_shares_norm"] = pd.to_numeric(
trade_df.get("filled_shares", trade_df.get("shares", 0.0)),
errors="coerce",
).fillna(pd.to_numeric(trade_df.get("shares", 0.0), errors="coerce"))
trade_df["unfilled_shares_norm"] = pd.to_numeric(
trade_df.get("unfilled_shares"),
errors="coerce",
).fillna((trade_df["requested_shares_norm"] - trade_df["filled_shares_norm"]).clip(lower=0.0))
order_value = pd.to_numeric(trade_df.get("order_value"), errors="coerce")
gross_notional = pd.to_numeric(trade_df.get("gross_notional", trade_df.get("filled_value")), errors="coerce")
price = pd.to_numeric(trade_df.get("price"), errors="coerce")
trade_df["requested_notional_norm"] = order_value.fillna(trade_df["requested_shares_norm"] * price).fillna(gross_notional)
trade_df["filled_notional_norm"] = pd.to_numeric(trade_df.get("filled_value"), errors="coerce").fillna(gross_notional).fillna(trade_df["filled_shares_norm"] * price)
action_series = trade_df["action"].astype(str) if "action" in trade_df.columns else pd.Series("", index=trade_df.index)
trade_df["buy_requested_shares"] = trade_df["requested_shares_norm"].where(action_series == "buy", 0.0)
trade_df["buy_filled_shares"] = trade_df["filled_shares_norm"].where(action_series == "buy", 0.0)
trade_df["sell_requested_shares"] = trade_df["requested_shares_norm"].where(action_series == "sell", 0.0)
trade_df["sell_filled_shares"] = trade_df["filled_shares_norm"].where(action_series == "sell", 0.0)
trade_df["clip_reason_text"] = trade_df.get("clip_reason", "").fillna("").astype(str)
def _first_valid(values: pd.Series) -> Any:
cleaned = values.dropna()
return cleaned.iloc[0] if not cleaned.empty else None
trade_instrument = (
trade_df.groupby(DETAIL_CONTEXT_COLUMNS + ["date", "instrument"], dropna=False)
.agg(
trade_actions=("action", lambda s: "|".join(dict.fromkeys(str(value) for value in s if str(value)))),
current_shares_ref=("current_shares", _first_valid),
target_shares_ref=("target_shares", _first_valid),
requested_shares_total=("requested_shares_norm", "sum"),
filled_shares_total=("filled_shares_norm", "sum"),
unfilled_shares_total=("unfilled_shares_norm", "sum"),
requested_notional_total=("requested_notional_norm", "sum"),
filled_notional_total=("filled_notional_norm", "sum"),
buy_requested_shares=("buy_requested_shares", "sum"),
buy_filled_shares=("buy_filled_shares", "sum"),
sell_requested_shares=("sell_requested_shares", "sum"),
sell_filled_shares=("sell_filled_shares", "sum"),
fill_ratio_mean=("fill_ratio", "mean"),
clip_reasons=("clip_reason_text", lambda s: "|".join(sorted({value for value in s if value}))),
)
.reset_index()
)
else:
trade_instrument = pd.DataFrame(
columns=DETAIL_CONTEXT_COLUMNS
+ [
"date",
"instrument",
"trade_actions",
"current_shares_ref",
"target_shares_ref",
"requested_shares_total",
"filled_shares_total",
"unfilled_shares_total",
"requested_notional_total",
"filled_notional_total",
"buy_requested_shares",
"buy_filled_shares",
"sell_requested_shares",
"sell_filled_shares",
"fill_ratio_mean",
"clip_reasons",
]
)
plan_instrument = hold_instrument.merge(
trade_instrument,
on=DETAIL_CONTEXT_COLUMNS + ["date", "instrument"],
how="outer",
)
plan_df = rebalance_summary.merge(hold_summary, on=DETAIL_CONTEXT_COLUMNS + ["date"], how="left")
plan_df = plan_df.merge(plan_instrument, on=DETAIL_CONTEXT_COLUMNS + ["date"], how="left")
plan_df["target_count_eod"] = pd.to_numeric(plan_df.get("target_count_eod"), errors="coerce").fillna(0).astype(int)
plan_df["target_total_value_eod"] = pd.to_numeric(plan_df.get("target_total_value_eod"), errors="coerce")
plan_df["target_list_eod"] = plan_df.get("target_list_eod").fillna("[]")
numeric_fill_defaults = {
"target_rank_eod": None,
"target_value_eod": 0.0,
"target_weight_eod": 0.0,
"shares_held_eod": 0.0,
"current_shares_ref": None,
"target_shares_ref": None,
"requested_shares_total": 0.0,
"filled_shares_total": 0.0,
"unfilled_shares_total": 0.0,
"requested_notional_total": 0.0,
"filled_notional_total": 0.0,
"buy_requested_shares": 0.0,
"buy_filled_shares": 0.0,
"sell_requested_shares": 0.0,
"sell_filled_shares": 0.0,
"fill_ratio_mean": None,
}
for col, default in numeric_fill_defaults.items():
if col not in plan_df.columns:
plan_df[col] = default
elif default is not None:
plan_df[col] = pd.to_numeric(plan_df[col], errors="coerce").fillna(default)
else:
plan_df[col] = pd.to_numeric(plan_df[col], errors="coerce")
if "trade_actions" not in plan_df.columns:
plan_df["trade_actions"] = ""
else:
plan_df["trade_actions"] = plan_df["trade_actions"].fillna("")
if "clip_reasons" not in plan_df.columns:
plan_df["clip_reasons"] = ""
else:
plan_df["clip_reasons"] = plan_df["clip_reasons"].fillna("")
return _rows_to_frame(plan_df.to_dict("records"), REBALANCE_PLAN_COLUMNS)
def _candidate_from_jsonl_row(row: dict[str, Any], idx: int) -> dict[str, Any]:
expr = str(row.get("expr") or row.get("factor_expr") or row.get("expression") or "").strip()
source = str(row.get("source") or "input").strip() or "input"
seed_version_name = str(
row.get("seed_version_name")
or row.get("factor_name")
or row.get("name")
or ""
).strip()
seed_name_value = str(
row.get("seed_name")
or row.get("root_seed_name")
or row.get("parent_seed_name")
or row.get("name")
or f"seed_{idx}"
).strip()
raw_scope = str(row.get("candidate_scope") or "").strip()
if raw_scope:
candidate_scope = raw_scope
elif source in {"summary_seed", "seed_baseline"}:
candidate_scope = "seed_baseline"
elif seed_version_name and seed_version_name == seed_name_value:
candidate_scope = "seed_baseline"
elif seed_version_name and seed_version_name != seed_name_value:
candidate_scope = "trial"
elif not row.get("seed_name"):
candidate_scope = "seed_baseline"
else:
candidate_scope = "trial"
root_seed_name = seed_name_value
factor_name = str(seed_version_name or root_seed_name or f"factor_{idx}")
seed_expr = str(row.get("seed_expr") or (expr if candidate_scope == "seed_baseline" else "")).strip()
inferred_turn = _infer_turn_from_version_name(root_seed_name, factor_name)
candidate = {
"input_index": idx,
"source": source,
"candidate_scope": candidate_scope,
"seed_name": root_seed_name,
"seed_expr": seed_expr,
"factor_name": factor_name,
"factor_expr": expr,
"turn": _optional_int(
row.get("turn")
if row.get("turn") is not None
else (row.get("best_ir_turn") if row.get("best_ir_turn") is not None else inferred_turn)
),
"call_index": _optional_int(row.get("call_index") if row.get("call_index") is not None else row.get("best_ir_call_index")),
"proposal_rank": _optional_int(row.get("proposal_rank") if row.get("proposal_rank") is not None else row.get("best_ir_proposal_rank")),
}
original_key_map = {
"success": "original_success",
"backtest_engine": "original_backtest_engine",
"performance_return": "original_performance_return",
"benchmark_performance_return": "original_benchmark_performance_return",
"excess_compounded_return": "original_excess_compounded_return",
"ir": "original_ir",
"ic_mean": "original_ic_mean",
"icir": "original_icir",
"rank_ic_mean": "original_rank_ic_mean",
"rank_icir": "original_rank_icir",
"aer": "original_aer",
"annualized_return": "original_annualized_return",
"annualized_volatility": "original_annualized_volatility",
"sharpe": "original_sharpe",
"winrate": "original_winrate",
"mdd": "original_mdd",
"excess_mdd": "original_excess_mdd",
"portfolio_nav_mdd": "original_portfolio_nav_mdd",
"total_return": "original_total_return",
}
for src_key, dst_key in original_key_map.items():
if src_key in row:
candidate[dst_key] = _clean_scalar(row.get(src_key))
for key, value in row.items():
if str(key).startswith("original_"):
candidate[str(key)] = _clean_scalar(value)
return candidate
def _empty_baseline(seed_name: str, seed_expr: str, backtest_engine: str) -> dict[str, Any]:
payload = _metric_payload({"success": False, "backtest_engine": backtest_engine})
payload.update(
{
"source": "missing_baseline",
"candidate_scope": "seed_baseline",
"seed_name": seed_name,
"seed_expr": seed_expr,
"factor_name": seed_name,
"factor_expr": seed_expr,
"turn": None,
"call_index": None,
"proposal_rank": None,
}
)
return payload
def _sort_key(row: dict[str, Any]) -> tuple[int, int, int, int]:
return (
int(row.get("input_index") or 10**9),
int(row.get("proposal_rank") or 10**9),
int(row.get("turn") or 10**9),
int(row.get("call_index") or 10**9),
)
def _evaluate_candidate_task(
*,
candidate: dict[str, Any],
period: str,
label_forward_days: int,
data_path: str | None,
backtest_engine: str,
top_k: int,
n_drop: int,
trade_guard_config: dict[str, Any] | None,
rebalance_mode: str,
custom_weight_mode: str,
redistribute_unfilled_cash: bool,
position_size: float,
max_pos_each_stock: float,
lot_size: int,
max_daily_volume_participation: float,
max_daily_amount_participation: float,
enforce_cash_limit: bool,
rebalance_freq: int,
cost_buy: float,
cost_sell: float,
score_transform: str,
score_clip: float,
universe_filter: str,
universe_top_n: int,
universe_lookback_days: int,
start_date: str | None,
end_date: str | None,
capture_details: bool,
) -> dict[str, Any]:
_ensure_backtest_imports()
configure_periods(PERIOD_CONFIGS)
try:
result = execute_expression(
str(candidate.get("factor_expr", "")),
data_path=data_path,
period=period,
start_date=start_date,
end_date=end_date,
label_forward_days=label_forward_days,
backtest_engine=backtest_engine,
top_k=top_k,
n_drop=n_drop,
position_size=position_size,
max_pos_each_stock=max_pos_each_stock,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
rebalance_freq=rebalance_freq,
cost_buy=cost_buy,
cost_sell=cost_sell,
capture_details=capture_details,
trade_guard_config=trade_guard_config,
rebalance_mode=rebalance_mode,
custom_weight_mode=custom_weight_mode,
redistribute_unfilled_cash=redistribute_unfilled_cash,
enforce_cash_limit=enforce_cash_limit,
score_transform=score_transform,
score_clip=score_clip,
universe_filter=universe_filter,
universe_top_n=universe_top_n,
universe_lookback_days=universe_lookback_days,
)
payload = _metric_payload(result)
except Exception as exc:
payload = _metric_payload(
{
"success": False,
"backtest_engine": backtest_engine,
"top_k": int(top_k),
"n_drop": int(n_drop),
"position_size": float(position_size),
"max_pos_each_stock": float(max_pos_each_stock),
"lot_size": int(lot_size),
"max_daily_volume_participation": float(max_daily_volume_participation),
"max_daily_amount_participation": float(max_daily_amount_participation),
"rebalance_freq": int(rebalance_freq),
"cost_buy": float(cost_buy),
"cost_sell": float(cost_sell),
"custom_weight_mode": custom_weight_mode,
"redistribute_unfilled_cash": bool(redistribute_unfilled_cash),
"enforce_cash_limit": bool(enforce_cash_limit),
"score_transform": score_transform,
"score_clip": float(score_clip),
"universe_filter": universe_filter,
"universe_top_n": int(universe_top_n),
"universe_lookback_days": int(universe_lookback_days),
"label_forward_days": label_forward_days,
"error": f"backtest_error: {exc}",
}
)
payload = _enrich_result_diagnostics(payload)
payload.update(candidate)
return payload
def _flatten_yearly_metrics(
*,
seed_name: str,
candidate_scope: str,
factor_name: str,
factor_expr: str,
success: bool,
turn: int | None,
call_index: int | None,
proposal_rank: int | None,
yearly_metrics: dict[str, Any],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for year_key in sorted(yearly_metrics.keys(), key=lambda x: int(x)):
metrics = yearly_metrics[year_key]
year = int(year_key)
rows.append(
{
"seed_name": seed_name,
"candidate_scope": candidate_scope,
"factor_name": factor_name,
"factor_expr": factor_expr,
"success": bool(success),
"turn": turn,
"call_index": call_index,
"proposal_rank": proposal_rank,
"year": year,
"market_regime": metrics.get("market_regime", _market_regime(year)),
"performance_return": float(metrics.get("performance_return", 0.0) or 0.0),
"benchmark_performance_return": float(metrics.get("benchmark_performance_return", 0.0) or 0.0),
"excess_compounded_return": float(metrics.get("excess_compounded_return", 0.0) or 0.0),
"ir": float(metrics.get("ir", 0.0) or 0.0),
"ic_mean": float(metrics.get("ic_mean", 0.0) or 0.0),
"icir": float(metrics.get("icir", 0.0) or 0.0),
"rank_ic_mean": float(metrics.get("rank_ic_mean", 0.0) or 0.0),
"rank_icir": float(metrics.get("rank_icir", 0.0) or 0.0),
"aer": float(metrics.get("annualized_return", 0.0) or 0.0),
"annualized_return": float(metrics.get("annualized_return", 0.0) or 0.0),
"sharpe": float(metrics.get("sharpe", 0.0) or 0.0),
"winrate": float(metrics.get("winrate", 0.0) or 0.0),
"mdd": float(metrics.get("mdd", 0.0) or 0.0),
"excess_mdd": float(metrics.get("excess_mdd", metrics.get("mdd", 0.0)) or 0.0),
"portfolio_nav_mdd": float(metrics.get("portfolio_nav_mdd", 0.0) or 0.0),
"drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0),
"drawdown_duration_mean": float(metrics.get("drawdown_duration_mean", 0.0) or 0.0),
"drawdown_duration_median": float(metrics.get("drawdown_duration_median", 0.0) or 0.0),
"annualized_volatility": float(metrics.get("annualized_volatility", 0.0) or 0.0),
"n_days": int(metrics.get("n_days", 0) or 0),
"n_ic_days": int(metrics.get("n_ic_days", 0) or 0),
"avg_holdings_count": float(metrics.get("avg_holdings_count", 0.0) or 0.0),
"max_holdings_count": int(metrics.get("max_holdings_count", 0) or 0),
"buy_trades": int(metrics.get("buy_trades", 0) or 0),
"sell_trades": int(metrics.get("sell_trades", 0) or 0),
"shares_bought": float(metrics.get("shares_bought", 0.0) or 0.0),
"shares_sold": float(metrics.get("shares_sold", 0.0) or 0.0),
"buy_gross_notional": float(metrics.get("buy_gross_notional", 0.0) or 0.0),
"sell_gross_notional": float(metrics.get("sell_gross_notional", 0.0) or 0.0),
"buy_cash_outflow": float(metrics.get("buy_cash_outflow", 0.0) or 0.0),
"sell_net_proceeds": float(metrics.get("sell_net_proceeds", 0.0) or 0.0),
"buy_transaction_cost": float(metrics.get("buy_transaction_cost", 0.0) or 0.0),
"sell_transaction_cost": float(metrics.get("sell_transaction_cost", 0.0) or 0.0),
"transaction_cost": float(metrics.get("transaction_cost", 0.0) or 0.0),
"gross_turnover": float(metrics.get("gross_turnover", 0.0) or 0.0),
}
)
return rows
def _build_trial_rows(results: list[dict[str, Any]], baselines: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for result in sorted(results, key=_sort_key):
if result.get("candidate_scope") == "seed_baseline":
continue
seed_name = str(result.get("seed_name") or result.get("factor_name") or "")
baseline = baselines.get(seed_name) or _empty_baseline(seed_name, str(result.get("seed_expr") or ""), str(result.get("backtest_engine") or ""))
row = _strip_detail_keys(result)
row["seed_name"] = seed_name
row["seed_expr"] = baseline.get("factor_expr") or result.get("seed_expr") or ""
row["seed_ir"] = baseline.get("ir")
row["seed_ic"] = baseline.get("ic_mean")
row["seed_aer"] = baseline.get("aer")
row["seed_sharpe"] = baseline.get("sharpe")
row["seed_winrate"] = baseline.get("winrate")
row["seed_mdd"] = baseline.get("mdd")
row["seed_excess_mdd"] = baseline.get("excess_mdd")
row["seed_portfolio_nav_mdd"] = baseline.get("portfolio_nav_mdd")
row["seed_performance_return"] = baseline.get("performance_return")
row["seed_benchmark_performance_return"] = baseline.get("benchmark_performance_return")
row["seed_excess_compounded_return"] = baseline.get("excess_compounded_return")
rows.append(row)
return rows
def _build_baseline_rows(results: list[dict[str, Any]]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for result in sorted(results, key=_sort_key):
if result.get("candidate_scope") != "seed_baseline":
continue
row = _strip_detail_keys(result)
row["seed_error"] = result.get("error")
row["qlib_warnings_json"] = json.dumps(result.get("qlib_warnings") or [], ensure_ascii=False)
rows.append(row)
return rows
def _build_summary_rows(results: list[dict[str, Any]], baselines: dict[str, dict[str, Any]], backtest_engine: str) -> list[dict[str, Any]]:
by_seed: dict[str, list[dict[str, Any]]] = {}
for result in results:
by_seed.setdefault(str(result.get("seed_name") or result.get("factor_name") or ""), []).append(result)
summary_rows: list[dict[str, Any]] = []
for seed_name in sorted(by_seed.keys()):
seed_results = sorted(by_seed[seed_name], key=_sort_key)
baseline = baselines.get(seed_name)
if baseline is None:
seed_expr = str(seed_results[0].get("seed_expr") or seed_results[0].get("factor_expr") or "")
baseline = _empty_baseline(seed_name, seed_expr, backtest_engine)
trials = [row for row in seed_results if row.get("candidate_scope") != "seed_baseline"]
valid = [row for row in trials if row.get("success")]
best = max(valid, key=lambda row: float(row.get("ir", float("-inf")))) if valid else None
best_ic = max(valid, key=lambda row: float(row.get("ic_mean", float("-inf")))) if valid else None
best_aer = max(valid, key=lambda row: float(row.get("aer", float("-inf")))) if valid else None
best_sharpe = max(valid, key=lambda row: float(row.get("sharpe", float("-inf")))) if valid else None
best_winrate = max(valid, key=lambda row: float(row.get("winrate", float("-inf")))) if valid else None
best_mdd = max(valid, key=lambda row: float(row.get("mdd", float("-inf")))) if valid else None
turn_values = sorted({int(row["turn"]) for row in trials if row.get("turn") is not None})
turn_records: list[dict[str, Any]] = []
for turn in turn_values:
turn_rows = [row for row in trials if row.get("turn") == turn]
turn_valid = [row for row in turn_rows if row.get("success")]
turn_best = max(turn_valid, key=lambda row: float(row.get("ir", float("-inf")))) if turn_valid else None
turn_records.append(
{
"turn": turn,
"n_calls": len(turn_rows),
"n_valid": len(turn_valid),
"best_ir": turn_best.get("ir") if turn_best else None,
"best_expr": turn_best.get("factor_expr") if turn_best else None,
}
)
ir_path = [record["best_ir"] for record in turn_records]
expr_path = [record["best_expr"] for record in turn_records]
calls_per_turn = [record["n_calls"] for record in turn_records]
valid_per_turn = [record["n_valid"] for record in turn_records]
seed_ir = float(baseline.get("ir", 0.0) or 0.0)
paper_baseline = max(0.0, seed_ir)
summary_rows.append(
{
"seed_name": seed_name,
"seed_expr": baseline.get("factor_expr") or baseline.get("seed_expr") or "",
"seed_success": bool(baseline.get("success", False)),
"seed_error": baseline.get("error"),
"seed_qlib_warnings": json.dumps(baseline.get("qlib_warnings") or [], ensure_ascii=False),
"seed_rebalance_mode": baseline.get("rebalance_mode"),
"seed_performance_return": float(baseline.get("performance_return", 0.0) or 0.0),
"seed_benchmark_performance_return": float(baseline.get("benchmark_performance_return", 0.0) or 0.0),
"seed_excess_compounded_return": float(baseline.get("excess_compounded_return", 0.0) or 0.0),
"seed_ir": seed_ir,
"seed_ic": float(baseline.get("ic_mean", 0.0) or 0.0),
"seed_aer": float(baseline.get("aer", 0.0) or 0.0),
"seed_sharpe": float(baseline.get("sharpe", 0.0) or 0.0),
"seed_winrate": float(baseline.get("winrate", 0.0) or 0.0),
"seed_mdd": float(baseline.get("mdd", 0.0) or 0.0),
"seed_excess_mdd": float(baseline.get("excess_mdd", baseline.get("mdd", 0.0)) or 0.0),
"seed_portfolio_nav_mdd": float(baseline.get("portfolio_nav_mdd", 0.0) or 0.0),
"seed_drawdown_duration_max": int(baseline.get("drawdown_duration_max", 0) or 0),
"seed_drawdown_duration_mean": float(baseline.get("drawdown_duration_mean", 0.0) or 0.0),
"seed_transaction_cost": float(baseline.get("transaction_cost", 0.0) or 0.0),
"seed_gross_turnover": float(baseline.get("gross_turnover", 0.0) or 0.0),
"seed_turnover_ratio": float(baseline.get("turnover_ratio", 0.0) or 0.0),
"seed_return_per_turnover": baseline.get("return_per_turnover"),
"seed_cash_weight_mean": baseline.get("cash_weight_mean"),
"seed_cash_weight_median": baseline.get("cash_weight_median"),
"seed_cash_weight_p95": baseline.get("cash_weight_p95"),
"seed_round2_redistributed_notional": baseline.get("round2_redistributed_notional"),
"seed_round2_buy_trade_count": baseline.get("round2_buy_trade_count"),
"seed_round2_rebalance_days": baseline.get("round2_rebalance_days"),
"seed_rebalance_holdings_mean": baseline.get("rebalance_holdings_mean"),
"seed_rebalance_holdings_median": baseline.get("rebalance_holdings_median"),
"seed_rebalance_holdings_min": baseline.get("rebalance_holdings_min"),
"seed_rebalance_holdings_max": baseline.get("rebalance_holdings_max"),
"seed_rebalance_window_count": baseline.get("rebalance_window_count"),
"seed_rebalance_window_days_mean": baseline.get("rebalance_window_days_mean"),
"seed_rebalance_window_days_median": baseline.get("rebalance_window_days_median"),
"seed_rebalance_window_days_max": baseline.get("rebalance_window_days_max"),
"seed_rebalance_window_return_reconstruction_error": baseline.get("rebalance_window_return_reconstruction_error"),
"seed_performance_return_reconstruction_error": baseline.get("performance_return_reconstruction_error"),
"seed_transaction_cost_reconstruction_error": baseline.get("transaction_cost_reconstruction_error"),
"seed_gross_turnover_reconstruction_error": baseline.get("gross_turnover_reconstruction_error"),
"n_calls": len(trials),
"n_valid": len(valid),
"best_performance_return": best.get("performance_return") if best else None,
"best_benchmark_performance_return": best.get("benchmark_performance_return") if best else None,
"best_excess_compounded_return": best.get("excess_compounded_return") if best else None,
"best_ir": best.get("ir") if best else None,
"best_ic": best_ic.get("ic_mean") if best_ic else None,
"best_aer": best_aer.get("aer") if best_aer else None,
"best_sharpe": best_sharpe.get("sharpe") if best_sharpe else None,
"best_winrate": best_winrate.get("winrate") if best_winrate else None,
"best_mdd": best_mdd.get("mdd") if best_mdd else None,
"best_excess_mdd": best.get("excess_mdd") if best else None,
"best_portfolio_nav_mdd": best.get("portfolio_nav_mdd") if best else None,
"best_drawdown_duration_max": best.get("drawdown_duration_max") if best else None,
"best_drawdown_duration_mean": best.get("drawdown_duration_mean") if best else None,
"best_transaction_cost": best.get("transaction_cost") if best else None,
"best_gross_turnover": best.get("gross_turnover") if best else None,
"best_turnover_ratio": best.get("turnover_ratio") if best else None,
"best_return_per_turnover": best.get("return_per_turnover") if best else None,
"best_cash_weight_mean": best.get("cash_weight_mean") if best else None,
"best_cash_weight_median": best.get("cash_weight_median") if best else None,
"best_cash_weight_p95": best.get("cash_weight_p95") if best else None,
"best_round2_redistributed_notional": best.get("round2_redistributed_notional") if best else None,
"best_round2_buy_trade_count": best.get("round2_buy_trade_count") if best else None,
"best_round2_rebalance_days": best.get("round2_rebalance_days") if best else None,
"best_rebalance_holdings_mean": best.get("rebalance_holdings_mean") if best else None,
"best_rebalance_holdings_median": best.get("rebalance_holdings_median") if best else None,
"best_rebalance_holdings_min": best.get("rebalance_holdings_min") if best else None,
"best_rebalance_holdings_max": best.get("rebalance_holdings_max") if best else None,
"best_rebalance_window_count": best.get("rebalance_window_count") if best else None,
"best_rebalance_window_days_mean": best.get("rebalance_window_days_mean") if best else None,
"best_rebalance_window_days_median": best.get("rebalance_window_days_median") if best else None,
"best_rebalance_window_days_max": best.get("rebalance_window_days_max") if best else None,
"best_rebalance_window_return_reconstruction_error": best.get("rebalance_window_return_reconstruction_error") if best else None,
"best_performance_return_reconstruction_error": best.get("performance_return_reconstruction_error") if best else None,
"best_transaction_cost_reconstruction_error": best.get("transaction_cost_reconstruction_error") if best else None,
"best_gross_turnover_reconstruction_error": best.get("gross_turnover_reconstruction_error") if best else None,
"best_factor_name": best.get("factor_name") if best else None,
"best_factor_expr": best.get("factor_expr") if best else None,
"best_ir_turn": best.get("turn") if best else None,
"best_ir_call_index": best.get("call_index") if best else None,
"best_ir_proposal_rank": best.get("proposal_rank") if best else None,
"stop_reason": "backtest_only_replay",
"turns_executed": len(turn_records),
"ir_path": json.dumps(ir_path, ensure_ascii=False),
"expr_path": json.dumps(expr_path, ensure_ascii=False),
"calls_per_turn": json.dumps(calls_per_turn, ensure_ascii=False),
"valid_per_turn": json.dumps(valid_per_turn, ensure_ascii=False),
"beat_seed_paper_ir": bool(best and float(best.get("ir", 0.0) or 0.0) > paper_baseline),
"ir_improvement_over_paper_seed": (float(best.get("ir", 0.0) or 0.0) - paper_baseline) if best else None,
"ic_improvement_over_seed": (float(best_ic.get("ic_mean", 0.0) or 0.0) - float(baseline.get("ic_mean", 0.0) or 0.0)) if best_ic else None,
"aer_improvement_over_seed": (float(best_aer.get("aer", 0.0) or 0.0) - float(baseline.get("aer", 0.0) or 0.0)) if best_aer else None,
"sharpe_improvement_over_seed": (float(best_sharpe.get("sharpe", 0.0) or 0.0) - float(baseline.get("sharpe", 0.0) or 0.0)) if best_sharpe else None,
"winrate_improvement_over_seed": (float(best_winrate.get("winrate", 0.0) or 0.0) - float(baseline.get("winrate", 0.0) or 0.0)) if best_winrate else None,
"mdd_improvement_over_seed": (float(best_mdd.get("mdd", 0.0) or 0.0) - float(baseline.get("mdd", 0.0) or 0.0)) if best_mdd else None,
"excess_compounded_return_improvement_over_seed": (float(best.get("excess_compounded_return", 0.0) or 0.0) - float(baseline.get("excess_compounded_return", 0.0) or 0.0)) if best else None,
"portfolio_nav_mdd_improvement_over_seed": (float(best.get("portfolio_nav_mdd", 0.0) or 0.0) - float(baseline.get("portfolio_nav_mdd", 0.0) or 0.0)) if best else None,
"original_seed_ir": baseline.get("original_ir"),
"original_best_ir": best.get("original_ir") if best else None,
"trade_guard_ir_delta_seed": seed_ir - float(baseline.get("original_ir", 0.0) or 0.0) if baseline.get("original_ir") is not None else None,
"trade_guard_ir_delta_best": (float(best.get("ir", 0.0) or 0.0) - float(best.get("original_ir", 0.0) or 0.0)) if best and best.get("original_ir") is not None else None,
}
)
return summary_rows
def _build_summary_yearly_rows(
*,
seed_name: str,
seed_expr: str,
seed_metrics: dict[str, Any],
seed_trials: list[dict[str, Any]],
) -> list[dict[str, Any]]:
seed_yearly = seed_metrics.get("yearly_metrics") or {}
year_keys: set[str] = set(seed_yearly.keys())
for trial in seed_trials:
year_keys.update((trial.get("yearly_metrics") or {}).keys())
rows: list[dict[str, Any]] = []
for year_key in sorted(year_keys, key=lambda x: int(x)):
year = int(year_key)
regime = _market_regime(year)
seed_year = seed_yearly.get(year_key) or {}
year_trials = [
(trial, (trial.get("yearly_metrics") or {}).get(year_key))
for trial in seed_trials
if (trial.get("yearly_metrics") or {}).get(year_key)
]
valid_year_trials = [(trial, metrics) for trial, metrics in year_trials if trial.get("success")]
best_pair = max(valid_year_trials, key=lambda item: float(item[1].get("ir", float("-inf")))) if valid_year_trials else None
baseline_ir = float(seed_year.get("ir", 0.0) or 0.0)
beat_flags = [
bool(trial.get("success")) and float(metrics.get("ir", float("-inf"))) > baseline_ir
for trial, metrics in year_trials
]
pass_at_3 = any(
float(metrics.get("ir", float("-inf"))) > baseline_ir
for trial, metrics in year_trials
if int(trial.get("proposal_rank", 10**9) or 10**9) <= 3
)
pass_at_5 = any(
float(metrics.get("ir", float("-inf"))) > baseline_ir
for trial, metrics in year_trials
if int(trial.get("proposal_rank", 10**9) or 10**9) <= 5
)
valid_rate = float(len(valid_year_trials) / len(year_trials)) if year_trials else 0.0
beat_seed_rate = float(sum(beat_flags) / len(year_trials)) if year_trials else 0.0
row = {
"seed_name": seed_name,
"seed_expr": seed_expr,
"year": year,
"market_regime": regime,
"seed_performance_return": float(seed_year.get("performance_return", 0.0) or 0.0),
"seed_benchmark_performance_return": float(seed_year.get("benchmark_performance_return", 0.0) or 0.0),
"seed_excess_compounded_return": float(seed_year.get("excess_compounded_return", 0.0) or 0.0),
"seed_ir": baseline_ir,
"seed_ic": float(seed_year.get("ic_mean", 0.0) or 0.0),
"seed_icir": float(seed_year.get("icir", 0.0) or 0.0),
"seed_aer": float(seed_year.get("annualized_return", 0.0) or 0.0),
"seed_sharpe": float(seed_year.get("sharpe", 0.0) or 0.0),
"seed_winrate": float(seed_year.get("winrate", 0.0) or 0.0),
"seed_mdd": float(seed_year.get("mdd", 0.0) or 0.0),
"seed_excess_mdd": float(seed_year.get("excess_mdd", seed_year.get("mdd", 0.0)) or 0.0),
"seed_portfolio_nav_mdd": float(seed_year.get("portfolio_nav_mdd", 0.0) or 0.0),
"seed_drawdown_duration_max": int(seed_year.get("drawdown_duration_max", 0) or 0),
"seed_drawdown_duration_mean": float(seed_year.get("drawdown_duration_mean", 0.0) or 0.0),
"seed_avg_holdings_count": float(seed_year.get("avg_holdings_count", 0.0) or 0.0),
"seed_max_holdings_count": int(seed_year.get("max_holdings_count", 0) or 0),
"n_calls": len(year_trials),
"n_valid": len(valid_year_trials),
"n_wins": int(sum(beat_flags)),
# Paper definition: VR is Valid Ratio, not beat-seed rate.
"vr": valid_rate,
"valid_rate": valid_rate,
"beat_seed_rate": beat_seed_rate,
"pass_at_3": bool(pass_at_3),
"pass_at_5": bool(pass_at_5),
}
if best_pair:
trial, metrics = best_pair
row.update(
{
"best_factor_name": trial.get("factor_name"),
"best_factor_expr": trial.get("factor_expr"),
"best_turn": trial.get("turn"),
"best_call_index": trial.get("call_index"),
"best_proposal_rank": trial.get("proposal_rank"),
"best_performance_return": float(metrics.get("performance_return", 0.0) or 0.0),
"best_benchmark_performance_return": float(metrics.get("benchmark_performance_return", 0.0) or 0.0),
"best_excess_compounded_return": float(metrics.get("excess_compounded_return", 0.0) or 0.0),
"best_ir": float(metrics.get("ir", 0.0) or 0.0),
"best_ic": float(metrics.get("ic_mean", 0.0) or 0.0),
"best_icir": float(metrics.get("icir", 0.0) or 0.0),
"best_aer": float(metrics.get("annualized_return", 0.0) or 0.0),
"best_sharpe": float(metrics.get("sharpe", 0.0) or 0.0),
"best_winrate": float(metrics.get("winrate", 0.0) or 0.0),
"best_mdd": float(metrics.get("mdd", 0.0) or 0.0),
"best_excess_mdd": float(metrics.get("excess_mdd", metrics.get("mdd", 0.0)) or 0.0),
"best_portfolio_nav_mdd": float(metrics.get("portfolio_nav_mdd", 0.0) or 0.0),
"best_drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0),
"best_drawdown_duration_mean": float(metrics.get("drawdown_duration_mean", 0.0) or 0.0),
"best_avg_holdings_count": float(metrics.get("avg_holdings_count", 0.0) or 0.0),
"best_max_holdings_count": int(metrics.get("max_holdings_count", 0) or 0),
}
)
else:
row.update(
{
"best_factor_name": None,
"best_factor_expr": None,
"best_turn": None,
"best_call_index": None,
"best_proposal_rank": None,
"best_performance_return": None,
"best_benchmark_performance_return": None,
"best_excess_compounded_return": None,
"best_ir": None,
"best_ic": None,
"best_icir": None,
"best_aer": None,
"best_sharpe": None,
"best_winrate": None,
"best_mdd": None,
"best_excess_mdd": None,
"best_portfolio_nav_mdd": None,
"best_drawdown_duration_max": None,
"best_drawdown_duration_mean": None,
"best_avg_holdings_count": None,
"best_max_holdings_count": None,
}
)
rows.append(row)
return rows
def _build_alpha_cash_cost_ranking_frame(summary_rows: list[dict[str, Any]], prefix: str) -> pd.DataFrame:
if prefix not in {"seed", "best"}:
raise ValueError(f"Unsupported ranking prefix: {prefix}")
if not summary_rows:
return pd.DataFrame(columns=ALPHA_CASH_COST_RANKING_COLUMNS)
summary_df = pd.DataFrame(summary_rows).copy()
factor_name_col = "seed_name" if prefix == "seed" else "best_factor_name"
factor_expr_col = "seed_expr" if prefix == "seed" else "best_factor_expr"
scope_label = "seed_baseline" if prefix == "seed" else "best_ir_candidate"
frame = pd.DataFrame(
{
"candidate_scope": scope_label,
"seed_name": summary_df.get("seed_name"),
"factor_name": summary_df.get(factor_name_col),
"factor_expr": summary_df.get(factor_expr_col),
"ir": summary_df.get(f"{prefix}_ir"),
"performance_return": summary_df.get(f"{prefix}_performance_return"),
"benchmark_performance_return": summary_df.get(f"{prefix}_benchmark_performance_return"),
"excess_compounded_return": summary_df.get(f"{prefix}_excess_compounded_return"),
"portfolio_nav_mdd": summary_df.get(f"{prefix}_portfolio_nav_mdd"),
"turnover_ratio": summary_df.get(f"{prefix}_turnover_ratio"),
"transaction_cost": summary_df.get(f"{prefix}_transaction_cost"),
"gross_turnover": summary_df.get(f"{prefix}_gross_turnover"),
"return_per_turnover": summary_df.get(f"{prefix}_return_per_turnover"),
"cash_weight_mean": summary_df.get(f"{prefix}_cash_weight_mean"),
"cash_weight_median": summary_df.get(f"{prefix}_cash_weight_median"),
"cash_weight_p95": summary_df.get(f"{prefix}_cash_weight_p95"),
"round2_redistributed_notional": summary_df.get(f"{prefix}_round2_redistributed_notional"),
"round2_buy_trade_count": summary_df.get(f"{prefix}_round2_buy_trade_count"),
"round2_rebalance_days": summary_df.get(f"{prefix}_round2_rebalance_days"),
}
)
if prefix == "best":
frame = frame[
frame["factor_expr"].notna()
| frame["ir"].notna()
| frame["performance_return"].notna()
].copy()
if frame.empty:
return pd.DataFrame(columns=ALPHA_CASH_COST_RANKING_COLUMNS)
numeric_cols = [
"ir",
"performance_return",
"benchmark_performance_return",
"excess_compounded_return",
"portfolio_nav_mdd",
"turnover_ratio",
"transaction_cost",
"gross_turnover",
"return_per_turnover",
"cash_weight_mean",
"cash_weight_median",
"cash_weight_p95",
"round2_redistributed_notional",
"round2_buy_trade_count",
"round2_rebalance_days",
]
for col in numeric_cols:
frame[col] = pd.to_numeric(frame[col], errors="coerce")
frame["rank_cash_weight_mean_asc"] = frame["cash_weight_mean"].rank(method="min", ascending=True, na_option="bottom")
frame["rank_transaction_cost_asc"] = frame["transaction_cost"].rank(method="min", ascending=True, na_option="bottom")
frame["rank_return_per_turnover_desc"] = frame["return_per_turnover"].rank(method="min", ascending=False, na_option="bottom")
frame["cash_cost_efficiency_rank"] = frame[
["rank_cash_weight_mean_asc", "rank_transaction_cost_asc", "rank_return_per_turnover_desc"]
].mean(axis=1)
frame = frame.sort_values(
["cash_cost_efficiency_rank", "return_per_turnover", "ir", "seed_name"],
ascending=[True, False, False, True],
na_position="last",
).reset_index(drop=True)
return _rows_to_frame(frame.to_dict("records"), ALPHA_CASH_COST_RANKING_COLUMNS)
def _build_yearly_outputs(results: list[dict[str, Any]], baselines: dict[str, dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
by_seed: dict[str, list[dict[str, Any]]] = {}
for result in results:
by_seed.setdefault(str(result.get("seed_name") or result.get("factor_name") or ""), []).append(result)
summary_yearly_rows: list[dict[str, Any]] = []
trial_yearly_rows: list[dict[str, Any]] = []
for seed_name in sorted(by_seed.keys()):
seed_trials = [row for row in by_seed[seed_name] if row.get("candidate_scope") != "seed_baseline"]
baseline = baselines.get(seed_name)
if baseline is None:
baseline = _empty_baseline(seed_name, str(seed_trials[0].get("seed_expr") or "") if seed_trials else "", "")
for row in sorted(seed_trials, key=_sort_key):
trial_yearly_rows.extend(
_flatten_yearly_metrics(
seed_name=seed_name,
candidate_scope=str(row.get("candidate_scope") or "trial"),
factor_name=str(row.get("factor_name") or ""),
factor_expr=str(row.get("factor_expr") or ""),
success=bool(row.get("success")),
turn=row.get("turn"),
call_index=row.get("call_index"),
proposal_rank=row.get("proposal_rank"),
yearly_metrics=row.get("yearly_metrics") or {},
)
)
summary_yearly_rows.extend(
_build_summary_yearly_rows(
seed_name=seed_name,
seed_expr=str(baseline.get("factor_expr") or baseline.get("seed_expr") or ""),
seed_metrics=baseline,
seed_trials=seed_trials,
)
)
return summary_yearly_rows, trial_yearly_rows
def _flatten_stock_contrib(
*,
seed_name: str,
candidate_scope: str,
factor_name: str,
factor_expr: str,
turn: int | None,
call_index: int | None,
proposal_rank: int | None,
stock_contrib: list[dict[str, Any]],
stock_contrib_topk: int,
) -> list[dict[str, Any]]:
if not stock_contrib:
return []
contrib_df = pd.DataFrame(stock_contrib)
if contrib_df.empty:
return []
contrib_df = contrib_df.sort_values(["year", "abs_contribution_return", "instrument"], ascending=[True, False, True])
contrib_df["rank"] = contrib_df.groupby("year").cumcount() + 1
if stock_contrib_topk > 0:
contrib_df = contrib_df[contrib_df["rank"] <= stock_contrib_topk]
rows: list[dict[str, Any]] = []
for record in contrib_df.to_dict("records"):
record = dict(record)
record.update(
{
"seed_name": seed_name,
"candidate_scope": candidate_scope,
"factor_name": factor_name,
"factor_expr": factor_expr,
"turn": turn,
"call_index": call_index,
"proposal_rank": proposal_rank,
}
)
rows.append(record)
return rows
def _flatten_trade_log(
*,
seed_name: str,
candidate_scope: str,
factor_name: str,
factor_expr: str,
turn: int | None,
call_index: int | None,
proposal_rank: int | None,
trade_log: list[dict[str, Any]],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for record in trade_log or []:
row = dict(record)
row.update(
{
"seed_name": seed_name,
"candidate_scope": candidate_scope,
"factor_name": factor_name,
"factor_expr": factor_expr,
"turn": turn,
"call_index": call_index,
"proposal_rank": proposal_rank,
}
)
rows.append(row)
return rows
def _flatten_holding_log(
*,
seed_name: str,
candidate_scope: str,
factor_name: str,
factor_expr: str,
turn: int | None,
call_index: int | None,
proposal_rank: int | None,
holding_log: list[dict[str, Any]],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for record in holding_log or []:
row = dict(record)
row.update(
{
"seed_name": seed_name,
"candidate_scope": candidate_scope,
"factor_name": factor_name,
"factor_expr": factor_expr,
"turn": turn,
"call_index": call_index,
"proposal_rank": proposal_rank,
}
)
rows.append(row)
return rows
def _flatten_portfolio_log(
*,
seed_name: str,
candidate_scope: str,
factor_name: str,
factor_expr: str,
turn: int | None,
call_index: int | None,
proposal_rank: int | None,
portfolio_log: list[dict[str, Any]],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for record in portfolio_log or []:
row = dict(record)
row.update(
{
"seed_name": seed_name,
"candidate_scope": candidate_scope,
"factor_name": factor_name,
"factor_expr": factor_expr,
"turn": turn,
"call_index": call_index,
"proposal_rank": proposal_rank,
}
)
rows.append(row)
return rows
def _flatten_signal_selection_log(
*,
seed_name: str,
candidate_scope: str,
factor_name: str,
factor_expr: str,
turn: int | None,
call_index: int | None,
proposal_rank: int | None,
signal_selection_log: list[dict[str, Any]],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for record in signal_selection_log or []:
row = dict(record)
row.update(
{
"seed_name": seed_name,
"candidate_scope": candidate_scope,
"factor_name": factor_name,
"factor_expr": factor_expr,
"turn": turn,
"call_index": call_index,
"proposal_rank": proposal_rank,
}
)
rows.append(row)
return rows
def _compute_pass_metrics(summary_rows: list[dict[str, Any]], trial_rows: list[dict[str, Any]]) -> dict[str, float]:
if not summary_rows or not trial_rows:
return {
"vr_global": 0.0,
"vr_seed_mean": 0.0,
"valid_rate_global": 0.0,
"valid_rate_seed_mean": 0.0,
"beat_seed_rate_global": 0.0,
"beat_seed_rate_seed_mean": 0.0,
"pass_at_3": 0.0,
"pass_at_5": 0.0,
}
summary_df = pd.DataFrame(summary_rows)
trials_df = pd.DataFrame(trial_rows)
if summary_df.empty or trials_df.empty:
return {
"vr_global": 0.0,
"vr_seed_mean": 0.0,
"valid_rate_global": 0.0,
"valid_rate_seed_mean": 0.0,
"beat_seed_rate_global": 0.0,
"beat_seed_rate_seed_mean": 0.0,
"pass_at_3": 0.0,
"pass_at_5": 0.0,
}
trials_df = trials_df.copy()
trials_df["proposal_rank"] = pd.to_numeric(trials_df["proposal_rank"], errors="coerce").fillna(10**9)
trials_df["seed_ir"] = pd.to_numeric(trials_df["seed_ir"], errors="coerce").fillna(0.0)
trials_df["ir"] = pd.to_numeric(trials_df["ir"], errors="coerce").fillna(float("-inf"))
trials_df["success"] = trials_df["success"].astype(bool)
trials_df["beat_seed_raw_ir"] = trials_df["success"] & (trials_df["ir"] > trials_df["seed_ir"])
def pass_at_k(k: int) -> float:
passed = trials_df[trials_df["proposal_rank"] <= k].groupby("seed_name")["beat_seed_raw_ir"].any()
return float(passed.reindex(summary_df["seed_name"], fill_value=False).mean())
n_calls = pd.to_numeric(summary_df["n_calls"], errors="coerce").replace(0, pd.NA)
n_valid = pd.to_numeric(summary_df["n_valid"], errors="coerce")
valid_rate_global = float(trials_df["success"].mean())
valid_rate_seed_mean = float((n_valid / n_calls).fillna(0.0).mean())
beat_seed_rate_global = float(trials_df["beat_seed_raw_ir"].mean())
beat_seed_rate_seed_mean = float(
trials_df.groupby("seed_name")["beat_seed_raw_ir"]
.mean()
.reindex(summary_df["seed_name"], fill_value=0.0)
.mean()
)
return {
# Paper definition: VR is Valid Ratio, i.e. executable generated alphas.
"vr_global": valid_rate_global,
"vr_seed_mean": valid_rate_seed_mean,
"valid_rate_global": valid_rate_global,
"valid_rate_seed_mean": valid_rate_seed_mean,
"beat_seed_rate_global": beat_seed_rate_global,
"beat_seed_rate_seed_mean": beat_seed_rate_seed_mean,
"pass_at_3": pass_at_k(3),
"pass_at_5": pass_at_k(5),
}
def _series_mean(rows: list[dict[str, Any]], key: str) -> float | None:
values = pd.to_numeric(pd.Series([row.get(key) for row in rows]), errors="coerce").dropna()
return float(values.mean()) if not values.empty else None
def _build_aggregate(
*,
summary_rows: list[dict[str, Any]],
trial_rows: list[dict[str, Any]],
start_date: str | None,
end_date: str | None,
elapsed_sec: float,
args: argparse.Namespace,
) -> dict[str, Any]:
valid_summaries = [row for row in summary_rows if row.get("best_ir") is not None]
pass_metrics = _compute_pass_metrics(summary_rows, trial_rows)
return {
"n_seeds": len(summary_rows),
"n_valid_seed_runs": len([row for row in summary_rows if row.get("seed_success")]),
"beat_rate_ir": float(sum(1 for row in summary_rows if row.get("beat_seed_paper_ir")) / max(len(summary_rows), 1)),
"vr_global": pass_metrics["vr_global"],
"vr_seed_mean": pass_metrics["vr_seed_mean"],
"valid_rate_global": pass_metrics["valid_rate_global"],
"valid_rate_seed_mean": pass_metrics["valid_rate_seed_mean"],
"beat_seed_rate_global": pass_metrics["beat_seed_rate_global"],
"beat_seed_rate_seed_mean": pass_metrics["beat_seed_rate_seed_mean"],
"pass_at_3": pass_metrics["pass_at_3"],
"pass_at_5": pass_metrics["pass_at_5"],
"mean_seed_performance_return": _series_mean(summary_rows, "seed_performance_return"),
"mean_seed_benchmark_performance_return": _series_mean(summary_rows, "seed_benchmark_performance_return"),
"mean_seed_excess_compounded_return": _series_mean(summary_rows, "seed_excess_compounded_return"),
"mean_seed_ir": _series_mean(summary_rows, "seed_ir"),
"mean_seed_ic": _series_mean(summary_rows, "seed_ic"),
"mean_seed_aer": _series_mean(summary_rows, "seed_aer"),
"mean_seed_sharpe": _series_mean(summary_rows, "seed_sharpe"),
"mean_seed_winrate": _series_mean(summary_rows, "seed_winrate"),
"mean_seed_mdd": _series_mean(summary_rows, "seed_mdd"),
"mean_seed_excess_mdd": _series_mean(summary_rows, "seed_excess_mdd"),
"mean_seed_portfolio_nav_mdd": _series_mean(summary_rows, "seed_portfolio_nav_mdd"),
"mean_seed_drawdown_duration_max": _series_mean(summary_rows, "seed_drawdown_duration_max"),
"mean_seed_drawdown_duration_mean": _series_mean(summary_rows, "seed_drawdown_duration_mean"),
"mean_seed_transaction_cost": _series_mean(summary_rows, "seed_transaction_cost"),
"mean_seed_gross_turnover": _series_mean(summary_rows, "seed_gross_turnover"),
"mean_seed_turnover_ratio": _series_mean(summary_rows, "seed_turnover_ratio"),
"mean_seed_return_per_turnover": _series_mean(summary_rows, "seed_return_per_turnover"),
"mean_seed_cash_weight_mean": _series_mean(summary_rows, "seed_cash_weight_mean"),
"mean_seed_cash_weight_median": _series_mean(summary_rows, "seed_cash_weight_median"),
"mean_seed_cash_weight_p95": _series_mean(summary_rows, "seed_cash_weight_p95"),
"mean_seed_round2_redistributed_notional": _series_mean(summary_rows, "seed_round2_redistributed_notional"),
"mean_seed_round2_buy_trade_count": _series_mean(summary_rows, "seed_round2_buy_trade_count"),
"mean_seed_round2_rebalance_days": _series_mean(summary_rows, "seed_round2_rebalance_days"),
"mean_seed_rebalance_holdings_mean": _series_mean(summary_rows, "seed_rebalance_holdings_mean"),
"mean_seed_rebalance_holdings_median": _series_mean(summary_rows, "seed_rebalance_holdings_median"),
"mean_seed_rebalance_window_count": _series_mean(summary_rows, "seed_rebalance_window_count"),
"mean_seed_rebalance_window_days_mean": _series_mean(summary_rows, "seed_rebalance_window_days_mean"),
"mean_seed_rebalance_window_days_median": _series_mean(summary_rows, "seed_rebalance_window_days_median"),
"mean_seed_rebalance_window_days_max": _series_mean(summary_rows, "seed_rebalance_window_days_max"),
"mean_seed_rebalance_window_return_reconstruction_error": _series_mean(summary_rows, "seed_rebalance_window_return_reconstruction_error"),
"mean_seed_perf_return_reconstruction_error": _series_mean(summary_rows, "seed_performance_return_reconstruction_error"),
"mean_seed_transaction_cost_reconstruction_error": _series_mean(summary_rows, "seed_transaction_cost_reconstruction_error"),
"mean_seed_gross_turnover_reconstruction_error": _series_mean(summary_rows, "seed_gross_turnover_reconstruction_error"),
"mean_best_performance_return": _series_mean(valid_summaries, "best_performance_return"),
"mean_best_benchmark_performance_return": _series_mean(valid_summaries, "best_benchmark_performance_return"),
"mean_best_excess_compounded_return": _series_mean(valid_summaries, "best_excess_compounded_return"),
"mean_best_ir": _series_mean(valid_summaries, "best_ir"),
"mean_best_ic": _series_mean(valid_summaries, "best_ic"),
"mean_best_aer": _series_mean(valid_summaries, "best_aer"),
"mean_best_sharpe": _series_mean(valid_summaries, "best_sharpe"),
"mean_best_winrate": _series_mean(valid_summaries, "best_winrate"),
"mean_best_mdd": _series_mean(valid_summaries, "best_mdd"),
"mean_best_excess_mdd": _series_mean(valid_summaries, "best_excess_mdd"),
"mean_best_portfolio_nav_mdd": _series_mean(valid_summaries, "best_portfolio_nav_mdd"),
"mean_best_drawdown_duration_max": _series_mean(valid_summaries, "best_drawdown_duration_max"),
"mean_best_drawdown_duration_mean": _series_mean(valid_summaries, "best_drawdown_duration_mean"),
"mean_best_transaction_cost": _series_mean(valid_summaries, "best_transaction_cost"),
"mean_best_gross_turnover": _series_mean(valid_summaries, "best_gross_turnover"),
"mean_best_turnover_ratio": _series_mean(valid_summaries, "best_turnover_ratio"),
"mean_best_return_per_turnover": _series_mean(valid_summaries, "best_return_per_turnover"),
"mean_best_cash_weight_mean": _series_mean(valid_summaries, "best_cash_weight_mean"),
"mean_best_cash_weight_median": _series_mean(valid_summaries, "best_cash_weight_median"),
"mean_best_cash_weight_p95": _series_mean(valid_summaries, "best_cash_weight_p95"),
"mean_best_round2_redistributed_notional": _series_mean(valid_summaries, "best_round2_redistributed_notional"),
"mean_best_round2_buy_trade_count": _series_mean(valid_summaries, "best_round2_buy_trade_count"),
"mean_best_round2_rebalance_days": _series_mean(valid_summaries, "best_round2_rebalance_days"),
"mean_best_rebalance_holdings_mean": _series_mean(valid_summaries, "best_rebalance_holdings_mean"),
"mean_best_rebalance_holdings_median": _series_mean(valid_summaries, "best_rebalance_holdings_median"),
"mean_best_rebalance_window_count": _series_mean(valid_summaries, "best_rebalance_window_count"),
"mean_best_rebalance_window_days_mean": _series_mean(valid_summaries, "best_rebalance_window_days_mean"),
"mean_best_rebalance_window_days_median": _series_mean(valid_summaries, "best_rebalance_window_days_median"),
"mean_best_rebalance_window_days_max": _series_mean(valid_summaries, "best_rebalance_window_days_max"),
"mean_best_rebalance_window_return_reconstruction_error": _series_mean(valid_summaries, "best_rebalance_window_return_reconstruction_error"),
"mean_best_perf_return_reconstruction_error": _series_mean(valid_summaries, "best_performance_return_reconstruction_error"),
"mean_best_transaction_cost_reconstruction_error": _series_mean(valid_summaries, "best_transaction_cost_reconstruction_error"),
"mean_best_gross_turnover_reconstruction_error": _series_mean(valid_summaries, "best_gross_turnover_reconstruction_error"),
"mean_ir_improvement": _series_mean(valid_summaries, "ir_improvement_over_paper_seed"),
"mean_ic_improvement": _series_mean(valid_summaries, "ic_improvement_over_seed"),
"mean_aer_improvement": _series_mean(valid_summaries, "aer_improvement_over_seed"),
"mean_sharpe_improvement": _series_mean(valid_summaries, "sharpe_improvement_over_seed"),
"mean_winrate_improvement": _series_mean(valid_summaries, "winrate_improvement_over_seed"),
"mean_mdd_improvement": _series_mean(valid_summaries, "mdd_improvement_over_seed"),
"mean_excess_compounded_return_improvement": _series_mean(valid_summaries, "excess_compounded_return_improvement_over_seed"),
"mean_portfolio_nav_mdd_improvement": _series_mean(valid_summaries, "portfolio_nav_mdd_improvement_over_seed"),
"mean_calls": _series_mean(summary_rows, "n_calls"),
"mean_valid_calls": _series_mean(summary_rows, "n_valid"),
"mean_trade_guard_ir_delta_seed": _series_mean(summary_rows, "trade_guard_ir_delta_seed"),
"mean_trade_guard_ir_delta_best": _series_mean(summary_rows, "trade_guard_ir_delta_best"),
"backtest_start_date": start_date,
"backtest_end_date": end_date,
"period": args.period,
"backtest_engine": args.backtest_engine,
"backtest_workers": int(max(args.backtest_workers, 1)),
"top_k": int(getattr(args, "top_k", 10)),
"n_drop": int(getattr(args, "n_drop", 2)),
"position_size": float(getattr(args, "position_size", 1.0)),
"max_pos_each_stock": float(getattr(args, "max_pos_each_stock", 1.0)),
"lot_size": int(getattr(args, "lot_size", 100)),
"max_daily_volume_participation": float(getattr(args, "max_daily_volume_participation", 0.0)),
"max_daily_amount_participation": float(getattr(args, "max_daily_amount_participation", 0.0)),
"rebalance_freq": int(getattr(args, "rebalance_freq", 5)),
"cost_buy": float(getattr(args, "buy_fee", 0.0013)),
"cost_sell": float(getattr(args, "sell_fee", 0.0013)),
"rebalance_mode": getattr(args, "rebalance_mode", "dropout"),
"custom_weight_mode": getattr(args, "custom_weight_mode", "equal"),
"redistribute_unfilled_cash": bool(getattr(args, "redistribute_unfilled_cash", False)),
"enforce_cash_limit": bool(getattr(args, "enforce_cash_limit", False)),
"score_transform": getattr(args, "score_transform", "identity"),
"score_clip": float(getattr(args, "score_clip", 3.0)),
"universe_filter": getattr(args, "universe_filter", "none"),
"universe_top_n": int(getattr(args, "universe_top_n", 0)),
"universe_lookback_days": int(getattr(args, "universe_lookback_days", 20)),
"trade_guard_config": _parse_trade_guard_config(getattr(args, "trade_guard_config", "")),
"data_path": args.data_path or os.environ.get("ALPHAEVO_DATA_PATH") or os.environ.get("AAE_DATA_PATH") or os.environ.get("DAILY_PV_PATH") or "repo default",
"jsonl": str(args.jsonl),
"elapsed_sec": round(float(elapsed_sec), 1),
}
def _build_aggregate_yearly(summary_yearly_rows: list[dict[str, Any]]) -> pd.DataFrame:
if not summary_yearly_rows:
return pd.DataFrame()
df = pd.DataFrame(summary_yearly_rows)
out_rows: list[dict[str, Any]] = []
for year, grp in df.groupby("year"):
n_calls = pd.to_numeric(grp["n_calls"], errors="coerce").fillna(0)
n_valid = pd.to_numeric(grp["n_valid"], errors="coerce").fillna(0)
n_wins = pd.to_numeric(grp.get("n_wins", pd.Series(0, index=grp.index)), errors="coerce").fillna(0)
valid_rate_seed = pd.to_numeric(
grp.get("valid_rate", grp.get("vr", pd.Series(0, index=grp.index))),
errors="coerce",
).fillna(0)
beat_seed_rate_seed = pd.to_numeric(
grp.get("beat_seed_rate", pd.Series(0, index=grp.index)),
errors="coerce",
).fillna(0)
out_rows.append(
{
"year": int(year),
"market_regime": grp["market_regime"].iloc[0],
"n_seeds": int(len(grp)),
"n_calls": int(n_calls.sum()),
"n_valid": int(n_valid.sum()),
"n_wins": int(n_wins.sum()),
"vr_seed_mean": float(valid_rate_seed.mean()),
"vr_global": float(n_valid.sum() / max(n_calls.sum(), 1)),
"valid_rate_seed_mean": float(valid_rate_seed.mean()),
"valid_rate_global": float(n_valid.sum() / max(n_calls.sum(), 1)),
"beat_seed_rate_seed_mean": float(beat_seed_rate_seed.mean()),
"beat_seed_rate_global": float(n_wins.sum() / max(n_calls.sum(), 1)),
"pass_at_3": float(grp["pass_at_3"].astype(bool).mean()),
"pass_at_5": float(grp["pass_at_5"].astype(bool).mean()),
"mean_seed_performance_return": float(pd.to_numeric(grp["seed_performance_return"], errors="coerce").mean()),
"mean_seed_benchmark_performance_return": float(pd.to_numeric(grp["seed_benchmark_performance_return"], errors="coerce").mean()),
"mean_seed_excess_compounded_return": float(pd.to_numeric(grp["seed_excess_compounded_return"], errors="coerce").mean()),
"mean_seed_ir": float(pd.to_numeric(grp["seed_ir"], errors="coerce").mean()),
"mean_seed_ic": float(pd.to_numeric(grp["seed_ic"], errors="coerce").mean()),
"mean_seed_aer": float(pd.to_numeric(grp["seed_aer"], errors="coerce").mean()),
"mean_seed_sharpe": float(pd.to_numeric(grp["seed_sharpe"], errors="coerce").mean()),
"mean_seed_winrate": float(pd.to_numeric(grp["seed_winrate"], errors="coerce").mean()),
"mean_seed_mdd": float(pd.to_numeric(grp["seed_mdd"], errors="coerce").mean()),
"mean_seed_excess_mdd": float(pd.to_numeric(grp["seed_excess_mdd"], errors="coerce").mean()),
"mean_seed_portfolio_nav_mdd": float(pd.to_numeric(grp["seed_portfolio_nav_mdd"], errors="coerce").mean()),
"mean_seed_drawdown_duration_max": float(pd.to_numeric(grp["seed_drawdown_duration_max"], errors="coerce").mean()),
"mean_seed_drawdown_duration_mean": float(pd.to_numeric(grp["seed_drawdown_duration_mean"], errors="coerce").mean()),
"mean_best_performance_return": float(pd.to_numeric(grp["best_performance_return"], errors="coerce").mean()),
"mean_best_benchmark_performance_return": float(pd.to_numeric(grp["best_benchmark_performance_return"], errors="coerce").mean()),
"mean_best_excess_compounded_return": float(pd.to_numeric(grp["best_excess_compounded_return"], errors="coerce").mean()),
"mean_best_ir": float(pd.to_numeric(grp["best_ir"], errors="coerce").mean()),
"mean_best_ic": float(pd.to_numeric(grp["best_ic"], errors="coerce").mean()),
"mean_best_aer": float(pd.to_numeric(grp["best_aer"], errors="coerce").mean()),
"mean_best_sharpe": float(pd.to_numeric(grp["best_sharpe"], errors="coerce").mean()),
"mean_best_winrate": float(pd.to_numeric(grp["best_winrate"], errors="coerce").mean()),
"mean_best_mdd": float(pd.to_numeric(grp["best_mdd"], errors="coerce").mean()),
"mean_best_excess_mdd": float(pd.to_numeric(grp["best_excess_mdd"], errors="coerce").mean()),
"mean_best_portfolio_nav_mdd": float(pd.to_numeric(grp["best_portfolio_nav_mdd"], errors="coerce").mean()),
"mean_best_drawdown_duration_max": float(pd.to_numeric(grp["best_drawdown_duration_max"], errors="coerce").mean()),
"mean_best_drawdown_duration_mean": float(pd.to_numeric(grp["best_drawdown_duration_mean"], errors="coerce").mean()),
}
)
return pd.DataFrame(out_rows).sort_values("year").reset_index(drop=True)
def _candidate_for_detail(result: dict[str, Any], candidate_scope: str) -> dict[str, Any]:
candidate = {
"input_index": result.get("input_index"),
"source": result.get("source"),
"candidate_scope": candidate_scope,
"seed_name": result.get("seed_name"),
"seed_expr": result.get("seed_expr"),
"factor_name": result.get("factor_name"),
"factor_expr": result.get("factor_expr"),
"turn": result.get("turn"),
"call_index": result.get("call_index"),
"proposal_rank": result.get("proposal_rank"),
}
return candidate
def _result_match_key(row: dict[str, Any]) -> tuple[str, str, int | None, int | None, int | None]:
return (
str(row.get("seed_name") or row.get("factor_name") or ""),
str(row.get("factor_expr") or ""),
_optional_int(row.get("turn")),
_optional_int(row.get("call_index")),
_optional_int(row.get("proposal_rank")),
)
def _merge_detail_results(
results: list[dict[str, Any]],
detail_results: list[dict[str, Any]] | None,
) -> list[dict[str, Any]]:
if not detail_results:
return list(results)
detail_lookup = {_result_match_key(row): row for row in detail_results}
merged_rows: list[dict[str, Any]] = []
protected_keys = {"candidate_scope", "source", "input_index"}
for row in results:
merged = dict(row)
detail = detail_lookup.get(_result_match_key(row))
if detail is not None:
for key, value in detail.items():
if key in protected_keys:
continue
merged[key] = value
merged_rows.append(merged)
return merged_rows
def _build_detail_rows(
*,
detail_results: list[dict[str, Any]],
stock_contrib_topk: int,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
stock_contrib_rows: list[dict[str, Any]] = []
trade_log_rows: list[dict[str, Any]] = []
holding_log_rows: list[dict[str, Any]] = []
portfolio_log_rows: list[dict[str, Any]] = []
rebalance_log_rows: list[dict[str, Any]] = []
signal_selection_rows: list[dict[str, Any]] = []
for result in detail_results:
seed_name = str(result.get("seed_name") or "")
candidate_scope = str(result.get("candidate_scope") or "")
factor_name = str(result.get("factor_name") or "")
factor_expr = str(result.get("factor_expr") or "")
stock_contrib_rows.extend(
_flatten_stock_contrib(
seed_name=seed_name,
candidate_scope=candidate_scope,
factor_name=factor_name,
factor_expr=factor_expr,
turn=result.get("turn"),
call_index=result.get("call_index"),
proposal_rank=result.get("proposal_rank"),
stock_contrib=result.get("stock_contrib") or [],
stock_contrib_topk=stock_contrib_topk,
)
)
trade_log_rows.extend(
_flatten_trade_log(
seed_name=seed_name,
candidate_scope=candidate_scope,
factor_name=factor_name,
factor_expr=factor_expr,
turn=result.get("turn"),
call_index=result.get("call_index"),
proposal_rank=result.get("proposal_rank"),
trade_log=result.get("trade_log") or [],
)
)
holding_log_rows.extend(
_flatten_holding_log(
seed_name=seed_name,
candidate_scope=candidate_scope,
factor_name=factor_name,
factor_expr=factor_expr,
turn=result.get("turn"),
call_index=result.get("call_index"),
proposal_rank=result.get("proposal_rank"),
holding_log=result.get("holding_log") or [],
)
)
portfolio_rows = _flatten_portfolio_log(
seed_name=seed_name,
candidate_scope=candidate_scope,
factor_name=factor_name,
factor_expr=factor_expr,
turn=result.get("turn"),
call_index=result.get("call_index"),
proposal_rank=result.get("proposal_rank"),
portfolio_log=result.get("portfolio_log") or [],
)
portfolio_log_rows.extend(portfolio_rows)
rebalance_log_rows.extend([row for row in portfolio_rows if bool(row.get("is_rebalance"))])
signal_selection_rows.extend(
_flatten_signal_selection_log(
seed_name=seed_name,
candidate_scope=candidate_scope,
factor_name=factor_name,
factor_expr=factor_expr,
turn=result.get("turn"),
call_index=result.get("call_index"),
proposal_rank=result.get("proposal_rank"),
signal_selection_log=result.get("signal_selection_log") or [],
)
)
return stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows
def _build_outputs_from_results(
*,
results: list[dict[str, Any]],
args: argparse.Namespace,
start_date: str | None,
end_date: str | None,
elapsed_sec: float,
detail_results: list[dict[str, Any]] | None = None,
) -> tuple[
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
pd.DataFrame,
dict[str, Any],
]:
effective_results = _merge_detail_results(results, detail_results)
baselines = {
str(row.get("seed_name") or row.get("factor_name") or ""): row
for row in effective_results
if row.get("candidate_scope") == "seed_baseline"
}
summary_rows = _build_summary_rows(effective_results, baselines, args.backtest_engine)
baseline_rows = _build_baseline_rows(effective_results)
trial_rows = _build_trial_rows(effective_results, baselines)
summary_yearly_rows, trial_yearly_rows = _build_yearly_outputs(effective_results, baselines)
if detail_results:
stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows = _build_detail_rows(
detail_results=detail_results,
stock_contrib_topk=args.stock_contrib_topk,
)
else:
stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows = [], [], [], [], [], []
aggregate = _build_aggregate(
summary_rows=summary_rows,
trial_rows=trial_rows,
start_date=start_date,
end_date=end_date,
elapsed_sec=elapsed_sec,
args=args,
)
aggregate_yearly = _build_aggregate_yearly(summary_yearly_rows)
return (
summary_rows,
trial_rows,
summary_yearly_rows,
trial_yearly_rows,
stock_contrib_rows,
trade_log_rows,
holding_log_rows,
portfolio_log_rows,
rebalance_log_rows,
signal_selection_rows,
baseline_rows,
aggregate_yearly,
aggregate,
)
def _save_outputs(
output_dir: Path,
summary_rows: list[dict[str, Any]],
trial_rows: list[dict[str, Any]],
summary_yearly_rows: list[dict[str, Any]],
trial_yearly_rows: list[dict[str, Any]],
stock_contrib_rows: list[dict[str, Any]],
trade_log_rows: list[dict[str, Any]],
holding_log_rows: list[dict[str, Any]],
portfolio_log_rows: list[dict[str, Any]],
rebalance_log_rows: list[dict[str, Any]],
signal_selection_rows: list[dict[str, Any]],
baseline_rows: list[dict[str, Any]],
aggregate_yearly: pd.DataFrame,
aggregate: dict[str, Any],
run_metadata: dict[str, Any] | None = None,
data_quality_report: dict[str, Any] | None = None,
) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
def _with_run_metadata(df: pd.DataFrame) -> pd.DataFrame:
if not run_metadata:
return df
out = df.copy()
ordered_keys = [
"mode",
"period",
"backtest_engine",
"top_k",
"n_drop",
"rebalance_mode",
"custom_weight_mode",
"rebalance_freq",
"position_size",
"max_pos_each_stock",
"lot_size",
"max_daily_volume_participation",
"max_daily_amount_participation",
"buy_fee",
"sell_fee",
"enforce_cash_limit",
"score_transform",
"score_clip",
"universe_filter",
"universe_top_n",
"universe_lookback_days",
"start_date",
"end_date",
]
for key in reversed(ordered_keys):
if key in run_metadata:
if key in out.columns:
continue
out.insert(0, key, run_metadata.get(key))
return out
rebalance_plan_df = _build_rebalance_plan_frame(
trade_log_rows=trade_log_rows,
holding_log_rows=holding_log_rows,
portfolio_log_rows=portfolio_log_rows,
)
rebalance_window_df = _build_rebalance_window_frame(
_rows_to_frame(portfolio_log_rows, PORTFOLIO_DAILY_COLUMNS)
)
seed_ranking_df = _build_alpha_cash_cost_ranking_frame(summary_rows, prefix="seed")
best_ranking_df = _build_alpha_cash_cost_ranking_frame(summary_rows, prefix="best")
_with_run_metadata(pd.DataFrame(summary_rows)).to_csv(output_dir / "summary.csv", index=False)
_with_run_metadata(pd.DataFrame(baseline_rows)).to_csv(output_dir / "baselines.csv", index=False)
_with_run_metadata(pd.DataFrame(trial_rows)).to_csv(output_dir / "trials.csv", index=False)
_with_run_metadata(pd.DataFrame(summary_yearly_rows)).to_csv(output_dir / "summary_yearly.csv", index=False)
_with_run_metadata(pd.DataFrame(trial_yearly_rows)).to_csv(output_dir / "trials_yearly.csv", index=False)
_with_run_metadata(aggregate_yearly).to_csv(output_dir / "aggregate_yearly.csv", index=False)
_rows_to_frame(stock_contrib_rows, STOCK_CONTRIB_COLUMNS).to_csv(output_dir / "stock_contrib.csv", index=False)
_rows_to_frame(trade_log_rows, TRADE_LOG_COLUMNS).to_csv(output_dir / "trade_log.csv", index=False)
_rows_to_frame(holding_log_rows, HOLDING_LOG_COLUMNS).to_csv(output_dir / "holdings_daily.csv", index=False)
_rows_to_frame(portfolio_log_rows, PORTFOLIO_DAILY_COLUMNS).to_csv(output_dir / "portfolio_daily.csv", index=False)
_rows_to_frame(rebalance_log_rows, REBALANCE_LOG_COLUMNS).to_csv(output_dir / "rebalance_log.csv", index=False)
_rows_to_frame(signal_selection_rows, SIGNAL_SELECTION_COLUMNS).to_csv(output_dir / "signal_selection_daily.csv", index=False)
rebalance_plan_df.to_csv(output_dir / "rebalance_plan.csv", index=False)
rebalance_window_df.to_csv(output_dir / "rebalance_window_returns.csv", index=False)
seed_ranking_df.to_csv(output_dir / "alpha_ranking_seed_cash_cost.csv", index=False)
best_ranking_df.to_csv(output_dir / "alpha_ranking_best_cash_cost.csv", index=False)
(output_dir / "aggregate.json").write_text(json.dumps(aggregate, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
if data_quality_report is not None:
(output_dir / "data_quality_report.json").write_text(
json.dumps(data_quality_report, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
def _build_robust_manifest(args: argparse.Namespace, jsonl_path: Path, data_path: str | None) -> dict[str, Any]:
return {
"mode": "alpha_robustness",
"jsonl": str(jsonl_path),
"period": args.period,
"start_date": args.start_date.strip() or None,
"end_date": args.end_date.strip() or None,
"backtest_engine": args.backtest_engine,
"rebalance_mode": args.rebalance_mode,
"rebalance_freq": int(args.rebalance_freq),
"top_k": int(args.top_k),
"n_drop": int(args.n_drop),
"position_size": float(args.position_size),
"max_pos_each_stock": float(args.max_pos_each_stock),
"lot_size": int(args.lot_size),
"max_daily_volume_participation": float(args.max_daily_volume_participation),
"max_daily_amount_participation": float(args.max_daily_amount_participation),
"custom_weight_mode": args.custom_weight_mode,
"redistribute_unfilled_cash": bool(args.redistribute_unfilled_cash),
"enforce_cash_limit": bool(args.enforce_cash_limit),
"buy_fee": float(args.buy_fee),
"sell_fee": float(args.sell_fee),
"score_transform": args.score_transform,
"score_clip": float(args.score_clip),
"universe_filter": args.universe_filter,
"universe_top_n": int(args.universe_top_n),
"universe_lookback_days": int(args.universe_lookback_days),
"sample_size": int(args.sample_size),
"sample_seed": int(args.sample_seed),
"data_path": data_path or "repo default",
}
def main() -> None:
parser = argparse.ArgumentParser(description="Run standalone alpha robustness backtests on an isolated robust path")
parser.add_argument("--jsonl", required=True, help="Path to seed/candidate JSONL file with {name, expr}")
parser.add_argument("--period", choices=("train", "val", "test"), default="test")
parser.add_argument(
"--backtest-engine",
choices=("custom", "qlib_original", "spec_shares_cash", "spec_return_based"),
default="custom",
)
parser.add_argument(
"--rebalance-mode",
choices=("dropout", "sell_all", "target_weight"),
default="dropout",
help="Qlib rebalance behavior: legacy dropout, sell-all-then-buy, or alpha-score target-weight sync (ignored by spec engines)",
)
parser.add_argument("--top-k", type=int, default=10, help="Target holding count for custom/qlib engines")
parser.add_argument("--n-drop", type=int, default=2, help="Legacy dropout cap for qlib_original; ignored by spec engines")
parser.add_argument("--position-size", type=float, default=1.0, help="Fraction of equity to allocate to the portfolio")
parser.add_argument("--max-pos-each-stock", type=float, default=1.0, help="Per-name weight cap; use 1.0 to effectively disable the old 20%% cap")
parser.add_argument("--lot-size", type=int, default=100, help="Trading lot size used for share rounding")
parser.add_argument("--max-daily-volume-participation", type=float, default=0.0, help="Buy-side market volume participation cap; 0 disables it")
parser.add_argument("--max-daily-amount-participation", type=float, default=0.0, help="Buy-side market amount participation cap; 0 disables it")
parser.add_argument("--rebalance-freq", type=int, default=5, help="Rebalance interval for the robust path")
parser.add_argument("--buy-fee", type=float, default=0.0013, help="Buy fee for the robust path")
parser.add_argument("--sell-fee", type=float, default=0.0013, help="Sell fee for the robust path")
parser.add_argument(
"--custom-weight-mode",
default="equal",
help="Custom engine weight mode: equal or alpha_score (aliases also accepted by core backtester)",
)
parser.add_argument(
"--redistribute-unfilled-cash",
action="store_true",
help="Custom engine only: carry unfilled buy budget down the remaining ranks within top_k",
)
parser.add_argument(
"--enforce-cash-limit",
action="store_true",
help="Custom engine only: clip buy orders by available cash instead of allowing negative cash",
)
parser.add_argument("--backtest-workers", type=int, default=1, help="Parallel worker count across seeds")
parser.add_argument("--data-path", default="", help="Optional path to daily_pv.h5")
parser.add_argument(
"--score-transform",
default="identity",
help="Optional score transform for robustness testing: identity, rank, zscore, rank_zscore, signed, clip_zscore",
)
parser.add_argument("--score-clip", type=float, default=3.0, help="Clip threshold used by clip_zscore")
parser.add_argument(
"--universe-filter",
default="none",
help="Optional liquidity universe filter: none, top_amount, top_volume",
)
parser.add_argument("--universe-top-n", type=int, default=0, help="Keep top-N names for liquidity-derived universes; 0 disables")
parser.add_argument("--universe-lookback-days", type=int, default=20, help="Rolling lookback for liquidity-derived universes")
parser.add_argument(
"--trade-guard-config",
default="",
help="Optional qlib_original trade guard config: none/null disables, 'vn' enables defaults, or pass a JSON object (ignored by spec engines)",
)
parser.add_argument("--start-date", default="", help="Optional explicit backtest start date (YYYY-MM-DD)")
parser.add_argument("--end-date", default="", help="Optional explicit backtest end date (YYYY-MM-DD)")
parser.add_argument("--label-forward-days", type=int, default=5)
parser.add_argument("--sample-size", type=int, default=0, help="0 means all seeds")
parser.add_argument("--sample-seed", type=int, default=42)
parser.add_argument("--output-dir", default="", help="Defaults to /kaggle/working/aae_v2/jsonl_backtest_<stem> on Kaggle")
parser.add_argument("--manifest-name", default="robust_manifest.json", help="Metadata snapshot written into the output dir")
parser.add_argument("--save-every", type=int, default=10)
parser.add_argument("--capture-detail-artifacts", action="store_true")
parser.add_argument("--stock-contrib-topk", type=int, default=10)
args = parser.parse_args()
jsonl_path = Path(args.jsonl).expanduser().resolve()
data_path = args.data_path.strip() or None
trade_guard_config = _parse_trade_guard_config(args.trade_guard_config)
start_date = args.start_date.strip() or None
end_date = args.end_date.strip() or None
output_dir = _build_output_dir(args.output_dir.strip() or None, jsonl_path)
_ensure_backtest_imports()
rows = _read_jsonl(jsonl_path)
sampled_rows = _sample_rows(rows, sample_size=args.sample_size, sample_seed=args.sample_seed)
candidates = [_candidate_from_jsonl_row(row, idx) for idx, row in enumerate(sampled_rows, start=1)]
configure_periods(PERIOD_CONFIGS)
loaded_df = load_data(data_path)
resolved_data_path = data_path or os.environ.get("ALPHAEVO_DATA_PATH") or os.environ.get("AAE_DATA_PATH") or os.environ.get("DAILY_PV_PATH") or "repo default"
data_quality_report = _build_data_quality_report(loaded_df, resolved_data_path)
n_baselines = sum(1 for candidate in candidates if candidate.get("candidate_scope") == "seed_baseline")
n_trials = len(candidates) - n_baselines
print(f"Loaded {len(rows)} rows from {jsonl_path}", flush=True)
if args.sample_size and args.sample_size > 0:
print(f"Sampled {len(sampled_rows)} rows with sample_seed={args.sample_seed}", flush=True)
print(f"Candidates: total={len(candidates)} baselines={n_baselines} trials={n_trials}", flush=True)
print(f"Period: {args.period}", flush=True)
print(f"Backtest engine: {args.backtest_engine}", flush=True)
print(f"Top K: {args.top_k}", flush=True)
print(f"N drop: {args.n_drop}", flush=True)
print(f"Rebalance mode: {args.rebalance_mode}", flush=True)
print(f"Custom weight mode: {args.custom_weight_mode}", flush=True)
print(f"Redistribute unfilled cash: {bool(args.redistribute_unfilled_cash)}", flush=True)
print(f"Position size: {float(args.position_size):.4f}", flush=True)
print(f"Max pos each stock: {float(args.max_pos_each_stock):.4f}", flush=True)
print(f"Lot size: {int(args.lot_size)}", flush=True)
print(f"Max daily volume participation: {float(args.max_daily_volume_participation):.6f}", flush=True)
print(f"Max daily amount participation: {float(args.max_daily_amount_participation):.6f}", flush=True)
print(f"Rebalance frequency: {int(args.rebalance_freq)}", flush=True)
print(f"Buy fee: {float(args.buy_fee):.6f}", flush=True)
print(f"Sell fee: {float(args.sell_fee):.6f}", flush=True)
print(f"Enforce cash limit: {bool(args.enforce_cash_limit)}", flush=True)
print(f"Score transform: {args.score_transform}", flush=True)
print(f"Score clip: {float(args.score_clip):.4f}", flush=True)
print(f"Universe filter: {args.universe_filter}", flush=True)
print(f"Universe top N: {int(args.universe_top_n)}", flush=True)
print(f"Universe lookback days: {int(args.universe_lookback_days)}", flush=True)
print(f"Backtest workers: {max(args.backtest_workers, 1)}", flush=True)
print(f"Trade guard config: {json.dumps(trade_guard_config, ensure_ascii=False) if trade_guard_config is not None else 'None'}", flush=True)
print(f"Output dir: {output_dir}", flush=True)
print(
"Data path: "
f"{resolved_data_path}",
flush=True,
)
if start_date or end_date:
print(f"Explicit backtest range override: {start_date or 'AUTO'} -> {end_date or 'AUTO'}", flush=True)
manifest = _build_robust_manifest(args, jsonl_path, resolved_data_path)
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / str(args.manifest_name)).write_text(
json.dumps(manifest, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
results: list[dict[str, Any]] = []
t0 = time.time()
workers = max(int(args.backtest_workers), 1)
if workers > 1:
with ProcessPoolExecutor(max_workers=workers, mp_context=mp.get_context("spawn")) as executor:
futures = [
executor.submit(
_evaluate_candidate_task,
candidate=candidate,
period=args.period,
label_forward_days=args.label_forward_days,
data_path=data_path,
backtest_engine=args.backtest_engine,
top_k=args.top_k,
n_drop=args.n_drop,
trade_guard_config=trade_guard_config,
rebalance_mode=args.rebalance_mode,
custom_weight_mode=args.custom_weight_mode,
redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash),
position_size=args.position_size,
max_pos_each_stock=args.max_pos_each_stock,
lot_size=args.lot_size,
max_daily_volume_participation=args.max_daily_volume_participation,
max_daily_amount_participation=args.max_daily_amount_participation,
enforce_cash_limit=bool(args.enforce_cash_limit),
rebalance_freq=args.rebalance_freq,
cost_buy=args.buy_fee,
cost_sell=args.sell_fee,
score_transform=args.score_transform,
score_clip=args.score_clip,
universe_filter=args.universe_filter,
universe_top_n=args.universe_top_n,
universe_lookback_days=args.universe_lookback_days,
start_date=start_date,
end_date=end_date,
capture_details=False,
)
for candidate in candidates
]
for idx, future in enumerate(futures, start=1):
result = future.result()
results.append(result)
print(
f"[{idx}/{len(candidates)}] seed={result.get('seed_name')} "
f"scope={result.get('candidate_scope')} factor={result.get('factor_name')} "
f"success={bool(result.get('success', False))} "
f"ir={float(result.get('ir', 0.0) or 0.0):.4f} "
f"ic={float(result.get('ic_mean', 0.0) or 0.0):.4f} "
f"icir={float(result.get('icir', 0.0) or 0.0):.4f} "
f"rank_icir={float(result.get('rank_icir', 0.0) or 0.0):.4f} "
f"aer={float(result.get('annualized_return', 0.0) or 0.0):.4f} "
f"mdd={float(result.get('mdd', 0.0) or 0.0):.4f}",
flush=True,
)
if idx % max(args.save_every, 1) == 0:
outputs = _build_outputs_from_results(
results=results,
args=args,
start_date=start_date,
end_date=end_date,
elapsed_sec=time.time() - t0,
)
_save_outputs(output_dir, *outputs, run_metadata=manifest, data_quality_report=data_quality_report)
else:
for idx, candidate in enumerate(candidates, start=1):
result = _evaluate_candidate_task(
candidate=candidate,
period=args.period,
label_forward_days=args.label_forward_days,
data_path=data_path,
backtest_engine=args.backtest_engine,
top_k=args.top_k,
n_drop=args.n_drop,
trade_guard_config=trade_guard_config,
rebalance_mode=args.rebalance_mode,
custom_weight_mode=args.custom_weight_mode,
redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash),
position_size=args.position_size,
max_pos_each_stock=args.max_pos_each_stock,
lot_size=args.lot_size,
max_daily_volume_participation=args.max_daily_volume_participation,
max_daily_amount_participation=args.max_daily_amount_participation,
enforce_cash_limit=bool(args.enforce_cash_limit),
rebalance_freq=args.rebalance_freq,
cost_buy=args.buy_fee,
cost_sell=args.sell_fee,
score_transform=args.score_transform,
score_clip=args.score_clip,
universe_filter=args.universe_filter,
universe_top_n=args.universe_top_n,
universe_lookback_days=args.universe_lookback_days,
start_date=start_date,
end_date=end_date,
capture_details=False,
)
results.append(result)
print(
f"[{idx}/{len(candidates)}] seed={result.get('seed_name')} "
f"scope={result.get('candidate_scope')} factor={result.get('factor_name')} "
f"success={bool(result.get('success', False))} "
f"ir={float(result.get('ir', 0.0) or 0.0):.4f} "
f"ic={float(result.get('ic_mean', 0.0) or 0.0):.4f} "
f"icir={float(result.get('icir', 0.0) or 0.0):.4f} "
f"rank_icir={float(result.get('rank_icir', 0.0) or 0.0):.4f} "
f"aer={float(result.get('annualized_return', 0.0) or 0.0):.4f} "
f"mdd={float(result.get('mdd', 0.0) or 0.0):.4f}",
flush=True,
)
if idx % max(args.save_every, 1) == 0:
outputs = _build_outputs_from_results(
results=results,
args=args,
start_date=start_date,
end_date=end_date,
elapsed_sec=time.time() - t0,
)
_save_outputs(output_dir, *outputs, run_metadata=manifest, data_quality_report=data_quality_report)
detail_results: list[dict[str, Any]] = []
if args.capture_detail_artifacts:
print("\nCapturing detail artifacts for seed baselines and best-IR candidates...", flush=True)
outputs_no_detail = _build_outputs_from_results(
results=results,
args=args,
start_date=start_date,
end_date=end_date,
elapsed_sec=time.time() - t0,
)
_save_outputs(output_dir, *outputs_no_detail, run_metadata=manifest, data_quality_report=data_quality_report)
print("Saved scalar backtest outputs before detail capture.", flush=True)
summary_rows = outputs_no_detail[0]
baselines = {
str(row.get("seed_name") or row.get("factor_name") or ""): row
for row in results
if row.get("candidate_scope") == "seed_baseline"
}
detail_candidates: list[dict[str, Any]] = []
for baseline in sorted(baselines.values(), key=_sort_key):
detail_candidates.append(_candidate_for_detail(baseline, "seed_baseline"))
for summary in summary_rows:
best_rank = summary.get("best_ir_proposal_rank")
best_result = next(
(
row
for row in results
if row.get("seed_name") == summary.get("seed_name")
and row.get("proposal_rank") == best_rank
and row.get("factor_expr") == summary.get("best_factor_expr")
),
None,
)
if best_result is not None:
detail_candidates.append(_candidate_for_detail(best_result, "best_ir_candidate"))
seen_detail_keys: set[tuple[str, str, str]] = set()
unique_detail_candidates: list[dict[str, Any]] = []
for candidate in detail_candidates:
key = (
str(candidate.get("seed_name") or ""),
str(candidate.get("candidate_scope") or ""),
str(candidate.get("factor_expr") or ""),
)
if key in seen_detail_keys:
continue
seen_detail_keys.add(key)
unique_detail_candidates.append(candidate)
print(
f"Detail candidates: {len(unique_detail_candidates)} "
"(seed baselines + per-seed best IR candidates).",
flush=True,
)
if workers > 1 and unique_detail_candidates:
with ProcessPoolExecutor(max_workers=workers, mp_context=mp.get_context("spawn")) as executor:
futures = [
executor.submit(
_evaluate_candidate_task,
candidate=candidate,
period=args.period,
label_forward_days=args.label_forward_days,
data_path=data_path,
backtest_engine=args.backtest_engine,
top_k=args.top_k,
n_drop=args.n_drop,
trade_guard_config=trade_guard_config,
rebalance_mode=args.rebalance_mode,
custom_weight_mode=args.custom_weight_mode,
redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash),
position_size=args.position_size,
max_pos_each_stock=args.max_pos_each_stock,
lot_size=args.lot_size,
max_daily_volume_participation=args.max_daily_volume_participation,
max_daily_amount_participation=args.max_daily_amount_participation,
enforce_cash_limit=bool(args.enforce_cash_limit),
rebalance_freq=args.rebalance_freq,
cost_buy=args.buy_fee,
cost_sell=args.sell_fee,
score_transform=args.score_transform,
score_clip=args.score_clip,
universe_filter=args.universe_filter,
universe_top_n=args.universe_top_n,
universe_lookback_days=args.universe_lookback_days,
start_date=start_date,
end_date=end_date,
capture_details=True,
)
for candidate in unique_detail_candidates
]
for idx, future in enumerate(futures, start=1):
detail_t0 = time.time()
detail = future.result()
detail_results.append(detail)
print(
f"[detail {idx}/{len(unique_detail_candidates)} DONE] seed={detail.get('seed_name')} "
f"scope={detail.get('candidate_scope')} success={bool(detail.get('success', False))} "
f"elapsed_wait={time.time() - detail_t0:.1f}s",
flush=True,
)
else:
for idx, candidate in enumerate(unique_detail_candidates, start=1):
detail_t0 = time.time()
print(
f"[detail {idx}/{len(unique_detail_candidates)} START] seed={candidate.get('seed_name')} "
f"scope={candidate.get('candidate_scope')} factor={candidate.get('factor_name')}",
flush=True,
)
detail = _evaluate_candidate_task(
candidate=candidate,
period=args.period,
label_forward_days=args.label_forward_days,
data_path=data_path,
backtest_engine=args.backtest_engine,
top_k=args.top_k,
n_drop=args.n_drop,
trade_guard_config=trade_guard_config,
rebalance_mode=args.rebalance_mode,
custom_weight_mode=args.custom_weight_mode,
redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash),
position_size=args.position_size,
max_pos_each_stock=args.max_pos_each_stock,
lot_size=args.lot_size,
max_daily_volume_participation=args.max_daily_volume_participation,
max_daily_amount_participation=args.max_daily_amount_participation,
enforce_cash_limit=bool(args.enforce_cash_limit),
rebalance_freq=args.rebalance_freq,
cost_buy=args.buy_fee,
cost_sell=args.sell_fee,
score_transform=args.score_transform,
score_clip=args.score_clip,
universe_filter=args.universe_filter,
universe_top_n=args.universe_top_n,
universe_lookback_days=args.universe_lookback_days,
start_date=start_date,
end_date=end_date,
capture_details=True,
)
detail_results.append(detail)
print(
f"[detail {idx}/{len(unique_detail_candidates)} DONE] seed={detail.get('seed_name')} "
f"scope={detail.get('candidate_scope')} success={bool(detail.get('success', False))} "
f"elapsed={time.time() - detail_t0:.1f}s",
flush=True,
)
outputs = _build_outputs_from_results(
results=results,
args=args,
start_date=start_date,
end_date=end_date,
elapsed_sec=time.time() - t0,
detail_results=detail_results,
)
_save_outputs(output_dir, *outputs, run_metadata=manifest, data_quality_report=data_quality_report)
print("\nSaved files:", flush=True)
print(f"summary.csv: {output_dir / 'summary.csv'}", flush=True)
print(f"trials.csv: {output_dir / 'trials.csv'}", flush=True)
print(f"summary_yearly.csv: {output_dir / 'summary_yearly.csv'}", flush=True)
print(f"trials_yearly.csv: {output_dir / 'trials_yearly.csv'}", flush=True)
print(f"aggregate_yearly.csv: {output_dir / 'aggregate_yearly.csv'}", flush=True)
print(f"stock_contrib.csv: {output_dir / 'stock_contrib.csv'}", flush=True)
print(f"trade_log.csv: {output_dir / 'trade_log.csv'}", flush=True)
print(f"holdings_daily.csv: {output_dir / 'holdings_daily.csv'}", flush=True)
print(f"portfolio_daily.csv: {output_dir / 'portfolio_daily.csv'}", flush=True)
print(f"rebalance_log.csv: {output_dir / 'rebalance_log.csv'}", flush=True)
print(f"signal_selection_daily.csv: {output_dir / 'signal_selection_daily.csv'}", flush=True)
print(f"rebalance_plan.csv: {output_dir / 'rebalance_plan.csv'}", flush=True)
print(f"rebalance_window_returns.csv: {output_dir / 'rebalance_window_returns.csv'}", flush=True)
print(f"alpha_ranking_seed_cash_cost.csv: {output_dir / 'alpha_ranking_seed_cash_cost.csv'}", flush=True)
print(f"alpha_ranking_best_cash_cost.csv: {output_dir / 'alpha_ranking_best_cash_cost.csv'}", flush=True)
print(f"data_quality_report.json: {output_dir / 'data_quality_report.json'}", flush=True)
print(f"aggregate.json: {output_dir / 'aggregate.json'}", flush=True)
if __name__ == "__main__":
main()