#!/usr/bin/env python3 """Run baseline backtests for every factor in a JSONL file. This script is intended for lightweight Kaggle usage: - provide a JSONL file with {"name": ..., "expr": ...} rows - run standalone backtests without model inference - save summary, yearly breakdown, and optional detail artifacts """ from __future__ import annotations import argparse from collections import defaultdict from concurrent.futures import ProcessPoolExecutor import json import multiprocessing as mp import os from pathlib import Path import random import re import sys import time from typing import Any import pandas as pd PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) configure_periods = None execute_expression = None load_data = None PERIOD_CONFIGS = { "train": {"start": "2016-01-01", "end": "2020-12-31"}, "val": {"start": "2021-01-01", "end": "2021-12-31"}, "test": {"start": "2022-01-01", "end": "2026-12-31"}, } YEAR_REGIMES = { 2022: "bearish", 2025: "bullish", } DETAIL_KEYS = {"yearly_metrics", "trade_log", "stock_contrib", "holding_log", "portfolio_log", "signal_selection_log"} DETAIL_CONTEXT_COLUMNS = [ "seed_name", "candidate_scope", "factor_name", "factor_expr", "turn", "call_index", "proposal_rank", ] STOCK_CONTRIB_COLUMNS = [ "year", "market_regime", "instrument", "contribution_return", "abs_contribution_return", "realized_pnl", "start_value", "end_value", "buy_trades", "sell_trades", "shares_bought", "shares_sold", "buy_cash_outflow", "sell_net_proceeds", "holding_days", "avg_shares_held", "ending_shares", "avg_market_value", "max_market_value", "market_volume_sum", "market_amount_sum", "rank", "seed_name", "candidate_scope", "factor_name", "factor_expr", "turn", "call_index", "proposal_rank", ] TRADE_LOG_COLUMNS = [ "date", "year", "market_regime", "action", "instrument", "shares", "current_shares", "target_shares", "requested_shares", "filled_shares", "unfilled_shares", "fill_ratio", "price", "order_value", "filled_value", "redistributed_notional", "gross_notional", "net_proceeds", "cash_outflow", "transaction_cost", "realized_pnl", "days_held", "holdings_count_before", "holdings_count_after", "market_volume", "market_amount", "volume_participation", "amount_participation", "clip_reason", "seed_name", "candidate_scope", "factor_name", "factor_expr", "turn", "call_index", "proposal_rank", ] HOLDING_LOG_COLUMNS = [ "date", "year", "market_regime", "instrument", "shares_held", "market_value", "close_price", "market_volume", "market_amount", "portfolio_value", "cash_eod", "weight", "seed_name", "candidate_scope", "factor_name", "factor_expr", "turn", "call_index", "proposal_rank", ] PORTFOLIO_DAILY_COLUMNS = [ "date", "year", "market_regime", "portfolio_value", "cash_eod", "cash_weight", "n_held", "portfolio_return", "benchmark_return", "excess_return", "is_rebalance", "had_trade", "buy_trades", "sell_trades", "gross_turnover", "transaction_cost", "fill_ratio_mean", "fill_ratio_min", "volume_participation_max", "amount_participation_max", "seed_name", "candidate_scope", "factor_name", "factor_expr", "turn", "call_index", "proposal_rank", ] REBALANCE_LOG_COLUMNS = PORTFOLIO_DAILY_COLUMNS REBALANCE_PLAN_COLUMNS = [ "date", "year", "market_regime", "portfolio_value", "cash_eod", "cash_weight", "invested_value_eod", "unallocated_cash_eod", "gross_turnover", "transaction_cost", "had_trade", "buy_trades", "sell_trades", "target_count_eod", "target_total_value_eod", "target_list_eod", "instrument", "target_rank_eod", "target_value_eod", "target_weight_eod", "shares_held_eod", "trade_actions", "current_shares_ref", "target_shares_ref", "requested_shares_total", "filled_shares_total", "unfilled_shares_total", "requested_notional_total", "filled_notional_total", "buy_requested_shares", "buy_filled_shares", "sell_requested_shares", "sell_filled_shares", "fill_ratio_mean", "clip_reasons", *DETAIL_CONTEXT_COLUMNS, ] REBALANCE_WINDOW_RETURN_COLUMNS = [ "window_index", "window_start_date", "window_end_date", "year", "market_regime", "n_days", "is_partial_window", "had_trade_any", "cash_weight_start", "cash_weight_end", "portfolio_return_compounded", "benchmark_return_compounded", "excess_compounded_return", "portfolio_return_sum", "benchmark_return_sum", "excess_return_sum", "mean_daily_portfolio_return", "mean_daily_benchmark_return", "mean_daily_excess_return", "cumulative_portfolio_return_to_end", "cumulative_benchmark_return_to_end", "cumulative_excess_compounded_return_to_end", "full_period_window_reconstructed_return", "full_period_window_reconstruction_error", *DETAIL_CONTEXT_COLUMNS, ] SIGNAL_SELECTION_COLUMNS = [ "signal_date", "trade_date", "instrument", "score", "trade_score_rank", "top5_by_score", "topk_by_score", "buy_gate", "force_exit", "defer_sell", "selected_eod", "eod_hold_rank", "shares_held_eod", "market_value_eod", "weight_eod", "had_trade", "trade_actions", "requested_shares_total", "filled_shares_total", "requested_notional_total", "filled_notional_total", "fill_ratio_mean", "clip_reason", *DETAIL_CONTEXT_COLUMNS, ] ALPHA_CASH_COST_RANKING_COLUMNS = [ "candidate_scope", "seed_name", "factor_name", "factor_expr", "ir", "performance_return", "benchmark_performance_return", "excess_compounded_return", "portfolio_nav_mdd", "turnover_ratio", "transaction_cost", "gross_turnover", "return_per_turnover", "cash_weight_mean", "cash_weight_median", "cash_weight_p95", "round2_redistributed_notional", "round2_buy_trade_count", "round2_rebalance_days", "rank_cash_weight_mean_asc", "rank_transaction_cost_asc", "rank_return_per_turnover_desc", "cash_cost_efficiency_rank", ] def _parse_trade_guard_config(raw: str | None) -> dict[str, Any] | None: value = (raw or "").strip() if not value or value.lower() in {"none", "null", "off", "false", "0"}: return None if value.lower() in {"vn", "default", "true", "1"}: return {} parsed = json.loads(value) if parsed is None: return None if not isinstance(parsed, dict): raise ValueError("--trade-guard-config must be none/null, 'vn', or a JSON object") return parsed def _ensure_backtest_imports() -> None: global configure_periods, execute_expression, load_data if configure_periods is not None and execute_expression is not None and load_data is not None: return from backtest.robust_factor_executor import configure_periods as _configure_periods from backtest.robust_factor_executor import execute_expression as _execute_expression from backtest.robust_factor_executor import load_data as _load_data configure_periods = _configure_periods execute_expression = _execute_expression load_data = _load_data def _read_jsonl(path: Path) -> list[dict[str, Any]]: text = path.read_text(encoding="utf-8").strip() if not text: return [] if path.suffix.lower() == ".json" or text[:1] == "[": payload = json.loads(text) if isinstance(payload, list): return [row for row in payload if isinstance(row, dict)] if isinstance(payload, dict): rows = payload.get("rows") if isinstance(rows, list): return [row for row in rows if isinstance(row, dict)] return [payload] raise ValueError(f"Unsupported JSON payload in {path}") rows: list[dict[str, Any]] = [] with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue rows.append(json.loads(line)) return rows def _group_rows_by_seed(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: grouped: dict[str, list[dict[str, Any]]] = {} for idx, row in enumerate(rows, start=1): seed_name = str( row.get("seed_name") or row.get("root_seed_name") or row.get("parent_seed_name") or row.get("name") or f"seed_{idx}" ).strip() grouped.setdefault(seed_name, []).append(row) return grouped def _sample_rows(rows: list[dict[str, Any]], sample_size: int, sample_seed: int) -> list[dict[str, Any]]: if sample_size <= 0 or sample_size >= len(rows): return list(rows) grouped = _group_rows_by_seed(rows) structured_grouping = any( row.get("seed_name") or row.get("seed_version_name") or row.get("candidate_scope") or row.get("factor_name") or row.get("expression") for row in rows ) if structured_grouping and 0 < sample_size < len(grouped): rng = random.Random(sample_seed) selected_seed_names = set(rng.sample(sorted(grouped.keys()), sample_size)) sampled_rows: list[dict[str, Any]] = [] for row in rows: seed_name = str( row.get("seed_name") or row.get("root_seed_name") or row.get("parent_seed_name") or row.get("name") or "" ).strip() if seed_name in selected_seed_names: sampled_rows.append(row) return sampled_rows rng = random.Random(sample_seed) return rng.sample(rows, sample_size) def _infer_turn_from_version_name(seed_name: str, seed_version_name: str) -> int | None: version = str(seed_version_name or "").strip() seed = str(seed_name or "").strip() if not version or not seed or version == seed: return None match = re.fullmatch(rf"{re.escape(seed)}_v(\d+)", version) if match: return int(match.group(1)) return None def _build_output_dir(base_output_dir: str | None, jsonl_path: Path) -> Path: if base_output_dir: return Path(base_output_dir).expanduser() if Path("/kaggle/working").exists(): return Path("/kaggle/working/aae_v2") / f"jsonl_backtest_{jsonl_path.stem}" return PROJECT_ROOT / "data" / f"jsonl_backtest_{jsonl_path.stem}" def _build_data_quality_report(df: pd.DataFrame, data_path: str | None) -> dict[str, Any]: index_names = list(df.index.names) if isinstance(df.index, pd.MultiIndex) else [str(df.index.name)] dates = ( pd.DatetimeIndex(df.index.get_level_values("datetime")).sort_values() if isinstance(df.index, pd.MultiIndex) and "datetime" in df.index.names else pd.DatetimeIndex(df.index).sort_values() ) instruments = ( pd.Index(df.index.get_level_values("instrument").astype(str)).unique().sort_values() if isinstance(df.index, pd.MultiIndex) and "instrument" in df.index.names else pd.Index([], dtype=object) ) coverage_by_year: list[dict[str, Any]] = [] if len(dates): years = pd.Series(dates.year, index=range(len(dates))) if len(instruments): inst_series = pd.Series(df.index.get_level_values("instrument").astype(str), index=range(len(df))) else: inst_series = pd.Series([], dtype=str) for year in sorted(years.dropna().unique()): mask = years == year coverage_by_year.append( { "year": int(year), "n_rows": int(mask.sum()), "n_days": int(pd.DatetimeIndex(dates[mask.to_numpy()]).nunique()), "n_instruments": int(inst_series[mask].nunique()) if not inst_series.empty else 0, } ) missing_rate_by_column = { str(col): round(float(pd.to_numeric(df[col], errors="coerce").isna().mean() if pd.api.types.is_numeric_dtype(df[col]) else df[col].isna().mean()), 6) for col in df.columns } nonpositive_rate_by_column: dict[str, float] = {} for col in df.columns: series = pd.to_numeric(df[col], errors="coerce") if series.notna().any(): nonpositive_rate_by_column[str(col)] = round(float((series <= 0).mean()), 6) return { "data_path": data_path or "repo default", "index_names": index_names, "index_is_unique": bool(df.index.is_unique), "duplicate_index_rows": int(df.index.duplicated().sum()), "index_monotonic_increasing": bool(df.index.is_monotonic_increasing), "n_rows": int(len(df)), "n_columns": int(len(df.columns)), "n_days": int(dates.nunique()) if len(dates) else 0, "n_instruments": int(len(instruments)), "date_start": dates.min().strftime("%Y-%m-%d") if len(dates) else None, "date_end": dates.max().strftime("%Y-%m-%d") if len(dates) else None, "columns_present": list(map(str, df.columns)), "core_field_availability": { "open": "$open" in df.columns, "close": "$close" in df.columns, "high": "$high" in df.columns, "low": "$low" in df.columns, "volume": "$volume" in df.columns, "amount": "$amount" in df.columns, "bench_return": "$bench_return" in df.columns, }, "missing_rate_by_column": missing_rate_by_column, "nonpositive_rate_by_column": nonpositive_rate_by_column, "coverage_by_year": coverage_by_year, "adjustment_status": "unknown_not_verified_from_h5", "notes": [ "Report is structural/data-health oriented; it does not prove economic correctness.", "Adjustment status is not inferred from HDF content alone and should be verified from the data pipeline/vendor contract.", ], } def _market_regime(year: int) -> str: return YEAR_REGIMES.get(int(year), "neutral") def _is_missing(value: Any) -> bool: if value is None: return True if isinstance(value, str): return not value.strip() try: return bool(pd.isna(value)) except Exception: return False def _clean_scalar(value: Any) -> Any: if _is_missing(value): return None if hasattr(value, "item"): try: return value.item() except Exception: pass return value def _optional_int(value: Any) -> int | None: value = _clean_scalar(value) if value is None: return None try: return int(float(value)) except Exception: return None def _metric_payload(result: dict[str, Any]) -> dict[str, Any]: ir = float(result.get("ir", 0.0) or 0.0) ic_mean = float(result.get("ic_mean", 0.0) or 0.0) icir = float(result.get("icir", 0.0) or 0.0) rank_ic_mean = float(result.get("rank_ic_mean", 0.0) or 0.0) rank_icir = float(result.get("rank_icir", 0.0) or 0.0) annualized_return = float(result.get("annualized_return", 0.0) or 0.0) mdd = float(result.get("mdd", 0.0) or 0.0) benchmark_performance_return = float(result.get("benchmark_performance_return", 0.0) or 0.0) excess_compounded_return = float(result.get("excess_compounded_return", 0.0) or 0.0) portfolio_nav_mdd = float(result.get("portfolio_nav_mdd", 0.0) or 0.0) return { "success": bool(result.get("success", False)), "backtest_engine": str(result.get("backtest_engine", "")), "label_forward_days": int(result.get("label_forward_days", 0) or 0), "ir": ir, "ic_mean": ic_mean, "ic_std": float(result.get("ic_std", 0.0) or 0.0), "icir": icir, "rank_ic_mean": rank_ic_mean, "rank_ic_std": float(result.get("rank_ic_std", 0.0) or 0.0), "rank_icir": rank_icir, "aer": annualized_return, "annualized_return": annualized_return, "annualized_volatility": float(result.get("annualized_volatility", 0.0) or 0.0), "performance_return": float(result.get("performance_return", 0.0) or 0.0), "benchmark_performance_return": benchmark_performance_return, "excess_compounded_return": excess_compounded_return, "sharpe": float(result.get("sharpe", 0.0) or 0.0), "winrate": float(result.get("winrate", 0.0) or 0.0), "mdd": mdd, "excess_mdd": float(result.get("excess_mdd", mdd) or 0.0), "portfolio_nav_mdd": portfolio_nav_mdd, "drawdown_duration_max": int(result.get("drawdown_duration_max", 0) or 0), "drawdown_duration_mean": float(result.get("drawdown_duration_mean", 0.0) or 0.0), "drawdown_duration_median": float(result.get("drawdown_duration_median", 0.0) or 0.0), "total_return": float(result.get("total_return", 0.0) or 0.0), "final_value": float(result.get("final_value", 0.0) or 0.0), "n_days": int(result.get("n_days", 0) or 0), "n_ic_days": int(result.get("n_ic_days", 0) or 0), "exec_time": float(result.get("exec_time", 0.0) or 0.0), "yearly_metrics": result.get("yearly_metrics") or {}, "trade_log": result.get("trade_log") or [], "stock_contrib": result.get("stock_contrib") or [], "holding_log": result.get("holding_log") or [], "portfolio_log": result.get("portfolio_log") or [], "signal_selection_log": result.get("signal_selection_log") or [], "qlib_warnings": result.get("qlib_warnings") or [], "trade_guard_config": result.get("trade_guard_config"), "rebalance_mode": result.get("rebalance_mode", "dropout"), "transaction_cost": float(result.get("transaction_cost", 0.0) or 0.0), "gross_turnover": float(result.get("gross_turnover", 0.0) or 0.0), "turnover_ratio": float(result.get("turnover_ratio", 0.0) or 0.0), "error": result.get("error"), # run22-style aliases for easier comparison "information_ratio": ir, "IC": ic_mean, "ICIR": icir, "rank_ic": rank_ic_mean, "max_drawdown": mdd, "portfolio_nav_max_drawdown": portfolio_nav_mdd, } def _to_float_or_none(value: Any) -> float | None: value = _clean_scalar(value) if value is None: return None try: return float(value) except Exception: return None def _series_stat(values: pd.Series, op: str) -> float | int | None: clean = pd.to_numeric(values, errors="coerce").dropna() if clean.empty: return None if op == "mean": return float(clean.mean()) if op == "median": return float(clean.median()) if op == "min": return float(clean.min()) if op == "max": return float(clean.max()) if op == "p95": return float(clean.quantile(0.95)) if op == "p05": return float(clean.quantile(0.05)) raise ValueError(f"Unsupported stat op: {op}") def _ensure_detail_context_columns(frame: pd.DataFrame) -> pd.DataFrame: frame = frame.copy() for column in DETAIL_CONTEXT_COLUMNS: if column not in frame.columns: frame[column] = pd.NA for column in ["seed_name", "candidate_scope", "factor_name", "factor_expr"]: frame[column] = frame[column].where(frame[column].notna(), "").astype(str) for column in ["turn", "call_index", "proposal_rank"]: frame[column] = pd.to_numeric(frame[column], errors="coerce").astype("Int64") return frame def _annotate_portfolio_log_with_trade_summary( portfolio_log: list[dict[str, Any]], trade_log: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not portfolio_log: return [] portfolio_df = pd.DataFrame(portfolio_log).copy() if portfolio_df.empty: return [] portfolio_df["date"] = pd.to_datetime(portfolio_df["date"]).dt.strftime("%Y-%m-%d") if trade_log: trade_df = pd.DataFrame(trade_log).copy() if not trade_df.empty: trade_df["date"] = pd.to_datetime(trade_df["date"]).dt.strftime("%Y-%m-%d") for col in [ "gross_notional", "transaction_cost", "fill_ratio", "volume_participation", "amount_participation", ]: if col not in trade_df.columns: trade_df[col] = pd.NA for col in [ "gross_notional", "transaction_cost", "fill_ratio", "volume_participation", "amount_participation", ]: if col in trade_df.columns: trade_df[col] = pd.to_numeric(trade_df[col], errors="coerce") action_series = trade_df["action"].astype(str) if "action" in trade_df.columns else pd.Series("", index=trade_df.index) trade_df["buy_flag"] = (action_series == "buy").astype(int) trade_df["sell_flag"] = (action_series == "sell").astype(int) trade_summary = ( trade_df.groupby("date", dropna=False) .agg( had_trade=("action", "size"), buy_trades=("buy_flag", "sum"), sell_trades=("sell_flag", "sum"), gross_turnover=("gross_notional", "sum"), transaction_cost=("transaction_cost", "sum"), fill_ratio_mean=("fill_ratio", "mean"), fill_ratio_min=("fill_ratio", "min"), volume_participation_max=("volume_participation", "max"), amount_participation_max=("amount_participation", "max"), ) .reset_index() ) trade_summary["had_trade"] = trade_summary["had_trade"].fillna(0).astype(int) > 0 portfolio_df = portfolio_df.merge(trade_summary, on="date", how="left") defaults = { "had_trade": False, "buy_trades": 0, "sell_trades": 0, "gross_turnover": 0.0, "transaction_cost": 0.0, "fill_ratio_mean": None, "fill_ratio_min": None, "volume_participation_max": None, "amount_participation_max": None, } for key, default in defaults.items(): if key not in portfolio_df.columns: portfolio_df[key] = default else: if isinstance(default, bool): portfolio_df[key] = portfolio_df[key].where(portfolio_df[key].notna(), False).astype(bool) elif isinstance(default, int): portfolio_df[key] = pd.to_numeric(portfolio_df[key], errors="coerce").fillna(0).astype(int) elif default == 0.0: portfolio_df[key] = pd.to_numeric(portfolio_df[key], errors="coerce").fillna(0.0) return portfolio_df.to_dict("records") def _enrich_result_diagnostics(payload: dict[str, Any]) -> dict[str, Any]: portfolio_log = _annotate_portfolio_log_with_trade_summary( payload.get("portfolio_log") or [], payload.get("trade_log") or [], ) payload["portfolio_log"] = portfolio_log portfolio_df = pd.DataFrame(portfolio_log) trade_df = pd.DataFrame(payload.get("trade_log") or []) if not portfolio_df.empty: for col in [ "portfolio_value", "cash_eod", "cash_weight", "n_held", "portfolio_return", "benchmark_return", "excess_return", "gross_turnover", "transaction_cost", "fill_ratio_mean", "fill_ratio_min", ]: if col in portfolio_df.columns: portfolio_df[col] = pd.to_numeric(portfolio_df[col], errors="coerce") if "cash_weight" not in portfolio_df.columns and {"cash_eod", "portfolio_value"}.issubset(portfolio_df.columns): portfolio_df["cash_weight"] = portfolio_df["cash_eod"] / portfolio_df["portfolio_value"].replace(0, pd.NA) if "is_rebalance" in portfolio_df.columns: portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool) else: portfolio_df["is_rebalance"] = False returns = pd.to_numeric(portfolio_df.get("portfolio_return"), errors="coerce").dropna() if not returns.empty: reconstructed = float((1.0 + returns).prod() - 1.0) payload["performance_return_reconstructed"] = reconstructed payload["performance_return_reconstruction_error"] = reconstructed - float(payload.get("performance_return", 0.0) or 0.0) window_df = _build_rebalance_window_frame(portfolio_df) payload["rebalance_window_count"] = int(len(window_df)) if not window_df.empty: window_reconstructed = float((1.0 + pd.to_numeric(window_df["portfolio_return_compounded"], errors="coerce").fillna(0.0)).prod() - 1.0) payload["rebalance_window_return_reconstructed"] = window_reconstructed payload["rebalance_window_return_reconstruction_error"] = window_reconstructed - float(payload.get("performance_return", 0.0) or 0.0) payload["rebalance_window_days_mean"] = _series_stat(window_df["n_days"], "mean") payload["rebalance_window_days_median"] = _series_stat(window_df["n_days"], "median") payload["rebalance_window_days_max"] = _series_stat(window_df["n_days"], "max") else: payload["rebalance_window_return_reconstructed"] = None payload["rebalance_window_return_reconstruction_error"] = None payload["rebalance_window_days_mean"] = None payload["rebalance_window_days_median"] = None payload["rebalance_window_days_max"] = None cash_weight = pd.to_numeric(portfolio_df.get("cash_weight"), errors="coerce").dropna() payload["cash_weight_mean"] = _series_stat(cash_weight, "mean") payload["cash_weight_median"] = _series_stat(cash_weight, "median") payload["cash_weight_p95"] = _series_stat(cash_weight, "p95") rebalance_df = portfolio_df[portfolio_df["is_rebalance"]].copy() payload["rebalance_days"] = int(len(rebalance_df)) if not rebalance_df.empty: n_held = pd.to_numeric(rebalance_df.get("n_held"), errors="coerce").dropna() payload["rebalance_holdings_min"] = _series_stat(n_held, "min") payload["rebalance_holdings_max"] = _series_stat(n_held, "max") payload["rebalance_holdings_mean"] = _series_stat(n_held, "mean") payload["rebalance_holdings_median"] = _series_stat(n_held, "median") payload["rebalance_days_with_trade"] = int( (rebalance_df["had_trade"] if "had_trade" in rebalance_df.columns else pd.Series(False, index=rebalance_df.index)) .fillna(False) .astype(bool) .sum() ) else: payload["rebalance_holdings_min"] = None payload["rebalance_holdings_max"] = None payload["rebalance_holdings_mean"] = None payload["rebalance_holdings_median"] = None payload["rebalance_days_with_trade"] = 0 else: payload["performance_return_reconstructed"] = None payload["performance_return_reconstruction_error"] = None payload["rebalance_window_count"] = 0 payload["rebalance_window_return_reconstructed"] = None payload["rebalance_window_return_reconstruction_error"] = None payload["rebalance_window_days_mean"] = None payload["rebalance_window_days_median"] = None payload["rebalance_window_days_max"] = None payload["cash_weight_mean"] = None payload["cash_weight_median"] = None payload["cash_weight_p95"] = None payload["rebalance_days"] = 0 payload["rebalance_holdings_min"] = None payload["rebalance_holdings_max"] = None payload["rebalance_holdings_mean"] = None payload["rebalance_holdings_median"] = None payload["rebalance_days_with_trade"] = 0 if not trade_df.empty: for col in ["transaction_cost", "gross_notional", "filled_value", "fill_ratio"]: if col in trade_df.columns: trade_df[col] = pd.to_numeric(trade_df[col], errors="coerce") gross_turnover_col = "gross_notional" if "gross_notional" in trade_df.columns else "filled_value" reconstructed_turnover = float(trade_df[gross_turnover_col].fillna(0.0).sum()) if gross_turnover_col in trade_df.columns else 0.0 reconstructed_cost = float(trade_df["transaction_cost"].fillna(0.0).sum()) if "transaction_cost" in trade_df.columns else 0.0 payload["gross_turnover_reconstructed"] = reconstructed_turnover payload["gross_turnover_reconstruction_error"] = reconstructed_turnover - float(payload.get("gross_turnover", 0.0) or 0.0) payload["transaction_cost_reconstructed"] = reconstructed_cost payload["transaction_cost_reconstruction_error"] = reconstructed_cost - float(payload.get("transaction_cost", 0.0) or 0.0) fill_ratio_source = trade_df["fill_ratio"] if "fill_ratio" in trade_df.columns else pd.Series(dtype=float) fill_ratio = pd.to_numeric(fill_ratio_source, errors="coerce").dropna() payload["fill_ratio_mean"] = _series_stat(fill_ratio, "mean") payload["fill_ratio_p05"] = _series_stat(fill_ratio, "p05") clip_reason_source = trade_df["clip_reason"] if "clip_reason" in trade_df.columns else pd.Series("", index=trade_df.index, dtype=object) clip_reason_series = clip_reason_source.fillna("").astype(str) redistributed_source = ( trade_df["redistributed_notional"] if "redistributed_notional" in trade_df.columns else pd.Series(0.0, index=trade_df.index, dtype=float) ) redistributed_series = pd.to_numeric(redistributed_source, errors="coerce").fillna(0.0) round2_mask = redistributed_series.gt(0.0) | clip_reason_series.str.contains("round2_cash_redistribution", regex=False) payload["round2_buy_trade_count"] = int(round2_mask.sum()) payload["round2_redistributed_notional"] = float(redistributed_series.sum()) if "redistributed_notional" in trade_df.columns else ( float(trade_df.loc[round2_mask, gross_turnover_col].fillna(0.0).sum()) if gross_turnover_col in trade_df.columns else 0.0 ) payload["round2_rebalance_days"] = int( trade_df.loc[round2_mask, "date"].astype(str).nunique() ) if "date" in trade_df.columns else 0 else: payload["gross_turnover_reconstructed"] = None payload["gross_turnover_reconstruction_error"] = None payload["transaction_cost_reconstructed"] = None payload["transaction_cost_reconstruction_error"] = None payload["fill_ratio_mean"] = None payload["fill_ratio_p05"] = None payload["round2_buy_trade_count"] = 0 payload["round2_redistributed_notional"] = 0.0 payload["round2_rebalance_days"] = 0 turnover_ratio = _to_float_or_none(payload.get("turnover_ratio")) performance_return = _to_float_or_none(payload.get("performance_return")) if turnover_ratio is not None and abs(turnover_ratio) > 1e-12 and performance_return is not None: payload["return_per_turnover"] = float(performance_return / turnover_ratio) else: payload["return_per_turnover"] = None gross_turnover = _to_float_or_none(payload.get("gross_turnover")) transaction_cost = _to_float_or_none(payload.get("transaction_cost")) if gross_turnover is not None and gross_turnover > 1e-12 and transaction_cost is not None: payload["cost_to_turnover"] = float(transaction_cost / gross_turnover) else: payload["cost_to_turnover"] = None return payload def _strip_detail_keys(payload: dict[str, Any]) -> dict[str, Any]: return {key: value for key, value in payload.items() if key not in DETAIL_KEYS} def _rows_to_frame(rows: list[dict[str, Any]], columns: list[str] | None = None) -> pd.DataFrame: if rows: frame = pd.DataFrame(rows) if columns: ordered_columns = list(columns) + [col for col in frame.columns if col not in columns] frame = frame.reindex(columns=ordered_columns) return frame return pd.DataFrame(columns=columns or []) def _build_rebalance_window_frame(portfolio_df: pd.DataFrame) -> pd.DataFrame: if portfolio_df is None or portfolio_df.empty: return pd.DataFrame(columns=REBALANCE_WINDOW_RETURN_COLUMNS) frame = _ensure_detail_context_columns(portfolio_df) frame = frame.copy() frame["date"] = pd.to_datetime(frame["date"], errors="coerce") frame = frame.dropna(subset=["date"]).sort_values(DETAIL_CONTEXT_COLUMNS + ["date"], na_position="last").reset_index(drop=True) if frame.empty: return pd.DataFrame(columns=REBALANCE_WINDOW_RETURN_COLUMNS) for col in ["portfolio_return", "benchmark_return", "cash_weight"]: if col in frame.columns: frame[col] = pd.to_numeric(frame[col], errors="coerce").fillna(0.0 if col != "cash_weight" else pd.NA) else: frame[col] = 0.0 if col != "cash_weight" else pd.NA frame["excess_return"] = pd.to_numeric(frame.get("excess_return"), errors="coerce") frame["is_rebalance"] = frame.get("is_rebalance", False) frame["is_rebalance"] = frame["is_rebalance"].fillna(False).astype(bool) frame["had_trade"] = frame.get("had_trade", False) frame["had_trade"] = frame["had_trade"].fillna(False).astype(bool) rows: list[dict[str, Any]] = [] group_cols = DETAIL_CONTEXT_COLUMNS for group_key, grp in frame.groupby(group_cols, dropna=False, sort=False): grp = grp.sort_values("date").reset_index(drop=True) if grp.empty: continue starts = grp.index[grp["is_rebalance"]].tolist() if not starts or starts[0] != 0: starts = [0] + starts starts = sorted(set(int(idx) for idx in starts)) expected_window_days = starts[1] - starts[0] if len(starts) >= 2 else None full_portfolio_return = float((1.0 + grp["portfolio_return"].astype(float)).prod() - 1.0) full_benchmark_return = float((1.0 + grp["benchmark_return"].astype(float)).prod() - 1.0) full_excess_compounded_return = full_portfolio_return - full_benchmark_return window_returns: list[float] = [] context = { col: value for col, value in zip(group_cols, group_key if isinstance(group_key, tuple) else (group_key,)) } for window_idx, start_idx in enumerate(starts, start=1): end_idx = starts[window_idx] - 1 if window_idx < len(starts) else len(grp) - 1 window = grp.iloc[start_idx : end_idx + 1].copy() if window.empty: continue portfolio_returns = window["portfolio_return"].astype(float) benchmark_returns = window["benchmark_return"].astype(float) excess_returns = portfolio_returns - benchmark_returns portfolio_comp = float((1.0 + portfolio_returns).prod() - 1.0) benchmark_comp = float((1.0 + benchmark_returns).prod() - 1.0) excess_comp = portfolio_comp - benchmark_comp window_returns.append(portfolio_comp) upto_end = grp.iloc[: end_idx + 1].copy() cumulative_portfolio = float((1.0 + upto_end["portfolio_return"].astype(float)).prod() - 1.0) cumulative_benchmark = float((1.0 + upto_end["benchmark_return"].astype(float)).prod() - 1.0) rows.append( { "window_index": int(window_idx), "window_start_date": window["date"].iloc[0].strftime("%Y-%m-%d"), "window_end_date": window["date"].iloc[-1].strftime("%Y-%m-%d"), "year": int(window["date"].iloc[0].year), "market_regime": str(window.get("market_regime", pd.Series(["neutral"])).iloc[0]), "n_days": int(len(window)), "is_partial_window": bool(expected_window_days is not None and window_idx == len(starts) and len(window) < expected_window_days), "had_trade_any": bool(window["had_trade"].any()), "cash_weight_start": _to_float_or_none(window.get("cash_weight", pd.Series([None])).iloc[0]), "cash_weight_end": _to_float_or_none(window.get("cash_weight", pd.Series([None])).iloc[-1]), "portfolio_return_compounded": portfolio_comp, "benchmark_return_compounded": benchmark_comp, "excess_compounded_return": excess_comp, "portfolio_return_sum": float(portfolio_returns.sum()), "benchmark_return_sum": float(benchmark_returns.sum()), "excess_return_sum": float(excess_returns.sum()), "mean_daily_portfolio_return": float(portfolio_returns.mean()), "mean_daily_benchmark_return": float(benchmark_returns.mean()), "mean_daily_excess_return": float(excess_returns.mean()), "cumulative_portfolio_return_to_end": cumulative_portfolio, "cumulative_benchmark_return_to_end": cumulative_benchmark, "cumulative_excess_compounded_return_to_end": cumulative_portfolio - cumulative_benchmark, "full_period_window_reconstructed_return": None, # filled after loop "full_period_window_reconstruction_error": None, # filled after loop **context, } ) full_window_reconstructed = float((1.0 + pd.Series(window_returns, dtype=float)).prod() - 1.0) if window_returns else 0.0 full_window_error = full_window_reconstructed - full_portfolio_return for idx in range(len(rows) - len(window_returns), len(rows)): rows[idx]["full_period_window_reconstructed_return"] = full_window_reconstructed rows[idx]["full_period_window_reconstruction_error"] = full_window_error if not rows: return pd.DataFrame(columns=REBALANCE_WINDOW_RETURN_COLUMNS) return _rows_to_frame(rows, REBALANCE_WINDOW_RETURN_COLUMNS) def _build_rebalance_plan_frame( trade_log_rows: list[dict[str, Any]], holding_log_rows: list[dict[str, Any]], portfolio_log_rows: list[dict[str, Any]], ) -> pd.DataFrame: portfolio_df = _rows_to_frame(portfolio_log_rows, PORTFOLIO_DAILY_COLUMNS) if portfolio_df.empty: return pd.DataFrame(columns=REBALANCE_PLAN_COLUMNS) portfolio_df = _ensure_detail_context_columns(portfolio_df) portfolio_df["date"] = pd.to_datetime(portfolio_df["date"], errors="coerce").dt.strftime("%Y-%m-%d") portfolio_df["is_rebalance"] = portfolio_df.get("is_rebalance", False) portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool) portfolio_df = portfolio_df[portfolio_df["is_rebalance"]].copy() if portfolio_df.empty: return pd.DataFrame(columns=REBALANCE_PLAN_COLUMNS) for col in [ "portfolio_value", "cash_eod", "cash_weight", "gross_turnover", "transaction_cost", ]: if col in portfolio_df.columns: portfolio_df[col] = pd.to_numeric(portfolio_df[col], errors="coerce") had_trade_source = portfolio_df["had_trade"] if "had_trade" in portfolio_df.columns else pd.Series(False, index=portfolio_df.index) portfolio_df["had_trade"] = had_trade_source.fillna(False).astype(bool) for col in ["buy_trades", "sell_trades"]: if col in portfolio_df.columns: portfolio_df[col] = pd.to_numeric(portfolio_df[col], errors="coerce").fillna(0).astype(int) else: portfolio_df[col] = 0 summary_cols = DETAIL_CONTEXT_COLUMNS + [ "date", "year", "market_regime", "portfolio_value", "cash_eod", "cash_weight", "gross_turnover", "transaction_cost", "had_trade", "buy_trades", "sell_trades", ] rebalance_summary = portfolio_df[summary_cols].drop_duplicates(subset=DETAIL_CONTEXT_COLUMNS + ["date"], keep="last").copy() rebalance_summary["invested_value_eod"] = rebalance_summary["portfolio_value"] - rebalance_summary["cash_eod"] rebalance_summary["unallocated_cash_eod"] = rebalance_summary["cash_eod"] holdings_df = _rows_to_frame(holding_log_rows, HOLDING_LOG_COLUMNS) if not holdings_df.empty: holdings_df = _ensure_detail_context_columns(holdings_df) holdings_df["date"] = pd.to_datetime(holdings_df["date"], errors="coerce").dt.strftime("%Y-%m-%d") holdings_df = holdings_df.merge(rebalance_summary[DETAIL_CONTEXT_COLUMNS + ["date"]], on=DETAIL_CONTEXT_COLUMNS + ["date"], how="inner") for col in ["market_value", "weight", "shares_held"]: if col in holdings_df.columns: holdings_df[col] = pd.to_numeric(holdings_df[col], errors="coerce") hold_instrument = ( holdings_df.groupby(DETAIL_CONTEXT_COLUMNS + ["date", "instrument"], dropna=False) .agg( target_value_eod=("market_value", "sum"), target_weight_eod=("weight", "sum"), shares_held_eod=("shares_held", "sum"), ) .reset_index() ) hold_instrument = hold_instrument.sort_values( DETAIL_CONTEXT_COLUMNS + ["date", "target_value_eod", "instrument"], ascending=[True] * (len(DETAIL_CONTEXT_COLUMNS) + 1) + [False, True], na_position="last", ) hold_instrument["target_rank_eod"] = ( hold_instrument.groupby(DETAIL_CONTEXT_COLUMNS + ["date"], dropna=False).cumcount() + 1 ) hold_summary = ( hold_instrument.groupby(DETAIL_CONTEXT_COLUMNS + ["date"], dropna=False) .agg( target_count_eod=("instrument", "nunique"), target_total_value_eod=("target_value_eod", "sum"), target_list_eod=("instrument", lambda s: json.dumps(list(map(str, s.tolist())), ensure_ascii=False)), ) .reset_index() ) else: hold_instrument = pd.DataFrame(columns=DETAIL_CONTEXT_COLUMNS + ["date", "instrument", "target_value_eod", "target_weight_eod", "shares_held_eod", "target_rank_eod"]) hold_summary = pd.DataFrame(columns=DETAIL_CONTEXT_COLUMNS + ["date", "target_count_eod", "target_total_value_eod", "target_list_eod"]) trade_df = _rows_to_frame(trade_log_rows, TRADE_LOG_COLUMNS) if not trade_df.empty: trade_df = _ensure_detail_context_columns(trade_df) trade_df["date"] = pd.to_datetime(trade_df["date"], errors="coerce").dt.strftime("%Y-%m-%d") trade_df = trade_df.merge(rebalance_summary[DETAIL_CONTEXT_COLUMNS + ["date"]], on=DETAIL_CONTEXT_COLUMNS + ["date"], how="inner") for col in [ "shares", "filled_shares", "requested_shares", "unfilled_shares", "price", "order_value", "filled_value", "gross_notional", "current_shares", "target_shares", "fill_ratio", ]: if col in trade_df.columns: trade_df[col] = pd.to_numeric(trade_df[col], errors="coerce") trade_df["requested_shares_norm"] = pd.to_numeric( trade_df.get("requested_shares", trade_df.get("filled_shares", trade_df.get("shares", 0.0))), errors="coerce", ).fillna(pd.to_numeric(trade_df.get("filled_shares", trade_df.get("shares", 0.0)), errors="coerce")) trade_df["filled_shares_norm"] = pd.to_numeric( trade_df.get("filled_shares", trade_df.get("shares", 0.0)), errors="coerce", ).fillna(pd.to_numeric(trade_df.get("shares", 0.0), errors="coerce")) trade_df["unfilled_shares_norm"] = pd.to_numeric( trade_df.get("unfilled_shares"), errors="coerce", ).fillna((trade_df["requested_shares_norm"] - trade_df["filled_shares_norm"]).clip(lower=0.0)) order_value = pd.to_numeric(trade_df.get("order_value"), errors="coerce") gross_notional = pd.to_numeric(trade_df.get("gross_notional", trade_df.get("filled_value")), errors="coerce") price = pd.to_numeric(trade_df.get("price"), errors="coerce") trade_df["requested_notional_norm"] = order_value.fillna(trade_df["requested_shares_norm"] * price).fillna(gross_notional) trade_df["filled_notional_norm"] = pd.to_numeric(trade_df.get("filled_value"), errors="coerce").fillna(gross_notional).fillna(trade_df["filled_shares_norm"] * price) action_series = trade_df["action"].astype(str) if "action" in trade_df.columns else pd.Series("", index=trade_df.index) trade_df["buy_requested_shares"] = trade_df["requested_shares_norm"].where(action_series == "buy", 0.0) trade_df["buy_filled_shares"] = trade_df["filled_shares_norm"].where(action_series == "buy", 0.0) trade_df["sell_requested_shares"] = trade_df["requested_shares_norm"].where(action_series == "sell", 0.0) trade_df["sell_filled_shares"] = trade_df["filled_shares_norm"].where(action_series == "sell", 0.0) trade_df["clip_reason_text"] = trade_df.get("clip_reason", "").fillna("").astype(str) def _first_valid(values: pd.Series) -> Any: cleaned = values.dropna() return cleaned.iloc[0] if not cleaned.empty else None trade_instrument = ( trade_df.groupby(DETAIL_CONTEXT_COLUMNS + ["date", "instrument"], dropna=False) .agg( trade_actions=("action", lambda s: "|".join(dict.fromkeys(str(value) for value in s if str(value)))), current_shares_ref=("current_shares", _first_valid), target_shares_ref=("target_shares", _first_valid), requested_shares_total=("requested_shares_norm", "sum"), filled_shares_total=("filled_shares_norm", "sum"), unfilled_shares_total=("unfilled_shares_norm", "sum"), requested_notional_total=("requested_notional_norm", "sum"), filled_notional_total=("filled_notional_norm", "sum"), buy_requested_shares=("buy_requested_shares", "sum"), buy_filled_shares=("buy_filled_shares", "sum"), sell_requested_shares=("sell_requested_shares", "sum"), sell_filled_shares=("sell_filled_shares", "sum"), fill_ratio_mean=("fill_ratio", "mean"), clip_reasons=("clip_reason_text", lambda s: "|".join(sorted({value for value in s if value}))), ) .reset_index() ) else: trade_instrument = pd.DataFrame( columns=DETAIL_CONTEXT_COLUMNS + [ "date", "instrument", "trade_actions", "current_shares_ref", "target_shares_ref", "requested_shares_total", "filled_shares_total", "unfilled_shares_total", "requested_notional_total", "filled_notional_total", "buy_requested_shares", "buy_filled_shares", "sell_requested_shares", "sell_filled_shares", "fill_ratio_mean", "clip_reasons", ] ) plan_instrument = hold_instrument.merge( trade_instrument, on=DETAIL_CONTEXT_COLUMNS + ["date", "instrument"], how="outer", ) plan_df = rebalance_summary.merge(hold_summary, on=DETAIL_CONTEXT_COLUMNS + ["date"], how="left") plan_df = plan_df.merge(plan_instrument, on=DETAIL_CONTEXT_COLUMNS + ["date"], how="left") plan_df["target_count_eod"] = pd.to_numeric(plan_df.get("target_count_eod"), errors="coerce").fillna(0).astype(int) plan_df["target_total_value_eod"] = pd.to_numeric(plan_df.get("target_total_value_eod"), errors="coerce") plan_df["target_list_eod"] = plan_df.get("target_list_eod").fillna("[]") numeric_fill_defaults = { "target_rank_eod": None, "target_value_eod": 0.0, "target_weight_eod": 0.0, "shares_held_eod": 0.0, "current_shares_ref": None, "target_shares_ref": None, "requested_shares_total": 0.0, "filled_shares_total": 0.0, "unfilled_shares_total": 0.0, "requested_notional_total": 0.0, "filled_notional_total": 0.0, "buy_requested_shares": 0.0, "buy_filled_shares": 0.0, "sell_requested_shares": 0.0, "sell_filled_shares": 0.0, "fill_ratio_mean": None, } for col, default in numeric_fill_defaults.items(): if col not in plan_df.columns: plan_df[col] = default elif default is not None: plan_df[col] = pd.to_numeric(plan_df[col], errors="coerce").fillna(default) else: plan_df[col] = pd.to_numeric(plan_df[col], errors="coerce") if "trade_actions" not in plan_df.columns: plan_df["trade_actions"] = "" else: plan_df["trade_actions"] = plan_df["trade_actions"].fillna("") if "clip_reasons" not in plan_df.columns: plan_df["clip_reasons"] = "" else: plan_df["clip_reasons"] = plan_df["clip_reasons"].fillna("") return _rows_to_frame(plan_df.to_dict("records"), REBALANCE_PLAN_COLUMNS) def _candidate_from_jsonl_row(row: dict[str, Any], idx: int) -> dict[str, Any]: expr = str(row.get("expr") or row.get("factor_expr") or row.get("expression") or "").strip() source = str(row.get("source") or "input").strip() or "input" seed_version_name = str( row.get("seed_version_name") or row.get("factor_name") or row.get("name") or "" ).strip() seed_name_value = str( row.get("seed_name") or row.get("root_seed_name") or row.get("parent_seed_name") or row.get("name") or f"seed_{idx}" ).strip() raw_scope = str(row.get("candidate_scope") or "").strip() if raw_scope: candidate_scope = raw_scope elif source in {"summary_seed", "seed_baseline"}: candidate_scope = "seed_baseline" elif seed_version_name and seed_version_name == seed_name_value: candidate_scope = "seed_baseline" elif seed_version_name and seed_version_name != seed_name_value: candidate_scope = "trial" elif not row.get("seed_name"): candidate_scope = "seed_baseline" else: candidate_scope = "trial" root_seed_name = seed_name_value factor_name = str(seed_version_name or root_seed_name or f"factor_{idx}") seed_expr = str(row.get("seed_expr") or (expr if candidate_scope == "seed_baseline" else "")).strip() inferred_turn = _infer_turn_from_version_name(root_seed_name, factor_name) candidate = { "input_index": idx, "source": source, "candidate_scope": candidate_scope, "seed_name": root_seed_name, "seed_expr": seed_expr, "factor_name": factor_name, "factor_expr": expr, "turn": _optional_int( row.get("turn") if row.get("turn") is not None else (row.get("best_ir_turn") if row.get("best_ir_turn") is not None else inferred_turn) ), "call_index": _optional_int(row.get("call_index") if row.get("call_index") is not None else row.get("best_ir_call_index")), "proposal_rank": _optional_int(row.get("proposal_rank") if row.get("proposal_rank") is not None else row.get("best_ir_proposal_rank")), } original_key_map = { "success": "original_success", "backtest_engine": "original_backtest_engine", "performance_return": "original_performance_return", "benchmark_performance_return": "original_benchmark_performance_return", "excess_compounded_return": "original_excess_compounded_return", "ir": "original_ir", "ic_mean": "original_ic_mean", "icir": "original_icir", "rank_ic_mean": "original_rank_ic_mean", "rank_icir": "original_rank_icir", "aer": "original_aer", "annualized_return": "original_annualized_return", "annualized_volatility": "original_annualized_volatility", "sharpe": "original_sharpe", "winrate": "original_winrate", "mdd": "original_mdd", "excess_mdd": "original_excess_mdd", "portfolio_nav_mdd": "original_portfolio_nav_mdd", "total_return": "original_total_return", } for src_key, dst_key in original_key_map.items(): if src_key in row: candidate[dst_key] = _clean_scalar(row.get(src_key)) for key, value in row.items(): if str(key).startswith("original_"): candidate[str(key)] = _clean_scalar(value) return candidate def _empty_baseline(seed_name: str, seed_expr: str, backtest_engine: str) -> dict[str, Any]: payload = _metric_payload({"success": False, "backtest_engine": backtest_engine}) payload.update( { "source": "missing_baseline", "candidate_scope": "seed_baseline", "seed_name": seed_name, "seed_expr": seed_expr, "factor_name": seed_name, "factor_expr": seed_expr, "turn": None, "call_index": None, "proposal_rank": None, } ) return payload def _sort_key(row: dict[str, Any]) -> tuple[int, int, int, int]: return ( int(row.get("input_index") or 10**9), int(row.get("proposal_rank") or 10**9), int(row.get("turn") or 10**9), int(row.get("call_index") or 10**9), ) def _evaluate_candidate_task( *, candidate: dict[str, Any], period: str, label_forward_days: int, data_path: str | None, backtest_engine: str, top_k: int, n_drop: int, trade_guard_config: dict[str, Any] | None, rebalance_mode: str, custom_weight_mode: str, redistribute_unfilled_cash: bool, position_size: float, max_pos_each_stock: float, lot_size: int, max_daily_volume_participation: float, max_daily_amount_participation: float, enforce_cash_limit: bool, rebalance_freq: int, cost_buy: float, cost_sell: float, score_transform: str, score_clip: float, universe_filter: str, universe_top_n: int, universe_lookback_days: int, start_date: str | None, end_date: str | None, capture_details: bool, ) -> dict[str, Any]: _ensure_backtest_imports() configure_periods(PERIOD_CONFIGS) try: result = execute_expression( str(candidate.get("factor_expr", "")), data_path=data_path, period=period, start_date=start_date, end_date=end_date, label_forward_days=label_forward_days, backtest_engine=backtest_engine, top_k=top_k, n_drop=n_drop, position_size=position_size, max_pos_each_stock=max_pos_each_stock, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, rebalance_freq=rebalance_freq, cost_buy=cost_buy, cost_sell=cost_sell, capture_details=capture_details, trade_guard_config=trade_guard_config, rebalance_mode=rebalance_mode, custom_weight_mode=custom_weight_mode, redistribute_unfilled_cash=redistribute_unfilled_cash, enforce_cash_limit=enforce_cash_limit, score_transform=score_transform, score_clip=score_clip, universe_filter=universe_filter, universe_top_n=universe_top_n, universe_lookback_days=universe_lookback_days, ) payload = _metric_payload(result) except Exception as exc: payload = _metric_payload( { "success": False, "backtest_engine": backtest_engine, "top_k": int(top_k), "n_drop": int(n_drop), "position_size": float(position_size), "max_pos_each_stock": float(max_pos_each_stock), "lot_size": int(lot_size), "max_daily_volume_participation": float(max_daily_volume_participation), "max_daily_amount_participation": float(max_daily_amount_participation), "rebalance_freq": int(rebalance_freq), "cost_buy": float(cost_buy), "cost_sell": float(cost_sell), "custom_weight_mode": custom_weight_mode, "redistribute_unfilled_cash": bool(redistribute_unfilled_cash), "enforce_cash_limit": bool(enforce_cash_limit), "score_transform": score_transform, "score_clip": float(score_clip), "universe_filter": universe_filter, "universe_top_n": int(universe_top_n), "universe_lookback_days": int(universe_lookback_days), "label_forward_days": label_forward_days, "error": f"backtest_error: {exc}", } ) payload = _enrich_result_diagnostics(payload) payload.update(candidate) return payload def _flatten_yearly_metrics( *, seed_name: str, candidate_scope: str, factor_name: str, factor_expr: str, success: bool, turn: int | None, call_index: int | None, proposal_rank: int | None, yearly_metrics: dict[str, Any], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for year_key in sorted(yearly_metrics.keys(), key=lambda x: int(x)): metrics = yearly_metrics[year_key] year = int(year_key) rows.append( { "seed_name": seed_name, "candidate_scope": candidate_scope, "factor_name": factor_name, "factor_expr": factor_expr, "success": bool(success), "turn": turn, "call_index": call_index, "proposal_rank": proposal_rank, "year": year, "market_regime": metrics.get("market_regime", _market_regime(year)), "performance_return": float(metrics.get("performance_return", 0.0) or 0.0), "benchmark_performance_return": float(metrics.get("benchmark_performance_return", 0.0) or 0.0), "excess_compounded_return": float(metrics.get("excess_compounded_return", 0.0) or 0.0), "ir": float(metrics.get("ir", 0.0) or 0.0), "ic_mean": float(metrics.get("ic_mean", 0.0) or 0.0), "icir": float(metrics.get("icir", 0.0) or 0.0), "rank_ic_mean": float(metrics.get("rank_ic_mean", 0.0) or 0.0), "rank_icir": float(metrics.get("rank_icir", 0.0) or 0.0), "aer": float(metrics.get("annualized_return", 0.0) or 0.0), "annualized_return": float(metrics.get("annualized_return", 0.0) or 0.0), "sharpe": float(metrics.get("sharpe", 0.0) or 0.0), "winrate": float(metrics.get("winrate", 0.0) or 0.0), "mdd": float(metrics.get("mdd", 0.0) or 0.0), "excess_mdd": float(metrics.get("excess_mdd", metrics.get("mdd", 0.0)) or 0.0), "portfolio_nav_mdd": float(metrics.get("portfolio_nav_mdd", 0.0) or 0.0), "drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0), "drawdown_duration_mean": float(metrics.get("drawdown_duration_mean", 0.0) or 0.0), "drawdown_duration_median": float(metrics.get("drawdown_duration_median", 0.0) or 0.0), "annualized_volatility": float(metrics.get("annualized_volatility", 0.0) or 0.0), "n_days": int(metrics.get("n_days", 0) or 0), "n_ic_days": int(metrics.get("n_ic_days", 0) or 0), "avg_holdings_count": float(metrics.get("avg_holdings_count", 0.0) or 0.0), "max_holdings_count": int(metrics.get("max_holdings_count", 0) or 0), "buy_trades": int(metrics.get("buy_trades", 0) or 0), "sell_trades": int(metrics.get("sell_trades", 0) or 0), "shares_bought": float(metrics.get("shares_bought", 0.0) or 0.0), "shares_sold": float(metrics.get("shares_sold", 0.0) or 0.0), "buy_gross_notional": float(metrics.get("buy_gross_notional", 0.0) or 0.0), "sell_gross_notional": float(metrics.get("sell_gross_notional", 0.0) or 0.0), "buy_cash_outflow": float(metrics.get("buy_cash_outflow", 0.0) or 0.0), "sell_net_proceeds": float(metrics.get("sell_net_proceeds", 0.0) or 0.0), "buy_transaction_cost": float(metrics.get("buy_transaction_cost", 0.0) or 0.0), "sell_transaction_cost": float(metrics.get("sell_transaction_cost", 0.0) or 0.0), "transaction_cost": float(metrics.get("transaction_cost", 0.0) or 0.0), "gross_turnover": float(metrics.get("gross_turnover", 0.0) or 0.0), } ) return rows def _build_trial_rows(results: list[dict[str, Any]], baselines: dict[str, dict[str, Any]]) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for result in sorted(results, key=_sort_key): if result.get("candidate_scope") == "seed_baseline": continue seed_name = str(result.get("seed_name") or result.get("factor_name") or "") baseline = baselines.get(seed_name) or _empty_baseline(seed_name, str(result.get("seed_expr") or ""), str(result.get("backtest_engine") or "")) row = _strip_detail_keys(result) row["seed_name"] = seed_name row["seed_expr"] = baseline.get("factor_expr") or result.get("seed_expr") or "" row["seed_ir"] = baseline.get("ir") row["seed_ic"] = baseline.get("ic_mean") row["seed_aer"] = baseline.get("aer") row["seed_sharpe"] = baseline.get("sharpe") row["seed_winrate"] = baseline.get("winrate") row["seed_mdd"] = baseline.get("mdd") row["seed_excess_mdd"] = baseline.get("excess_mdd") row["seed_portfolio_nav_mdd"] = baseline.get("portfolio_nav_mdd") row["seed_performance_return"] = baseline.get("performance_return") row["seed_benchmark_performance_return"] = baseline.get("benchmark_performance_return") row["seed_excess_compounded_return"] = baseline.get("excess_compounded_return") rows.append(row) return rows def _build_baseline_rows(results: list[dict[str, Any]]) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for result in sorted(results, key=_sort_key): if result.get("candidate_scope") != "seed_baseline": continue row = _strip_detail_keys(result) row["seed_error"] = result.get("error") row["qlib_warnings_json"] = json.dumps(result.get("qlib_warnings") or [], ensure_ascii=False) rows.append(row) return rows def _build_summary_rows(results: list[dict[str, Any]], baselines: dict[str, dict[str, Any]], backtest_engine: str) -> list[dict[str, Any]]: by_seed: dict[str, list[dict[str, Any]]] = {} for result in results: by_seed.setdefault(str(result.get("seed_name") or result.get("factor_name") or ""), []).append(result) summary_rows: list[dict[str, Any]] = [] for seed_name in sorted(by_seed.keys()): seed_results = sorted(by_seed[seed_name], key=_sort_key) baseline = baselines.get(seed_name) if baseline is None: seed_expr = str(seed_results[0].get("seed_expr") or seed_results[0].get("factor_expr") or "") baseline = _empty_baseline(seed_name, seed_expr, backtest_engine) trials = [row for row in seed_results if row.get("candidate_scope") != "seed_baseline"] valid = [row for row in trials if row.get("success")] best = max(valid, key=lambda row: float(row.get("ir", float("-inf")))) if valid else None best_ic = max(valid, key=lambda row: float(row.get("ic_mean", float("-inf")))) if valid else None best_aer = max(valid, key=lambda row: float(row.get("aer", float("-inf")))) if valid else None best_sharpe = max(valid, key=lambda row: float(row.get("sharpe", float("-inf")))) if valid else None best_winrate = max(valid, key=lambda row: float(row.get("winrate", float("-inf")))) if valid else None best_mdd = max(valid, key=lambda row: float(row.get("mdd", float("-inf")))) if valid else None turn_values = sorted({int(row["turn"]) for row in trials if row.get("turn") is not None}) turn_records: list[dict[str, Any]] = [] for turn in turn_values: turn_rows = [row for row in trials if row.get("turn") == turn] turn_valid = [row for row in turn_rows if row.get("success")] turn_best = max(turn_valid, key=lambda row: float(row.get("ir", float("-inf")))) if turn_valid else None turn_records.append( { "turn": turn, "n_calls": len(turn_rows), "n_valid": len(turn_valid), "best_ir": turn_best.get("ir") if turn_best else None, "best_expr": turn_best.get("factor_expr") if turn_best else None, } ) ir_path = [record["best_ir"] for record in turn_records] expr_path = [record["best_expr"] for record in turn_records] calls_per_turn = [record["n_calls"] for record in turn_records] valid_per_turn = [record["n_valid"] for record in turn_records] seed_ir = float(baseline.get("ir", 0.0) or 0.0) paper_baseline = max(0.0, seed_ir) summary_rows.append( { "seed_name": seed_name, "seed_expr": baseline.get("factor_expr") or baseline.get("seed_expr") or "", "seed_success": bool(baseline.get("success", False)), "seed_error": baseline.get("error"), "seed_qlib_warnings": json.dumps(baseline.get("qlib_warnings") or [], ensure_ascii=False), "seed_rebalance_mode": baseline.get("rebalance_mode"), "seed_performance_return": float(baseline.get("performance_return", 0.0) or 0.0), "seed_benchmark_performance_return": float(baseline.get("benchmark_performance_return", 0.0) or 0.0), "seed_excess_compounded_return": float(baseline.get("excess_compounded_return", 0.0) or 0.0), "seed_ir": seed_ir, "seed_ic": float(baseline.get("ic_mean", 0.0) or 0.0), "seed_aer": float(baseline.get("aer", 0.0) or 0.0), "seed_sharpe": float(baseline.get("sharpe", 0.0) or 0.0), "seed_winrate": float(baseline.get("winrate", 0.0) or 0.0), "seed_mdd": float(baseline.get("mdd", 0.0) or 0.0), "seed_excess_mdd": float(baseline.get("excess_mdd", baseline.get("mdd", 0.0)) or 0.0), "seed_portfolio_nav_mdd": float(baseline.get("portfolio_nav_mdd", 0.0) or 0.0), "seed_drawdown_duration_max": int(baseline.get("drawdown_duration_max", 0) or 0), "seed_drawdown_duration_mean": float(baseline.get("drawdown_duration_mean", 0.0) or 0.0), "seed_transaction_cost": float(baseline.get("transaction_cost", 0.0) or 0.0), "seed_gross_turnover": float(baseline.get("gross_turnover", 0.0) or 0.0), "seed_turnover_ratio": float(baseline.get("turnover_ratio", 0.0) or 0.0), "seed_return_per_turnover": baseline.get("return_per_turnover"), "seed_cash_weight_mean": baseline.get("cash_weight_mean"), "seed_cash_weight_median": baseline.get("cash_weight_median"), "seed_cash_weight_p95": baseline.get("cash_weight_p95"), "seed_round2_redistributed_notional": baseline.get("round2_redistributed_notional"), "seed_round2_buy_trade_count": baseline.get("round2_buy_trade_count"), "seed_round2_rebalance_days": baseline.get("round2_rebalance_days"), "seed_rebalance_holdings_mean": baseline.get("rebalance_holdings_mean"), "seed_rebalance_holdings_median": baseline.get("rebalance_holdings_median"), "seed_rebalance_holdings_min": baseline.get("rebalance_holdings_min"), "seed_rebalance_holdings_max": baseline.get("rebalance_holdings_max"), "seed_rebalance_window_count": baseline.get("rebalance_window_count"), "seed_rebalance_window_days_mean": baseline.get("rebalance_window_days_mean"), "seed_rebalance_window_days_median": baseline.get("rebalance_window_days_median"), "seed_rebalance_window_days_max": baseline.get("rebalance_window_days_max"), "seed_rebalance_window_return_reconstruction_error": baseline.get("rebalance_window_return_reconstruction_error"), "seed_performance_return_reconstruction_error": baseline.get("performance_return_reconstruction_error"), "seed_transaction_cost_reconstruction_error": baseline.get("transaction_cost_reconstruction_error"), "seed_gross_turnover_reconstruction_error": baseline.get("gross_turnover_reconstruction_error"), "n_calls": len(trials), "n_valid": len(valid), "best_performance_return": best.get("performance_return") if best else None, "best_benchmark_performance_return": best.get("benchmark_performance_return") if best else None, "best_excess_compounded_return": best.get("excess_compounded_return") if best else None, "best_ir": best.get("ir") if best else None, "best_ic": best_ic.get("ic_mean") if best_ic else None, "best_aer": best_aer.get("aer") if best_aer else None, "best_sharpe": best_sharpe.get("sharpe") if best_sharpe else None, "best_winrate": best_winrate.get("winrate") if best_winrate else None, "best_mdd": best_mdd.get("mdd") if best_mdd else None, "best_excess_mdd": best.get("excess_mdd") if best else None, "best_portfolio_nav_mdd": best.get("portfolio_nav_mdd") if best else None, "best_drawdown_duration_max": best.get("drawdown_duration_max") if best else None, "best_drawdown_duration_mean": best.get("drawdown_duration_mean") if best else None, "best_transaction_cost": best.get("transaction_cost") if best else None, "best_gross_turnover": best.get("gross_turnover") if best else None, "best_turnover_ratio": best.get("turnover_ratio") if best else None, "best_return_per_turnover": best.get("return_per_turnover") if best else None, "best_cash_weight_mean": best.get("cash_weight_mean") if best else None, "best_cash_weight_median": best.get("cash_weight_median") if best else None, "best_cash_weight_p95": best.get("cash_weight_p95") if best else None, "best_round2_redistributed_notional": best.get("round2_redistributed_notional") if best else None, "best_round2_buy_trade_count": best.get("round2_buy_trade_count") if best else None, "best_round2_rebalance_days": best.get("round2_rebalance_days") if best else None, "best_rebalance_holdings_mean": best.get("rebalance_holdings_mean") if best else None, "best_rebalance_holdings_median": best.get("rebalance_holdings_median") if best else None, "best_rebalance_holdings_min": best.get("rebalance_holdings_min") if best else None, "best_rebalance_holdings_max": best.get("rebalance_holdings_max") if best else None, "best_rebalance_window_count": best.get("rebalance_window_count") if best else None, "best_rebalance_window_days_mean": best.get("rebalance_window_days_mean") if best else None, "best_rebalance_window_days_median": best.get("rebalance_window_days_median") if best else None, "best_rebalance_window_days_max": best.get("rebalance_window_days_max") if best else None, "best_rebalance_window_return_reconstruction_error": best.get("rebalance_window_return_reconstruction_error") if best else None, "best_performance_return_reconstruction_error": best.get("performance_return_reconstruction_error") if best else None, "best_transaction_cost_reconstruction_error": best.get("transaction_cost_reconstruction_error") if best else None, "best_gross_turnover_reconstruction_error": best.get("gross_turnover_reconstruction_error") if best else None, "best_factor_name": best.get("factor_name") if best else None, "best_factor_expr": best.get("factor_expr") if best else None, "best_ir_turn": best.get("turn") if best else None, "best_ir_call_index": best.get("call_index") if best else None, "best_ir_proposal_rank": best.get("proposal_rank") if best else None, "stop_reason": "backtest_only_replay", "turns_executed": len(turn_records), "ir_path": json.dumps(ir_path, ensure_ascii=False), "expr_path": json.dumps(expr_path, ensure_ascii=False), "calls_per_turn": json.dumps(calls_per_turn, ensure_ascii=False), "valid_per_turn": json.dumps(valid_per_turn, ensure_ascii=False), "beat_seed_paper_ir": bool(best and float(best.get("ir", 0.0) or 0.0) > paper_baseline), "ir_improvement_over_paper_seed": (float(best.get("ir", 0.0) or 0.0) - paper_baseline) if best else None, "ic_improvement_over_seed": (float(best_ic.get("ic_mean", 0.0) or 0.0) - float(baseline.get("ic_mean", 0.0) or 0.0)) if best_ic else None, "aer_improvement_over_seed": (float(best_aer.get("aer", 0.0) or 0.0) - float(baseline.get("aer", 0.0) or 0.0)) if best_aer else None, "sharpe_improvement_over_seed": (float(best_sharpe.get("sharpe", 0.0) or 0.0) - float(baseline.get("sharpe", 0.0) or 0.0)) if best_sharpe else None, "winrate_improvement_over_seed": (float(best_winrate.get("winrate", 0.0) or 0.0) - float(baseline.get("winrate", 0.0) or 0.0)) if best_winrate else None, "mdd_improvement_over_seed": (float(best_mdd.get("mdd", 0.0) or 0.0) - float(baseline.get("mdd", 0.0) or 0.0)) if best_mdd else None, "excess_compounded_return_improvement_over_seed": (float(best.get("excess_compounded_return", 0.0) or 0.0) - float(baseline.get("excess_compounded_return", 0.0) or 0.0)) if best else None, "portfolio_nav_mdd_improvement_over_seed": (float(best.get("portfolio_nav_mdd", 0.0) or 0.0) - float(baseline.get("portfolio_nav_mdd", 0.0) or 0.0)) if best else None, "original_seed_ir": baseline.get("original_ir"), "original_best_ir": best.get("original_ir") if best else None, "trade_guard_ir_delta_seed": seed_ir - float(baseline.get("original_ir", 0.0) or 0.0) if baseline.get("original_ir") is not None else None, "trade_guard_ir_delta_best": (float(best.get("ir", 0.0) or 0.0) - float(best.get("original_ir", 0.0) or 0.0)) if best and best.get("original_ir") is not None else None, } ) return summary_rows def _build_summary_yearly_rows( *, seed_name: str, seed_expr: str, seed_metrics: dict[str, Any], seed_trials: list[dict[str, Any]], ) -> list[dict[str, Any]]: seed_yearly = seed_metrics.get("yearly_metrics") or {} year_keys: set[str] = set(seed_yearly.keys()) for trial in seed_trials: year_keys.update((trial.get("yearly_metrics") or {}).keys()) rows: list[dict[str, Any]] = [] for year_key in sorted(year_keys, key=lambda x: int(x)): year = int(year_key) regime = _market_regime(year) seed_year = seed_yearly.get(year_key) or {} year_trials = [ (trial, (trial.get("yearly_metrics") or {}).get(year_key)) for trial in seed_trials if (trial.get("yearly_metrics") or {}).get(year_key) ] valid_year_trials = [(trial, metrics) for trial, metrics in year_trials if trial.get("success")] best_pair = max(valid_year_trials, key=lambda item: float(item[1].get("ir", float("-inf")))) if valid_year_trials else None baseline_ir = float(seed_year.get("ir", 0.0) or 0.0) beat_flags = [ bool(trial.get("success")) and float(metrics.get("ir", float("-inf"))) > baseline_ir for trial, metrics in year_trials ] pass_at_3 = any( float(metrics.get("ir", float("-inf"))) > baseline_ir for trial, metrics in year_trials if int(trial.get("proposal_rank", 10**9) or 10**9) <= 3 ) pass_at_5 = any( float(metrics.get("ir", float("-inf"))) > baseline_ir for trial, metrics in year_trials if int(trial.get("proposal_rank", 10**9) or 10**9) <= 5 ) valid_rate = float(len(valid_year_trials) / len(year_trials)) if year_trials else 0.0 beat_seed_rate = float(sum(beat_flags) / len(year_trials)) if year_trials else 0.0 row = { "seed_name": seed_name, "seed_expr": seed_expr, "year": year, "market_regime": regime, "seed_performance_return": float(seed_year.get("performance_return", 0.0) or 0.0), "seed_benchmark_performance_return": float(seed_year.get("benchmark_performance_return", 0.0) or 0.0), "seed_excess_compounded_return": float(seed_year.get("excess_compounded_return", 0.0) or 0.0), "seed_ir": baseline_ir, "seed_ic": float(seed_year.get("ic_mean", 0.0) or 0.0), "seed_icir": float(seed_year.get("icir", 0.0) or 0.0), "seed_aer": float(seed_year.get("annualized_return", 0.0) or 0.0), "seed_sharpe": float(seed_year.get("sharpe", 0.0) or 0.0), "seed_winrate": float(seed_year.get("winrate", 0.0) or 0.0), "seed_mdd": float(seed_year.get("mdd", 0.0) or 0.0), "seed_excess_mdd": float(seed_year.get("excess_mdd", seed_year.get("mdd", 0.0)) or 0.0), "seed_portfolio_nav_mdd": float(seed_year.get("portfolio_nav_mdd", 0.0) or 0.0), "seed_drawdown_duration_max": int(seed_year.get("drawdown_duration_max", 0) or 0), "seed_drawdown_duration_mean": float(seed_year.get("drawdown_duration_mean", 0.0) or 0.0), "seed_avg_holdings_count": float(seed_year.get("avg_holdings_count", 0.0) or 0.0), "seed_max_holdings_count": int(seed_year.get("max_holdings_count", 0) or 0), "n_calls": len(year_trials), "n_valid": len(valid_year_trials), "n_wins": int(sum(beat_flags)), # Paper definition: VR is Valid Ratio, not beat-seed rate. "vr": valid_rate, "valid_rate": valid_rate, "beat_seed_rate": beat_seed_rate, "pass_at_3": bool(pass_at_3), "pass_at_5": bool(pass_at_5), } if best_pair: trial, metrics = best_pair row.update( { "best_factor_name": trial.get("factor_name"), "best_factor_expr": trial.get("factor_expr"), "best_turn": trial.get("turn"), "best_call_index": trial.get("call_index"), "best_proposal_rank": trial.get("proposal_rank"), "best_performance_return": float(metrics.get("performance_return", 0.0) or 0.0), "best_benchmark_performance_return": float(metrics.get("benchmark_performance_return", 0.0) or 0.0), "best_excess_compounded_return": float(metrics.get("excess_compounded_return", 0.0) or 0.0), "best_ir": float(metrics.get("ir", 0.0) or 0.0), "best_ic": float(metrics.get("ic_mean", 0.0) or 0.0), "best_icir": float(metrics.get("icir", 0.0) or 0.0), "best_aer": float(metrics.get("annualized_return", 0.0) or 0.0), "best_sharpe": float(metrics.get("sharpe", 0.0) or 0.0), "best_winrate": float(metrics.get("winrate", 0.0) or 0.0), "best_mdd": float(metrics.get("mdd", 0.0) or 0.0), "best_excess_mdd": float(metrics.get("excess_mdd", metrics.get("mdd", 0.0)) or 0.0), "best_portfolio_nav_mdd": float(metrics.get("portfolio_nav_mdd", 0.0) or 0.0), "best_drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0), "best_drawdown_duration_mean": float(metrics.get("drawdown_duration_mean", 0.0) or 0.0), "best_avg_holdings_count": float(metrics.get("avg_holdings_count", 0.0) or 0.0), "best_max_holdings_count": int(metrics.get("max_holdings_count", 0) or 0), } ) else: row.update( { "best_factor_name": None, "best_factor_expr": None, "best_turn": None, "best_call_index": None, "best_proposal_rank": None, "best_performance_return": None, "best_benchmark_performance_return": None, "best_excess_compounded_return": None, "best_ir": None, "best_ic": None, "best_icir": None, "best_aer": None, "best_sharpe": None, "best_winrate": None, "best_mdd": None, "best_excess_mdd": None, "best_portfolio_nav_mdd": None, "best_drawdown_duration_max": None, "best_drawdown_duration_mean": None, "best_avg_holdings_count": None, "best_max_holdings_count": None, } ) rows.append(row) return rows def _build_alpha_cash_cost_ranking_frame(summary_rows: list[dict[str, Any]], prefix: str) -> pd.DataFrame: if prefix not in {"seed", "best"}: raise ValueError(f"Unsupported ranking prefix: {prefix}") if not summary_rows: return pd.DataFrame(columns=ALPHA_CASH_COST_RANKING_COLUMNS) summary_df = pd.DataFrame(summary_rows).copy() factor_name_col = "seed_name" if prefix == "seed" else "best_factor_name" factor_expr_col = "seed_expr" if prefix == "seed" else "best_factor_expr" scope_label = "seed_baseline" if prefix == "seed" else "best_ir_candidate" frame = pd.DataFrame( { "candidate_scope": scope_label, "seed_name": summary_df.get("seed_name"), "factor_name": summary_df.get(factor_name_col), "factor_expr": summary_df.get(factor_expr_col), "ir": summary_df.get(f"{prefix}_ir"), "performance_return": summary_df.get(f"{prefix}_performance_return"), "benchmark_performance_return": summary_df.get(f"{prefix}_benchmark_performance_return"), "excess_compounded_return": summary_df.get(f"{prefix}_excess_compounded_return"), "portfolio_nav_mdd": summary_df.get(f"{prefix}_portfolio_nav_mdd"), "turnover_ratio": summary_df.get(f"{prefix}_turnover_ratio"), "transaction_cost": summary_df.get(f"{prefix}_transaction_cost"), "gross_turnover": summary_df.get(f"{prefix}_gross_turnover"), "return_per_turnover": summary_df.get(f"{prefix}_return_per_turnover"), "cash_weight_mean": summary_df.get(f"{prefix}_cash_weight_mean"), "cash_weight_median": summary_df.get(f"{prefix}_cash_weight_median"), "cash_weight_p95": summary_df.get(f"{prefix}_cash_weight_p95"), "round2_redistributed_notional": summary_df.get(f"{prefix}_round2_redistributed_notional"), "round2_buy_trade_count": summary_df.get(f"{prefix}_round2_buy_trade_count"), "round2_rebalance_days": summary_df.get(f"{prefix}_round2_rebalance_days"), } ) if prefix == "best": frame = frame[ frame["factor_expr"].notna() | frame["ir"].notna() | frame["performance_return"].notna() ].copy() if frame.empty: return pd.DataFrame(columns=ALPHA_CASH_COST_RANKING_COLUMNS) numeric_cols = [ "ir", "performance_return", "benchmark_performance_return", "excess_compounded_return", "portfolio_nav_mdd", "turnover_ratio", "transaction_cost", "gross_turnover", "return_per_turnover", "cash_weight_mean", "cash_weight_median", "cash_weight_p95", "round2_redistributed_notional", "round2_buy_trade_count", "round2_rebalance_days", ] for col in numeric_cols: frame[col] = pd.to_numeric(frame[col], errors="coerce") frame["rank_cash_weight_mean_asc"] = frame["cash_weight_mean"].rank(method="min", ascending=True, na_option="bottom") frame["rank_transaction_cost_asc"] = frame["transaction_cost"].rank(method="min", ascending=True, na_option="bottom") frame["rank_return_per_turnover_desc"] = frame["return_per_turnover"].rank(method="min", ascending=False, na_option="bottom") frame["cash_cost_efficiency_rank"] = frame[ ["rank_cash_weight_mean_asc", "rank_transaction_cost_asc", "rank_return_per_turnover_desc"] ].mean(axis=1) frame = frame.sort_values( ["cash_cost_efficiency_rank", "return_per_turnover", "ir", "seed_name"], ascending=[True, False, False, True], na_position="last", ).reset_index(drop=True) return _rows_to_frame(frame.to_dict("records"), ALPHA_CASH_COST_RANKING_COLUMNS) def _build_yearly_outputs(results: list[dict[str, Any]], baselines: dict[str, dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: by_seed: dict[str, list[dict[str, Any]]] = {} for result in results: by_seed.setdefault(str(result.get("seed_name") or result.get("factor_name") or ""), []).append(result) summary_yearly_rows: list[dict[str, Any]] = [] trial_yearly_rows: list[dict[str, Any]] = [] for seed_name in sorted(by_seed.keys()): seed_trials = [row for row in by_seed[seed_name] if row.get("candidate_scope") != "seed_baseline"] baseline = baselines.get(seed_name) if baseline is None: baseline = _empty_baseline(seed_name, str(seed_trials[0].get("seed_expr") or "") if seed_trials else "", "") for row in sorted(seed_trials, key=_sort_key): trial_yearly_rows.extend( _flatten_yearly_metrics( seed_name=seed_name, candidate_scope=str(row.get("candidate_scope") or "trial"), factor_name=str(row.get("factor_name") or ""), factor_expr=str(row.get("factor_expr") or ""), success=bool(row.get("success")), turn=row.get("turn"), call_index=row.get("call_index"), proposal_rank=row.get("proposal_rank"), yearly_metrics=row.get("yearly_metrics") or {}, ) ) summary_yearly_rows.extend( _build_summary_yearly_rows( seed_name=seed_name, seed_expr=str(baseline.get("factor_expr") or baseline.get("seed_expr") or ""), seed_metrics=baseline, seed_trials=seed_trials, ) ) return summary_yearly_rows, trial_yearly_rows def _flatten_stock_contrib( *, seed_name: str, candidate_scope: str, factor_name: str, factor_expr: str, turn: int | None, call_index: int | None, proposal_rank: int | None, stock_contrib: list[dict[str, Any]], stock_contrib_topk: int, ) -> list[dict[str, Any]]: if not stock_contrib: return [] contrib_df = pd.DataFrame(stock_contrib) if contrib_df.empty: return [] contrib_df = contrib_df.sort_values(["year", "abs_contribution_return", "instrument"], ascending=[True, False, True]) contrib_df["rank"] = contrib_df.groupby("year").cumcount() + 1 if stock_contrib_topk > 0: contrib_df = contrib_df[contrib_df["rank"] <= stock_contrib_topk] rows: list[dict[str, Any]] = [] for record in contrib_df.to_dict("records"): record = dict(record) record.update( { "seed_name": seed_name, "candidate_scope": candidate_scope, "factor_name": factor_name, "factor_expr": factor_expr, "turn": turn, "call_index": call_index, "proposal_rank": proposal_rank, } ) rows.append(record) return rows def _flatten_trade_log( *, seed_name: str, candidate_scope: str, factor_name: str, factor_expr: str, turn: int | None, call_index: int | None, proposal_rank: int | None, trade_log: list[dict[str, Any]], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for record in trade_log or []: row = dict(record) row.update( { "seed_name": seed_name, "candidate_scope": candidate_scope, "factor_name": factor_name, "factor_expr": factor_expr, "turn": turn, "call_index": call_index, "proposal_rank": proposal_rank, } ) rows.append(row) return rows def _flatten_holding_log( *, seed_name: str, candidate_scope: str, factor_name: str, factor_expr: str, turn: int | None, call_index: int | None, proposal_rank: int | None, holding_log: list[dict[str, Any]], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for record in holding_log or []: row = dict(record) row.update( { "seed_name": seed_name, "candidate_scope": candidate_scope, "factor_name": factor_name, "factor_expr": factor_expr, "turn": turn, "call_index": call_index, "proposal_rank": proposal_rank, } ) rows.append(row) return rows def _flatten_portfolio_log( *, seed_name: str, candidate_scope: str, factor_name: str, factor_expr: str, turn: int | None, call_index: int | None, proposal_rank: int | None, portfolio_log: list[dict[str, Any]], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for record in portfolio_log or []: row = dict(record) row.update( { "seed_name": seed_name, "candidate_scope": candidate_scope, "factor_name": factor_name, "factor_expr": factor_expr, "turn": turn, "call_index": call_index, "proposal_rank": proposal_rank, } ) rows.append(row) return rows def _flatten_signal_selection_log( *, seed_name: str, candidate_scope: str, factor_name: str, factor_expr: str, turn: int | None, call_index: int | None, proposal_rank: int | None, signal_selection_log: list[dict[str, Any]], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for record in signal_selection_log or []: row = dict(record) row.update( { "seed_name": seed_name, "candidate_scope": candidate_scope, "factor_name": factor_name, "factor_expr": factor_expr, "turn": turn, "call_index": call_index, "proposal_rank": proposal_rank, } ) rows.append(row) return rows def _compute_pass_metrics(summary_rows: list[dict[str, Any]], trial_rows: list[dict[str, Any]]) -> dict[str, float]: if not summary_rows or not trial_rows: return { "vr_global": 0.0, "vr_seed_mean": 0.0, "valid_rate_global": 0.0, "valid_rate_seed_mean": 0.0, "beat_seed_rate_global": 0.0, "beat_seed_rate_seed_mean": 0.0, "pass_at_3": 0.0, "pass_at_5": 0.0, } summary_df = pd.DataFrame(summary_rows) trials_df = pd.DataFrame(trial_rows) if summary_df.empty or trials_df.empty: return { "vr_global": 0.0, "vr_seed_mean": 0.0, "valid_rate_global": 0.0, "valid_rate_seed_mean": 0.0, "beat_seed_rate_global": 0.0, "beat_seed_rate_seed_mean": 0.0, "pass_at_3": 0.0, "pass_at_5": 0.0, } trials_df = trials_df.copy() trials_df["proposal_rank"] = pd.to_numeric(trials_df["proposal_rank"], errors="coerce").fillna(10**9) trials_df["seed_ir"] = pd.to_numeric(trials_df["seed_ir"], errors="coerce").fillna(0.0) trials_df["ir"] = pd.to_numeric(trials_df["ir"], errors="coerce").fillna(float("-inf")) trials_df["success"] = trials_df["success"].astype(bool) trials_df["beat_seed_raw_ir"] = trials_df["success"] & (trials_df["ir"] > trials_df["seed_ir"]) def pass_at_k(k: int) -> float: passed = trials_df[trials_df["proposal_rank"] <= k].groupby("seed_name")["beat_seed_raw_ir"].any() return float(passed.reindex(summary_df["seed_name"], fill_value=False).mean()) n_calls = pd.to_numeric(summary_df["n_calls"], errors="coerce").replace(0, pd.NA) n_valid = pd.to_numeric(summary_df["n_valid"], errors="coerce") valid_rate_global = float(trials_df["success"].mean()) valid_rate_seed_mean = float((n_valid / n_calls).fillna(0.0).mean()) beat_seed_rate_global = float(trials_df["beat_seed_raw_ir"].mean()) beat_seed_rate_seed_mean = float( trials_df.groupby("seed_name")["beat_seed_raw_ir"] .mean() .reindex(summary_df["seed_name"], fill_value=0.0) .mean() ) return { # Paper definition: VR is Valid Ratio, i.e. executable generated alphas. "vr_global": valid_rate_global, "vr_seed_mean": valid_rate_seed_mean, "valid_rate_global": valid_rate_global, "valid_rate_seed_mean": valid_rate_seed_mean, "beat_seed_rate_global": beat_seed_rate_global, "beat_seed_rate_seed_mean": beat_seed_rate_seed_mean, "pass_at_3": pass_at_k(3), "pass_at_5": pass_at_k(5), } def _series_mean(rows: list[dict[str, Any]], key: str) -> float | None: values = pd.to_numeric(pd.Series([row.get(key) for row in rows]), errors="coerce").dropna() return float(values.mean()) if not values.empty else None def _build_aggregate( *, summary_rows: list[dict[str, Any]], trial_rows: list[dict[str, Any]], start_date: str | None, end_date: str | None, elapsed_sec: float, args: argparse.Namespace, ) -> dict[str, Any]: valid_summaries = [row for row in summary_rows if row.get("best_ir") is not None] pass_metrics = _compute_pass_metrics(summary_rows, trial_rows) return { "n_seeds": len(summary_rows), "n_valid_seed_runs": len([row for row in summary_rows if row.get("seed_success")]), "beat_rate_ir": float(sum(1 for row in summary_rows if row.get("beat_seed_paper_ir")) / max(len(summary_rows), 1)), "vr_global": pass_metrics["vr_global"], "vr_seed_mean": pass_metrics["vr_seed_mean"], "valid_rate_global": pass_metrics["valid_rate_global"], "valid_rate_seed_mean": pass_metrics["valid_rate_seed_mean"], "beat_seed_rate_global": pass_metrics["beat_seed_rate_global"], "beat_seed_rate_seed_mean": pass_metrics["beat_seed_rate_seed_mean"], "pass_at_3": pass_metrics["pass_at_3"], "pass_at_5": pass_metrics["pass_at_5"], "mean_seed_performance_return": _series_mean(summary_rows, "seed_performance_return"), "mean_seed_benchmark_performance_return": _series_mean(summary_rows, "seed_benchmark_performance_return"), "mean_seed_excess_compounded_return": _series_mean(summary_rows, "seed_excess_compounded_return"), "mean_seed_ir": _series_mean(summary_rows, "seed_ir"), "mean_seed_ic": _series_mean(summary_rows, "seed_ic"), "mean_seed_aer": _series_mean(summary_rows, "seed_aer"), "mean_seed_sharpe": _series_mean(summary_rows, "seed_sharpe"), "mean_seed_winrate": _series_mean(summary_rows, "seed_winrate"), "mean_seed_mdd": _series_mean(summary_rows, "seed_mdd"), "mean_seed_excess_mdd": _series_mean(summary_rows, "seed_excess_mdd"), "mean_seed_portfolio_nav_mdd": _series_mean(summary_rows, "seed_portfolio_nav_mdd"), "mean_seed_drawdown_duration_max": _series_mean(summary_rows, "seed_drawdown_duration_max"), "mean_seed_drawdown_duration_mean": _series_mean(summary_rows, "seed_drawdown_duration_mean"), "mean_seed_transaction_cost": _series_mean(summary_rows, "seed_transaction_cost"), "mean_seed_gross_turnover": _series_mean(summary_rows, "seed_gross_turnover"), "mean_seed_turnover_ratio": _series_mean(summary_rows, "seed_turnover_ratio"), "mean_seed_return_per_turnover": _series_mean(summary_rows, "seed_return_per_turnover"), "mean_seed_cash_weight_mean": _series_mean(summary_rows, "seed_cash_weight_mean"), "mean_seed_cash_weight_median": _series_mean(summary_rows, "seed_cash_weight_median"), "mean_seed_cash_weight_p95": _series_mean(summary_rows, "seed_cash_weight_p95"), "mean_seed_round2_redistributed_notional": _series_mean(summary_rows, "seed_round2_redistributed_notional"), "mean_seed_round2_buy_trade_count": _series_mean(summary_rows, "seed_round2_buy_trade_count"), "mean_seed_round2_rebalance_days": _series_mean(summary_rows, "seed_round2_rebalance_days"), "mean_seed_rebalance_holdings_mean": _series_mean(summary_rows, "seed_rebalance_holdings_mean"), "mean_seed_rebalance_holdings_median": _series_mean(summary_rows, "seed_rebalance_holdings_median"), "mean_seed_rebalance_window_count": _series_mean(summary_rows, "seed_rebalance_window_count"), "mean_seed_rebalance_window_days_mean": _series_mean(summary_rows, "seed_rebalance_window_days_mean"), "mean_seed_rebalance_window_days_median": _series_mean(summary_rows, "seed_rebalance_window_days_median"), "mean_seed_rebalance_window_days_max": _series_mean(summary_rows, "seed_rebalance_window_days_max"), "mean_seed_rebalance_window_return_reconstruction_error": _series_mean(summary_rows, "seed_rebalance_window_return_reconstruction_error"), "mean_seed_perf_return_reconstruction_error": _series_mean(summary_rows, "seed_performance_return_reconstruction_error"), "mean_seed_transaction_cost_reconstruction_error": _series_mean(summary_rows, "seed_transaction_cost_reconstruction_error"), "mean_seed_gross_turnover_reconstruction_error": _series_mean(summary_rows, "seed_gross_turnover_reconstruction_error"), "mean_best_performance_return": _series_mean(valid_summaries, "best_performance_return"), "mean_best_benchmark_performance_return": _series_mean(valid_summaries, "best_benchmark_performance_return"), "mean_best_excess_compounded_return": _series_mean(valid_summaries, "best_excess_compounded_return"), "mean_best_ir": _series_mean(valid_summaries, "best_ir"), "mean_best_ic": _series_mean(valid_summaries, "best_ic"), "mean_best_aer": _series_mean(valid_summaries, "best_aer"), "mean_best_sharpe": _series_mean(valid_summaries, "best_sharpe"), "mean_best_winrate": _series_mean(valid_summaries, "best_winrate"), "mean_best_mdd": _series_mean(valid_summaries, "best_mdd"), "mean_best_excess_mdd": _series_mean(valid_summaries, "best_excess_mdd"), "mean_best_portfolio_nav_mdd": _series_mean(valid_summaries, "best_portfolio_nav_mdd"), "mean_best_drawdown_duration_max": _series_mean(valid_summaries, "best_drawdown_duration_max"), "mean_best_drawdown_duration_mean": _series_mean(valid_summaries, "best_drawdown_duration_mean"), "mean_best_transaction_cost": _series_mean(valid_summaries, "best_transaction_cost"), "mean_best_gross_turnover": _series_mean(valid_summaries, "best_gross_turnover"), "mean_best_turnover_ratio": _series_mean(valid_summaries, "best_turnover_ratio"), "mean_best_return_per_turnover": _series_mean(valid_summaries, "best_return_per_turnover"), "mean_best_cash_weight_mean": _series_mean(valid_summaries, "best_cash_weight_mean"), "mean_best_cash_weight_median": _series_mean(valid_summaries, "best_cash_weight_median"), "mean_best_cash_weight_p95": _series_mean(valid_summaries, "best_cash_weight_p95"), "mean_best_round2_redistributed_notional": _series_mean(valid_summaries, "best_round2_redistributed_notional"), "mean_best_round2_buy_trade_count": _series_mean(valid_summaries, "best_round2_buy_trade_count"), "mean_best_round2_rebalance_days": _series_mean(valid_summaries, "best_round2_rebalance_days"), "mean_best_rebalance_holdings_mean": _series_mean(valid_summaries, "best_rebalance_holdings_mean"), "mean_best_rebalance_holdings_median": _series_mean(valid_summaries, "best_rebalance_holdings_median"), "mean_best_rebalance_window_count": _series_mean(valid_summaries, "best_rebalance_window_count"), "mean_best_rebalance_window_days_mean": _series_mean(valid_summaries, "best_rebalance_window_days_mean"), "mean_best_rebalance_window_days_median": _series_mean(valid_summaries, "best_rebalance_window_days_median"), "mean_best_rebalance_window_days_max": _series_mean(valid_summaries, "best_rebalance_window_days_max"), "mean_best_rebalance_window_return_reconstruction_error": _series_mean(valid_summaries, "best_rebalance_window_return_reconstruction_error"), "mean_best_perf_return_reconstruction_error": _series_mean(valid_summaries, "best_performance_return_reconstruction_error"), "mean_best_transaction_cost_reconstruction_error": _series_mean(valid_summaries, "best_transaction_cost_reconstruction_error"), "mean_best_gross_turnover_reconstruction_error": _series_mean(valid_summaries, "best_gross_turnover_reconstruction_error"), "mean_ir_improvement": _series_mean(valid_summaries, "ir_improvement_over_paper_seed"), "mean_ic_improvement": _series_mean(valid_summaries, "ic_improvement_over_seed"), "mean_aer_improvement": _series_mean(valid_summaries, "aer_improvement_over_seed"), "mean_sharpe_improvement": _series_mean(valid_summaries, "sharpe_improvement_over_seed"), "mean_winrate_improvement": _series_mean(valid_summaries, "winrate_improvement_over_seed"), "mean_mdd_improvement": _series_mean(valid_summaries, "mdd_improvement_over_seed"), "mean_excess_compounded_return_improvement": _series_mean(valid_summaries, "excess_compounded_return_improvement_over_seed"), "mean_portfolio_nav_mdd_improvement": _series_mean(valid_summaries, "portfolio_nav_mdd_improvement_over_seed"), "mean_calls": _series_mean(summary_rows, "n_calls"), "mean_valid_calls": _series_mean(summary_rows, "n_valid"), "mean_trade_guard_ir_delta_seed": _series_mean(summary_rows, "trade_guard_ir_delta_seed"), "mean_trade_guard_ir_delta_best": _series_mean(summary_rows, "trade_guard_ir_delta_best"), "backtest_start_date": start_date, "backtest_end_date": end_date, "period": args.period, "backtest_engine": args.backtest_engine, "backtest_workers": int(max(args.backtest_workers, 1)), "top_k": int(getattr(args, "top_k", 10)), "n_drop": int(getattr(args, "n_drop", 2)), "position_size": float(getattr(args, "position_size", 1.0)), "max_pos_each_stock": float(getattr(args, "max_pos_each_stock", 1.0)), "lot_size": int(getattr(args, "lot_size", 100)), "max_daily_volume_participation": float(getattr(args, "max_daily_volume_participation", 0.0)), "max_daily_amount_participation": float(getattr(args, "max_daily_amount_participation", 0.0)), "rebalance_freq": int(getattr(args, "rebalance_freq", 5)), "cost_buy": float(getattr(args, "buy_fee", 0.0013)), "cost_sell": float(getattr(args, "sell_fee", 0.0013)), "rebalance_mode": getattr(args, "rebalance_mode", "dropout"), "custom_weight_mode": getattr(args, "custom_weight_mode", "equal"), "redistribute_unfilled_cash": bool(getattr(args, "redistribute_unfilled_cash", False)), "enforce_cash_limit": bool(getattr(args, "enforce_cash_limit", False)), "score_transform": getattr(args, "score_transform", "identity"), "score_clip": float(getattr(args, "score_clip", 3.0)), "universe_filter": getattr(args, "universe_filter", "none"), "universe_top_n": int(getattr(args, "universe_top_n", 0)), "universe_lookback_days": int(getattr(args, "universe_lookback_days", 20)), "trade_guard_config": _parse_trade_guard_config(getattr(args, "trade_guard_config", "")), "data_path": args.data_path or os.environ.get("ALPHAEVO_DATA_PATH") or os.environ.get("AAE_DATA_PATH") or os.environ.get("DAILY_PV_PATH") or "repo default", "jsonl": str(args.jsonl), "elapsed_sec": round(float(elapsed_sec), 1), } def _build_aggregate_yearly(summary_yearly_rows: list[dict[str, Any]]) -> pd.DataFrame: if not summary_yearly_rows: return pd.DataFrame() df = pd.DataFrame(summary_yearly_rows) out_rows: list[dict[str, Any]] = [] for year, grp in df.groupby("year"): n_calls = pd.to_numeric(grp["n_calls"], errors="coerce").fillna(0) n_valid = pd.to_numeric(grp["n_valid"], errors="coerce").fillna(0) n_wins = pd.to_numeric(grp.get("n_wins", pd.Series(0, index=grp.index)), errors="coerce").fillna(0) valid_rate_seed = pd.to_numeric( grp.get("valid_rate", grp.get("vr", pd.Series(0, index=grp.index))), errors="coerce", ).fillna(0) beat_seed_rate_seed = pd.to_numeric( grp.get("beat_seed_rate", pd.Series(0, index=grp.index)), errors="coerce", ).fillna(0) out_rows.append( { "year": int(year), "market_regime": grp["market_regime"].iloc[0], "n_seeds": int(len(grp)), "n_calls": int(n_calls.sum()), "n_valid": int(n_valid.sum()), "n_wins": int(n_wins.sum()), "vr_seed_mean": float(valid_rate_seed.mean()), "vr_global": float(n_valid.sum() / max(n_calls.sum(), 1)), "valid_rate_seed_mean": float(valid_rate_seed.mean()), "valid_rate_global": float(n_valid.sum() / max(n_calls.sum(), 1)), "beat_seed_rate_seed_mean": float(beat_seed_rate_seed.mean()), "beat_seed_rate_global": float(n_wins.sum() / max(n_calls.sum(), 1)), "pass_at_3": float(grp["pass_at_3"].astype(bool).mean()), "pass_at_5": float(grp["pass_at_5"].astype(bool).mean()), "mean_seed_performance_return": float(pd.to_numeric(grp["seed_performance_return"], errors="coerce").mean()), "mean_seed_benchmark_performance_return": float(pd.to_numeric(grp["seed_benchmark_performance_return"], errors="coerce").mean()), "mean_seed_excess_compounded_return": float(pd.to_numeric(grp["seed_excess_compounded_return"], errors="coerce").mean()), "mean_seed_ir": float(pd.to_numeric(grp["seed_ir"], errors="coerce").mean()), "mean_seed_ic": float(pd.to_numeric(grp["seed_ic"], errors="coerce").mean()), "mean_seed_aer": float(pd.to_numeric(grp["seed_aer"], errors="coerce").mean()), "mean_seed_sharpe": float(pd.to_numeric(grp["seed_sharpe"], errors="coerce").mean()), "mean_seed_winrate": float(pd.to_numeric(grp["seed_winrate"], errors="coerce").mean()), "mean_seed_mdd": float(pd.to_numeric(grp["seed_mdd"], errors="coerce").mean()), "mean_seed_excess_mdd": float(pd.to_numeric(grp["seed_excess_mdd"], errors="coerce").mean()), "mean_seed_portfolio_nav_mdd": float(pd.to_numeric(grp["seed_portfolio_nav_mdd"], errors="coerce").mean()), "mean_seed_drawdown_duration_max": float(pd.to_numeric(grp["seed_drawdown_duration_max"], errors="coerce").mean()), "mean_seed_drawdown_duration_mean": float(pd.to_numeric(grp["seed_drawdown_duration_mean"], errors="coerce").mean()), "mean_best_performance_return": float(pd.to_numeric(grp["best_performance_return"], errors="coerce").mean()), "mean_best_benchmark_performance_return": float(pd.to_numeric(grp["best_benchmark_performance_return"], errors="coerce").mean()), "mean_best_excess_compounded_return": float(pd.to_numeric(grp["best_excess_compounded_return"], errors="coerce").mean()), "mean_best_ir": float(pd.to_numeric(grp["best_ir"], errors="coerce").mean()), "mean_best_ic": float(pd.to_numeric(grp["best_ic"], errors="coerce").mean()), "mean_best_aer": float(pd.to_numeric(grp["best_aer"], errors="coerce").mean()), "mean_best_sharpe": float(pd.to_numeric(grp["best_sharpe"], errors="coerce").mean()), "mean_best_winrate": float(pd.to_numeric(grp["best_winrate"], errors="coerce").mean()), "mean_best_mdd": float(pd.to_numeric(grp["best_mdd"], errors="coerce").mean()), "mean_best_excess_mdd": float(pd.to_numeric(grp["best_excess_mdd"], errors="coerce").mean()), "mean_best_portfolio_nav_mdd": float(pd.to_numeric(grp["best_portfolio_nav_mdd"], errors="coerce").mean()), "mean_best_drawdown_duration_max": float(pd.to_numeric(grp["best_drawdown_duration_max"], errors="coerce").mean()), "mean_best_drawdown_duration_mean": float(pd.to_numeric(grp["best_drawdown_duration_mean"], errors="coerce").mean()), } ) return pd.DataFrame(out_rows).sort_values("year").reset_index(drop=True) def _candidate_for_detail(result: dict[str, Any], candidate_scope: str) -> dict[str, Any]: candidate = { "input_index": result.get("input_index"), "source": result.get("source"), "candidate_scope": candidate_scope, "seed_name": result.get("seed_name"), "seed_expr": result.get("seed_expr"), "factor_name": result.get("factor_name"), "factor_expr": result.get("factor_expr"), "turn": result.get("turn"), "call_index": result.get("call_index"), "proposal_rank": result.get("proposal_rank"), } return candidate def _result_match_key(row: dict[str, Any]) -> tuple[str, str, int | None, int | None, int | None]: return ( str(row.get("seed_name") or row.get("factor_name") or ""), str(row.get("factor_expr") or ""), _optional_int(row.get("turn")), _optional_int(row.get("call_index")), _optional_int(row.get("proposal_rank")), ) def _merge_detail_results( results: list[dict[str, Any]], detail_results: list[dict[str, Any]] | None, ) -> list[dict[str, Any]]: if not detail_results: return list(results) detail_lookup = {_result_match_key(row): row for row in detail_results} merged_rows: list[dict[str, Any]] = [] protected_keys = {"candidate_scope", "source", "input_index"} for row in results: merged = dict(row) detail = detail_lookup.get(_result_match_key(row)) if detail is not None: for key, value in detail.items(): if key in protected_keys: continue merged[key] = value merged_rows.append(merged) return merged_rows def _build_detail_rows( *, detail_results: list[dict[str, Any]], stock_contrib_topk: int, ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: stock_contrib_rows: list[dict[str, Any]] = [] trade_log_rows: list[dict[str, Any]] = [] holding_log_rows: list[dict[str, Any]] = [] portfolio_log_rows: list[dict[str, Any]] = [] rebalance_log_rows: list[dict[str, Any]] = [] signal_selection_rows: list[dict[str, Any]] = [] for result in detail_results: seed_name = str(result.get("seed_name") or "") candidate_scope = str(result.get("candidate_scope") or "") factor_name = str(result.get("factor_name") or "") factor_expr = str(result.get("factor_expr") or "") stock_contrib_rows.extend( _flatten_stock_contrib( seed_name=seed_name, candidate_scope=candidate_scope, factor_name=factor_name, factor_expr=factor_expr, turn=result.get("turn"), call_index=result.get("call_index"), proposal_rank=result.get("proposal_rank"), stock_contrib=result.get("stock_contrib") or [], stock_contrib_topk=stock_contrib_topk, ) ) trade_log_rows.extend( _flatten_trade_log( seed_name=seed_name, candidate_scope=candidate_scope, factor_name=factor_name, factor_expr=factor_expr, turn=result.get("turn"), call_index=result.get("call_index"), proposal_rank=result.get("proposal_rank"), trade_log=result.get("trade_log") or [], ) ) holding_log_rows.extend( _flatten_holding_log( seed_name=seed_name, candidate_scope=candidate_scope, factor_name=factor_name, factor_expr=factor_expr, turn=result.get("turn"), call_index=result.get("call_index"), proposal_rank=result.get("proposal_rank"), holding_log=result.get("holding_log") or [], ) ) portfolio_rows = _flatten_portfolio_log( seed_name=seed_name, candidate_scope=candidate_scope, factor_name=factor_name, factor_expr=factor_expr, turn=result.get("turn"), call_index=result.get("call_index"), proposal_rank=result.get("proposal_rank"), portfolio_log=result.get("portfolio_log") or [], ) portfolio_log_rows.extend(portfolio_rows) rebalance_log_rows.extend([row for row in portfolio_rows if bool(row.get("is_rebalance"))]) signal_selection_rows.extend( _flatten_signal_selection_log( seed_name=seed_name, candidate_scope=candidate_scope, factor_name=factor_name, factor_expr=factor_expr, turn=result.get("turn"), call_index=result.get("call_index"), proposal_rank=result.get("proposal_rank"), signal_selection_log=result.get("signal_selection_log") or [], ) ) return stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows def _build_outputs_from_results( *, results: list[dict[str, Any]], args: argparse.Namespace, start_date: str | None, end_date: str | None, elapsed_sec: float, detail_results: list[dict[str, Any]] | None = None, ) -> tuple[ list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], pd.DataFrame, dict[str, Any], ]: effective_results = _merge_detail_results(results, detail_results) baselines = { str(row.get("seed_name") or row.get("factor_name") or ""): row for row in effective_results if row.get("candidate_scope") == "seed_baseline" } summary_rows = _build_summary_rows(effective_results, baselines, args.backtest_engine) baseline_rows = _build_baseline_rows(effective_results) trial_rows = _build_trial_rows(effective_results, baselines) summary_yearly_rows, trial_yearly_rows = _build_yearly_outputs(effective_results, baselines) if detail_results: stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows = _build_detail_rows( detail_results=detail_results, stock_contrib_topk=args.stock_contrib_topk, ) else: stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows = [], [], [], [], [], [] aggregate = _build_aggregate( summary_rows=summary_rows, trial_rows=trial_rows, start_date=start_date, end_date=end_date, elapsed_sec=elapsed_sec, args=args, ) aggregate_yearly = _build_aggregate_yearly(summary_yearly_rows) return ( summary_rows, trial_rows, summary_yearly_rows, trial_yearly_rows, stock_contrib_rows, trade_log_rows, holding_log_rows, portfolio_log_rows, rebalance_log_rows, signal_selection_rows, baseline_rows, aggregate_yearly, aggregate, ) def _save_outputs( output_dir: Path, summary_rows: list[dict[str, Any]], trial_rows: list[dict[str, Any]], summary_yearly_rows: list[dict[str, Any]], trial_yearly_rows: list[dict[str, Any]], stock_contrib_rows: list[dict[str, Any]], trade_log_rows: list[dict[str, Any]], holding_log_rows: list[dict[str, Any]], portfolio_log_rows: list[dict[str, Any]], rebalance_log_rows: list[dict[str, Any]], signal_selection_rows: list[dict[str, Any]], baseline_rows: list[dict[str, Any]], aggregate_yearly: pd.DataFrame, aggregate: dict[str, Any], run_metadata: dict[str, Any] | None = None, data_quality_report: dict[str, Any] | None = None, ) -> None: output_dir.mkdir(parents=True, exist_ok=True) def _with_run_metadata(df: pd.DataFrame) -> pd.DataFrame: if not run_metadata: return df out = df.copy() ordered_keys = [ "mode", "period", "backtest_engine", "top_k", "n_drop", "rebalance_mode", "custom_weight_mode", "rebalance_freq", "position_size", "max_pos_each_stock", "lot_size", "max_daily_volume_participation", "max_daily_amount_participation", "buy_fee", "sell_fee", "enforce_cash_limit", "score_transform", "score_clip", "universe_filter", "universe_top_n", "universe_lookback_days", "start_date", "end_date", ] for key in reversed(ordered_keys): if key in run_metadata: if key in out.columns: continue out.insert(0, key, run_metadata.get(key)) return out rebalance_plan_df = _build_rebalance_plan_frame( trade_log_rows=trade_log_rows, holding_log_rows=holding_log_rows, portfolio_log_rows=portfolio_log_rows, ) rebalance_window_df = _build_rebalance_window_frame( _rows_to_frame(portfolio_log_rows, PORTFOLIO_DAILY_COLUMNS) ) seed_ranking_df = _build_alpha_cash_cost_ranking_frame(summary_rows, prefix="seed") best_ranking_df = _build_alpha_cash_cost_ranking_frame(summary_rows, prefix="best") _with_run_metadata(pd.DataFrame(summary_rows)).to_csv(output_dir / "summary.csv", index=False) _with_run_metadata(pd.DataFrame(baseline_rows)).to_csv(output_dir / "baselines.csv", index=False) _with_run_metadata(pd.DataFrame(trial_rows)).to_csv(output_dir / "trials.csv", index=False) _with_run_metadata(pd.DataFrame(summary_yearly_rows)).to_csv(output_dir / "summary_yearly.csv", index=False) _with_run_metadata(pd.DataFrame(trial_yearly_rows)).to_csv(output_dir / "trials_yearly.csv", index=False) _with_run_metadata(aggregate_yearly).to_csv(output_dir / "aggregate_yearly.csv", index=False) _rows_to_frame(stock_contrib_rows, STOCK_CONTRIB_COLUMNS).to_csv(output_dir / "stock_contrib.csv", index=False) _rows_to_frame(trade_log_rows, TRADE_LOG_COLUMNS).to_csv(output_dir / "trade_log.csv", index=False) _rows_to_frame(holding_log_rows, HOLDING_LOG_COLUMNS).to_csv(output_dir / "holdings_daily.csv", index=False) _rows_to_frame(portfolio_log_rows, PORTFOLIO_DAILY_COLUMNS).to_csv(output_dir / "portfolio_daily.csv", index=False) _rows_to_frame(rebalance_log_rows, REBALANCE_LOG_COLUMNS).to_csv(output_dir / "rebalance_log.csv", index=False) _rows_to_frame(signal_selection_rows, SIGNAL_SELECTION_COLUMNS).to_csv(output_dir / "signal_selection_daily.csv", index=False) rebalance_plan_df.to_csv(output_dir / "rebalance_plan.csv", index=False) rebalance_window_df.to_csv(output_dir / "rebalance_window_returns.csv", index=False) seed_ranking_df.to_csv(output_dir / "alpha_ranking_seed_cash_cost.csv", index=False) best_ranking_df.to_csv(output_dir / "alpha_ranking_best_cash_cost.csv", index=False) (output_dir / "aggregate.json").write_text(json.dumps(aggregate, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") if data_quality_report is not None: (output_dir / "data_quality_report.json").write_text( json.dumps(data_quality_report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) def _build_robust_manifest(args: argparse.Namespace, jsonl_path: Path, data_path: str | None) -> dict[str, Any]: return { "mode": "alpha_robustness", "jsonl": str(jsonl_path), "period": args.period, "start_date": args.start_date.strip() or None, "end_date": args.end_date.strip() or None, "backtest_engine": args.backtest_engine, "rebalance_mode": args.rebalance_mode, "rebalance_freq": int(args.rebalance_freq), "top_k": int(args.top_k), "n_drop": int(args.n_drop), "position_size": float(args.position_size), "max_pos_each_stock": float(args.max_pos_each_stock), "lot_size": int(args.lot_size), "max_daily_volume_participation": float(args.max_daily_volume_participation), "max_daily_amount_participation": float(args.max_daily_amount_participation), "custom_weight_mode": args.custom_weight_mode, "redistribute_unfilled_cash": bool(args.redistribute_unfilled_cash), "enforce_cash_limit": bool(args.enforce_cash_limit), "buy_fee": float(args.buy_fee), "sell_fee": float(args.sell_fee), "score_transform": args.score_transform, "score_clip": float(args.score_clip), "universe_filter": args.universe_filter, "universe_top_n": int(args.universe_top_n), "universe_lookback_days": int(args.universe_lookback_days), "sample_size": int(args.sample_size), "sample_seed": int(args.sample_seed), "data_path": data_path or "repo default", } def main() -> None: parser = argparse.ArgumentParser(description="Run standalone alpha robustness backtests on an isolated robust path") parser.add_argument("--jsonl", required=True, help="Path to seed/candidate JSONL file with {name, expr}") parser.add_argument("--period", choices=("train", "val", "test"), default="test") parser.add_argument( "--backtest-engine", choices=("custom", "qlib_original", "spec_shares_cash", "spec_return_based"), default="custom", ) parser.add_argument( "--rebalance-mode", choices=("dropout", "sell_all", "target_weight"), default="dropout", help="Qlib rebalance behavior: legacy dropout, sell-all-then-buy, or alpha-score target-weight sync (ignored by spec engines)", ) parser.add_argument("--top-k", type=int, default=10, help="Target holding count for custom/qlib engines") parser.add_argument("--n-drop", type=int, default=2, help="Legacy dropout cap for qlib_original; ignored by spec engines") parser.add_argument("--position-size", type=float, default=1.0, help="Fraction of equity to allocate to the portfolio") parser.add_argument("--max-pos-each-stock", type=float, default=1.0, help="Per-name weight cap; use 1.0 to effectively disable the old 20%% cap") parser.add_argument("--lot-size", type=int, default=100, help="Trading lot size used for share rounding") parser.add_argument("--max-daily-volume-participation", type=float, default=0.0, help="Buy-side market volume participation cap; 0 disables it") parser.add_argument("--max-daily-amount-participation", type=float, default=0.0, help="Buy-side market amount participation cap; 0 disables it") parser.add_argument("--rebalance-freq", type=int, default=5, help="Rebalance interval for the robust path") parser.add_argument("--buy-fee", type=float, default=0.0013, help="Buy fee for the robust path") parser.add_argument("--sell-fee", type=float, default=0.0013, help="Sell fee for the robust path") parser.add_argument( "--custom-weight-mode", default="equal", help="Custom engine weight mode: equal or alpha_score (aliases also accepted by core backtester)", ) parser.add_argument( "--redistribute-unfilled-cash", action="store_true", help="Custom engine only: carry unfilled buy budget down the remaining ranks within top_k", ) parser.add_argument( "--enforce-cash-limit", action="store_true", help="Custom engine only: clip buy orders by available cash instead of allowing negative cash", ) parser.add_argument("--backtest-workers", type=int, default=1, help="Parallel worker count across seeds") parser.add_argument("--data-path", default="", help="Optional path to daily_pv.h5") parser.add_argument( "--score-transform", default="identity", help="Optional score transform for robustness testing: identity, rank, zscore, rank_zscore, signed, clip_zscore", ) parser.add_argument("--score-clip", type=float, default=3.0, help="Clip threshold used by clip_zscore") parser.add_argument( "--universe-filter", default="none", help="Optional liquidity universe filter: none, top_amount, top_volume", ) parser.add_argument("--universe-top-n", type=int, default=0, help="Keep top-N names for liquidity-derived universes; 0 disables") parser.add_argument("--universe-lookback-days", type=int, default=20, help="Rolling lookback for liquidity-derived universes") parser.add_argument( "--trade-guard-config", default="", help="Optional qlib_original trade guard config: none/null disables, 'vn' enables defaults, or pass a JSON object (ignored by spec engines)", ) parser.add_argument("--start-date", default="", help="Optional explicit backtest start date (YYYY-MM-DD)") parser.add_argument("--end-date", default="", help="Optional explicit backtest end date (YYYY-MM-DD)") parser.add_argument("--label-forward-days", type=int, default=5) parser.add_argument("--sample-size", type=int, default=0, help="0 means all seeds") parser.add_argument("--sample-seed", type=int, default=42) parser.add_argument("--output-dir", default="", help="Defaults to /kaggle/working/aae_v2/jsonl_backtest_ on Kaggle") parser.add_argument("--manifest-name", default="robust_manifest.json", help="Metadata snapshot written into the output dir") parser.add_argument("--save-every", type=int, default=10) parser.add_argument("--capture-detail-artifacts", action="store_true") parser.add_argument("--stock-contrib-topk", type=int, default=10) args = parser.parse_args() jsonl_path = Path(args.jsonl).expanduser().resolve() data_path = args.data_path.strip() or None trade_guard_config = _parse_trade_guard_config(args.trade_guard_config) start_date = args.start_date.strip() or None end_date = args.end_date.strip() or None output_dir = _build_output_dir(args.output_dir.strip() or None, jsonl_path) _ensure_backtest_imports() rows = _read_jsonl(jsonl_path) sampled_rows = _sample_rows(rows, sample_size=args.sample_size, sample_seed=args.sample_seed) candidates = [_candidate_from_jsonl_row(row, idx) for idx, row in enumerate(sampled_rows, start=1)] configure_periods(PERIOD_CONFIGS) loaded_df = load_data(data_path) resolved_data_path = data_path or os.environ.get("ALPHAEVO_DATA_PATH") or os.environ.get("AAE_DATA_PATH") or os.environ.get("DAILY_PV_PATH") or "repo default" data_quality_report = _build_data_quality_report(loaded_df, resolved_data_path) n_baselines = sum(1 for candidate in candidates if candidate.get("candidate_scope") == "seed_baseline") n_trials = len(candidates) - n_baselines print(f"Loaded {len(rows)} rows from {jsonl_path}", flush=True) if args.sample_size and args.sample_size > 0: print(f"Sampled {len(sampled_rows)} rows with sample_seed={args.sample_seed}", flush=True) print(f"Candidates: total={len(candidates)} baselines={n_baselines} trials={n_trials}", flush=True) print(f"Period: {args.period}", flush=True) print(f"Backtest engine: {args.backtest_engine}", flush=True) print(f"Top K: {args.top_k}", flush=True) print(f"N drop: {args.n_drop}", flush=True) print(f"Rebalance mode: {args.rebalance_mode}", flush=True) print(f"Custom weight mode: {args.custom_weight_mode}", flush=True) print(f"Redistribute unfilled cash: {bool(args.redistribute_unfilled_cash)}", flush=True) print(f"Position size: {float(args.position_size):.4f}", flush=True) print(f"Max pos each stock: {float(args.max_pos_each_stock):.4f}", flush=True) print(f"Lot size: {int(args.lot_size)}", flush=True) print(f"Max daily volume participation: {float(args.max_daily_volume_participation):.6f}", flush=True) print(f"Max daily amount participation: {float(args.max_daily_amount_participation):.6f}", flush=True) print(f"Rebalance frequency: {int(args.rebalance_freq)}", flush=True) print(f"Buy fee: {float(args.buy_fee):.6f}", flush=True) print(f"Sell fee: {float(args.sell_fee):.6f}", flush=True) print(f"Enforce cash limit: {bool(args.enforce_cash_limit)}", flush=True) print(f"Score transform: {args.score_transform}", flush=True) print(f"Score clip: {float(args.score_clip):.4f}", flush=True) print(f"Universe filter: {args.universe_filter}", flush=True) print(f"Universe top N: {int(args.universe_top_n)}", flush=True) print(f"Universe lookback days: {int(args.universe_lookback_days)}", flush=True) print(f"Backtest workers: {max(args.backtest_workers, 1)}", flush=True) print(f"Trade guard config: {json.dumps(trade_guard_config, ensure_ascii=False) if trade_guard_config is not None else 'None'}", flush=True) print(f"Output dir: {output_dir}", flush=True) print( "Data path: " f"{resolved_data_path}", flush=True, ) if start_date or end_date: print(f"Explicit backtest range override: {start_date or 'AUTO'} -> {end_date or 'AUTO'}", flush=True) manifest = _build_robust_manifest(args, jsonl_path, resolved_data_path) output_dir.mkdir(parents=True, exist_ok=True) (output_dir / str(args.manifest_name)).write_text( json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) results: list[dict[str, Any]] = [] t0 = time.time() workers = max(int(args.backtest_workers), 1) if workers > 1: with ProcessPoolExecutor(max_workers=workers, mp_context=mp.get_context("spawn")) as executor: futures = [ executor.submit( _evaluate_candidate_task, candidate=candidate, period=args.period, label_forward_days=args.label_forward_days, data_path=data_path, backtest_engine=args.backtest_engine, top_k=args.top_k, n_drop=args.n_drop, trade_guard_config=trade_guard_config, rebalance_mode=args.rebalance_mode, custom_weight_mode=args.custom_weight_mode, redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash), position_size=args.position_size, max_pos_each_stock=args.max_pos_each_stock, lot_size=args.lot_size, max_daily_volume_participation=args.max_daily_volume_participation, max_daily_amount_participation=args.max_daily_amount_participation, enforce_cash_limit=bool(args.enforce_cash_limit), rebalance_freq=args.rebalance_freq, cost_buy=args.buy_fee, cost_sell=args.sell_fee, score_transform=args.score_transform, score_clip=args.score_clip, universe_filter=args.universe_filter, universe_top_n=args.universe_top_n, universe_lookback_days=args.universe_lookback_days, start_date=start_date, end_date=end_date, capture_details=False, ) for candidate in candidates ] for idx, future in enumerate(futures, start=1): result = future.result() results.append(result) print( f"[{idx}/{len(candidates)}] seed={result.get('seed_name')} " f"scope={result.get('candidate_scope')} factor={result.get('factor_name')} " f"success={bool(result.get('success', False))} " f"ir={float(result.get('ir', 0.0) or 0.0):.4f} " f"ic={float(result.get('ic_mean', 0.0) or 0.0):.4f} " f"icir={float(result.get('icir', 0.0) or 0.0):.4f} " f"rank_icir={float(result.get('rank_icir', 0.0) or 0.0):.4f} " f"aer={float(result.get('annualized_return', 0.0) or 0.0):.4f} " f"mdd={float(result.get('mdd', 0.0) or 0.0):.4f}", flush=True, ) if idx % max(args.save_every, 1) == 0: outputs = _build_outputs_from_results( results=results, args=args, start_date=start_date, end_date=end_date, elapsed_sec=time.time() - t0, ) _save_outputs(output_dir, *outputs, run_metadata=manifest, data_quality_report=data_quality_report) else: for idx, candidate in enumerate(candidates, start=1): result = _evaluate_candidate_task( candidate=candidate, period=args.period, label_forward_days=args.label_forward_days, data_path=data_path, backtest_engine=args.backtest_engine, top_k=args.top_k, n_drop=args.n_drop, trade_guard_config=trade_guard_config, rebalance_mode=args.rebalance_mode, custom_weight_mode=args.custom_weight_mode, redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash), position_size=args.position_size, max_pos_each_stock=args.max_pos_each_stock, lot_size=args.lot_size, max_daily_volume_participation=args.max_daily_volume_participation, max_daily_amount_participation=args.max_daily_amount_participation, enforce_cash_limit=bool(args.enforce_cash_limit), rebalance_freq=args.rebalance_freq, cost_buy=args.buy_fee, cost_sell=args.sell_fee, score_transform=args.score_transform, score_clip=args.score_clip, universe_filter=args.universe_filter, universe_top_n=args.universe_top_n, universe_lookback_days=args.universe_lookback_days, start_date=start_date, end_date=end_date, capture_details=False, ) results.append(result) print( f"[{idx}/{len(candidates)}] seed={result.get('seed_name')} " f"scope={result.get('candidate_scope')} factor={result.get('factor_name')} " f"success={bool(result.get('success', False))} " f"ir={float(result.get('ir', 0.0) or 0.0):.4f} " f"ic={float(result.get('ic_mean', 0.0) or 0.0):.4f} " f"icir={float(result.get('icir', 0.0) or 0.0):.4f} " f"rank_icir={float(result.get('rank_icir', 0.0) or 0.0):.4f} " f"aer={float(result.get('annualized_return', 0.0) or 0.0):.4f} " f"mdd={float(result.get('mdd', 0.0) or 0.0):.4f}", flush=True, ) if idx % max(args.save_every, 1) == 0: outputs = _build_outputs_from_results( results=results, args=args, start_date=start_date, end_date=end_date, elapsed_sec=time.time() - t0, ) _save_outputs(output_dir, *outputs, run_metadata=manifest, data_quality_report=data_quality_report) detail_results: list[dict[str, Any]] = [] if args.capture_detail_artifacts: print("\nCapturing detail artifacts for seed baselines and best-IR candidates...", flush=True) outputs_no_detail = _build_outputs_from_results( results=results, args=args, start_date=start_date, end_date=end_date, elapsed_sec=time.time() - t0, ) _save_outputs(output_dir, *outputs_no_detail, run_metadata=manifest, data_quality_report=data_quality_report) print("Saved scalar backtest outputs before detail capture.", flush=True) summary_rows = outputs_no_detail[0] baselines = { str(row.get("seed_name") or row.get("factor_name") or ""): row for row in results if row.get("candidate_scope") == "seed_baseline" } detail_candidates: list[dict[str, Any]] = [] for baseline in sorted(baselines.values(), key=_sort_key): detail_candidates.append(_candidate_for_detail(baseline, "seed_baseline")) for summary in summary_rows: best_rank = summary.get("best_ir_proposal_rank") best_result = next( ( row for row in results if row.get("seed_name") == summary.get("seed_name") and row.get("proposal_rank") == best_rank and row.get("factor_expr") == summary.get("best_factor_expr") ), None, ) if best_result is not None: detail_candidates.append(_candidate_for_detail(best_result, "best_ir_candidate")) seen_detail_keys: set[tuple[str, str, str]] = set() unique_detail_candidates: list[dict[str, Any]] = [] for candidate in detail_candidates: key = ( str(candidate.get("seed_name") or ""), str(candidate.get("candidate_scope") or ""), str(candidate.get("factor_expr") or ""), ) if key in seen_detail_keys: continue seen_detail_keys.add(key) unique_detail_candidates.append(candidate) print( f"Detail candidates: {len(unique_detail_candidates)} " "(seed baselines + per-seed best IR candidates).", flush=True, ) if workers > 1 and unique_detail_candidates: with ProcessPoolExecutor(max_workers=workers, mp_context=mp.get_context("spawn")) as executor: futures = [ executor.submit( _evaluate_candidate_task, candidate=candidate, period=args.period, label_forward_days=args.label_forward_days, data_path=data_path, backtest_engine=args.backtest_engine, top_k=args.top_k, n_drop=args.n_drop, trade_guard_config=trade_guard_config, rebalance_mode=args.rebalance_mode, custom_weight_mode=args.custom_weight_mode, redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash), position_size=args.position_size, max_pos_each_stock=args.max_pos_each_stock, lot_size=args.lot_size, max_daily_volume_participation=args.max_daily_volume_participation, max_daily_amount_participation=args.max_daily_amount_participation, enforce_cash_limit=bool(args.enforce_cash_limit), rebalance_freq=args.rebalance_freq, cost_buy=args.buy_fee, cost_sell=args.sell_fee, score_transform=args.score_transform, score_clip=args.score_clip, universe_filter=args.universe_filter, universe_top_n=args.universe_top_n, universe_lookback_days=args.universe_lookback_days, start_date=start_date, end_date=end_date, capture_details=True, ) for candidate in unique_detail_candidates ] for idx, future in enumerate(futures, start=1): detail_t0 = time.time() detail = future.result() detail_results.append(detail) print( f"[detail {idx}/{len(unique_detail_candidates)} DONE] seed={detail.get('seed_name')} " f"scope={detail.get('candidate_scope')} success={bool(detail.get('success', False))} " f"elapsed_wait={time.time() - detail_t0:.1f}s", flush=True, ) else: for idx, candidate in enumerate(unique_detail_candidates, start=1): detail_t0 = time.time() print( f"[detail {idx}/{len(unique_detail_candidates)} START] seed={candidate.get('seed_name')} " f"scope={candidate.get('candidate_scope')} factor={candidate.get('factor_name')}", flush=True, ) detail = _evaluate_candidate_task( candidate=candidate, period=args.period, label_forward_days=args.label_forward_days, data_path=data_path, backtest_engine=args.backtest_engine, top_k=args.top_k, n_drop=args.n_drop, trade_guard_config=trade_guard_config, rebalance_mode=args.rebalance_mode, custom_weight_mode=args.custom_weight_mode, redistribute_unfilled_cash=bool(args.redistribute_unfilled_cash), position_size=args.position_size, max_pos_each_stock=args.max_pos_each_stock, lot_size=args.lot_size, max_daily_volume_participation=args.max_daily_volume_participation, max_daily_amount_participation=args.max_daily_amount_participation, enforce_cash_limit=bool(args.enforce_cash_limit), rebalance_freq=args.rebalance_freq, cost_buy=args.buy_fee, cost_sell=args.sell_fee, score_transform=args.score_transform, score_clip=args.score_clip, universe_filter=args.universe_filter, universe_top_n=args.universe_top_n, universe_lookback_days=args.universe_lookback_days, start_date=start_date, end_date=end_date, capture_details=True, ) detail_results.append(detail) print( f"[detail {idx}/{len(unique_detail_candidates)} DONE] seed={detail.get('seed_name')} " f"scope={detail.get('candidate_scope')} success={bool(detail.get('success', False))} " f"elapsed={time.time() - detail_t0:.1f}s", flush=True, ) outputs = _build_outputs_from_results( results=results, args=args, start_date=start_date, end_date=end_date, elapsed_sec=time.time() - t0, detail_results=detail_results, ) _save_outputs(output_dir, *outputs, run_metadata=manifest, data_quality_report=data_quality_report) print("\nSaved files:", flush=True) print(f"summary.csv: {output_dir / 'summary.csv'}", flush=True) print(f"trials.csv: {output_dir / 'trials.csv'}", flush=True) print(f"summary_yearly.csv: {output_dir / 'summary_yearly.csv'}", flush=True) print(f"trials_yearly.csv: {output_dir / 'trials_yearly.csv'}", flush=True) print(f"aggregate_yearly.csv: {output_dir / 'aggregate_yearly.csv'}", flush=True) print(f"stock_contrib.csv: {output_dir / 'stock_contrib.csv'}", flush=True) print(f"trade_log.csv: {output_dir / 'trade_log.csv'}", flush=True) print(f"holdings_daily.csv: {output_dir / 'holdings_daily.csv'}", flush=True) print(f"portfolio_daily.csv: {output_dir / 'portfolio_daily.csv'}", flush=True) print(f"rebalance_log.csv: {output_dir / 'rebalance_log.csv'}", flush=True) print(f"signal_selection_daily.csv: {output_dir / 'signal_selection_daily.csv'}", flush=True) print(f"rebalance_plan.csv: {output_dir / 'rebalance_plan.csv'}", flush=True) print(f"rebalance_window_returns.csv: {output_dir / 'rebalance_window_returns.csv'}", flush=True) print(f"alpha_ranking_seed_cash_cost.csv: {output_dir / 'alpha_ranking_seed_cash_cost.csv'}", flush=True) print(f"alpha_ranking_best_cash_cost.csv: {output_dir / 'alpha_ranking_best_cash_cost.csv'}", flush=True) print(f"data_quality_report.json: {output_dir / 'data_quality_report.json'}", flush=True) print(f"aggregate.json: {output_dir / 'aggregate.json'}", flush=True) if __name__ == "__main__": main()