"""Robustness-only duplicate of the single-factor portfolio backtester. Strictly matches AlphaAgent's conf_vn_combined_kdd_ver.yaml: Strategy: ForceExitTopkDropoutStrategy - topk: 10 (absolute, not percentage) - n_drop: 2 - hold_thresh: 2 (T+2 settlement) - rebalance_mode="dropout" keeps legacy TopKDropout behavior - rebalance_mode="sell_all" sells all sellable holdings, then buys the new alpha top-k - rebalance_mode="target_weight" syncs holdings to alpha top-k target weights Execution: - Buy at OPEN price on rebalance dates - Sell at CLOSE price on rebalance dates - Optional trade guards are disabled by default and enabled only via trade_guard_config - Signal uses data up to T-1 Costs: - open_cost: 0.13% (buy) - close_cost: 0.13% (sell) Metrics (Qlib "sum" mode): - IR = mean(daily_excess_return_vs_benchmark) / std(daily_excess_return_vs_benchmark) * sqrt(252) - Sharpe = mean(daily_return - daily_risk_free_rate) / std(daily_return - daily_risk_free_rate) * sqrt(252) - annualized_return = mean daily excess return * 252 - performance_return = compounded portfolio return over the evaluated window - benchmark_performance_return = compounded benchmark return over the evaluated window - excess_compounded_return = performance_return - benchmark_performance_return - max_drawdown = (cumsum(excess_return) - cummax(cumsum(excess_return))).min() - portfolio_nav_mdd = ((cumprod(1 + portfolio_return) / cummax(cumprod(1 + portfolio_return))) - 1).min() - ann_scaler: 252 Additional factor metrics: - IC = mean daily cross-sectional Pearson correlation between factor_t and close-to-close forward return over the next `label_forward_days` - ICIR = IC / std(IC) - RankIC = mean daily cross-sectional Spearman correlation - RankICIR = RankIC / std(RankIC) """ from __future__ import annotations import contextlib import copy from collections import defaultdict import hashlib import io import json import logging import os from pathlib import Path import shutil import subprocess import sys import time from typing import Any import warnings import numpy as np import pandas as pd PROJECT_ROOT = Path(__file__).resolve().parent.parent _QLIB_CACHE_ROOT: Path | None = None _QLIB_RUNTIME_STATE: dict[str, str | None] = {"provider_uri": None} _PROVIDER_SIGNATURE_CACHE: dict[int, str] = {} _PRICE_WIDE_CACHE: dict[tuple[int, str], pd.DataFrame] = {} _AVAILABLE_DATES_CACHE: dict[int, pd.DatetimeIndex] = {} YEAR_REGIMES = { 2022: "bearish", 2025: "bullish", } DEFAULT_VN_TRADE_GUARD_CONFIG: dict[str, Any] = { "trend_ema_span": 20, "force_exit_trend_days": 3, "buy_chase_ctc_thresh": 0.05, "buy_chase_intraday_thresh": 0.04, "panic_drop_ctc_thresh": 0.05, "panic_drop_intraday_thresh": 0.04, "extreme_range_thresh": 0.10, "amount_ma_window": 20, "amount_buy_min_ratio": 1.0, "amount_force_exit_ratio": 0.5, } DEFAULT_QLIB_REBALANCE_MODE = "dropout" QLIB_REBALANCE_MODE_ALIASES = { "dropout": "dropout", "topk_dropout": "dropout", "original": "dropout", "sell_all": "sell_all", "sell_all_buy_topk": "sell_all", "full_liquidate": "sell_all", "full_rebalance": "sell_all", "target_weight": "target_weight", "target_weight_topk": "target_weight", "sync_topk": "target_weight", "weight_sync": "target_weight", } def _market_regime(year: int) -> str: return YEAR_REGIMES.get(int(year), "neutral") def _normalize_rebalance_mode(mode: str | None) -> str: key = (mode or DEFAULT_QLIB_REBALANCE_MODE).strip().lower() if key in ("", "none", "null"): key = DEFAULT_QLIB_REBALANCE_MODE try: return QLIB_REBALANCE_MODE_ALIASES[key] except KeyError as exc: choices = ", ".join(sorted(set(QLIB_REBALANCE_MODE_ALIASES.values()))) raise ValueError(f"Unsupported qlib rebalance_mode={mode!r}. Choose one of: {choices}") from exc def _normalize_trade_guard_config(config: dict[str, Any] | bool | None) -> dict[str, Any] | None: if config is None or config is False: return None if config is True: return dict(DEFAULT_VN_TRADE_GUARD_CONFIG) if not isinstance(config, dict): raise TypeError("trade_guard_config must be a dict, True, False, or None") normalized = dict(DEFAULT_VN_TRADE_GUARD_CONFIG) normalized.update(config) return normalized def _cfg_float(config: dict[str, Any], key: str) -> float | None: value = config.get(key) if value is None: return None return float(value) def _cfg_int(config: dict[str, Any], key: str) -> int | None: value = config.get(key) if value is None: return None return int(value) def _get_frame_value(frame: pd.DataFrame | None, date: pd.Timestamp, inst: str, default: float = np.nan) -> float: if frame is None: return float(default) try: value = frame.loc[date, inst] except (KeyError, TypeError): return float(default) if pd.isna(value): return float(default) return float(value) def _compute_daily_cross_sectional_ic_frame( factor_values: pd.Series, close_prices: pd.DataFrame, label_forward_days: int = 5, ) -> pd.DataFrame: """Compute daily cross-sectional IC / RankIC series on forward close returns.""" empty_ic_frame = pd.DataFrame( { "ic": pd.Series(dtype=float), "rank_ic": pd.Series(dtype=float), }, index=pd.DatetimeIndex([], name="date"), ) if isinstance(factor_values, pd.DataFrame): factor_values = factor_values.iloc[:, 0] factor_wide = factor_values.unstack("instrument").sort_index() close_wide = close_prices.sort_index() common_dates = factor_wide.index.intersection(close_wide.index) if len(common_dates) <= label_forward_days: return empty_ic_frame factor_wide = factor_wide.reindex(common_dates) close_wide = close_wide.reindex(common_dates) forward_returns = close_wide.shift(-label_forward_days) / close_wide - 1.0 rows: list[dict[str, Any]] = [] last_valid_idx = len(common_dates) - label_forward_days for idx in range(last_valid_idx): date = common_dates[idx] factor_row = factor_wide.iloc[idx] ret_row = forward_returns.iloc[idx] valid = factor_row.notna() & ret_row.notna() if valid.sum() < 2: continue x = factor_row[valid] y = ret_row[valid] if x.nunique(dropna=True) < 2 or y.nunique(dropna=True) < 2: continue ic = x.corr(y, method="pearson") rank_ic = x.corr(y, method="spearman") rows.append( { "date": date, "ic": float(ic) if pd.notna(ic) else np.nan, "rank_ic": float(rank_ic) if pd.notna(rank_ic) else np.nan, } ) if not rows: return empty_ic_frame return pd.DataFrame(rows).set_index("date").sort_index() def _summarize_ic_frame(ic_frame: pd.DataFrame) -> dict[str, Any]: if ic_frame is None or ic_frame.empty: return { "ic_mean": 0.0, "ic_std": 0.0, "icir": 0.0, "rank_ic_mean": 0.0, "rank_ic_std": 0.0, "rank_icir": 0.0, "n_ic_days": 0, } ic_values = ic_frame["ic"].dropna() rank_values = ic_frame["rank_ic"].dropna() ic_mean = float(ic_values.mean()) if len(ic_values) else 0.0 ic_std = float(ic_values.std(ddof=1)) if len(ic_values) > 1 else 0.0 rank_ic_mean = float(rank_values.mean()) if len(rank_values) else 0.0 rank_ic_std = float(rank_values.std(ddof=1)) if len(rank_values) > 1 else 0.0 return { "ic_mean": round(ic_mean, 6), "ic_std": round(ic_std, 6), "icir": round(ic_mean / (ic_std + 1e-12), 6) if len(ic_values) > 1 else 0.0, "rank_ic_mean": round(rank_ic_mean, 6), "rank_ic_std": round(rank_ic_std, 6), "rank_icir": round(rank_ic_mean / (rank_ic_std + 1e-12), 6) if len(rank_values) > 1 else 0.0, "n_ic_days": int(len(ic_values)), } def _summarize_return_frame( return_frame: pd.DataFrame, ann_scaler: int = 252, risk_free_rate_annual: float = 0.0, ) -> dict[str, Any]: def _drawdown_duration_stats(cumulative_excess: np.ndarray) -> tuple[int, float, float]: if len(cumulative_excess) == 0: return 0, 0.0, 0.0 running_max = np.maximum.accumulate(cumulative_excess) in_drawdown = cumulative_excess < running_max - 1e-12 durations: list[int] = [] current = 0 for flag in in_drawdown: if flag: current += 1 elif current > 0: durations.append(current) current = 0 if current > 0: durations.append(current) if not durations: return 0, 0.0, 0.0 return int(max(durations)), float(np.mean(durations)), float(np.median(durations)) if return_frame is None or return_frame.empty: return { "ir": 0.0, "annualized_return": 0.0, "annualized_volatility": 0.0, "sharpe": 0.0, "winrate": 0.0, "mdd": 0.0, "excess_mdd": 0.0, "portfolio_nav_mdd": 0.0, "drawdown_duration_max": 0, "drawdown_duration_mean": 0.0, "drawdown_duration_median": 0.0, "total_return": 0.0, "performance_return": 0.0, "benchmark_performance_return": 0.0, "excess_compounded_return": 0.0, "mean_daily_return": 0.0, "std_daily_return": 0.0, "n_days": 0, } portfolio_returns = return_frame["portfolio_return"].astype(float) benchmark_returns = return_frame["benchmark_return"].astype(float) excess_returns = portfolio_returns - benchmark_returns daily_rf = float(risk_free_rate_annual) / float(ann_scaler) risk_free_adjusted = portfolio_returns - daily_rf mean_excess = float(excess_returns.mean()) std_excess = float(excess_returns.std(ddof=1)) if len(excess_returns) > 1 else 1e-8 mean_rf = float(risk_free_adjusted.mean()) std_rf = float(risk_free_adjusted.std(ddof=1)) if len(risk_free_adjusted) > 1 else 1e-8 performance_return = float((1.0 + portfolio_returns).prod() - 1.0) benchmark_performance_return = float((1.0 + benchmark_returns).prod() - 1.0) excess_compounded_return = performance_return - benchmark_performance_return annualized_return = mean_excess * ann_scaler information_ratio = mean_excess / (std_excess + 1e-12) * np.sqrt(ann_scaler) sharpe_ratio = mean_rf / (std_rf + 1e-12) * np.sqrt(ann_scaler) winrate = float(np.mean(excess_returns > 0)) if len(excess_returns) > 0 else 0.0 cumulative_excess = np.cumsum(excess_returns.to_numpy(dtype=float)) if len(cumulative_excess): max_drawdown = float((cumulative_excess - np.maximum.accumulate(cumulative_excess)).min()) else: max_drawdown = 0.0 drawdown_duration_max, drawdown_duration_mean, drawdown_duration_median = _drawdown_duration_stats(cumulative_excess) nav = np.cumprod(1.0 + portfolio_returns.to_numpy(dtype=float)) if len(nav): nav_peak = np.maximum.accumulate(nav) portfolio_nav_mdd = float(np.min(nav / (nav_peak + 1e-12) - 1.0)) else: portfolio_nav_mdd = 0.0 annualized_volatility = std_excess * np.sqrt(ann_scaler) total_return = float(np.sum(portfolio_returns)) return { "ir": round(float(information_ratio), 6), "annualized_return": round(float(annualized_return), 6), "annualized_volatility": round(float(annualized_volatility), 6), "sharpe": round(float(sharpe_ratio), 6), "winrate": round(float(winrate), 6), "mdd": round(float(max_drawdown), 6), "excess_mdd": round(float(max_drawdown), 6), "portfolio_nav_mdd": round(float(portfolio_nav_mdd), 6), "drawdown_duration_max": int(drawdown_duration_max), "drawdown_duration_mean": round(float(drawdown_duration_mean), 4), "drawdown_duration_median": round(float(drawdown_duration_median), 4), "total_return": round(float(total_return), 6), "performance_return": round(float(performance_return), 6), "benchmark_performance_return": round(float(benchmark_performance_return), 6), "excess_compounded_return": round(float(excess_compounded_return), 6), "mean_daily_return": round(float(mean_excess), 8), "std_daily_return": round(float(std_excess), 8), "n_days": int(len(portfolio_returns)), } def _build_yearly_metrics( return_frame: pd.DataFrame, ic_frame: pd.DataFrame, yearly_trade_stats: dict[int, dict[str, float]], yearly_holding_stats: dict[int, dict[str, float]], ann_scaler: int, risk_free_rate_annual: float, ) -> dict[str, Any]: years: set[int] = set() if return_frame is not None and not return_frame.empty: years.update(int(year) for year in return_frame.index.year.unique()) if ic_frame is not None and not ic_frame.empty: years.update(int(year) for year in ic_frame.index.year.unique()) years.update(int(year) for year in yearly_trade_stats.keys()) years.update(int(year) for year in yearly_holding_stats.keys()) yearly_metrics: dict[str, Any] = {} for year in sorted(years): year_return_frame = return_frame[return_frame.index.year == year] if not return_frame.empty else pd.DataFrame() year_ic_frame = ic_frame[ic_frame.index.year == year] if not ic_frame.empty else pd.DataFrame() metrics = _summarize_return_frame( return_frame=year_return_frame, ann_scaler=ann_scaler, risk_free_rate_annual=risk_free_rate_annual, ) metrics.update(_summarize_ic_frame(year_ic_frame)) trade_stats = yearly_trade_stats.get(year, {}) holding_stats = yearly_holding_stats.get(year, {}) n_days = int(holding_stats.get("n_days", 0)) holdings_count_sum = float(holding_stats.get("holdings_count_sum", 0.0)) metrics.update( { "year": int(year), "market_regime": _market_regime(year), "avg_holdings_count": round(holdings_count_sum / n_days, 4) if n_days else 0.0, "max_holdings_count": int(holding_stats.get("max_holdings_count", 0)), "drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0), "drawdown_duration_mean": round(float(metrics.get("drawdown_duration_mean", 0.0) or 0.0), 4), "drawdown_duration_median": round(float(metrics.get("drawdown_duration_median", 0.0) or 0.0), 4), "buy_trades": int(trade_stats.get("buy_trades", 0)), "sell_trades": int(trade_stats.get("sell_trades", 0)), "shares_bought": round(float(trade_stats.get("shares_bought", 0.0)), 6), "shares_sold": round(float(trade_stats.get("shares_sold", 0.0)), 6), "buy_gross_notional": round(float(trade_stats.get("buy_gross_notional", 0.0)), 6), "sell_gross_notional": round(float(trade_stats.get("sell_gross_notional", 0.0)), 6), "buy_cash_outflow": round(float(trade_stats.get("buy_cash_outflow", 0.0)), 6), "sell_net_proceeds": round(float(trade_stats.get("sell_net_proceeds", 0.0)), 6), "buy_transaction_cost": round(float(trade_stats.get("buy_transaction_cost", 0.0)), 6), "sell_transaction_cost": round(float(trade_stats.get("sell_transaction_cost", 0.0)), 6), "transaction_cost": round(float(trade_stats.get("transaction_cost", 0.0)), 6), "gross_turnover": round(float(trade_stats.get("gross_turnover", 0.0)), 6), } ) yearly_metrics[str(year)] = metrics return yearly_metrics def _build_stock_contribution_summary( daily_holding_records: list[dict[str, Any]], trade_records: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not daily_holding_records and not trade_records: return [] hold_df = pd.DataFrame(daily_holding_records) trade_df = pd.DataFrame(trade_records) if not hold_df.empty: hold_df["date"] = pd.to_datetime(hold_df["date"]) if not trade_df.empty: trade_df["date"] = pd.to_datetime(trade_df["date"]) years: set[int] = set() if not hold_df.empty: years.update(int(year) for year in hold_df["year"].dropna().unique()) if not trade_df.empty: years.update(int(year) for year in trade_df["year"].dropna().unique()) if not years: return [] holdings_by_date: dict[pd.Timestamp, pd.Series] = {} if not hold_df.empty: for date, grp in hold_df.groupby("date"): holdings_by_date[pd.Timestamp(date)] = grp.groupby("instrument")["market_value"].sum() snapshot_dates = sorted(holdings_by_date.keys()) rows: list[dict[str, Any]] = [] for year in sorted(years): year_hold = hold_df[hold_df["year"] == year] if not hold_df.empty else pd.DataFrame() year_trade = trade_df[trade_df["year"] == year] if not trade_df.empty else pd.DataFrame() instruments: set[str] = set() if not year_hold.empty: instruments.update(str(inst) for inst in year_hold["instrument"].unique()) if not year_trade.empty: instruments.update(str(inst) for inst in year_trade["instrument"].unique()) if not instruments: continue year_start = pd.Timestamp(f"{year}-01-01") prior_dates = [date for date in snapshot_dates if date < year_start] prior_snapshot = holdings_by_date[prior_dates[-1]] if prior_dates else pd.Series(dtype=float) if not year_hold.empty: year_end_date = pd.Timestamp(year_hold["date"].max()) end_snapshot = holdings_by_date.get(year_end_date, pd.Series(dtype=float)) else: end_snapshot = pd.Series(dtype=float) for instrument in sorted(instruments): hold_inst = year_hold[year_hold["instrument"] == instrument] if not year_hold.empty else pd.DataFrame() trade_inst = year_trade[year_trade["instrument"] == instrument] if not year_trade.empty else pd.DataFrame() if not trade_inst.empty: filled_mask = trade_inst.get("filled_shares", trade_inst.get("shares", 0.0)).astype(float) > 0 filled_trade_inst = trade_inst[filled_mask] buy_inst = filled_trade_inst[filled_trade_inst["action"] == "buy"] sell_inst = filled_trade_inst[filled_trade_inst["action"] == "sell"] else: buy_inst = pd.DataFrame() sell_inst = pd.DataFrame() start_value = float(prior_snapshot.get(instrument, 0.0)) if not prior_snapshot.empty else 0.0 end_value = float(end_snapshot.get(instrument, 0.0)) if not end_snapshot.empty else 0.0 buy_cash_outflow = float(buy_inst["cash_outflow"].sum()) if not buy_inst.empty else 0.0 sell_net_proceeds = float(sell_inst["net_proceeds"].sum()) if not sell_inst.empty else 0.0 realized_pnl = float(sell_inst["realized_pnl"].sum()) if not sell_inst.empty else 0.0 contribution_return = end_value + sell_net_proceeds - start_value - buy_cash_outflow rows.append( { "year": int(year), "market_regime": _market_regime(year), "instrument": instrument, "contribution_return": round(float(contribution_return), 6), "abs_contribution_return": round(abs(float(contribution_return)), 6), "realized_pnl": round(float(realized_pnl), 6), "start_value": round(float(start_value), 6), "end_value": round(float(end_value), 6), "buy_trades": int(len(buy_inst)), "sell_trades": int(len(sell_inst)), "shares_bought": round(float(buy_inst["shares"].sum()) if not buy_inst.empty else 0.0, 6), "shares_sold": round(float(sell_inst["shares"].sum()) if not sell_inst.empty else 0.0, 6), "buy_cash_outflow": round(float(buy_cash_outflow), 6), "sell_net_proceeds": round(float(sell_net_proceeds), 6), "holding_days": int(hold_inst["date"].nunique()) if not hold_inst.empty else 0, "avg_shares_held": round(float(hold_inst["shares_held"].mean()) if not hold_inst.empty else 0.0, 6), "ending_shares": round(float(hold_inst.sort_values("date")["shares_held"].iloc[-1]) if not hold_inst.empty else 0.0, 6), "avg_market_value": round(float(hold_inst["market_value"].mean()) if not hold_inst.empty else 0.0, 6), "max_market_value": round(float(hold_inst["market_value"].max()) if not hold_inst.empty else 0.0, 6), "market_volume_sum": round(float(hold_inst["market_volume"].sum()) if not hold_inst.empty else 0.0, 6), "market_amount_sum": round(float(hold_inst["market_amount"].sum()) if not hold_inst.empty else 0.0, 6), } ) if not rows: return rows contrib_df = pd.DataFrame(rows) contrib_df = contrib_df.sort_values(["year", "abs_contribution_return", "instrument"], ascending=[True, False, True]) contrib_df["rank"] = contrib_df.groupby("year").cumcount() + 1 return contrib_df.to_dict("records") def _round_down_lot(shares: float, lot_size: int) -> float: if not np.isfinite(shares) or shares <= 0: return 0.0 lot = max(int(lot_size), 1) return float(np.floor(float(shares) / float(lot)) * float(lot)) def _market_value_from_close( close_prices: pd.DataFrame, date: pd.Timestamp, position: dict[str, Any], instrument: str, ) -> float: close_px = _get_frame_value(close_prices, date, instrument, default=position.get("avg_cost_per_share", 0.0)) if close_px > 0: return float(position["shares"]) * float(close_px) return float(position["total_cost_basis"]) def _liquidity_caps( *, market_volume: float, market_amount: float, price: float, lot_size: int, max_daily_volume_participation: float, max_daily_amount_participation: float, ) -> tuple[float, float]: volume_cap = np.inf if np.isfinite(market_volume) and market_volume > 0 and max_daily_volume_participation > 0: volume_cap = _round_down_lot(float(market_volume) * float(max_daily_volume_participation), lot_size) amount_cap = np.inf if np.isfinite(market_amount) and market_amount > 0 and price > 0 and max_daily_amount_participation > 0: amount_cap = _round_down_lot( (float(market_amount) * float(max_daily_amount_participation)) / float(price), lot_size, ) return float(volume_cap), float(amount_cap) def _clip_reason_text(reasons: list[str]) -> str: deduped = [] for reason in reasons: if reason and reason not in deduped: deduped.append(reason) return ",".join(deduped) if deduped else "none" CUSTOM_WEIGHT_MODE_ALIASES = { "equal": "equal", "equal_weight": "equal", "weight_equal": "equal", "alpha_score": "alpha_score", "score": "alpha_score", "score_weight": "alpha_score", "alpha_weight": "alpha_score", } def _normalize_custom_weight_mode(mode: str | None) -> str: key = str(mode or "equal").strip().lower() if key in ("", "none", "null"): key = "equal" try: return CUSTOM_WEIGHT_MODE_ALIASES[key] except KeyError as exc: choices = ", ".join(sorted(set(CUSTOM_WEIGHT_MODE_ALIASES.values()))) raise ValueError(f"Unsupported custom_weight_mode={mode!r}. Choose one of: {choices}") from exc def _normalize_positive_weights( raw: pd.Series, *, total_weight: float, max_pos_each_stock: float, ) -> pd.Series: raw = raw.replace([np.inf, -np.inf], np.nan).dropna().astype(float) raw = raw[raw > 0] if raw.empty: return pd.Series(dtype=float) total_weight = max(float(total_weight), 0.0) if total_weight <= 0: return pd.Series(dtype=float) cap = max(float(max_pos_each_stock), 0.0) effective_total_weight = total_weight if cap > 0: effective_total_weight = min(effective_total_weight, float(len(raw)) * cap) if effective_total_weight <= 1e-12: return pd.Series(dtype=float) weights = raw / float(raw.sum()) * effective_total_weight if cap <= 0: return weights.reindex(raw.index).fillna(0.0) fixed: dict[str, float] = {} free = weights.copy() remaining = effective_total_weight for _ in range(len(weights)): over = free[free > cap] if over.empty: break for code in over.index: fixed[str(code)] = cap free = free.drop(index=over.index) remaining = max(effective_total_weight - float(sum(fixed.values())), 0.0) if free.empty or remaining <= 1e-12: free = pd.Series(dtype=float) break if float(free.sum()) <= 1e-12: free = pd.Series(dtype=float) break free = free / float(free.sum()) * remaining out = pd.Series(fixed, dtype=float) if not free.empty: out = pd.concat([out, free]) return out.reindex(raw.index).fillna(0.0) def _build_custom_target_weights( *, target_scores: pd.Series, target_names: list[str], total_weight: float, max_pos_each_stock: float, weight_mode: str, ) -> pd.Series: ordered_names = [str(code) for code in target_names] if not ordered_names: return pd.Series(dtype=float) normalized_mode = _normalize_custom_weight_mode(weight_mode) if normalized_mode == "equal": raw = pd.Series(1.0, index=ordered_names, dtype=float) else: ordered_scores = pd.to_numeric(target_scores.reindex(ordered_names), errors="coerce") if ordered_scores.isna().any(): raw = pd.Series(1.0, index=ordered_names, dtype=float) else: spread = float(ordered_scores.max() - ordered_scores.min()) if spread <= 1e-12: raw = pd.Series(1.0, index=ordered_names, dtype=float) else: # Shift scores into the positive domain so weights are stable # regardless of whether the factor's raw range is [-1, 1], [0, 1], # or something unbounded. raw = (ordered_scores - float(ordered_scores.min())) + spread * 0.05 weights = _normalize_positive_weights( raw, total_weight=total_weight, max_pos_each_stock=max_pos_each_stock, ) if weights.empty: return weights return weights.reindex(ordered_names).fillna(0.0) def _normalize_backtest_engine(engine: str | None) -> str: value = str(engine or "custom").strip().lower() aliases = { "custom": "custom", "qlib_custom": "custom", "qlib_customize": "custom", "qlib_customized": "custom", "customized": "custom", "original": "qlib_original", "qlib": "qlib_original", "qlib_original": "qlib_original", "spec": "spec_shares_cash", "spec_cash": "spec_shares_cash", "spec_shares_cash": "spec_shares_cash", "qlib_spec": "spec_shares_cash", "spec_return": "spec_return_based", "spec_return_based": "spec_return_based", "qlib_spec_return_based": "spec_return_based", } return aliases.get(value, value) def _resolve_writable_cache_root() -> Path: candidates: list[Path] = [] env_override = os.getenv("ALPHAEVO_QLIB_CACHE_ROOT") or os.getenv("QLIB_CACHE_ROOT") if env_override: candidates.append(Path(env_override).expanduser()) if Path("/kaggle/working").exists(): candidates.append(Path("/kaggle/working") / ".cache" / "qlib_vn") candidates.extend( [ PROJECT_ROOT / ".cache" / "qlib_vn", Path.home() / ".cache" / "alphaevo" / "qlib_vn", Path("/tmp") / "alphaevo_qlib_vn", ] ) tried: list[str] = [] for root in candidates: root = root.resolve() tried.append(str(root)) try: root.mkdir(parents=True, exist_ok=True) probe = root / ".write_probe" probe.write_text("ok", encoding="utf-8") probe.unlink(missing_ok=True) return root except OSError: continue raise RuntimeError( "Could not find a writable cache directory for qlib_original. Tried: " + ", ".join(tried) ) def _get_qlib_cache_root() -> Path: global _QLIB_CACHE_ROOT if _QLIB_CACHE_ROOT is None: _QLIB_CACHE_ROOT = _resolve_writable_cache_root() return _QLIB_CACHE_ROOT @contextlib.contextmanager def _provider_build_lock(lock_path: Path, timeout_sec: float = 900.0): lock_path.parent.mkdir(parents=True, exist_ok=True) start = time.monotonic() fd: int | None = None while fd is None: try: fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR) os.write(fd, f"{os.getpid()}\n".encode("utf-8")) except FileExistsError: try: age = time.time() - lock_path.stat().st_mtime if age > timeout_sec: lock_path.unlink(missing_ok=True) continue except FileNotFoundError: continue if time.monotonic() - start > timeout_sec: raise TimeoutError(f"Timed out waiting for Qlib provider cache lock: {lock_path}") time.sleep(0.25) try: yield finally: if fd is not None: os.close(fd) lock_path.unlink(missing_ok=True) @contextlib.contextmanager def _quiet_runtime_io() -> Any: stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() with contextlib.redirect_stdout(stdout_buffer), contextlib.redirect_stderr(stderr_buffer): yield stdout_buffer, stderr_buffer def _import_qlib_runtime() -> dict[str, Any]: try: with _quiet_runtime_io(): import qlib from qlib.backtest import CommonInfrastructure, backtest_loop, get_exchange from qlib.backtest.account import Account from qlib.backtest.decision import Order, OrderDir, TradeDecisionWO from qlib.backtest.executor import SimulatorExecutor from qlib.backtest.position import Position from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy from qlib.log import set_global_logger_level_cm except Exception as exc: # pragma: no cover - depends on optional runtime raise RuntimeError( "qlib_original requires the real Qlib package/runtime to be available in the environment" ) from exc class ForceExitTopkDropoutStrategy(TopkDropoutStrategy): """Top-k dropout with explicit rebalance cadence and VN-specific overlays. The wrapper stays close to Qlib's original TopkDropoutStrategy: - keep `topk`, `n_drop`, and `hold_thresh` - only rebalance every `rebalance_freq` bars - restrict new buys to names that pass the precomputed `__buy_gate__` - push `__force_exit__` names to the bottom of the sell ranking without bypassing `n_drop` - defer non-forced sells when `__defer_sell__` marks a panic-drop day - avoid rebuying recently sold names for a short cooldown window - hard-cap live stock count at `topk`; new buys only use pre-trade empty slots - optionally run full liquidation or target-weight top-k sync modes """ SCORE_COL = "score" BUY_GATE_COL = "__buy_gate__" FORCE_EXIT_COL = "__force_exit__" DEFER_SELL_COL = "__defer_sell__" def __init__( self, *, rebalance_freq: int = 5, cooldown_period: int = 10, rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE, max_pos_each_stock: float = 1.0, target_weight_eps: float = 0.001, **kwargs, ): super().__init__(**kwargs) self.rebalance_freq = max(int(rebalance_freq), 1) self.cooldown_period = max(int(cooldown_period), 0) self.rebalance_mode = _normalize_rebalance_mode(rebalance_mode) self.max_pos_each_stock = max(float(max_pos_each_stock), 0.0) self.target_weight_eps = max(float(target_weight_eps), 0.0) self._sell_cooldown_until: dict[str, int] = {} def _score_to_target_weights(self, target_score: pd.Series) -> dict[str, float]: target_score = target_score.replace([np.inf, -np.inf], np.nan).dropna().astype(float) if target_score.empty: return {} spread = float(target_score.max() - target_score.min()) if spread <= 1e-12: raw = pd.Series(1.0, index=target_score.index, dtype=float) else: # Keep the weakest selected name non-zero while still letting alpha strength size positions. raw = (target_score - float(target_score.min())) + spread * 0.05 total_weight = max(min(float(self.risk_degree), 1.0), 0.0) if raw.sum() <= 1e-12 or total_weight <= 0: return {} weights = raw / float(raw.sum()) * total_weight cap = float(self.max_pos_each_stock) if cap > 0: fixed: dict[str, float] = {} free = weights.copy() remaining = total_weight for _ in range(len(weights)): over = free[free > cap] if over.empty: break for code in over.index: fixed[str(code)] = cap free = free.drop(index=over.index) remaining = max(total_weight - float(sum(fixed.values())), 0.0) if free.empty or remaining <= 1e-12: free = pd.Series(dtype=float) break free = free / float(free.sum()) * remaining weights = pd.Series(fixed, dtype=float) if not free.empty: weights = pd.concat([weights, free]) return {str(code): float(weight) for code, weight in weights.items() if float(weight) > 1e-12} def _deal_price(self, code, start_time, end_time, direction, fallback: float = np.nan) -> float: try: price = self.trade_exchange.get_deal_price( stock_id=code, start_time=start_time, end_time=end_time, direction=direction, ) except Exception: price = fallback if price is None or not np.isfinite(float(price)) or float(price) <= 0: return float(fallback) return float(price) def _round_trade_amount(self, code, amount: float, start_time, end_time) -> float: if amount <= 0: return 0.0 try: factor = self.trade_exchange.get_factor( stock_id=code, start_time=start_time, end_time=end_time, ) return float(self.trade_exchange.round_amount_by_trade_unit(amount, factor)) except Exception: return float(amount) @staticmethod def _stock_amount(position: Position, code) -> float: try: return float(position.get_stock_amount(code=code)) except Exception: return 0.0 @staticmethod def _stock_price(position: Position, code, default: float = np.nan) -> float: try: price = float(position.get_stock_price(code)) except Exception: price = float(default) return price if np.isfinite(price) and price > 0 else float(default) def _is_stock_tradable(self, code, start_time, end_time, direction) -> bool: try: return bool( self.trade_exchange.is_stock_tradable( stock_id=code, start_time=start_time, end_time=end_time, direction=None if self.forbid_all_trade_at_limit else direction, ) ) except Exception: return False def _can_sell(self, position: Position, code, time_per_step) -> bool: try: return bool(position.get_stock_count(code, bar=time_per_step) >= self.hold_thresh) except Exception: return False def _select_target_scores( self, score: pd.Series, buy_gate: pd.Series, force_exit: pd.Series, trade_start_time, trade_end_time, ) -> pd.Series: target_score = score.replace([np.inf, -np.inf], np.nan).dropna().astype(float) if target_score.empty: return target_score allowed = buy_gate.reindex(target_score.index).fillna(False).astype(bool) forced = force_exit.reindex(target_score.index).fillna(False).astype(bool) target_score = target_score[allowed & ~forced] if self._sell_cooldown_until: target_score = target_score[~target_score.index.isin(self._sell_cooldown_until.keys())] target_score = target_score.sort_values(ascending=False) selected = [] for code in target_score.index: if self.only_tradable and not self._is_stock_tradable( code, trade_start_time, trade_end_time, OrderDir.BUY, ): continue selected.append(code) if len(selected) >= int(self.topk): break return target_score.reindex(selected) def _portfolio_value( self, position: Position, stock_list: list[str], trade_start_time, trade_end_time, ) -> float: value = float(position.get_cash()) for code in stock_list: amount = self._stock_amount(position, code) if amount <= 0: continue fallback = self._stock_price(position, code) price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.SELL, fallback=fallback) if np.isfinite(price) and price > 0: value += float(amount * price) return float(value) def _make_sell_order( self, position: Position, code, amount: float, trade_start_time, trade_end_time, ): amount = min(float(amount), self._stock_amount(position, code)) if amount <= 1e-12: return None if not self._is_stock_tradable(code, trade_start_time, trade_end_time, OrderDir.SELL): return None sell_order = Order( stock_id=code, amount=amount, start_time=trade_start_time, end_time=trade_end_time, direction=Order.SELL, ) return sell_order if self.trade_exchange.check_order(sell_order) else None def _make_buy_order( self, code, cash_value: float, trade_start_time, trade_end_time, ): if cash_value <= 1e-12: return None if not self._is_stock_tradable(code, trade_start_time, trade_end_time, OrderDir.BUY): return None buy_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.BUY) if not np.isfinite(buy_price) or buy_price <= 0: return None buy_amount = self._round_trade_amount(code, float(cash_value) / float(buy_price), trade_start_time, trade_end_time) if buy_amount <= 1e-12: return None buy_order = Order( stock_id=code, amount=buy_amount, start_time=trade_start_time, end_time=trade_end_time, direction=Order.BUY, ) return buy_order if self.trade_exchange.check_order(buy_order) else None def _generate_sell_all_decision( self, score: pd.Series, buy_gate: pd.Series, force_exit: pd.Series, trade_start_time, trade_end_time, ): current_temp: Position = copy.deepcopy(self.trade_position) sell_order_list = [] buy_order_list = [] cash = float(current_temp.get_cash()) time_per_step = self.trade_calendar.get_freq() for code in list(current_temp.get_stock_list()): if not self._can_sell(current_temp, code, time_per_step): continue amount = self._stock_amount(current_temp, code) sell_order = self._make_sell_order(current_temp, code, amount, trade_start_time, trade_end_time) if sell_order is None: continue sell_order_list.append(sell_order) if self.cooldown_period > 0: self._sell_cooldown_until[code] = self.trade_calendar.get_trade_step() + self.cooldown_period trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order( sell_order, position=current_temp, ) cash += float(trade_val - trade_cost) target_scores = self._select_target_scores(score, buy_gate, force_exit, trade_start_time, trade_end_time) remaining = set(current_temp.get_stock_list()) available_slots = max(int(self.topk) - len(remaining), 0) buy_codes = [code for code in target_scores.index if code not in remaining][:available_slots] buy_value = cash * max(min(float(self.risk_degree), 1.0), 0.0) / len(buy_codes) if buy_codes else 0.0 for code in buy_codes: buy_order = self._make_buy_order(code, buy_value, trade_start_time, trade_end_time) if buy_order is None: continue buy_order_list.append(buy_order) trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order( buy_order, position=current_temp, ) cash -= float(trade_val + trade_cost) return TradeDecisionWO(sell_order_list + buy_order_list, self) def _generate_target_weight_decision( self, score: pd.Series, buy_gate: pd.Series, force_exit: pd.Series, trade_start_time, trade_end_time, ): current_temp: Position = copy.deepcopy(self.trade_position) sell_order_list = [] buy_order_list = [] time_per_step = self.trade_calendar.get_freq() target_scores = self._select_target_scores(score, buy_gate, force_exit, trade_start_time, trade_end_time) target_weights = self._score_to_target_weights(target_scores) if not target_weights: return TradeDecisionWO([], self) current_stock_list = list(current_temp.get_stock_list()) portfolio_value = self._portfolio_value(current_temp, current_stock_list, trade_start_time, trade_end_time) if portfolio_value <= 1e-12: return TradeDecisionWO([], self) eps_value = float(portfolio_value * self.target_weight_eps) for code in current_stock_list: amount = self._stock_amount(current_temp, code) if amount <= 0: continue fallback = self._stock_price(current_temp, code) sell_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.SELL, fallback=fallback) if not np.isfinite(sell_price) or sell_price <= 0: continue current_value = float(amount * sell_price) target_value = float(target_weights.get(str(code), 0.0) * portfolio_value) sell_value = current_value - target_value if sell_value <= eps_value: continue if not self._can_sell(current_temp, code, time_per_step): continue sell_amount = self._round_trade_amount(code, sell_value / sell_price, trade_start_time, trade_end_time) sell_order = self._make_sell_order(current_temp, code, sell_amount, trade_start_time, trade_end_time) if sell_order is None: continue sell_order_list.append(sell_order) if str(code) not in target_weights and self.cooldown_period > 0: self._sell_cooldown_until[code] = self.trade_calendar.get_trade_step() + self.cooldown_period self.trade_exchange.deal_order(sell_order, position=current_temp) current_stock_after_sell = set(current_temp.get_stock_list()) portfolio_value_after_sell = self._portfolio_value( current_temp, list(current_stock_after_sell), trade_start_time, trade_end_time, ) eps_value = float(portfolio_value_after_sell * self.target_weight_eps) cash = float(current_temp.get_cash()) available_new_slots = max(int(self.topk) - len(current_stock_after_sell), 0) for code in target_weights: fallback = self._stock_price(current_temp, code) buy_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.BUY, fallback=fallback) if not np.isfinite(buy_price) or buy_price <= 0: continue current_amount = self._stock_amount(current_temp, code) current_value = float(current_amount * buy_price) target_value = float(target_weights[code] * portfolio_value_after_sell) buy_value = target_value - current_value if buy_value <= eps_value: continue if code not in current_stock_after_sell: if available_new_slots <= 0: continue available_new_slots -= 1 current_stock_after_sell.add(code) buy_order = self._make_buy_order(code, min(float(buy_value), cash), trade_start_time, trade_end_time) if buy_order is None: continue buy_order_list.append(buy_order) trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order( buy_order, position=current_temp, ) cash -= float(trade_val + trade_cost) if cash <= 1e-12: break return TradeDecisionWO(sell_order_list + buy_order_list, self) @staticmethod def _split_signal_frame( pred_score: pd.Series | pd.DataFrame, ) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]: if isinstance(pred_score, pd.DataFrame): score = pred_score[ForceExitTopkDropoutStrategy.SCORE_COL] if ForceExitTopkDropoutStrategy.SCORE_COL in pred_score.columns else pred_score.iloc[:, 0] buy_gate = ( pred_score[ForceExitTopkDropoutStrategy.BUY_GATE_COL].astype(bool) if ForceExitTopkDropoutStrategy.BUY_GATE_COL in pred_score.columns else pd.Series(True, index=pred_score.index) ) force_exit = ( pred_score[ForceExitTopkDropoutStrategy.FORCE_EXIT_COL].astype(bool) if ForceExitTopkDropoutStrategy.FORCE_EXIT_COL in pred_score.columns else pd.Series(False, index=pred_score.index) ) defer_sell = ( pred_score[ForceExitTopkDropoutStrategy.DEFER_SELL_COL].astype(bool) if ForceExitTopkDropoutStrategy.DEFER_SELL_COL in pred_score.columns else pd.Series(False, index=pred_score.index) ) return ( score.astype(float), buy_gate.reindex(score.index).fillna(False), force_exit.reindex(score.index).fillna(False), defer_sell.reindex(score.index).fillna(False), ) score = pred_score.astype(float) return ( score, pd.Series(True, index=score.index), pd.Series(False, index=score.index), pd.Series(False, index=score.index), ) def generate_trade_decision(self, execute_result=None): trade_step = self.trade_calendar.get_trade_step() if self.cooldown_period > 0: self._sell_cooldown_until = { code: until for code, until in self._sell_cooldown_until.items() if until > trade_step } if trade_step % self.rebalance_freq != 0: return TradeDecisionWO([], self) trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step) pred_start_time, pred_end_time = self.trade_calendar.get_step_time(trade_step, shift=1) pred_score_raw = self.signal.get_signal(start_time=pred_start_time, end_time=pred_end_time) if pred_score_raw is None: return TradeDecisionWO([], self) score, buy_gate, force_exit, defer_sell = self._split_signal_frame(pred_score_raw) if self.rebalance_mode == "sell_all": return self._generate_sell_all_decision( score, buy_gate, force_exit, trade_start_time, trade_end_time, ) if self.rebalance_mode == "target_weight": return self._generate_target_weight_decision( score, buy_gate, force_exit, trade_start_time, trade_end_time, ) if self.only_tradable: def get_first_n(li, n, reverse=False): cur_n = 0 res = [] for si in reversed(li) if reverse else li: if self.trade_exchange.is_stock_tradable( stock_id=si, start_time=trade_start_time, end_time=trade_end_time, ): res.append(si) cur_n += 1 if cur_n >= n: break return res[::-1] if reverse else res def get_last_n(li, n): return get_first_n(li, n, reverse=True) def filter_stock(li): return [ si for si in li if self.trade_exchange.is_stock_tradable( stock_id=si, start_time=trade_start_time, end_time=trade_end_time, ) ] else: def get_first_n(li, n): return list(li)[:n] def get_last_n(li, n): return list(li)[-n:] def filter_stock(li): return list(li) current_temp: Position = copy.deepcopy(self.trade_position) sell_order_list = [] buy_order_list = [] cash = current_temp.get_cash() current_stock_list = current_temp.get_stock_list() rank_score = score.copy() force_exit_index = force_exit[force_exit].index.intersection(current_stock_list) if len(force_exit_index) > 0: rank_score.loc[force_exit_index] = -np.inf last = rank_score.reindex(current_stock_list).fillna(-np.inf).sort_values(ascending=False).index outsider_score = score[~score.index.isin(last)] outsider_score = outsider_score[buy_gate.reindex(outsider_score.index).fillna(False)] if self._sell_cooldown_until: outsider_score = outsider_score[~outsider_score.index.isin(self._sell_cooldown_until.keys())] outsider_score = outsider_score.sort_values(ascending=False) if self.method_buy == "top": today = get_first_n( outsider_score.index, self.n_drop + self.topk - len(last), ) elif self.method_buy == "random": topk_candi = get_first_n(outsider_score.index, self.topk) candi = list(filter(lambda x: x not in last, topk_candi)) n = self.n_drop + self.topk - len(last) try: today = np.random.choice(candi, n, replace=False) except ValueError: today = candi else: raise NotImplementedError(f"This type of input is not supported") comb = rank_score.reindex(last.union(pd.Index(today))).fillna(-np.inf).sort_values(ascending=False).index over_cap_count = max(len(current_stock_list) - int(self.topk), 0) hard_cap_sell = pd.Index(get_last_n(last, over_cap_count)) if over_cap_count > 0 else pd.Index([]) if self.method_sell == "bottom": sell = last[last.isin(get_last_n(comb, self.n_drop))] elif self.method_sell == "random": candi = filter_stock(last) try: sell = pd.Index(np.random.choice(candi, self.n_drop, replace=False) if len(last) else []) except ValueError: sell = candi else: raise NotImplementedError(f"This type of input is not supported") if len(hard_cap_sell) > 0: sell = pd.Index(list(dict.fromkeys(list(sell) + list(hard_cap_sell)))) filtered_sell = [] hard_cap_sell_set = set(hard_cap_sell) time_per_step = self.trade_calendar.get_freq() for code in sell: is_force_exit = bool(force_exit.reindex([code]).fillna(False).iloc[0]) should_defer = bool(defer_sell.reindex([code]).fillna(False).iloc[0]) is_hard_cap_exit = code in hard_cap_sell_set if should_defer and not is_force_exit and not is_hard_cap_exit: continue if ( not is_hard_cap_exit and current_temp.get_stock_count(code, bar=time_per_step) < self.hold_thresh ): continue filtered_sell.append(code) sell = pd.Index(filtered_sell) pre_trade_slots = max(int(self.topk) - len(current_stock_list), 0) buy = today[:pre_trade_slots] for code in current_stock_list: if not self.trade_exchange.is_stock_tradable( stock_id=code, start_time=trade_start_time, end_time=trade_end_time, direction=None if self.forbid_all_trade_at_limit else OrderDir.SELL, ): continue if code in sell: if ( code not in hard_cap_sell_set and current_temp.get_stock_count(code, bar=time_per_step) < self.hold_thresh ): continue sell_amount = current_temp.get_stock_amount(code=code) sell_order = Order( stock_id=code, amount=sell_amount, start_time=trade_start_time, end_time=trade_end_time, direction=Order.SELL, ) if self.trade_exchange.check_order(sell_order): sell_order_list.append(sell_order) if self.cooldown_period > 0: self._sell_cooldown_until[code] = trade_step + self.cooldown_period trade_val, trade_cost, trade_price = self.trade_exchange.deal_order( sell_order, position=current_temp, ) cash += trade_val - trade_cost value = cash * self.risk_degree / len(buy) if len(buy) > 0 else 0 for code in buy: if not self.trade_exchange.is_stock_tradable( stock_id=code, start_time=trade_start_time, end_time=trade_end_time, direction=None if self.forbid_all_trade_at_limit else OrderDir.BUY, ): continue buy_price = self.trade_exchange.get_deal_price( stock_id=code, start_time=trade_start_time, end_time=trade_end_time, direction=OrderDir.BUY, ) buy_amount = value / buy_price factor = self.trade_exchange.get_factor( stock_id=code, start_time=trade_start_time, end_time=trade_end_time, ) buy_amount = self.trade_exchange.round_amount_by_trade_unit(buy_amount, factor) buy_order = Order( stock_id=code, amount=buy_amount, start_time=trade_start_time, end_time=trade_end_time, direction=Order.BUY, ) buy_order_list.append(buy_order) return TradeDecisionWO(sell_order_list + buy_order_list, self) return { "qlib": qlib, "Account": Account, "CommonInfrastructure": CommonInfrastructure, "SimulatorExecutor": SimulatorExecutor, "TopkDropoutStrategy": TopkDropoutStrategy, "ForceExitTopkDropoutStrategy": ForceExitTopkDropoutStrategy, "backtest_loop": backtest_loop, "get_exchange": get_exchange, "set_global_logger_level_cm": set_global_logger_level_cm, } def _resolve_qlib_dump_script(qlib_module: Any) -> Path: env_override = os.getenv("ALPHAEVO_QLIB_DUMP_SCRIPT") if env_override: override_path = Path(env_override).expanduser().resolve() if override_path.exists(): return override_path qlib_file = Path(qlib_module.__file__).resolve() candidates = [ PROJECT_ROOT / "backtest" / "vendor" / "qlib_dump_bin.py", # bundled fallback for wheel installs qlib_file.parent.parent / "scripts" / "dump_bin.py", # source checkout layout qlib_file.parent / "scripts" / "dump_bin.py", # wheel layout with qlib/scripts qlib_file.parent.parent / "qlib" / "scripts" / "dump_bin.py", qlib_file.parents[2] / "scripts" / "dump_bin.py" if len(qlib_file.parents) > 2 else None, ] tried: list[str] = [] for candidate in candidates: if candidate is None: continue candidate = candidate.resolve() tried.append(str(candidate)) if candidate.exists(): return candidate for root in [qlib_file.parent, qlib_file.parent.parent]: try: for candidate in root.rglob("dump_bin.py"): candidate = candidate.resolve() tried.append(str(candidate)) if candidate.exists(): return candidate except OSError: continue raise RuntimeError( "Could not locate Qlib dump_bin.py. Tried: " + ", ".join(dict.fromkeys(tried)) ) def _provider_signature(price_df: pd.DataFrame) -> str: cache_key = id(price_df) cached = _PROVIDER_SIGNATURE_CACHE.get(cache_key) if cached is not None: return cached reset_df = price_df.reset_index() dates = pd.to_datetime(reset_df["datetime"]) summary = { "rows": int(len(reset_df)), "instruments": int(reset_df["instrument"].nunique()), "start": dates.min().strftime("%Y-%m-%d") if len(reset_df) else "NA", "end": dates.max().strftime("%Y-%m-%d") if len(reset_df) else "NA", "cols": sorted(str(col) for col in price_df.columns), "close_sum_head": round(float(pd.to_numeric(reset_df.get("$close"), errors="coerce").fillna(0.0).head(5000).sum()), 6), "volume_sum_head": round(float(pd.to_numeric(reset_df.get("$volume"), errors="coerce").fillna(0.0).head(5000).sum()), 6), } digest = hashlib.sha256(json.dumps(summary, sort_keys=True).encode("utf-8")).hexdigest() signature = digest[:16] _PROVIDER_SIGNATURE_CACHE[cache_key] = signature return signature def _get_price_wide_frame(price_df: pd.DataFrame, column: str) -> pd.DataFrame | None: if column not in price_df.columns: return None cache_key = (id(price_df), column) cached = _PRICE_WIDE_CACHE.get(cache_key) if cached is not None: return cached frame = price_df[column].unstack("instrument") _PRICE_WIDE_CACHE[cache_key] = frame return frame def _get_available_dates(price_df: pd.DataFrame) -> pd.DatetimeIndex: cache_key = id(price_df) cached = _AVAILABLE_DATES_CACHE.get(cache_key) if cached is not None: return cached dates = pd.DatetimeIndex(price_df.index.get_level_values("datetime").unique()).sort_values() _AVAILABLE_DATES_CACHE[cache_key] = dates return dates def _write_qlib_source_csvs(price_df: pd.DataFrame, source_dir: Path) -> None: source_dir.mkdir(parents=True, exist_ok=True) reset_df = price_df.reset_index().rename(columns={"instrument": "symbol", "datetime": "date"}).copy() reset_df["date"] = pd.to_datetime(reset_df["date"]) for raw_symbol, group in reset_df.groupby("symbol", sort=True): symbol = str(raw_symbol) group = group.sort_values("date").copy() close_series = pd.to_numeric(group.get("$close"), errors="coerce") change = close_series.pct_change(fill_method=None).replace([np.inf, -np.inf], np.nan).fillna(0.0) out_df = pd.DataFrame( { "symbol": symbol, "date": group["date"].dt.strftime("%Y-%m-%d"), "open": pd.to_numeric(group.get("$open"), errors="coerce"), "close": close_series, "volume": pd.to_numeric(group.get("$volume"), errors="coerce").fillna(0.0), "amount": pd.to_numeric(group.get("$amount"), errors="coerce").fillna(0.0), "factor": 1.0, "change": change, } ) out_df.to_csv(source_dir / f"{symbol}.csv", index=False) def _ensure_qlib_provider(price_df: pd.DataFrame) -> Path: runtime = _import_qlib_runtime() provider_key = _provider_signature(price_df) cache_root = _get_qlib_cache_root() / provider_key provider_dir = cache_root / "provider" source_dir = cache_root / "csv" ready_path = cache_root / "ready.json" lock_path = cache_root.with_suffix(".lock") if ready_path.exists() and provider_dir.exists(): return provider_dir with _provider_build_lock(lock_path): if ready_path.exists() and provider_dir.exists(): return provider_dir if cache_root.exists(): shutil.rmtree(cache_root) cache_root.mkdir(parents=True, exist_ok=True) _write_qlib_source_csvs(price_df, source_dir) dump_script = _resolve_qlib_dump_script(runtime["qlib"]) cmd = [ sys.executable, str(dump_script), "dump_all", f"--data_path={source_dir}", f"--qlib_dir={provider_dir}", "--freq=day", "--date_field_name=date", "--symbol_field_name=symbol", "--include_fields=open,close,volume,amount,factor,change", "--file_suffix=.csv", "--max_workers=8", ] proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: raise RuntimeError( "Failed to build Qlib provider cache: " + (proc.stderr.strip() or proc.stdout.strip())[:800] ) ready_path.write_text( json.dumps( { "provider_key": provider_key, "provider_dir": str(provider_dir), }, ensure_ascii=False, indent=2, ), encoding="utf-8", ) return provider_dir def _ensure_qlib_initialized(provider_dir: Path) -> dict[str, Any]: runtime = _import_qlib_runtime() provider_uri = str(provider_dir) if _QLIB_RUNTIME_STATE.get("provider_uri") != provider_uri: with _quiet_runtime_io(), runtime["set_global_logger_level_cm"](logging.ERROR): runtime["qlib"].init( provider_uri=provider_uri, kernels=1, joblib_backend="threading", expression_cache=None, dataset_cache=None, redis_port=-1, clear_mem_cache=False, logging_level=logging.ERROR, ) _QLIB_RUNTIME_STATE["provider_uri"] = provider_uri return runtime def _prepare_qlib_signal(factor_values: pd.Series) -> pd.Series: signal = factor_values.copy() if isinstance(signal, pd.DataFrame): signal = signal.iloc[:, 0] if "datetime" in signal.index.names and "instrument" in signal.index.names: signal = signal.reorder_levels(["datetime", "instrument"]).sort_index() else: signal = signal.sort_index() signal = signal.astype(float).dropna() signal.name = "score" return signal def _prepare_force_exit_qlib_signal( factor_values: pd.Series, price_df: pd.DataFrame, trade_guard_config: dict[str, Any], ) -> pd.DataFrame: score = _prepare_qlib_signal(factor_values) close_series = price_df["$close"].copy() if "datetime" in close_series.index.names and "instrument" in close_series.index.names: close_series = close_series.reorder_levels(["datetime", "instrument"]).sort_index() else: close_series = close_series.sort_index() close_series = pd.to_numeric(close_series, errors="coerce") buy_gate = pd.Series(True, index=score.index, dtype=bool) force_exit = pd.Series(False, index=score.index, dtype=bool) defer_sell = pd.Series(False, index=score.index, dtype=bool) trend_ema_span = _cfg_int(trade_guard_config, "trend_ema_span") force_exit_trend_days = _cfg_int(trade_guard_config, "force_exit_trend_days") if trend_ema_span is not None and trend_ema_span > 0: ema = close_series.groupby(level="instrument").transform( lambda x: x.ewm(span=trend_ema_span, adjust=False, min_periods=1).mean() ) trend_ok = (close_series > ema).reindex(score.index).fillna(False) buy_gate &= trend_ok if force_exit_trend_days is not None and force_exit_trend_days > 0: trend_bad = close_series < ema trend_bad_streak = trend_bad.groupby(level="instrument").transform( lambda x: x.astype(float).rolling(force_exit_trend_days, min_periods=force_exit_trend_days).sum() >= force_exit_trend_days ) force_exit |= trend_bad_streak.reindex(score.index).fillna(False) prev_close = close_series.groupby(level="instrument").shift(1) close_to_close_return = (close_series / prev_close - 1.0).replace([np.inf, -np.inf], np.nan) intraday_return = pd.Series(np.nan, index=close_series.index, dtype=float) if "$open" in price_df.columns: open_series = price_df["$open"].copy() if "datetime" in open_series.index.names and "instrument" in open_series.index.names: open_series = open_series.reorder_levels(["datetime", "instrument"]).sort_index() else: open_series = open_series.sort_index() open_series = pd.to_numeric(open_series, errors="coerce") intraday_return = (close_series / open_series - 1.0).replace([np.inf, -np.inf], np.nan) buy_chase = pd.Series(False, index=score.index, dtype=bool) buy_chase_ctc_thresh = _cfg_float(trade_guard_config, "buy_chase_ctc_thresh") if buy_chase_ctc_thresh is not None: buy_chase |= (close_to_close_return > abs(buy_chase_ctc_thresh)).reindex(score.index).fillna(False) buy_chase_intraday_thresh = _cfg_float(trade_guard_config, "buy_chase_intraday_thresh") if buy_chase_intraday_thresh is not None: buy_chase |= (intraday_return > abs(buy_chase_intraday_thresh)).reindex(score.index).fillna(False) panic_drop = pd.Series(False, index=score.index, dtype=bool) panic_drop_ctc_thresh = _cfg_float(trade_guard_config, "panic_drop_ctc_thresh") if panic_drop_ctc_thresh is not None: panic_drop |= (close_to_close_return < -abs(panic_drop_ctc_thresh)).reindex(score.index).fillna(False) panic_drop_intraday_thresh = _cfg_float(trade_guard_config, "panic_drop_intraday_thresh") if panic_drop_intraday_thresh is not None: panic_drop |= (intraday_return < -abs(panic_drop_intraday_thresh)).reindex(score.index).fillna(False) extreme_range = pd.Series(False, index=score.index) extreme_range_thresh = _cfg_float(trade_guard_config, "extreme_range_thresh") if extreme_range_thresh is not None and "$high" in price_df.columns and "$low" in price_df.columns: high_series = price_df["$high"].copy() low_series = price_df["$low"].copy() if "datetime" in high_series.index.names and "instrument" in high_series.index.names: high_series = high_series.reorder_levels(["datetime", "instrument"]).sort_index() low_series = low_series.reorder_levels(["datetime", "instrument"]).sort_index() else: high_series = high_series.sort_index() low_series = low_series.sort_index() high_series = pd.to_numeric(high_series, errors="coerce") low_series = pd.to_numeric(low_series, errors="coerce") day_range = ((high_series - low_series) / close_series).replace([np.inf, -np.inf], np.nan) extreme_range = (day_range > abs(extreme_range_thresh)).reindex(score.index).fillna(False) buy_gate &= ~(buy_chase | extreme_range) defer_sell = panic_drop & ~force_exit amount_ma_window = _cfg_int(trade_guard_config, "amount_ma_window") if amount_ma_window is not None and amount_ma_window > 0 and "$amount" in price_df.columns: amount_series = price_df["$amount"].copy() if "datetime" in amount_series.index.names and "instrument" in amount_series.index.names: amount_series = amount_series.reorder_levels(["datetime", "instrument"]).sort_index() else: amount_series = amount_series.sort_index() amount_series = pd.to_numeric(amount_series, errors="coerce") amount_ma20 = amount_series.groupby(level="instrument").transform( lambda x: x.rolling(amount_ma_window, min_periods=1).mean() ) amount_buy_min_ratio = _cfg_float(trade_guard_config, "amount_buy_min_ratio") if amount_buy_min_ratio is not None: amount_ok = (amount_series >= (float(amount_buy_min_ratio) * amount_ma20)).reindex(score.index).fillna(False) buy_gate &= amount_ok amount_force_exit_ratio = _cfg_float(trade_guard_config, "amount_force_exit_ratio") if amount_force_exit_ratio is not None: amount_weak = (amount_series < (float(amount_force_exit_ratio) * amount_ma20)).reindex(score.index).fillna(False) force_exit |= amount_weak defer_sell &= ~amount_weak strategy_signal = pd.DataFrame( { "score": score.astype(float), "__buy_gate__": buy_gate.astype(bool), "__force_exit__": force_exit.astype(bool), "__defer_sell__": defer_sell.astype(bool), }, index=score.index, ) return strategy_signal def build_signal_selection_log( *, factor_values: pd.Series, price_df: pd.DataFrame, top_k: int = 10, start_date: str | None = None, end_date: str | None = None, backtest_engine: str = "qlib_original", trade_guard_config: dict[str, Any] | bool | None = None, holding_log: list[dict[str, Any]] | None = None, trade_log: list[dict[str, Any]] | None = None, portfolio_log: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: normalized_engine = _normalize_backtest_engine(backtest_engine) normalized_trade_guard_config = _normalize_trade_guard_config(trade_guard_config) if normalized_engine == "qlib_original" and normalized_trade_guard_config is not None: signal_payload = _prepare_force_exit_qlib_signal( factor_values=factor_values, price_df=price_df, trade_guard_config=normalized_trade_guard_config, ) else: score = _prepare_qlib_signal(factor_values) signal_payload = pd.DataFrame( { "score": score.astype(float), "__buy_gate__": True, "__force_exit__": False, "__defer_sell__": False, }, index=score.index, ) if signal_payload is None or len(signal_payload) == 0: return [] if isinstance(signal_payload, pd.Series): signal_frame = signal_payload.to_frame(name="score") else: signal_frame = signal_payload.copy() if "score" not in signal_frame.columns: signal_frame["score"] = pd.to_numeric(signal_frame.iloc[:, 0], errors="coerce") if "datetime" in signal_frame.index.names and "instrument" in signal_frame.index.names: signal_frame = signal_frame.reorder_levels(["datetime", "instrument"]).sort_index() else: signal_frame = signal_frame.sort_index() signal_frame = signal_frame.reset_index() signal_frame["datetime"] = pd.to_datetime(signal_frame["datetime"], errors="coerce").dt.normalize() signal_frame["instrument"] = signal_frame["instrument"].astype(str) signal_frame["score"] = pd.to_numeric(signal_frame["score"], errors="coerce") signal_frame = signal_frame.dropna(subset=["datetime", "instrument", "score"]) if signal_frame.empty: return [] available_dates = _get_available_dates(price_df) if len(available_dates) <= 1: return [] date_map = { pd.Timestamp(available_dates[i]).normalize(): pd.Timestamp(available_dates[i + 1]).normalize() for i in range(0, len(available_dates) - 1) } signal_frame["trade_date"] = signal_frame["datetime"].map(date_map) signal_frame = signal_frame.dropna(subset=["trade_date"]).copy() if signal_frame.empty: return [] if start_date: signal_frame = signal_frame[signal_frame["trade_date"] >= pd.Timestamp(start_date)] if end_date: signal_frame = signal_frame[signal_frame["trade_date"] <= pd.Timestamp(end_date)] if signal_frame.empty: return [] if portfolio_log: portfolio_df = pd.DataFrame(portfolio_log).copy() if not portfolio_df.empty and "date" in portfolio_df.columns: portfolio_df["date"] = pd.to_datetime(portfolio_df["date"], errors="coerce").dt.normalize() if "is_rebalance" in portfolio_df.columns: portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool) rebalance_dates = set(portfolio_df.loc[portfolio_df["is_rebalance"], "date"].dropna().tolist()) if rebalance_dates: signal_frame = signal_frame[signal_frame["trade_date"].isin(rebalance_dates)] if signal_frame.empty: return [] for col, default in ( ("__buy_gate__", True), ("__force_exit__", False), ("__defer_sell__", False), ): if col not in signal_frame.columns: signal_frame[col] = default signal_frame[col] = signal_frame[col].fillna(default).astype(bool) signal_frame["trade_score_rank"] = ( signal_frame.groupby("trade_date", dropna=False)["score"] .rank(method="first", ascending=False) .astype(int) ) debug_top_n = max(int(top_k), 5) signal_frame["top5_by_score"] = signal_frame["trade_score_rank"] <= 5 signal_frame["topk_by_score"] = signal_frame["trade_score_rank"] <= int(top_k) hold_summary = pd.DataFrame() if holding_log: hold_summary = pd.DataFrame(holding_log).copy() if not hold_summary.empty and {"date", "instrument"}.issubset(hold_summary.columns): hold_summary["date"] = pd.to_datetime(hold_summary["date"], errors="coerce").dt.normalize() hold_summary["instrument"] = hold_summary["instrument"].astype(str) for col in ["market_value", "weight", "shares_held"]: if col in hold_summary.columns: hold_summary[col] = pd.to_numeric(hold_summary[col], errors="coerce") hold_summary = ( hold_summary.groupby(["date", "instrument"], dropna=False) .agg( selected_eod=("instrument", "size"), shares_held_eod=("shares_held", "sum"), market_value_eod=("market_value", "sum"), weight_eod=("weight", "sum"), ) .reset_index() ) hold_summary["selected_eod"] = hold_summary["selected_eod"].fillna(0).astype(int) > 0 hold_summary["eod_hold_rank"] = ( hold_summary.groupby("date", dropna=False)["market_value_eod"] .rank(method="first", ascending=False) ) trade_summary = pd.DataFrame() if trade_log: trade_summary = pd.DataFrame(trade_log).copy() if not trade_summary.empty and {"date", "instrument"}.issubset(trade_summary.columns): trade_summary["date"] = pd.to_datetime(trade_summary["date"], errors="coerce").dt.normalize() trade_summary["instrument"] = trade_summary["instrument"].astype(str) for col in ["requested_shares", "filled_shares", "gross_notional", "filled_value", "fill_ratio"]: if col in trade_summary.columns: trade_summary[col] = pd.to_numeric(trade_summary[col], errors="coerce") trade_summary["requested_shares_norm"] = pd.to_numeric( trade_summary.get("requested_shares", trade_summary.get("filled_shares", trade_summary.get("shares", 0.0))), errors="coerce", ) trade_summary["filled_shares_norm"] = pd.to_numeric( trade_summary.get("filled_shares", trade_summary.get("shares", 0.0)), errors="coerce", ) trade_summary["requested_notional_norm"] = pd.to_numeric( trade_summary.get("order_value", trade_summary.get("gross_notional", trade_summary.get("filled_value", 0.0))), errors="coerce", ) trade_summary["filled_notional_norm"] = pd.to_numeric( trade_summary.get("filled_value", trade_summary.get("gross_notional", 0.0)), errors="coerce", ) trade_summary = ( trade_summary.groupby(["date", "instrument"], dropna=False) .agg( had_trade=("action", "size"), trade_actions=("action", lambda s: "|".join(dict.fromkeys(str(v) for v in s if str(v)))), requested_shares_total=("requested_shares_norm", "sum"), filled_shares_total=("filled_shares_norm", "sum"), requested_notional_total=("requested_notional_norm", "sum"), filled_notional_total=("filled_notional_norm", "sum"), fill_ratio_mean=("fill_ratio", "mean"), clip_reason=("clip_reason", lambda s: "|".join(sorted({str(v) for v in s if str(v)}))), ) .reset_index() ) trade_summary["had_trade"] = trade_summary["had_trade"].fillna(0).astype(int) > 0 merged = signal_frame.merge( hold_summary, left_on=["trade_date", "instrument"], right_on=["date", "instrument"], how="left", ) if "date" in merged.columns: merged = merged.drop(columns=["date"]) merged = merged.merge( trade_summary, left_on=["trade_date", "instrument"], right_on=["date", "instrument"], how="left", ) if "date" in merged.columns: merged = merged.drop(columns=["date"]) selected_source = merged["selected_eod"] if "selected_eod" in merged.columns else pd.Series(False, index=merged.index) had_trade_source = merged["had_trade"] if "had_trade" in merged.columns else pd.Series(False, index=merged.index) trade_actions_source = merged["trade_actions"] if "trade_actions" in merged.columns else pd.Series("", index=merged.index) merged["selected_eod"] = selected_source.where(selected_source.notna(), False).astype(bool) merged["had_trade"] = had_trade_source.where(had_trade_source.notna(), False).astype(bool) merged["trade_actions"] = trade_actions_source.where(trade_actions_source.notna(), "").astype(str) keep_mask = ( (merged["trade_score_rank"] <= debug_top_n) | merged["selected_eod"] | merged["had_trade"] ) merged = merged[keep_mask].copy() if merged.empty: return [] merged = merged.sort_values(["trade_date", "trade_score_rank", "instrument"], ascending=[True, True, True]) rows: list[dict[str, Any]] = [] for row in merged.to_dict("records"): rows.append( { "signal_date": pd.Timestamp(row["datetime"]).strftime("%Y-%m-%d"), "trade_date": pd.Timestamp(row["trade_date"]).strftime("%Y-%m-%d"), "instrument": str(row["instrument"]), "score": round(float(row.get("score", 0.0) or 0.0), 10), "trade_score_rank": int(row.get("trade_score_rank", 0) or 0), "top5_by_score": bool(row.get("top5_by_score", False)), "topk_by_score": bool(row.get("topk_by_score", False)), "buy_gate": bool(row.get("__buy_gate__", True)), "force_exit": bool(row.get("__force_exit__", False)), "defer_sell": bool(row.get("__defer_sell__", False)), "selected_eod": bool(row.get("selected_eod", False)), "eod_hold_rank": int(row["eod_hold_rank"]) if pd.notna(row.get("eod_hold_rank")) else None, "shares_held_eod": round(float(row.get("shares_held_eod", 0.0) or 0.0), 6), "market_value_eod": round(float(row.get("market_value_eod", 0.0) or 0.0), 6), "weight_eod": round(float(row.get("weight_eod", 0.0) or 0.0), 8), "had_trade": bool(row.get("had_trade", False)), "trade_actions": str(row.get("trade_actions", "") or ""), "requested_shares_total": round(float(row.get("requested_shares_total", 0.0) or 0.0), 6), "filled_shares_total": round(float(row.get("filled_shares_total", 0.0) or 0.0), 6), "requested_notional_total": round(float(row.get("requested_notional_total", 0.0) or 0.0), 6), "filled_notional_total": round(float(row.get("filled_notional_total", 0.0) or 0.0), 6), "fill_ratio_mean": round(float(row.get("fill_ratio_mean", 0.0) or 0.0), 6) if pd.notna(row.get("fill_ratio_mean")) else None, "clip_reason": str(row.get("clip_reason", "") or ""), } ) return rows def _extract_position_amounts(position: Any) -> dict[str, float]: if position is None: return {} return { str(instrument): float(position.get_stock_amount(instrument)) for instrument in position.get_stock_list() if float(position.get_stock_amount(instrument)) > 0 } def _build_holding_log_from_qlib_positions( positions_normal: dict[pd.Timestamp, Any], report_normal: pd.DataFrame, volume_frame: pd.DataFrame | None, amount_frame: pd.DataFrame | None, build_rows: bool = True, ) -> tuple[list[dict[str, Any]], dict[int, dict[str, float]]]: rows: list[dict[str, Any]] = [] yearly_holding_stats: dict[int, dict[str, float]] = defaultdict( lambda: { "n_days": 0, "holdings_count_sum": 0.0, "max_holdings_count": 0, } ) pos_by_date = {pd.Timestamp(date): position for date, position in positions_normal.items()} for date in sorted(pos_by_date): position = pos_by_date[date] stock_list = [inst for inst in position.get_stock_list() if float(position.get_stock_amount(inst)) > 0] year = int(date.year) stats = yearly_holding_stats[year] stats["n_days"] += 1 stats["holdings_count_sum"] += float(len(stock_list)) stats["max_holdings_count"] = max(int(stats["max_holdings_count"]), len(stock_list)) if not build_rows: continue portfolio_value = float(report_normal.loc[date, "account"]) if date in report_normal.index else float(position.calculate_value()) cash_eod = float(report_normal.loc[date, "cash"]) if date in report_normal.index else float(position.get_cash(include_settle=True)) for instrument in stock_list: shares_held = float(position.get_stock_amount(instrument)) close_price = float(position.get_stock_price(instrument)) market_value = float(shares_held * close_price) rows.append( { "date": date.strftime("%Y-%m-%d"), "year": year, "market_regime": _market_regime(year), "instrument": instrument, "shares_held": round(float(shares_held), 6), "market_value": round(float(market_value), 6), "close_price": round(float(close_price), 6), "market_volume": round(_get_frame_value(volume_frame, date, instrument, default=0.0), 6), "market_amount": round(_get_frame_value(amount_frame, date, instrument, default=0.0), 6), "portfolio_value": round(float(portfolio_value), 6), "cash_eod": round(float(cash_eod), 6), "weight": round(float(market_value) / float(portfolio_value), 6) if portfolio_value > 0 else 0.0, } ) return rows, yearly_holding_stats def _build_trade_log_from_qlib_positions( positions_normal: dict[pd.Timestamp, Any], open_prices: pd.DataFrame, close_prices: pd.DataFrame, volume_frame: pd.DataFrame | None, amount_frame: pd.DataFrame | None, cost_buy: float, cost_sell: float, build_rows: bool = True, ) -> tuple[list[dict[str, Any]], dict[int, dict[str, float]]]: rows: list[dict[str, Any]] = [] yearly_trade_stats: dict[int, dict[str, float]] = defaultdict( lambda: { "buy_trades": 0, "sell_trades": 0, "shares_bought": 0.0, "shares_sold": 0.0, "buy_gross_notional": 0.0, "sell_gross_notional": 0.0, "buy_cash_outflow": 0.0, "sell_net_proceeds": 0.0, "buy_transaction_cost": 0.0, "sell_transaction_cost": 0.0, "transaction_cost": 0.0, "gross_turnover": 0.0, } ) pos_by_date = {pd.Timestamp(date): position for date, position in positions_normal.items()} prev_position = None prev_amounts: dict[str, float] = {} for date in sorted(pos_by_date): position = pos_by_date[date] current_amounts = _extract_position_amounts(position) holdings_count_before = int(sum(1 for value in prev_amounts.values() if value > 0)) holdings_count_after = int(sum(1 for value in current_amounts.values() if value > 0)) for instrument in sorted(set(prev_amounts) | set(current_amounts)): current_shares = float(prev_amounts.get(instrument, 0.0)) target_shares = float(current_amounts.get(instrument, 0.0)) delta_shares = target_shares - current_shares if abs(delta_shares) <= 1e-12: continue action = "buy" if delta_shares > 0 else "sell" filled_shares = abs(float(delta_shares)) deal_price = _get_frame_value(open_prices if action == "buy" else close_prices, date, instrument, default=np.nan) if not np.isfinite(deal_price) or deal_price <= 0: deal_price = _get_frame_value(close_prices, date, instrument, default=np.nan) if (not np.isfinite(deal_price) or deal_price <= 0) and action == "buy" and instrument in current_amounts: deal_price = float(position.get_stock_price(instrument)) if (not np.isfinite(deal_price) or deal_price <= 0) and action == "sell" and prev_position is not None and instrument in prev_amounts: deal_price = float(prev_position.get_stock_price(instrument)) if not np.isfinite(deal_price) or deal_price <= 0: continue order_value = float(filled_shares * deal_price) transaction_cost = float(order_value * (cost_buy if action == "buy" else cost_sell)) cash_outflow = float(order_value + transaction_cost) if action == "buy" else 0.0 net_proceeds = float(order_value - transaction_cost) if action == "sell" else 0.0 days_held = 0 if action == "sell" and prev_position is not None: try: days_held = int(prev_position.get_stock_count(instrument, "day")) except Exception: days_held = 0 year = int(date.year) if build_rows: market_volume = _get_frame_value(volume_frame, date, instrument, default=np.nan) market_amount = _get_frame_value(amount_frame, date, instrument, default=np.nan) rows.append( { "date": date.strftime("%Y-%m-%d"), "year": year, "market_regime": _market_regime(year), "action": action, "instrument": instrument, "shares": round(float(filled_shares), 6), "current_shares": round(float(current_shares), 6), "target_shares": round(float(target_shares), 6), "requested_shares": round(float(filled_shares), 6), "filled_shares": round(float(filled_shares), 6), "unfilled_shares": 0.0, "fill_ratio": 1.0, "price": round(float(deal_price), 6), "order_value": round(float(order_value), 6), "filled_value": round(float(order_value), 6), "gross_notional": round(float(order_value), 6), "net_proceeds": round(float(net_proceeds), 6), "cash_outflow": round(float(cash_outflow), 6), "transaction_cost": round(float(transaction_cost), 6), "realized_pnl": 0.0, "holdings_count_before": holdings_count_before, "holdings_count_after": holdings_count_after, "days_held": days_held, "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, "amount_participation": round(float(order_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, "clip_reason": "qlib_original", } ) stats = yearly_trade_stats[year] if action == "buy": stats["buy_trades"] += 1 stats["shares_bought"] += float(filled_shares) stats["buy_gross_notional"] += float(order_value) stats["buy_cash_outflow"] += float(cash_outflow) stats["buy_transaction_cost"] += float(transaction_cost) else: stats["sell_trades"] += 1 stats["shares_sold"] += float(filled_shares) stats["sell_gross_notional"] += float(order_value) stats["sell_net_proceeds"] += float(net_proceeds) stats["sell_transaction_cost"] += float(transaction_cost) stats["transaction_cost"] += float(transaction_cost) stats["gross_turnover"] += float(order_value) prev_position = position prev_amounts = current_amounts return rows, yearly_trade_stats def _build_portfolio_log_rows( *, dates: list[pd.Timestamp], portfolio_value: pd.Series, portfolio_return: pd.Series, benchmark_return: pd.Series, cash_series: pd.Series | None, holdings_count: dict[pd.Timestamp, int] | None, rebalance_freq: int, rebalance_flags: dict[pd.Timestamp, bool] | None = None, ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] rebalance_step = max(int(rebalance_freq), 1) holdings_count = holdings_count or {} cash_series = cash_series if cash_series is not None else pd.Series(dtype=float) rebalance_flags = rebalance_flags or {} for idx, date in enumerate(dates): date_key = pd.Timestamp(date) portfolio_value_at_date = float(portfolio_value.get(date, np.nan)) portfolio_return_at_date = float(portfolio_return.get(date, 0.0) or 0.0) benchmark_return_at_date = float(benchmark_return.get(date, 0.0) or 0.0) cash_at_date = float(cash_series.get(date, np.nan)) if date in cash_series.index else np.nan is_rebalance = bool(rebalance_flags.get(date_key, idx % rebalance_step == 0)) rows.append( { "date": date_key.strftime("%Y-%m-%d"), "year": int(date_key.year), "market_regime": _market_regime(int(date_key.year)), "portfolio_value": round(float(portfolio_value_at_date), 6) if np.isfinite(portfolio_value_at_date) else None, "cash_eod": round(float(cash_at_date), 6) if np.isfinite(cash_at_date) else None, "cash_weight": round(float(cash_at_date / portfolio_value_at_date), 6) if np.isfinite(cash_at_date) and np.isfinite(portfolio_value_at_date) and abs(portfolio_value_at_date) > 1e-12 else None, "n_held": int(holdings_count.get(date_key, 0)), "portfolio_return": round(float(portfolio_return_at_date), 8), "benchmark_return": round(float(benchmark_return_at_date), 8), "excess_return": round(float(portfolio_return_at_date - benchmark_return_at_date), 8), "is_rebalance": is_rebalance, } ) return rows def _compute_portfolio_ir_qlib( factor_values: pd.Series, price_df: pd.DataFrame, bench_return: pd.Series | None = None, top_k: int = 10, n_drop: int = 2, rebalance_freq: int = 5, cost_buy: float = 0.0013, cost_sell: float = 0.0013, hold_thresh: int = 2, label_forward_days: int = 5, ann_scaler: int = 252, start_date: str | None = None, end_date: str | None = None, risk_free_rate_annual: float = 0.0, start_cash: float = 200_000_000.0, position_size: float = 1.0, max_pos_each_stock: float = 1.0, lot_size: int = 100, max_daily_volume_participation: float = 0.0, max_daily_amount_participation: float = 0.0, capture_details: bool = False, trade_guard_config: dict[str, Any] | bool | None = None, rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE, ) -> dict: if isinstance(factor_values, pd.DataFrame): factor_values = factor_values.iloc[:, 0] open_prices = _get_price_wide_frame(price_df, "$open") close_prices = _get_price_wide_frame(price_df, "$close") volume_frame = _get_price_wide_frame(price_df, "$volume") if capture_details else None amount_frame = _get_price_wide_frame(price_df, "$amount") if capture_details else None if open_prices is None or close_prices is None: return _empty_result("Missing $open or $close in price_df") normalized_rebalance_mode = _normalize_rebalance_mode(rebalance_mode) normalized_trade_guard_config = _normalize_trade_guard_config(trade_guard_config) if normalized_trade_guard_config is None: signal = _prepare_qlib_signal(factor_values) cooldown_period = 0 else: signal = _prepare_force_exit_qlib_signal(factor_values, price_df, normalized_trade_guard_config) if "cooldown_period" in normalized_trade_guard_config: cooldown_value = normalized_trade_guard_config.get("cooldown_period") cooldown_period = 0 if cooldown_value is None else max(int(cooldown_value), 0) else: cooldown_period = max(int(rebalance_freq) * 2, int(hold_thresh)) if start_date: signal = signal[signal.index.get_level_values("datetime") >= pd.Timestamp(start_date)] if end_date: signal = signal[signal.index.get_level_values("datetime") <= pd.Timestamp(end_date)] if signal.empty: return _empty_result("No valid signal values for qlib_original backtest") runtime = _ensure_qlib_initialized(_ensure_qlib_provider(price_df)) ForceExitTopkDropoutStrategy = runtime["ForceExitTopkDropoutStrategy"] SimulatorExecutor = runtime["SimulatorExecutor"] Account = runtime["Account"] CommonInfrastructure = runtime["CommonInfrastructure"] backtest_loop = runtime["backtest_loop"] get_exchange = runtime["get_exchange"] qlib_warnings: list[str] = [] effective_risk_degree = min(max(float(position_size), 0.0), 1.0) if max_pos_each_stock > 0 and top_k > 0: effective_risk_degree = min(effective_risk_degree, float(top_k) * float(max_pos_each_stock)) if max_daily_amount_participation > 0: qlib_warnings.append("max_daily_amount_participation is not natively supported by qlib_original and is ignored") if normalized_trade_guard_config is not None: qlib_warnings.append("qlib_original trade_guard_config enabled") if normalized_rebalance_mode != DEFAULT_QLIB_REBALANCE_MODE: qlib_warnings.append(f"qlib_original rebalance_mode={normalized_rebalance_mode}") strategy = ForceExitTopkDropoutStrategy( signal=signal, topk=top_k, n_drop=n_drop, rebalance_freq=rebalance_freq, cooldown_period=cooldown_period, rebalance_mode=normalized_rebalance_mode, max_pos_each_stock=max_pos_each_stock, hold_thresh=hold_thresh, risk_degree=effective_risk_degree, only_tradable=False, forbid_all_trade_at_limit=False, ) executor_obj = SimulatorExecutor( time_per_step="day", generate_portfolio_metrics=True, verbose=False, ) exchange_kwargs: dict[str, Any] = { "freq": "day", "deal_price": ("$open", "$close"), "limit_threshold": None, "open_cost": cost_buy, "close_cost": cost_sell, "min_cost": 0.0, "trade_unit": max(int(lot_size), 1), } if max_daily_volume_participation > 0: exchange_kwargs["volume_threshold"] = ("current", f"{float(max_daily_volume_participation)} * $volume") available_dates = _get_available_dates(price_df) if len(available_dates) < 2: return _empty_result("Qlib original backtest requires at least 2 trading dates") latest_safe_end = pd.Timestamp(available_dates[-2]) bt_start = pd.Timestamp(start_date) if start_date else pd.Timestamp(signal.index.get_level_values("datetime").min()) requested_end = pd.Timestamp(end_date) if end_date else pd.Timestamp(signal.index.get_level_values("datetime").max()) bt_end = min(requested_end, latest_safe_end) if bt_end < bt_start: return _empty_result("Qlib original backtest window is empty after calendar clipping") if bt_end < requested_end: qlib_warnings.append( f"qlib_original clipped end_date from {requested_end.strftime('%Y-%m-%d')} " f"to {bt_end.strftime('%Y-%m-%d')} because the provider has no future calendar bar" ) signal = signal[signal.index.get_level_values("datetime") <= bt_end] if signal.empty: return _empty_result("No valid signal values remain after qlib_original calendar clipping") benchmark_series = None if bench_return is not None: benchmark_series = bench_return.astype(float).sort_index() benchmark_series = benchmark_series[benchmark_series.index >= bt_start] benchmark_series = benchmark_series[benchmark_series.index <= bt_end] try: with _quiet_runtime_io(), runtime["set_global_logger_level_cm"](logging.ERROR): trade_account = Account( init_cash=max(float(start_cash), 0.0), position_dict={}, pos_type="Position", benchmark_config={"benchmark": None}, ) exchange_with_dates = dict(exchange_kwargs) exchange_with_dates.setdefault("start_time", bt_start) exchange_with_dates.setdefault("end_time", bt_end) trade_exchange = get_exchange(**exchange_with_dates) common_infra = CommonInfrastructure(trade_account=trade_account, trade_exchange=trade_exchange) strategy.reset_common_infra(common_infra) executor_obj.reset_common_infra(common_infra) with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Mean of empty slice", category=RuntimeWarning) portfolio_metric_dict, _indicator_dict = backtest_loop( start_time=bt_start, end_time=bt_end, trade_strategy=strategy, trade_executor=executor_obj, ) except Exception as exc: # pragma: no cover - depends on qlib runtime return _empty_result(f"Qlib original backtest failed: {type(exc).__name__}: {str(exc)[:500]}") if not portfolio_metric_dict: return _empty_result("Qlib original backtest returned no portfolio metrics") analysis_key = sorted(portfolio_metric_dict.keys())[0] report_normal, positions_normal = portfolio_metric_dict[analysis_key] if report_normal is None or report_normal.empty: return _empty_result("Qlib original backtest returned an empty report") report_normal = report_normal.sort_index() portfolio_returns_with_cost = report_normal["return"].astype(float) - report_normal["cost"].astype(float) if benchmark_series is not None: benchmark_returns = benchmark_series.reindex(report_normal.index).fillna(0.0).astype(float) else: benchmark_returns = report_normal.get("bench", pd.Series(index=report_normal.index, dtype=float)).astype(float).fillna(0.0) return_frame = pd.DataFrame( { "portfolio_value": report_normal["account"].astype(float), "portfolio_return": portfolio_returns_with_cost, "benchmark_return": benchmark_returns, }, index=report_normal.index, ) ic_frame = _compute_daily_cross_sectional_ic_frame( factor_values=factor_values, close_prices=close_prices, label_forward_days=label_forward_days, ) if start_date and not ic_frame.empty: ic_frame = ic_frame[ic_frame.index >= pd.Timestamp(start_date)] if end_date and not ic_frame.empty: ic_frame = ic_frame[ic_frame.index <= pd.Timestamp(end_date)] holding_log, yearly_holding_stats = _build_holding_log_from_qlib_positions( positions_normal=positions_normal, report_normal=report_normal, volume_frame=volume_frame, amount_frame=amount_frame, build_rows=capture_details, ) trade_log, yearly_trade_stats = _build_trade_log_from_qlib_positions( positions_normal=positions_normal, open_prices=open_prices, close_prices=close_prices, volume_frame=volume_frame, amount_frame=amount_frame, cost_buy=cost_buy, cost_sell=cost_sell, build_rows=capture_details, ) holdings_count_by_date = { pd.Timestamp(date): int( sum(1 for instrument in position.get_stock_list() if float(position.get_stock_amount(instrument)) > 0) ) for date, position in positions_normal.items() } portfolio_log = _build_portfolio_log_rows( dates=list(return_frame.index), portfolio_value=return_frame["portfolio_value"].astype(float), portfolio_return=return_frame["portfolio_return"].astype(float), benchmark_return=return_frame["benchmark_return"].astype(float), cash_series=report_normal["cash"].astype(float) if "cash" in report_normal.columns else None, holdings_count=holdings_count_by_date, rebalance_freq=rebalance_freq, ) total_transaction_cost = float(sum(stats.get("transaction_cost", 0.0) for stats in yearly_trade_stats.values())) total_gross_turnover = float(sum(stats.get("gross_turnover", 0.0) for stats in yearly_trade_stats.values())) result = _summarize_return_frame( return_frame=return_frame, ann_scaler=ann_scaler, risk_free_rate_annual=risk_free_rate_annual, ) result.update(_summarize_ic_frame(ic_frame)) result.update( { "success": True, "final_value": round(float(report_normal["account"].iloc[-1]), 6), "error": None, "qlib_warnings": qlib_warnings, "trade_guard_config": normalized_trade_guard_config, "rebalance_mode": normalized_rebalance_mode, "transaction_cost": round(total_transaction_cost, 6), "gross_turnover": round(total_gross_turnover, 6), "turnover_ratio": round(total_gross_turnover / max(float(start_cash), 1e-12), 6), "yearly_metrics": _build_yearly_metrics( return_frame=return_frame, ic_frame=ic_frame, yearly_trade_stats=yearly_trade_stats, yearly_holding_stats=yearly_holding_stats, ann_scaler=ann_scaler, risk_free_rate_annual=risk_free_rate_annual, ), "trade_log": trade_log if capture_details else [], "stock_contrib": _build_stock_contribution_summary(holding_log, trade_log) if capture_details else [], "holding_log": holding_log if capture_details else [], "portfolio_log": portfolio_log if capture_details else [], } ) return result def _compute_portfolio_ir_custom( factor_values: pd.Series, price_df: pd.DataFrame, bench_return: pd.Series | None = None, top_k: int = 10, n_drop: int = 2, rebalance_freq: int = 5, cost_buy: float = 0.0013, cost_sell: float = 0.0013, hold_thresh: int = 2, label_forward_days: int = 5, ann_scaler: int = 252, start_date: str | None = None, end_date: str | None = None, risk_free_rate_annual: float = 0.0, start_cash: float = 200_000_000.0, position_size: float = 1.0, max_pos_each_stock: float = 1.0, lot_size: int = 100, max_daily_volume_participation: float = 0.0, max_daily_amount_participation: float = 0.0, capture_details: bool = False, custom_weight_mode: str = "equal", redistribute_unfilled_cash: bool = False, enforce_cash_limit: bool = False, ) -> dict: """Compute portfolio-based IR matching AlphaAgent's Qlib config.""" if isinstance(factor_values, pd.DataFrame): factor_values = factor_values.iloc[:, 0] all_dates = sorted(factor_values.index.get_level_values("datetime").unique()) if start_date: all_dates = [d for d in all_dates if d >= pd.Timestamp(start_date)] if end_date: all_dates = [d for d in all_dates if d <= pd.Timestamp(end_date)] if len(all_dates) < rebalance_freq + hold_thresh + 1: return _empty_result("Not enough dates for backtesting") open_prices = price_df["$open"].unstack("instrument") if "$open" in price_df.columns else None close_prices = price_df["$close"].unstack("instrument") if "$close" in price_df.columns else None volume_frame = price_df["$volume"].unstack("instrument") if "$volume" in price_df.columns else None amount_frame = price_df["$amount"].unstack("instrument") if "$amount" in price_df.columns else None if open_prices is None or close_prices is None: return _empty_result("Missing $open or $close in price_df") effective_freq = max(rebalance_freq, hold_thresh) rebalance_indices = list(range(0, len(all_dates), effective_freq)) rebalance_set = {all_dates[i] for i in rebalance_indices} lot_size = max(int(lot_size), 1) start_cash = max(float(start_cash), 0.0) position_size = min(max(float(position_size), 0.0), 1.0) max_pos_each_stock = min(max(float(max_pos_each_stock), 0.0), 1.0) max_daily_volume_participation = max(float(max_daily_volume_participation), 0.0) max_daily_amount_participation = max(float(max_daily_amount_participation), 0.0) normalized_custom_weight_mode = _normalize_custom_weight_mode(custom_weight_mode) redistribute_unfilled_cash = bool(redistribute_unfilled_cash) enforce_cash_limit = bool(enforce_cash_limit) holdings: dict[str, dict[str, Any]] = {} cash = float(start_cash) daily_portfolio_values: list[float] = [] daily_dates_out: list[pd.Timestamp] = [] daily_portfolio_records: list[dict[str, Any]] = [] daily_holding_records: list[dict[str, Any]] = [] trade_records: list[dict[str, Any]] = [] yearly_trade_stats: dict[int, dict[str, float]] = defaultdict( lambda: { "buy_trades": 0, "sell_trades": 0, "shares_bought": 0.0, "shares_sold": 0.0, "buy_gross_notional": 0.0, "sell_gross_notional": 0.0, "buy_cash_outflow": 0.0, "sell_net_proceeds": 0.0, "buy_transaction_cost": 0.0, "sell_transaction_cost": 0.0, "transaction_cost": 0.0, "gross_turnover": 0.0, } ) yearly_holding_stats: dict[int, dict[str, float]] = defaultdict( lambda: { "n_days": 0, "holdings_count_sum": 0.0, "max_holdings_count": 0, } ) rebalance_execution_count = 0 for day_idx, date in enumerate(all_dates): is_rebalance = date in rebalance_set rebalance_executed = False signal = pd.Series(dtype=float) if is_rebalance and day_idx > 0: prev_date = all_dates[day_idx - 1] try: signal = factor_values.xs(prev_date, level="datetime") except KeyError: signal = pd.Series(dtype=float) signal = signal.dropna().sort_values(ascending=False) if is_rebalance and day_idx > 0 and not signal.empty: rebalance_executed = True is_initial_rebalance = rebalance_execution_count == 0 effective_custom_weight_mode = "equal" if is_initial_rebalance else normalized_custom_weight_mode effective_redistribute_unfilled_cash = False if is_initial_rebalance else redistribute_unfilled_cash current_insts = set(holdings.keys()) locked_insts = { inst for inst, pos in holdings.items() if day_idx - int(pos["buy_day_idx"]) < hold_thresh } new_top = set(signal.nlargest(top_k).index.tolist()) eligible_current = list(current_insts - locked_insts) # Custom engine now rebalances directly to the current top-k target set: # any sellable holding that falls out of the target top-k becomes a drop # candidate, without an n_drop cap. dropout_candidates = {inst for inst in eligible_current if inst not in new_top} keep_insts = list(current_insts - dropout_candidates) keep_insts_set = {str(inst) for inst in keep_insts} equity = cash + sum( _market_value_from_close(close_prices, date, pos, inst) for inst, pos in holdings.items() ) max_position_value = equity * max_pos_each_stock if max_pos_each_stock >= 1.0 - 1e-12: baseline_slot_value = equity * position_size else: baseline_slot_value = min( max_position_value, (equity * position_size / float(top_k)) if top_k > 0 else 0.0, ) target_universe = list(keep_insts) for inst in signal.index.tolist(): if len(target_universe) >= top_k: break if inst in target_universe or inst in dropout_candidates: continue px = _get_frame_value(open_prices, date, inst, default=np.nan) if not np.isfinite(px) or px <= 0: px = _get_frame_value(close_prices, date, inst, default=np.nan) if not np.isfinite(px) or px <= 0: continue one_lot_cash = float(lot_size) * float(px) * (1.0 + cost_buy) if baseline_slot_value > 0 and one_lot_cash > baseline_slot_value: continue target_universe.append(inst) ranked_target_names = [str(inst) for inst in target_universe] target_shares_map: dict[str, float] = {} score_target_weights = pd.Series(dtype=float) if effective_custom_weight_mode == "equal" and not effective_redistribute_unfilled_cash: target_count = max(len(ranked_target_names), 1) target_value_per_name = min( equity * position_size / float(target_count), max_position_value, ) for inst in set(ranked_target_names).union(current_insts): current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) if inst not in ranked_target_names or target_value_per_name <= 0: target_shares = 0.0 else: price_for_target = _get_frame_value(open_prices, date, inst, default=np.nan) if not np.isfinite(price_for_target) or price_for_target <= 0: price_for_target = _get_frame_value(close_prices, date, inst, default=np.nan) if not np.isfinite(price_for_target) or price_for_target <= 0: target_shares = current_shares else: target_shares = _round_down_lot(target_value_per_name / float(price_for_target), lot_size) if inst in keep_insts_set and target_shares < current_shares: target_shares = current_shares target_shares_map[inst] = float(target_shares) else: ranked_target_scores = signal.reindex(pd.Index(ranked_target_names)).dropna().sort_values(ascending=False) ranked_target_names = [str(inst) for inst in ranked_target_scores.index.tolist()] score_target_weights = _build_custom_target_weights( target_scores=ranked_target_scores, target_names=ranked_target_names, total_weight=1.0 if position_size > 0 else 0.0, max_pos_each_stock=max_pos_each_stock, weight_mode=effective_custom_weight_mode, ) for inst in set(ranked_target_names).union(current_insts): current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) # Score-weighted custom mode treats sell decisions as membership-driven: # if a name remains in the refreshed alpha basket, keep the current size # and let the normalized score decide how much incremental cash to add # after sells have completed. if str(inst) not in ranked_target_names: target_shares = 0.0 else: target_shares = current_shares if inst in keep_insts_set and target_shares < current_shares: target_shares = current_shares target_shares_map[inst] = float(target_shares) for inst in list(holdings.keys()): pos = holdings[inst] current_shares = float(pos["shares"]) target_shares = float(target_shares_map.get(inst, 0.0)) requested_shares = max(current_shares - target_shares, 0.0) if requested_shares <= 0: continue hold_count_before = len(holdings) days_held = int(day_idx - int(pos["buy_day_idx"])) sell_px = _get_frame_value(close_prices, date, inst, default=pos.get("avg_cost_per_share", 0.0)) sell_px = sell_px if sell_px > 0 else float(pos.get("avg_cost_per_share", 0.0)) market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan) market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan) # Custom engine uses sell signals as hard exits: once an instrument # falls out of the active alpha basket, liquidate the whole sellable # position on that rebalance date instead of clipping the order into # multiple partial sells across later rebalances. clip_reasons: list[str] = [] if not np.isfinite(sell_px) or sell_px <= 0: filled_shares = 0.0 clip_reasons.append("invalid_price") else: filled_shares = min(requested_shares, current_shares) order_value = requested_shares * sell_px filled_value = filled_shares * sell_px transaction_cost = filled_value * cost_sell net_proceeds = filled_value - transaction_cost realized_pnl = filled_value - transaction_cost - float(pos["avg_cost_per_share"]) * filled_shares if filled_shares > 0: cash += net_proceeds pos["shares"] = float(pos["shares"]) - filled_shares pos["total_cost_basis"] = max( float(pos["total_cost_basis"]) - float(pos["avg_cost_per_share"]) * filled_shares, 0.0, ) if pos["shares"] > 0: pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"]) else: del holdings[inst] year = int(pd.Timestamp(date).year) stats = yearly_trade_stats[year] stats["sell_trades"] += 1 stats["shares_sold"] += float(filled_shares) stats["sell_gross_notional"] += float(filled_value) stats["sell_net_proceeds"] += float(net_proceeds) stats["sell_transaction_cost"] += float(transaction_cost) stats["transaction_cost"] += float(transaction_cost) stats["gross_turnover"] += float(filled_value) if capture_details: year = int(pd.Timestamp(date).year) trade_records.append( { "date": pd.Timestamp(date).strftime("%Y-%m-%d"), "year": year, "market_regime": _market_regime(year), "action": "sell", "instrument": inst, "shares": round(float(filled_shares), 6), "current_shares": round(float(current_shares), 6), "target_shares": round(float(target_shares), 6), "requested_shares": round(float(requested_shares), 6), "filled_shares": round(float(filled_shares), 6), "unfilled_shares": round(float(max(requested_shares - filled_shares, 0.0)), 6), "fill_ratio": round(float(filled_shares / requested_shares), 6) if requested_shares > 0 else 0.0, "price": round(float(sell_px), 6), "order_value": round(float(order_value), 6), "filled_value": round(float(filled_value), 6), "redistributed_notional": 0.0, "gross_notional": round(float(filled_value), 6), "net_proceeds": round(float(net_proceeds), 6), "cash_outflow": 0.0, "transaction_cost": round(float(transaction_cost), 6), "realized_pnl": round(float(realized_pnl), 6), "holdings_count_before": int(hold_count_before), "holdings_count_after": int(len(holdings)), "days_held": days_held, "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, "amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, "clip_reason": _clip_reason_text(clip_reasons), } ) score_buy_budget_by_inst: dict[str, float] = {} if not score_target_weights.empty: available_buy_budget = max(float(cash), 0.0) score_buy_budget_by_inst = { str(inst): float(weight) * available_buy_budget for inst, weight in score_target_weights.items() if float(weight) > 0 } carry_budget = 0.0 buy_filled_by_inst: dict[str, float] = {} buy_names = ranked_target_names if ranked_target_names else [] for inst in buy_names: current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) target_shares = float(target_shares_map.get(inst, 0.0)) reported_target_shares = float(target_shares) buy_px = _get_frame_value(open_prices, date, inst, default=np.nan) if not np.isfinite(buy_px) or buy_px <= 0: buy_px = _get_frame_value(close_prices, date, inst, default=np.nan) hold_count_before = len(holdings) year = int(pd.Timestamp(date).year) market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan) market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan) clip_reasons: list[str] = [] filled_shares = 0.0 requested_shares = 0.0 order_value = 0.0 filled_value = 0.0 transaction_cost = 0.0 cash_outflow = 0.0 redistributed_notional = 0.0 if effective_custom_weight_mode == "equal" and not effective_redistribute_unfilled_cash: requested_shares = max(target_shares - current_shares, 0.0) if requested_shares <= 0: continue order_value = requested_shares * float(buy_px) if np.isfinite(buy_px) and buy_px > 0 else 0.0 if not np.isfinite(buy_px) or buy_px <= 0: clip_reasons.append("invalid_price") else: volume_cap, amount_cap = _liquidity_caps( market_volume=market_volume, market_amount=market_amount, price=buy_px, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, ) cash_cap = ( _round_down_lot(cash / (float(buy_px) * (1.0 + cost_buy)), lot_size) if enforce_cash_limit else np.inf ) filled_shares = min(requested_shares, volume_cap, amount_cap, cash_cap) if volume_cap < requested_shares: clip_reasons.append("volume_limit") if amount_cap < requested_shares: clip_reasons.append("amount_limit") if enforce_cash_limit and cash_cap < requested_shares: clip_reasons.append("cash_limit") if filled_shares < requested_shares and filled_shares <= 0: clip_reasons.append("lot_rounding") filled_value = filled_shares * float(buy_px) transaction_cost = filled_value * cost_buy cash_outflow = filled_value + transaction_cost else: current_value = float(current_shares * buy_px) if np.isfinite(buy_px) and buy_px > 0 else 0.0 base_requested_budget = float(score_buy_budget_by_inst.get(str(inst), 0.0)) requested_budget = float(base_requested_budget) carry_in_budget = 0.0 if effective_redistribute_unfilled_cash: carry_in_budget = float(carry_budget) requested_budget += carry_in_budget carry_budget = 0.0 if max_pos_each_stock > 0 and np.isfinite(buy_px) and buy_px > 0: per_name_value_cap = float(equity * max_pos_each_stock) budget_cap = max(per_name_value_cap - current_value, 0.0) carry_budget += max(requested_budget - budget_cap, 0.0) requested_budget = min(requested_budget, budget_cap) if requested_budget <= 1e-12 and carry_budget <= 1e-12: continue if not np.isfinite(buy_px) or buy_px <= 0: clip_reasons.append("invalid_price") if effective_redistribute_unfilled_cash: carry_budget += requested_budget else: requested_shares = _round_down_lot( requested_budget / (float(buy_px) * (1.0 + cost_buy)), lot_size, ) reported_target_shares = float(current_shares + requested_shares) order_value = requested_shares * float(buy_px) volume_cap, amount_cap = _liquidity_caps( market_volume=market_volume, market_amount=market_amount, price=buy_px, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, ) cash_cap = ( _round_down_lot(cash / (float(buy_px) * (1.0 + cost_buy)), lot_size) if enforce_cash_limit else np.inf ) filled_shares = min(requested_shares, volume_cap, amount_cap, cash_cap) if volume_cap < requested_shares: clip_reasons.append("volume_limit") if amount_cap < requested_shares: clip_reasons.append("amount_limit") if enforce_cash_limit and cash_cap < requested_shares: clip_reasons.append("cash_limit") if requested_budget > 1e-12 and requested_shares <= 0: clip_reasons.append("lot_rounding") if filled_shares < requested_shares and filled_shares <= 0: clip_reasons.append("lot_rounding") filled_value = filled_shares * float(buy_px) transaction_cost = filled_value * cost_buy cash_outflow = filled_value + transaction_cost if carry_in_budget > 0 and filled_shares > 0: redistributed_notional = min(float(cash_outflow), float(carry_in_budget)) if effective_redistribute_unfilled_cash: carry_budget += max(requested_budget - cash_outflow, 0.0) if filled_shares > 0: cash -= cash_outflow buy_filled_by_inst[inst] = float(buy_filled_by_inst.get(inst, 0.0)) + float(filled_shares) if inst in holdings: pos = holdings[inst] pos["shares"] = float(pos["shares"]) + filled_shares pos["total_cost_basis"] = float(pos["total_cost_basis"]) + cash_outflow pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"]) else: holdings[inst] = { "shares": float(filled_shares), "buy_date": pd.Timestamp(date), "buy_day_idx": int(day_idx), "avg_cost_per_share": float(cash_outflow / filled_shares), "total_cost_basis": float(cash_outflow), } stats = yearly_trade_stats[year] stats["buy_trades"] += 1 stats["shares_bought"] += float(filled_shares) stats["buy_gross_notional"] += float(filled_value) stats["buy_cash_outflow"] += float(cash_outflow) stats["buy_transaction_cost"] += float(transaction_cost) stats["transaction_cost"] += float(transaction_cost) stats["gross_turnover"] += float(filled_value) if capture_details: trade_records.append( { "date": pd.Timestamp(date).strftime("%Y-%m-%d"), "year": year, "market_regime": _market_regime(year), "action": "buy", "instrument": inst, "shares": round(float(filled_shares), 6), "current_shares": round(float(current_shares), 6), "target_shares": round(float(reported_target_shares), 6), "requested_shares": round(float(requested_shares), 6), "filled_shares": round(float(filled_shares), 6), "unfilled_shares": round(float(max(requested_shares - filled_shares, 0.0)), 6), "fill_ratio": round(float(filled_shares / requested_shares), 6) if requested_shares > 0 else 0.0, "price": round(float(buy_px), 6) if np.isfinite(buy_px) else 0.0, "order_value": round(float(order_value), 6), "filled_value": round(float(filled_value), 6), "redistributed_notional": round(float(redistributed_notional), 6), "gross_notional": round(float(filled_value), 6), "net_proceeds": 0.0, "cash_outflow": round(float(cash_outflow), 6), "transaction_cost": round(float(transaction_cost), 6), "realized_pnl": 0.0, "holdings_count_before": int(hold_count_before), "holdings_count_after": int(len(holdings)), "days_held": 0, "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, "amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, "clip_reason": _clip_reason_text(clip_reasons), } ) if effective_redistribute_unfilled_cash and carry_budget > 1e-12 and buy_names: residual_budget = ( min(float(carry_budget), float(cash)) if enforce_cash_limit else float(carry_budget) ) for inst in buy_names: if residual_budget <= 1e-12: break buy_px = _get_frame_value(open_prices, date, inst, default=np.nan) if not np.isfinite(buy_px) or buy_px <= 0: buy_px = _get_frame_value(close_prices, date, inst, default=np.nan) if not np.isfinite(buy_px) or buy_px <= 0: continue one_lot_cash = float(lot_size) * float(buy_px) * (1.0 + cost_buy) if one_lot_cash <= 0 or residual_budget + 1e-12 < one_lot_cash or (enforce_cash_limit and cash + 1e-12 < one_lot_cash): continue current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) current_value = float(current_shares * float(buy_px)) extra_budget_cap = float(residual_budget) if max_pos_each_stock > 0: per_name_value_cap = float(equity * max_pos_each_stock) extra_budget_cap = min(extra_budget_cap, max(per_name_value_cap - current_value, 0.0)) if extra_budget_cap + 1e-12 < one_lot_cash: continue market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan) market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan) volume_cap, amount_cap = _liquidity_caps( market_volume=market_volume, market_amount=market_amount, price=buy_px, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, ) used_shares_today = float(buy_filled_by_inst.get(inst, 0.0)) remaining_volume_cap = max(float(volume_cap) - used_shares_today, 0.0) remaining_amount_cap = max(float(amount_cap) - used_shares_today, 0.0) remaining_liquidity_cap = min(remaining_volume_cap, remaining_amount_cap) budget_cap_shares = _round_down_lot(extra_budget_cap / (float(buy_px) * (1.0 + cost_buy)), lot_size) cash_cap_shares = ( _round_down_lot(min(float(cash), float(residual_budget)) / (float(buy_px) * (1.0 + cost_buy)), lot_size) if enforce_cash_limit else budget_cap_shares ) filled_shares = min(remaining_liquidity_cap, budget_cap_shares, cash_cap_shares) if filled_shares <= 0: continue hold_count_before = len(holdings) year = int(pd.Timestamp(date).year) filled_value = filled_shares * float(buy_px) transaction_cost = filled_value * cost_buy cash_outflow = filled_value + transaction_cost cash -= cash_outflow residual_budget = max(float(residual_budget) - float(cash_outflow), 0.0) buy_filled_by_inst[inst] = float(buy_filled_by_inst.get(inst, 0.0)) + float(filled_shares) if inst in holdings: pos = holdings[inst] pos["shares"] = float(pos["shares"]) + filled_shares pos["total_cost_basis"] = float(pos["total_cost_basis"]) + cash_outflow pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"]) else: holdings[inst] = { "shares": float(filled_shares), "buy_date": pd.Timestamp(date), "buy_day_idx": int(day_idx), "avg_cost_per_share": float(cash_outflow / filled_shares), "total_cost_basis": float(cash_outflow), } stats = yearly_trade_stats[year] stats["buy_trades"] += 1 stats["shares_bought"] += float(filled_shares) stats["buy_gross_notional"] += float(filled_value) stats["buy_cash_outflow"] += float(cash_outflow) stats["buy_transaction_cost"] += float(transaction_cost) stats["transaction_cost"] += float(transaction_cost) stats["gross_turnover"] += float(filled_value) if capture_details: trade_records.append( { "date": pd.Timestamp(date).strftime("%Y-%m-%d"), "year": year, "market_regime": _market_regime(year), "action": "buy", "instrument": inst, "shares": round(float(filled_shares), 6), "current_shares": round(float(current_shares), 6), "target_shares": round(float(target_shares_map.get(inst, 0.0)), 6), "requested_shares": round(float(filled_shares), 6), "filled_shares": round(float(filled_shares), 6), "unfilled_shares": 0.0, "fill_ratio": 1.0, "price": round(float(buy_px), 6), "order_value": round(float(filled_value), 6), "filled_value": round(float(filled_value), 6), "redistributed_notional": round(float(cash_outflow), 6), "gross_notional": round(float(filled_value), 6), "net_proceeds": 0.0, "cash_outflow": round(float(cash_outflow), 6), "transaction_cost": round(float(transaction_cost), 6), "realized_pnl": 0.0, "holdings_count_before": int(hold_count_before), "holdings_count_after": int(len(holdings)), "days_held": 0, "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, "amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, "clip_reason": "cash_sweep", } ) rebalance_execution_count += 1 year = int(pd.Timestamp(date).year) holding_stats = yearly_holding_stats[year] holding_stats["n_days"] += 1 holding_stats["holdings_count_sum"] += float(len(holdings)) holding_stats["max_holdings_count"] = max(int(holding_stats["max_holdings_count"]), len(holdings)) eod_rows: list[dict[str, Any]] = [] holdings_market_value = 0.0 for inst, pos in holdings.items(): close_px = _get_frame_value(close_prices, date, inst, default=pos.get("avg_cost_per_share", 0.0)) if close_px > 0: market_value = float(pos["shares"]) * float(close_px) else: market_value = float(pos["total_cost_basis"]) holdings_market_value += float(market_value) if capture_details: eod_rows.append( { "date": pd.Timestamp(date).strftime("%Y-%m-%d"), "year": year, "market_regime": _market_regime(year), "instrument": inst, "shares_held": round(float(pos["shares"]), 6), "market_value": round(float(market_value), 6), "close_price": round(float(close_px), 6), "market_volume": round(_get_frame_value(volume_frame, date, inst, default=0.0), 6), "market_amount": round(_get_frame_value(amount_frame, date, inst, default=0.0), 6), } ) portfolio_value = float(cash + holdings_market_value) daily_portfolio_values.append(portfolio_value) daily_dates_out.append(pd.Timestamp(date)) daily_portfolio_records.append( { "Date": pd.Timestamp(date), "portfolio_value": float(portfolio_value), "cash": float(cash), "n_held": int(len(holdings)), "is_rebalance": bool(rebalance_executed), } ) if capture_details and eod_rows: for row in eod_rows: row["portfolio_value"] = round(float(portfolio_value), 6) row["cash_eod"] = round(float(cash), 6) row["weight"] = round(float(row["market_value"]) / float(portfolio_value), 6) if portfolio_value > 0 else 0.0 daily_holding_records.append(row) if len(daily_portfolio_values) < 10: return _empty_result("Not enough daily returns") pv_series = pd.Series(daily_portfolio_values, index=pd.DatetimeIndex(daily_dates_out), name="portfolio_value") portfolio_returns = pv_series.pct_change().dropna() if len(portfolio_returns) < 2: return _empty_result("Not enough daily returns") benchmark_returns = ( bench_return.reindex(portfolio_returns.index).fillna(0.0) if bench_return is not None else pd.Series(0.0, index=portfolio_returns.index) ) return_frame = pd.DataFrame( { "portfolio_value": pv_series.reindex(portfolio_returns.index), "portfolio_return": portfolio_returns.astype(float), "benchmark_return": benchmark_returns.astype(float), }, index=portfolio_returns.index, ) portfolio_dates = list(pv_series.index) portfolio_returns_aligned = portfolio_returns.reindex(portfolio_dates).fillna(0.0).astype(float) benchmark_returns_aligned = benchmark_returns.reindex(portfolio_dates).fillna(0.0).astype(float) cash_series_aligned = pd.Series( { pd.Timestamp(row["Date"]): float(row["cash"]) for row in daily_portfolio_records } ) holdings_count_aligned = { pd.Timestamp(row["Date"]): int(row["n_held"]) for row in daily_portfolio_records } rebalance_flags_aligned = { pd.Timestamp(row["Date"]): bool(row.get("is_rebalance", False)) for row in daily_portfolio_records } portfolio_log = _build_portfolio_log_rows( dates=portfolio_dates, portfolio_value=pv_series.astype(float), portfolio_return=portfolio_returns_aligned, benchmark_return=benchmark_returns_aligned, cash_series=cash_series_aligned, holdings_count=holdings_count_aligned, rebalance_freq=rebalance_freq, rebalance_flags=rebalance_flags_aligned, ) ic_frame = _compute_daily_cross_sectional_ic_frame( factor_values=factor_values, close_prices=close_prices, label_forward_days=label_forward_days, ) if start_date and not ic_frame.empty: ic_frame = ic_frame[ic_frame.index >= pd.Timestamp(start_date)] if end_date and not ic_frame.empty: ic_frame = ic_frame[ic_frame.index <= pd.Timestamp(end_date)] result = _summarize_return_frame( return_frame=return_frame, ann_scaler=ann_scaler, risk_free_rate_annual=risk_free_rate_annual, ) result.update(_summarize_ic_frame(ic_frame)) total_transaction_cost = float(sum(stats.get("transaction_cost", 0.0) for stats in yearly_trade_stats.values())) total_gross_turnover = float(sum(stats.get("gross_turnover", 0.0) for stats in yearly_trade_stats.values())) result.update( { "success": True, "final_value": round(float(pv_series.iloc[-1]), 6), "error": None, "rebalance_mode": DEFAULT_QLIB_REBALANCE_MODE, "custom_weight_mode": normalized_custom_weight_mode, "redistribute_unfilled_cash": redistribute_unfilled_cash, "enforce_cash_limit": enforce_cash_limit, "transaction_cost": round(total_transaction_cost, 6), "gross_turnover": round(total_gross_turnover, 6), "turnover_ratio": round(total_gross_turnover / max(float(start_cash), 1e-12), 6), "yearly_metrics": _build_yearly_metrics( return_frame=return_frame, ic_frame=ic_frame, yearly_trade_stats=yearly_trade_stats, yearly_holding_stats=yearly_holding_stats, ann_scaler=ann_scaler, risk_free_rate_annual=risk_free_rate_annual, ), "trade_log": trade_records if capture_details else [], "stock_contrib": _build_stock_contribution_summary(daily_holding_records, trade_records) if capture_details else [], "holding_log": daily_holding_records if capture_details else [], "portfolio_log": portfolio_log if capture_details else [], } ) return result def compute_portfolio_ir( factor_values: pd.Series, price_df: pd.DataFrame, bench_return: pd.Series | None = None, top_k: int = 10, n_drop: int = 2, rebalance_freq: int = 5, cost_buy: float = 0.0013, cost_sell: float = 0.0013, hold_thresh: int = 2, label_forward_days: int = 5, ann_scaler: int = 252, start_date: str | None = None, end_date: str | None = None, risk_free_rate_annual: float = 0.0, start_cash: float = 200_000_000.0, position_size: float = 1.0, max_pos_each_stock: float = 1.0, lot_size: int = 100, max_daily_volume_participation: float = 0.0, max_daily_amount_participation: float = 0.0, capture_details: bool = False, engine: str = "custom", trade_guard_config: dict[str, Any] | bool | None = None, rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE, custom_weight_mode: str = "equal", redistribute_unfilled_cash: bool = False, enforce_cash_limit: bool = False, ) -> dict: normalized_engine = _normalize_backtest_engine(engine) if normalized_engine == "custom": return _compute_portfolio_ir_custom( factor_values=factor_values, price_df=price_df, bench_return=bench_return, top_k=top_k, n_drop=n_drop, rebalance_freq=rebalance_freq, cost_buy=cost_buy, cost_sell=cost_sell, hold_thresh=hold_thresh, label_forward_days=label_forward_days, ann_scaler=ann_scaler, start_date=start_date, end_date=end_date, risk_free_rate_annual=risk_free_rate_annual, start_cash=start_cash, position_size=position_size, max_pos_each_stock=max_pos_each_stock, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, capture_details=capture_details, custom_weight_mode=custom_weight_mode, redistribute_unfilled_cash=redistribute_unfilled_cash, enforce_cash_limit=enforce_cash_limit, ) if normalized_engine == "qlib_original": return _compute_portfolio_ir_qlib( factor_values=factor_values, price_df=price_df, bench_return=bench_return, top_k=top_k, n_drop=n_drop, rebalance_freq=rebalance_freq, cost_buy=cost_buy, cost_sell=cost_sell, hold_thresh=hold_thresh, label_forward_days=label_forward_days, ann_scaler=ann_scaler, start_date=start_date, end_date=end_date, risk_free_rate_annual=risk_free_rate_annual, start_cash=start_cash, position_size=position_size, max_pos_each_stock=max_pos_each_stock, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, capture_details=capture_details, trade_guard_config=trade_guard_config, rebalance_mode=rebalance_mode, ) if normalized_engine in {"spec_shares_cash", "spec_return_based"}: from backtest.spec_bridge_backtester import compute_portfolio_ir_spec_bridge return compute_portfolio_ir_spec_bridge( factor_values=factor_values, price_df=price_df, bench_return=bench_return, top_k=top_k, n_drop=n_drop, rebalance_freq=rebalance_freq, cost_buy=cost_buy, cost_sell=cost_sell, hold_thresh=hold_thresh, label_forward_days=label_forward_days, ann_scaler=ann_scaler, start_date=start_date, end_date=end_date, risk_free_rate_annual=risk_free_rate_annual, start_cash=start_cash, position_size=position_size, max_pos_each_stock=max_pos_each_stock, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, capture_details=capture_details, mode="shares_cash" if normalized_engine == "spec_shares_cash" else "return_based", trade_guard_config=trade_guard_config, rebalance_mode=rebalance_mode, ) return _empty_result(f"Unsupported backtest engine: {engine}") def _empty_result(error: str) -> dict: return { "success": False, "ir": 0.0, "ic_mean": 0.0, "ic_std": 0.0, "icir": 0.0, "rank_ic_mean": 0.0, "rank_ic_std": 0.0, "rank_icir": 0.0, "n_ic_days": 0, "annualized_return": 0.0, "annualized_volatility": 0.0, "sharpe": 0.0, "winrate": 0.0, "mdd": 0.0, "excess_mdd": 0.0, "portfolio_nav_mdd": 0.0, "drawdown_duration_max": 0, "drawdown_duration_mean": 0.0, "drawdown_duration_median": 0.0, "total_return": 0.0, "performance_return": 0.0, "benchmark_performance_return": 0.0, "excess_compounded_return": 0.0, "mean_daily_return": 0.0, "std_daily_return": 0.0, "n_days": 0, "final_value": 1.0, "yearly_metrics": {}, "trade_log": [], "stock_contrib": [], "holding_log": [], "portfolio_log": [], "error": error, }