robust-AAE / runtime /backtest /robust_qlib_backtester.py
PuLam's picture
Add standalone alpha robustness matrix bundle
79e6483 verified
"""Robustness-only duplicate of the single-factor portfolio backtester.
Strictly matches AlphaAgent's conf_vn_combined_kdd_ver.yaml:
Strategy: ForceExitTopkDropoutStrategy
- topk: 10 (absolute, not percentage)
- n_drop: 2
- hold_thresh: 2 (T+2 settlement)
- rebalance_mode="dropout" keeps legacy TopKDropout behavior
- rebalance_mode="sell_all" sells all sellable holdings, then buys the new alpha top-k
- rebalance_mode="target_weight" syncs holdings to alpha top-k target weights
Execution:
- Buy at OPEN price on rebalance dates
- Sell at CLOSE price on rebalance dates
- Optional trade guards are disabled by default and enabled only via trade_guard_config
- Signal uses data up to T-1
Costs:
- open_cost: 0.13% (buy)
- close_cost: 0.13% (sell)
Metrics (Qlib "sum" mode):
- IR = mean(daily_excess_return_vs_benchmark) / std(daily_excess_return_vs_benchmark) * sqrt(252)
- Sharpe = mean(daily_return - daily_risk_free_rate) / std(daily_return - daily_risk_free_rate) * sqrt(252)
- annualized_return = mean daily excess return * 252
- performance_return = compounded portfolio return over the evaluated window
- benchmark_performance_return = compounded benchmark return over the evaluated window
- excess_compounded_return = performance_return - benchmark_performance_return
- max_drawdown = (cumsum(excess_return) - cummax(cumsum(excess_return))).min()
- portfolio_nav_mdd = ((cumprod(1 + portfolio_return) / cummax(cumprod(1 + portfolio_return))) - 1).min()
- ann_scaler: 252
Additional factor metrics:
- IC = mean daily cross-sectional Pearson correlation between factor_t and
close-to-close forward return over the next `label_forward_days`
- ICIR = IC / std(IC)
- RankIC = mean daily cross-sectional Spearman correlation
- RankICIR = RankIC / std(RankIC)
"""
from __future__ import annotations
import contextlib
import copy
from collections import defaultdict
import hashlib
import io
import json
import logging
import os
from pathlib import Path
import shutil
import subprocess
import sys
import time
from typing import Any
import warnings
import numpy as np
import pandas as pd
PROJECT_ROOT = Path(__file__).resolve().parent.parent
_QLIB_CACHE_ROOT: Path | None = None
_QLIB_RUNTIME_STATE: dict[str, str | None] = {"provider_uri": None}
_PROVIDER_SIGNATURE_CACHE: dict[int, str] = {}
_PRICE_WIDE_CACHE: dict[tuple[int, str], pd.DataFrame] = {}
_AVAILABLE_DATES_CACHE: dict[int, pd.DatetimeIndex] = {}
YEAR_REGIMES = {
2022: "bearish",
2025: "bullish",
}
DEFAULT_VN_TRADE_GUARD_CONFIG: dict[str, Any] = {
"trend_ema_span": 20,
"force_exit_trend_days": 3,
"buy_chase_ctc_thresh": 0.05,
"buy_chase_intraday_thresh": 0.04,
"panic_drop_ctc_thresh": 0.05,
"panic_drop_intraday_thresh": 0.04,
"extreme_range_thresh": 0.10,
"amount_ma_window": 20,
"amount_buy_min_ratio": 1.0,
"amount_force_exit_ratio": 0.5,
}
DEFAULT_QLIB_REBALANCE_MODE = "dropout"
QLIB_REBALANCE_MODE_ALIASES = {
"dropout": "dropout",
"topk_dropout": "dropout",
"original": "dropout",
"sell_all": "sell_all",
"sell_all_buy_topk": "sell_all",
"full_liquidate": "sell_all",
"full_rebalance": "sell_all",
"target_weight": "target_weight",
"target_weight_topk": "target_weight",
"sync_topk": "target_weight",
"weight_sync": "target_weight",
}
def _market_regime(year: int) -> str:
return YEAR_REGIMES.get(int(year), "neutral")
def _normalize_rebalance_mode(mode: str | None) -> str:
key = (mode or DEFAULT_QLIB_REBALANCE_MODE).strip().lower()
if key in ("", "none", "null"):
key = DEFAULT_QLIB_REBALANCE_MODE
try:
return QLIB_REBALANCE_MODE_ALIASES[key]
except KeyError as exc:
choices = ", ".join(sorted(set(QLIB_REBALANCE_MODE_ALIASES.values())))
raise ValueError(f"Unsupported qlib rebalance_mode={mode!r}. Choose one of: {choices}") from exc
def _normalize_trade_guard_config(config: dict[str, Any] | bool | None) -> dict[str, Any] | None:
if config is None or config is False:
return None
if config is True:
return dict(DEFAULT_VN_TRADE_GUARD_CONFIG)
if not isinstance(config, dict):
raise TypeError("trade_guard_config must be a dict, True, False, or None")
normalized = dict(DEFAULT_VN_TRADE_GUARD_CONFIG)
normalized.update(config)
return normalized
def _cfg_float(config: dict[str, Any], key: str) -> float | None:
value = config.get(key)
if value is None:
return None
return float(value)
def _cfg_int(config: dict[str, Any], key: str) -> int | None:
value = config.get(key)
if value is None:
return None
return int(value)
def _get_frame_value(frame: pd.DataFrame | None, date: pd.Timestamp, inst: str, default: float = np.nan) -> float:
if frame is None:
return float(default)
try:
value = frame.loc[date, inst]
except (KeyError, TypeError):
return float(default)
if pd.isna(value):
return float(default)
return float(value)
def _compute_daily_cross_sectional_ic_frame(
factor_values: pd.Series,
close_prices: pd.DataFrame,
label_forward_days: int = 5,
) -> pd.DataFrame:
"""Compute daily cross-sectional IC / RankIC series on forward close returns."""
empty_ic_frame = pd.DataFrame(
{
"ic": pd.Series(dtype=float),
"rank_ic": pd.Series(dtype=float),
},
index=pd.DatetimeIndex([], name="date"),
)
if isinstance(factor_values, pd.DataFrame):
factor_values = factor_values.iloc[:, 0]
factor_wide = factor_values.unstack("instrument").sort_index()
close_wide = close_prices.sort_index()
common_dates = factor_wide.index.intersection(close_wide.index)
if len(common_dates) <= label_forward_days:
return empty_ic_frame
factor_wide = factor_wide.reindex(common_dates)
close_wide = close_wide.reindex(common_dates)
forward_returns = close_wide.shift(-label_forward_days) / close_wide - 1.0
rows: list[dict[str, Any]] = []
last_valid_idx = len(common_dates) - label_forward_days
for idx in range(last_valid_idx):
date = common_dates[idx]
factor_row = factor_wide.iloc[idx]
ret_row = forward_returns.iloc[idx]
valid = factor_row.notna() & ret_row.notna()
if valid.sum() < 2:
continue
x = factor_row[valid]
y = ret_row[valid]
if x.nunique(dropna=True) < 2 or y.nunique(dropna=True) < 2:
continue
ic = x.corr(y, method="pearson")
rank_ic = x.corr(y, method="spearman")
rows.append(
{
"date": date,
"ic": float(ic) if pd.notna(ic) else np.nan,
"rank_ic": float(rank_ic) if pd.notna(rank_ic) else np.nan,
}
)
if not rows:
return empty_ic_frame
return pd.DataFrame(rows).set_index("date").sort_index()
def _summarize_ic_frame(ic_frame: pd.DataFrame) -> dict[str, Any]:
if ic_frame is None or ic_frame.empty:
return {
"ic_mean": 0.0,
"ic_std": 0.0,
"icir": 0.0,
"rank_ic_mean": 0.0,
"rank_ic_std": 0.0,
"rank_icir": 0.0,
"n_ic_days": 0,
}
ic_values = ic_frame["ic"].dropna()
rank_values = ic_frame["rank_ic"].dropna()
ic_mean = float(ic_values.mean()) if len(ic_values) else 0.0
ic_std = float(ic_values.std(ddof=1)) if len(ic_values) > 1 else 0.0
rank_ic_mean = float(rank_values.mean()) if len(rank_values) else 0.0
rank_ic_std = float(rank_values.std(ddof=1)) if len(rank_values) > 1 else 0.0
return {
"ic_mean": round(ic_mean, 6),
"ic_std": round(ic_std, 6),
"icir": round(ic_mean / (ic_std + 1e-12), 6) if len(ic_values) > 1 else 0.0,
"rank_ic_mean": round(rank_ic_mean, 6),
"rank_ic_std": round(rank_ic_std, 6),
"rank_icir": round(rank_ic_mean / (rank_ic_std + 1e-12), 6) if len(rank_values) > 1 else 0.0,
"n_ic_days": int(len(ic_values)),
}
def _summarize_return_frame(
return_frame: pd.DataFrame,
ann_scaler: int = 252,
risk_free_rate_annual: float = 0.0,
) -> dict[str, Any]:
def _drawdown_duration_stats(cumulative_excess: np.ndarray) -> tuple[int, float, float]:
if len(cumulative_excess) == 0:
return 0, 0.0, 0.0
running_max = np.maximum.accumulate(cumulative_excess)
in_drawdown = cumulative_excess < running_max - 1e-12
durations: list[int] = []
current = 0
for flag in in_drawdown:
if flag:
current += 1
elif current > 0:
durations.append(current)
current = 0
if current > 0:
durations.append(current)
if not durations:
return 0, 0.0, 0.0
return int(max(durations)), float(np.mean(durations)), float(np.median(durations))
if return_frame is None or return_frame.empty:
return {
"ir": 0.0,
"annualized_return": 0.0,
"annualized_volatility": 0.0,
"sharpe": 0.0,
"winrate": 0.0,
"mdd": 0.0,
"excess_mdd": 0.0,
"portfolio_nav_mdd": 0.0,
"drawdown_duration_max": 0,
"drawdown_duration_mean": 0.0,
"drawdown_duration_median": 0.0,
"total_return": 0.0,
"performance_return": 0.0,
"benchmark_performance_return": 0.0,
"excess_compounded_return": 0.0,
"mean_daily_return": 0.0,
"std_daily_return": 0.0,
"n_days": 0,
}
portfolio_returns = return_frame["portfolio_return"].astype(float)
benchmark_returns = return_frame["benchmark_return"].astype(float)
excess_returns = portfolio_returns - benchmark_returns
daily_rf = float(risk_free_rate_annual) / float(ann_scaler)
risk_free_adjusted = portfolio_returns - daily_rf
mean_excess = float(excess_returns.mean())
std_excess = float(excess_returns.std(ddof=1)) if len(excess_returns) > 1 else 1e-8
mean_rf = float(risk_free_adjusted.mean())
std_rf = float(risk_free_adjusted.std(ddof=1)) if len(risk_free_adjusted) > 1 else 1e-8
performance_return = float((1.0 + portfolio_returns).prod() - 1.0)
benchmark_performance_return = float((1.0 + benchmark_returns).prod() - 1.0)
excess_compounded_return = performance_return - benchmark_performance_return
annualized_return = mean_excess * ann_scaler
information_ratio = mean_excess / (std_excess + 1e-12) * np.sqrt(ann_scaler)
sharpe_ratio = mean_rf / (std_rf + 1e-12) * np.sqrt(ann_scaler)
winrate = float(np.mean(excess_returns > 0)) if len(excess_returns) > 0 else 0.0
cumulative_excess = np.cumsum(excess_returns.to_numpy(dtype=float))
if len(cumulative_excess):
max_drawdown = float((cumulative_excess - np.maximum.accumulate(cumulative_excess)).min())
else:
max_drawdown = 0.0
drawdown_duration_max, drawdown_duration_mean, drawdown_duration_median = _drawdown_duration_stats(cumulative_excess)
nav = np.cumprod(1.0 + portfolio_returns.to_numpy(dtype=float))
if len(nav):
nav_peak = np.maximum.accumulate(nav)
portfolio_nav_mdd = float(np.min(nav / (nav_peak + 1e-12) - 1.0))
else:
portfolio_nav_mdd = 0.0
annualized_volatility = std_excess * np.sqrt(ann_scaler)
total_return = float(np.sum(portfolio_returns))
return {
"ir": round(float(information_ratio), 6),
"annualized_return": round(float(annualized_return), 6),
"annualized_volatility": round(float(annualized_volatility), 6),
"sharpe": round(float(sharpe_ratio), 6),
"winrate": round(float(winrate), 6),
"mdd": round(float(max_drawdown), 6),
"excess_mdd": round(float(max_drawdown), 6),
"portfolio_nav_mdd": round(float(portfolio_nav_mdd), 6),
"drawdown_duration_max": int(drawdown_duration_max),
"drawdown_duration_mean": round(float(drawdown_duration_mean), 4),
"drawdown_duration_median": round(float(drawdown_duration_median), 4),
"total_return": round(float(total_return), 6),
"performance_return": round(float(performance_return), 6),
"benchmark_performance_return": round(float(benchmark_performance_return), 6),
"excess_compounded_return": round(float(excess_compounded_return), 6),
"mean_daily_return": round(float(mean_excess), 8),
"std_daily_return": round(float(std_excess), 8),
"n_days": int(len(portfolio_returns)),
}
def _build_yearly_metrics(
return_frame: pd.DataFrame,
ic_frame: pd.DataFrame,
yearly_trade_stats: dict[int, dict[str, float]],
yearly_holding_stats: dict[int, dict[str, float]],
ann_scaler: int,
risk_free_rate_annual: float,
) -> dict[str, Any]:
years: set[int] = set()
if return_frame is not None and not return_frame.empty:
years.update(int(year) for year in return_frame.index.year.unique())
if ic_frame is not None and not ic_frame.empty:
years.update(int(year) for year in ic_frame.index.year.unique())
years.update(int(year) for year in yearly_trade_stats.keys())
years.update(int(year) for year in yearly_holding_stats.keys())
yearly_metrics: dict[str, Any] = {}
for year in sorted(years):
year_return_frame = return_frame[return_frame.index.year == year] if not return_frame.empty else pd.DataFrame()
year_ic_frame = ic_frame[ic_frame.index.year == year] if not ic_frame.empty else pd.DataFrame()
metrics = _summarize_return_frame(
return_frame=year_return_frame,
ann_scaler=ann_scaler,
risk_free_rate_annual=risk_free_rate_annual,
)
metrics.update(_summarize_ic_frame(year_ic_frame))
trade_stats = yearly_trade_stats.get(year, {})
holding_stats = yearly_holding_stats.get(year, {})
n_days = int(holding_stats.get("n_days", 0))
holdings_count_sum = float(holding_stats.get("holdings_count_sum", 0.0))
metrics.update(
{
"year": int(year),
"market_regime": _market_regime(year),
"avg_holdings_count": round(holdings_count_sum / n_days, 4) if n_days else 0.0,
"max_holdings_count": int(holding_stats.get("max_holdings_count", 0)),
"drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0),
"drawdown_duration_mean": round(float(metrics.get("drawdown_duration_mean", 0.0) or 0.0), 4),
"drawdown_duration_median": round(float(metrics.get("drawdown_duration_median", 0.0) or 0.0), 4),
"buy_trades": int(trade_stats.get("buy_trades", 0)),
"sell_trades": int(trade_stats.get("sell_trades", 0)),
"shares_bought": round(float(trade_stats.get("shares_bought", 0.0)), 6),
"shares_sold": round(float(trade_stats.get("shares_sold", 0.0)), 6),
"buy_gross_notional": round(float(trade_stats.get("buy_gross_notional", 0.0)), 6),
"sell_gross_notional": round(float(trade_stats.get("sell_gross_notional", 0.0)), 6),
"buy_cash_outflow": round(float(trade_stats.get("buy_cash_outflow", 0.0)), 6),
"sell_net_proceeds": round(float(trade_stats.get("sell_net_proceeds", 0.0)), 6),
"buy_transaction_cost": round(float(trade_stats.get("buy_transaction_cost", 0.0)), 6),
"sell_transaction_cost": round(float(trade_stats.get("sell_transaction_cost", 0.0)), 6),
"transaction_cost": round(float(trade_stats.get("transaction_cost", 0.0)), 6),
"gross_turnover": round(float(trade_stats.get("gross_turnover", 0.0)), 6),
}
)
yearly_metrics[str(year)] = metrics
return yearly_metrics
def _build_stock_contribution_summary(
daily_holding_records: list[dict[str, Any]],
trade_records: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if not daily_holding_records and not trade_records:
return []
hold_df = pd.DataFrame(daily_holding_records)
trade_df = pd.DataFrame(trade_records)
if not hold_df.empty:
hold_df["date"] = pd.to_datetime(hold_df["date"])
if not trade_df.empty:
trade_df["date"] = pd.to_datetime(trade_df["date"])
years: set[int] = set()
if not hold_df.empty:
years.update(int(year) for year in hold_df["year"].dropna().unique())
if not trade_df.empty:
years.update(int(year) for year in trade_df["year"].dropna().unique())
if not years:
return []
holdings_by_date: dict[pd.Timestamp, pd.Series] = {}
if not hold_df.empty:
for date, grp in hold_df.groupby("date"):
holdings_by_date[pd.Timestamp(date)] = grp.groupby("instrument")["market_value"].sum()
snapshot_dates = sorted(holdings_by_date.keys())
rows: list[dict[str, Any]] = []
for year in sorted(years):
year_hold = hold_df[hold_df["year"] == year] if not hold_df.empty else pd.DataFrame()
year_trade = trade_df[trade_df["year"] == year] if not trade_df.empty else pd.DataFrame()
instruments: set[str] = set()
if not year_hold.empty:
instruments.update(str(inst) for inst in year_hold["instrument"].unique())
if not year_trade.empty:
instruments.update(str(inst) for inst in year_trade["instrument"].unique())
if not instruments:
continue
year_start = pd.Timestamp(f"{year}-01-01")
prior_dates = [date for date in snapshot_dates if date < year_start]
prior_snapshot = holdings_by_date[prior_dates[-1]] if prior_dates else pd.Series(dtype=float)
if not year_hold.empty:
year_end_date = pd.Timestamp(year_hold["date"].max())
end_snapshot = holdings_by_date.get(year_end_date, pd.Series(dtype=float))
else:
end_snapshot = pd.Series(dtype=float)
for instrument in sorted(instruments):
hold_inst = year_hold[year_hold["instrument"] == instrument] if not year_hold.empty else pd.DataFrame()
trade_inst = year_trade[year_trade["instrument"] == instrument] if not year_trade.empty else pd.DataFrame()
if not trade_inst.empty:
filled_mask = trade_inst.get("filled_shares", trade_inst.get("shares", 0.0)).astype(float) > 0
filled_trade_inst = trade_inst[filled_mask]
buy_inst = filled_trade_inst[filled_trade_inst["action"] == "buy"]
sell_inst = filled_trade_inst[filled_trade_inst["action"] == "sell"]
else:
buy_inst = pd.DataFrame()
sell_inst = pd.DataFrame()
start_value = float(prior_snapshot.get(instrument, 0.0)) if not prior_snapshot.empty else 0.0
end_value = float(end_snapshot.get(instrument, 0.0)) if not end_snapshot.empty else 0.0
buy_cash_outflow = float(buy_inst["cash_outflow"].sum()) if not buy_inst.empty else 0.0
sell_net_proceeds = float(sell_inst["net_proceeds"].sum()) if not sell_inst.empty else 0.0
realized_pnl = float(sell_inst["realized_pnl"].sum()) if not sell_inst.empty else 0.0
contribution_return = end_value + sell_net_proceeds - start_value - buy_cash_outflow
rows.append(
{
"year": int(year),
"market_regime": _market_regime(year),
"instrument": instrument,
"contribution_return": round(float(contribution_return), 6),
"abs_contribution_return": round(abs(float(contribution_return)), 6),
"realized_pnl": round(float(realized_pnl), 6),
"start_value": round(float(start_value), 6),
"end_value": round(float(end_value), 6),
"buy_trades": int(len(buy_inst)),
"sell_trades": int(len(sell_inst)),
"shares_bought": round(float(buy_inst["shares"].sum()) if not buy_inst.empty else 0.0, 6),
"shares_sold": round(float(sell_inst["shares"].sum()) if not sell_inst.empty else 0.0, 6),
"buy_cash_outflow": round(float(buy_cash_outflow), 6),
"sell_net_proceeds": round(float(sell_net_proceeds), 6),
"holding_days": int(hold_inst["date"].nunique()) if not hold_inst.empty else 0,
"avg_shares_held": round(float(hold_inst["shares_held"].mean()) if not hold_inst.empty else 0.0, 6),
"ending_shares": round(float(hold_inst.sort_values("date")["shares_held"].iloc[-1]) if not hold_inst.empty else 0.0, 6),
"avg_market_value": round(float(hold_inst["market_value"].mean()) if not hold_inst.empty else 0.0, 6),
"max_market_value": round(float(hold_inst["market_value"].max()) if not hold_inst.empty else 0.0, 6),
"market_volume_sum": round(float(hold_inst["market_volume"].sum()) if not hold_inst.empty else 0.0, 6),
"market_amount_sum": round(float(hold_inst["market_amount"].sum()) if not hold_inst.empty else 0.0, 6),
}
)
if not rows:
return rows
contrib_df = pd.DataFrame(rows)
contrib_df = contrib_df.sort_values(["year", "abs_contribution_return", "instrument"], ascending=[True, False, True])
contrib_df["rank"] = contrib_df.groupby("year").cumcount() + 1
return contrib_df.to_dict("records")
def _round_down_lot(shares: float, lot_size: int) -> float:
if not np.isfinite(shares) or shares <= 0:
return 0.0
lot = max(int(lot_size), 1)
return float(np.floor(float(shares) / float(lot)) * float(lot))
def _market_value_from_close(
close_prices: pd.DataFrame,
date: pd.Timestamp,
position: dict[str, Any],
instrument: str,
) -> float:
close_px = _get_frame_value(close_prices, date, instrument, default=position.get("avg_cost_per_share", 0.0))
if close_px > 0:
return float(position["shares"]) * float(close_px)
return float(position["total_cost_basis"])
def _liquidity_caps(
*,
market_volume: float,
market_amount: float,
price: float,
lot_size: int,
max_daily_volume_participation: float,
max_daily_amount_participation: float,
) -> tuple[float, float]:
volume_cap = np.inf
if np.isfinite(market_volume) and market_volume > 0 and max_daily_volume_participation > 0:
volume_cap = _round_down_lot(float(market_volume) * float(max_daily_volume_participation), lot_size)
amount_cap = np.inf
if np.isfinite(market_amount) and market_amount > 0 and price > 0 and max_daily_amount_participation > 0:
amount_cap = _round_down_lot(
(float(market_amount) * float(max_daily_amount_participation)) / float(price),
lot_size,
)
return float(volume_cap), float(amount_cap)
def _clip_reason_text(reasons: list[str]) -> str:
deduped = []
for reason in reasons:
if reason and reason not in deduped:
deduped.append(reason)
return ",".join(deduped) if deduped else "none"
CUSTOM_WEIGHT_MODE_ALIASES = {
"equal": "equal",
"equal_weight": "equal",
"weight_equal": "equal",
"alpha_score": "alpha_score",
"score": "alpha_score",
"score_weight": "alpha_score",
"alpha_weight": "alpha_score",
}
def _normalize_custom_weight_mode(mode: str | None) -> str:
key = str(mode or "equal").strip().lower()
if key in ("", "none", "null"):
key = "equal"
try:
return CUSTOM_WEIGHT_MODE_ALIASES[key]
except KeyError as exc:
choices = ", ".join(sorted(set(CUSTOM_WEIGHT_MODE_ALIASES.values())))
raise ValueError(f"Unsupported custom_weight_mode={mode!r}. Choose one of: {choices}") from exc
def _normalize_positive_weights(
raw: pd.Series,
*,
total_weight: float,
max_pos_each_stock: float,
) -> pd.Series:
raw = raw.replace([np.inf, -np.inf], np.nan).dropna().astype(float)
raw = raw[raw > 0]
if raw.empty:
return pd.Series(dtype=float)
total_weight = max(float(total_weight), 0.0)
if total_weight <= 0:
return pd.Series(dtype=float)
cap = max(float(max_pos_each_stock), 0.0)
effective_total_weight = total_weight
if cap > 0:
effective_total_weight = min(effective_total_weight, float(len(raw)) * cap)
if effective_total_weight <= 1e-12:
return pd.Series(dtype=float)
weights = raw / float(raw.sum()) * effective_total_weight
if cap <= 0:
return weights.reindex(raw.index).fillna(0.0)
fixed: dict[str, float] = {}
free = weights.copy()
remaining = effective_total_weight
for _ in range(len(weights)):
over = free[free > cap]
if over.empty:
break
for code in over.index:
fixed[str(code)] = cap
free = free.drop(index=over.index)
remaining = max(effective_total_weight - float(sum(fixed.values())), 0.0)
if free.empty or remaining <= 1e-12:
free = pd.Series(dtype=float)
break
if float(free.sum()) <= 1e-12:
free = pd.Series(dtype=float)
break
free = free / float(free.sum()) * remaining
out = pd.Series(fixed, dtype=float)
if not free.empty:
out = pd.concat([out, free])
return out.reindex(raw.index).fillna(0.0)
def _build_custom_target_weights(
*,
target_scores: pd.Series,
target_names: list[str],
total_weight: float,
max_pos_each_stock: float,
weight_mode: str,
) -> pd.Series:
ordered_names = [str(code) for code in target_names]
if not ordered_names:
return pd.Series(dtype=float)
normalized_mode = _normalize_custom_weight_mode(weight_mode)
if normalized_mode == "equal":
raw = pd.Series(1.0, index=ordered_names, dtype=float)
else:
ordered_scores = pd.to_numeric(target_scores.reindex(ordered_names), errors="coerce")
if ordered_scores.isna().any():
raw = pd.Series(1.0, index=ordered_names, dtype=float)
else:
spread = float(ordered_scores.max() - ordered_scores.min())
if spread <= 1e-12:
raw = pd.Series(1.0, index=ordered_names, dtype=float)
else:
# Shift scores into the positive domain so weights are stable
# regardless of whether the factor's raw range is [-1, 1], [0, 1],
# or something unbounded.
raw = (ordered_scores - float(ordered_scores.min())) + spread * 0.05
weights = _normalize_positive_weights(
raw,
total_weight=total_weight,
max_pos_each_stock=max_pos_each_stock,
)
if weights.empty:
return weights
return weights.reindex(ordered_names).fillna(0.0)
def _normalize_backtest_engine(engine: str | None) -> str:
value = str(engine or "custom").strip().lower()
aliases = {
"custom": "custom",
"qlib_custom": "custom",
"qlib_customize": "custom",
"qlib_customized": "custom",
"customized": "custom",
"original": "qlib_original",
"qlib": "qlib_original",
"qlib_original": "qlib_original",
"spec": "spec_shares_cash",
"spec_cash": "spec_shares_cash",
"spec_shares_cash": "spec_shares_cash",
"qlib_spec": "spec_shares_cash",
"spec_return": "spec_return_based",
"spec_return_based": "spec_return_based",
"qlib_spec_return_based": "spec_return_based",
}
return aliases.get(value, value)
def _resolve_writable_cache_root() -> Path:
candidates: list[Path] = []
env_override = os.getenv("ALPHAEVO_QLIB_CACHE_ROOT") or os.getenv("QLIB_CACHE_ROOT")
if env_override:
candidates.append(Path(env_override).expanduser())
if Path("/kaggle/working").exists():
candidates.append(Path("/kaggle/working") / ".cache" / "qlib_vn")
candidates.extend(
[
PROJECT_ROOT / ".cache" / "qlib_vn",
Path.home() / ".cache" / "alphaevo" / "qlib_vn",
Path("/tmp") / "alphaevo_qlib_vn",
]
)
tried: list[str] = []
for root in candidates:
root = root.resolve()
tried.append(str(root))
try:
root.mkdir(parents=True, exist_ok=True)
probe = root / ".write_probe"
probe.write_text("ok", encoding="utf-8")
probe.unlink(missing_ok=True)
return root
except OSError:
continue
raise RuntimeError(
"Could not find a writable cache directory for qlib_original. Tried: "
+ ", ".join(tried)
)
def _get_qlib_cache_root() -> Path:
global _QLIB_CACHE_ROOT
if _QLIB_CACHE_ROOT is None:
_QLIB_CACHE_ROOT = _resolve_writable_cache_root()
return _QLIB_CACHE_ROOT
@contextlib.contextmanager
def _provider_build_lock(lock_path: Path, timeout_sec: float = 900.0):
lock_path.parent.mkdir(parents=True, exist_ok=True)
start = time.monotonic()
fd: int | None = None
while fd is None:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR)
os.write(fd, f"{os.getpid()}\n".encode("utf-8"))
except FileExistsError:
try:
age = time.time() - lock_path.stat().st_mtime
if age > timeout_sec:
lock_path.unlink(missing_ok=True)
continue
except FileNotFoundError:
continue
if time.monotonic() - start > timeout_sec:
raise TimeoutError(f"Timed out waiting for Qlib provider cache lock: {lock_path}")
time.sleep(0.25)
try:
yield
finally:
if fd is not None:
os.close(fd)
lock_path.unlink(missing_ok=True)
@contextlib.contextmanager
def _quiet_runtime_io() -> Any:
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
with contextlib.redirect_stdout(stdout_buffer), contextlib.redirect_stderr(stderr_buffer):
yield stdout_buffer, stderr_buffer
def _import_qlib_runtime() -> dict[str, Any]:
try:
with _quiet_runtime_io():
import qlib
from qlib.backtest import CommonInfrastructure, backtest_loop, get_exchange
from qlib.backtest.account import Account
from qlib.backtest.decision import Order, OrderDir, TradeDecisionWO
from qlib.backtest.executor import SimulatorExecutor
from qlib.backtest.position import Position
from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy
from qlib.log import set_global_logger_level_cm
except Exception as exc: # pragma: no cover - depends on optional runtime
raise RuntimeError(
"qlib_original requires the real Qlib package/runtime to be available in the environment"
) from exc
class ForceExitTopkDropoutStrategy(TopkDropoutStrategy):
"""Top-k dropout with explicit rebalance cadence and VN-specific overlays.
The wrapper stays close to Qlib's original TopkDropoutStrategy:
- keep `topk`, `n_drop`, and `hold_thresh`
- only rebalance every `rebalance_freq` bars
- restrict new buys to names that pass the precomputed `__buy_gate__`
- push `__force_exit__` names to the bottom of the sell ranking without
bypassing `n_drop`
- defer non-forced sells when `__defer_sell__` marks a panic-drop day
- avoid rebuying recently sold names for a short cooldown window
- hard-cap live stock count at `topk`; new buys only use pre-trade empty slots
- optionally run full liquidation or target-weight top-k sync modes
"""
SCORE_COL = "score"
BUY_GATE_COL = "__buy_gate__"
FORCE_EXIT_COL = "__force_exit__"
DEFER_SELL_COL = "__defer_sell__"
def __init__(
self,
*,
rebalance_freq: int = 5,
cooldown_period: int = 10,
rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE,
max_pos_each_stock: float = 1.0,
target_weight_eps: float = 0.001,
**kwargs,
):
super().__init__(**kwargs)
self.rebalance_freq = max(int(rebalance_freq), 1)
self.cooldown_period = max(int(cooldown_period), 0)
self.rebalance_mode = _normalize_rebalance_mode(rebalance_mode)
self.max_pos_each_stock = max(float(max_pos_each_stock), 0.0)
self.target_weight_eps = max(float(target_weight_eps), 0.0)
self._sell_cooldown_until: dict[str, int] = {}
def _score_to_target_weights(self, target_score: pd.Series) -> dict[str, float]:
target_score = target_score.replace([np.inf, -np.inf], np.nan).dropna().astype(float)
if target_score.empty:
return {}
spread = float(target_score.max() - target_score.min())
if spread <= 1e-12:
raw = pd.Series(1.0, index=target_score.index, dtype=float)
else:
# Keep the weakest selected name non-zero while still letting alpha strength size positions.
raw = (target_score - float(target_score.min())) + spread * 0.05
total_weight = max(min(float(self.risk_degree), 1.0), 0.0)
if raw.sum() <= 1e-12 or total_weight <= 0:
return {}
weights = raw / float(raw.sum()) * total_weight
cap = float(self.max_pos_each_stock)
if cap > 0:
fixed: dict[str, float] = {}
free = weights.copy()
remaining = total_weight
for _ in range(len(weights)):
over = free[free > cap]
if over.empty:
break
for code in over.index:
fixed[str(code)] = cap
free = free.drop(index=over.index)
remaining = max(total_weight - float(sum(fixed.values())), 0.0)
if free.empty or remaining <= 1e-12:
free = pd.Series(dtype=float)
break
free = free / float(free.sum()) * remaining
weights = pd.Series(fixed, dtype=float)
if not free.empty:
weights = pd.concat([weights, free])
return {str(code): float(weight) for code, weight in weights.items() if float(weight) > 1e-12}
def _deal_price(self, code, start_time, end_time, direction, fallback: float = np.nan) -> float:
try:
price = self.trade_exchange.get_deal_price(
stock_id=code,
start_time=start_time,
end_time=end_time,
direction=direction,
)
except Exception:
price = fallback
if price is None or not np.isfinite(float(price)) or float(price) <= 0:
return float(fallback)
return float(price)
def _round_trade_amount(self, code, amount: float, start_time, end_time) -> float:
if amount <= 0:
return 0.0
try:
factor = self.trade_exchange.get_factor(
stock_id=code,
start_time=start_time,
end_time=end_time,
)
return float(self.trade_exchange.round_amount_by_trade_unit(amount, factor))
except Exception:
return float(amount)
@staticmethod
def _stock_amount(position: Position, code) -> float:
try:
return float(position.get_stock_amount(code=code))
except Exception:
return 0.0
@staticmethod
def _stock_price(position: Position, code, default: float = np.nan) -> float:
try:
price = float(position.get_stock_price(code))
except Exception:
price = float(default)
return price if np.isfinite(price) and price > 0 else float(default)
def _is_stock_tradable(self, code, start_time, end_time, direction) -> bool:
try:
return bool(
self.trade_exchange.is_stock_tradable(
stock_id=code,
start_time=start_time,
end_time=end_time,
direction=None if self.forbid_all_trade_at_limit else direction,
)
)
except Exception:
return False
def _can_sell(self, position: Position, code, time_per_step) -> bool:
try:
return bool(position.get_stock_count(code, bar=time_per_step) >= self.hold_thresh)
except Exception:
return False
def _select_target_scores(
self,
score: pd.Series,
buy_gate: pd.Series,
force_exit: pd.Series,
trade_start_time,
trade_end_time,
) -> pd.Series:
target_score = score.replace([np.inf, -np.inf], np.nan).dropna().astype(float)
if target_score.empty:
return target_score
allowed = buy_gate.reindex(target_score.index).fillna(False).astype(bool)
forced = force_exit.reindex(target_score.index).fillna(False).astype(bool)
target_score = target_score[allowed & ~forced]
if self._sell_cooldown_until:
target_score = target_score[~target_score.index.isin(self._sell_cooldown_until.keys())]
target_score = target_score.sort_values(ascending=False)
selected = []
for code in target_score.index:
if self.only_tradable and not self._is_stock_tradable(
code,
trade_start_time,
trade_end_time,
OrderDir.BUY,
):
continue
selected.append(code)
if len(selected) >= int(self.topk):
break
return target_score.reindex(selected)
def _portfolio_value(
self,
position: Position,
stock_list: list[str],
trade_start_time,
trade_end_time,
) -> float:
value = float(position.get_cash())
for code in stock_list:
amount = self._stock_amount(position, code)
if amount <= 0:
continue
fallback = self._stock_price(position, code)
price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.SELL, fallback=fallback)
if np.isfinite(price) and price > 0:
value += float(amount * price)
return float(value)
def _make_sell_order(
self,
position: Position,
code,
amount: float,
trade_start_time,
trade_end_time,
):
amount = min(float(amount), self._stock_amount(position, code))
if amount <= 1e-12:
return None
if not self._is_stock_tradable(code, trade_start_time, trade_end_time, OrderDir.SELL):
return None
sell_order = Order(
stock_id=code,
amount=amount,
start_time=trade_start_time,
end_time=trade_end_time,
direction=Order.SELL,
)
return sell_order if self.trade_exchange.check_order(sell_order) else None
def _make_buy_order(
self,
code,
cash_value: float,
trade_start_time,
trade_end_time,
):
if cash_value <= 1e-12:
return None
if not self._is_stock_tradable(code, trade_start_time, trade_end_time, OrderDir.BUY):
return None
buy_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.BUY)
if not np.isfinite(buy_price) or buy_price <= 0:
return None
buy_amount = self._round_trade_amount(code, float(cash_value) / float(buy_price), trade_start_time, trade_end_time)
if buy_amount <= 1e-12:
return None
buy_order = Order(
stock_id=code,
amount=buy_amount,
start_time=trade_start_time,
end_time=trade_end_time,
direction=Order.BUY,
)
return buy_order if self.trade_exchange.check_order(buy_order) else None
def _generate_sell_all_decision(
self,
score: pd.Series,
buy_gate: pd.Series,
force_exit: pd.Series,
trade_start_time,
trade_end_time,
):
current_temp: Position = copy.deepcopy(self.trade_position)
sell_order_list = []
buy_order_list = []
cash = float(current_temp.get_cash())
time_per_step = self.trade_calendar.get_freq()
for code in list(current_temp.get_stock_list()):
if not self._can_sell(current_temp, code, time_per_step):
continue
amount = self._stock_amount(current_temp, code)
sell_order = self._make_sell_order(current_temp, code, amount, trade_start_time, trade_end_time)
if sell_order is None:
continue
sell_order_list.append(sell_order)
if self.cooldown_period > 0:
self._sell_cooldown_until[code] = self.trade_calendar.get_trade_step() + self.cooldown_period
trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order(
sell_order,
position=current_temp,
)
cash += float(trade_val - trade_cost)
target_scores = self._select_target_scores(score, buy_gate, force_exit, trade_start_time, trade_end_time)
remaining = set(current_temp.get_stock_list())
available_slots = max(int(self.topk) - len(remaining), 0)
buy_codes = [code for code in target_scores.index if code not in remaining][:available_slots]
buy_value = cash * max(min(float(self.risk_degree), 1.0), 0.0) / len(buy_codes) if buy_codes else 0.0
for code in buy_codes:
buy_order = self._make_buy_order(code, buy_value, trade_start_time, trade_end_time)
if buy_order is None:
continue
buy_order_list.append(buy_order)
trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order(
buy_order,
position=current_temp,
)
cash -= float(trade_val + trade_cost)
return TradeDecisionWO(sell_order_list + buy_order_list, self)
def _generate_target_weight_decision(
self,
score: pd.Series,
buy_gate: pd.Series,
force_exit: pd.Series,
trade_start_time,
trade_end_time,
):
current_temp: Position = copy.deepcopy(self.trade_position)
sell_order_list = []
buy_order_list = []
time_per_step = self.trade_calendar.get_freq()
target_scores = self._select_target_scores(score, buy_gate, force_exit, trade_start_time, trade_end_time)
target_weights = self._score_to_target_weights(target_scores)
if not target_weights:
return TradeDecisionWO([], self)
current_stock_list = list(current_temp.get_stock_list())
portfolio_value = self._portfolio_value(current_temp, current_stock_list, trade_start_time, trade_end_time)
if portfolio_value <= 1e-12:
return TradeDecisionWO([], self)
eps_value = float(portfolio_value * self.target_weight_eps)
for code in current_stock_list:
amount = self._stock_amount(current_temp, code)
if amount <= 0:
continue
fallback = self._stock_price(current_temp, code)
sell_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.SELL, fallback=fallback)
if not np.isfinite(sell_price) or sell_price <= 0:
continue
current_value = float(amount * sell_price)
target_value = float(target_weights.get(str(code), 0.0) * portfolio_value)
sell_value = current_value - target_value
if sell_value <= eps_value:
continue
if not self._can_sell(current_temp, code, time_per_step):
continue
sell_amount = self._round_trade_amount(code, sell_value / sell_price, trade_start_time, trade_end_time)
sell_order = self._make_sell_order(current_temp, code, sell_amount, trade_start_time, trade_end_time)
if sell_order is None:
continue
sell_order_list.append(sell_order)
if str(code) not in target_weights and self.cooldown_period > 0:
self._sell_cooldown_until[code] = self.trade_calendar.get_trade_step() + self.cooldown_period
self.trade_exchange.deal_order(sell_order, position=current_temp)
current_stock_after_sell = set(current_temp.get_stock_list())
portfolio_value_after_sell = self._portfolio_value(
current_temp,
list(current_stock_after_sell),
trade_start_time,
trade_end_time,
)
eps_value = float(portfolio_value_after_sell * self.target_weight_eps)
cash = float(current_temp.get_cash())
available_new_slots = max(int(self.topk) - len(current_stock_after_sell), 0)
for code in target_weights:
fallback = self._stock_price(current_temp, code)
buy_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.BUY, fallback=fallback)
if not np.isfinite(buy_price) or buy_price <= 0:
continue
current_amount = self._stock_amount(current_temp, code)
current_value = float(current_amount * buy_price)
target_value = float(target_weights[code] * portfolio_value_after_sell)
buy_value = target_value - current_value
if buy_value <= eps_value:
continue
if code not in current_stock_after_sell:
if available_new_slots <= 0:
continue
available_new_slots -= 1
current_stock_after_sell.add(code)
buy_order = self._make_buy_order(code, min(float(buy_value), cash), trade_start_time, trade_end_time)
if buy_order is None:
continue
buy_order_list.append(buy_order)
trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order(
buy_order,
position=current_temp,
)
cash -= float(trade_val + trade_cost)
if cash <= 1e-12:
break
return TradeDecisionWO(sell_order_list + buy_order_list, self)
@staticmethod
def _split_signal_frame(
pred_score: pd.Series | pd.DataFrame,
) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
if isinstance(pred_score, pd.DataFrame):
score = pred_score[ForceExitTopkDropoutStrategy.SCORE_COL] if ForceExitTopkDropoutStrategy.SCORE_COL in pred_score.columns else pred_score.iloc[:, 0]
buy_gate = (
pred_score[ForceExitTopkDropoutStrategy.BUY_GATE_COL].astype(bool)
if ForceExitTopkDropoutStrategy.BUY_GATE_COL in pred_score.columns
else pd.Series(True, index=pred_score.index)
)
force_exit = (
pred_score[ForceExitTopkDropoutStrategy.FORCE_EXIT_COL].astype(bool)
if ForceExitTopkDropoutStrategy.FORCE_EXIT_COL in pred_score.columns
else pd.Series(False, index=pred_score.index)
)
defer_sell = (
pred_score[ForceExitTopkDropoutStrategy.DEFER_SELL_COL].astype(bool)
if ForceExitTopkDropoutStrategy.DEFER_SELL_COL in pred_score.columns
else pd.Series(False, index=pred_score.index)
)
return (
score.astype(float),
buy_gate.reindex(score.index).fillna(False),
force_exit.reindex(score.index).fillna(False),
defer_sell.reindex(score.index).fillna(False),
)
score = pred_score.astype(float)
return (
score,
pd.Series(True, index=score.index),
pd.Series(False, index=score.index),
pd.Series(False, index=score.index),
)
def generate_trade_decision(self, execute_result=None):
trade_step = self.trade_calendar.get_trade_step()
if self.cooldown_period > 0:
self._sell_cooldown_until = {
code: until
for code, until in self._sell_cooldown_until.items()
if until > trade_step
}
if trade_step % self.rebalance_freq != 0:
return TradeDecisionWO([], self)
trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step)
pred_start_time, pred_end_time = self.trade_calendar.get_step_time(trade_step, shift=1)
pred_score_raw = self.signal.get_signal(start_time=pred_start_time, end_time=pred_end_time)
if pred_score_raw is None:
return TradeDecisionWO([], self)
score, buy_gate, force_exit, defer_sell = self._split_signal_frame(pred_score_raw)
if self.rebalance_mode == "sell_all":
return self._generate_sell_all_decision(
score,
buy_gate,
force_exit,
trade_start_time,
trade_end_time,
)
if self.rebalance_mode == "target_weight":
return self._generate_target_weight_decision(
score,
buy_gate,
force_exit,
trade_start_time,
trade_end_time,
)
if self.only_tradable:
def get_first_n(li, n, reverse=False):
cur_n = 0
res = []
for si in reversed(li) if reverse else li:
if self.trade_exchange.is_stock_tradable(
stock_id=si,
start_time=trade_start_time,
end_time=trade_end_time,
):
res.append(si)
cur_n += 1
if cur_n >= n:
break
return res[::-1] if reverse else res
def get_last_n(li, n):
return get_first_n(li, n, reverse=True)
def filter_stock(li):
return [
si
for si in li
if self.trade_exchange.is_stock_tradable(
stock_id=si,
start_time=trade_start_time,
end_time=trade_end_time,
)
]
else:
def get_first_n(li, n):
return list(li)[:n]
def get_last_n(li, n):
return list(li)[-n:]
def filter_stock(li):
return list(li)
current_temp: Position = copy.deepcopy(self.trade_position)
sell_order_list = []
buy_order_list = []
cash = current_temp.get_cash()
current_stock_list = current_temp.get_stock_list()
rank_score = score.copy()
force_exit_index = force_exit[force_exit].index.intersection(current_stock_list)
if len(force_exit_index) > 0:
rank_score.loc[force_exit_index] = -np.inf
last = rank_score.reindex(current_stock_list).fillna(-np.inf).sort_values(ascending=False).index
outsider_score = score[~score.index.isin(last)]
outsider_score = outsider_score[buy_gate.reindex(outsider_score.index).fillna(False)]
if self._sell_cooldown_until:
outsider_score = outsider_score[~outsider_score.index.isin(self._sell_cooldown_until.keys())]
outsider_score = outsider_score.sort_values(ascending=False)
if self.method_buy == "top":
today = get_first_n(
outsider_score.index,
self.n_drop + self.topk - len(last),
)
elif self.method_buy == "random":
topk_candi = get_first_n(outsider_score.index, self.topk)
candi = list(filter(lambda x: x not in last, topk_candi))
n = self.n_drop + self.topk - len(last)
try:
today = np.random.choice(candi, n, replace=False)
except ValueError:
today = candi
else:
raise NotImplementedError(f"This type of input is not supported")
comb = rank_score.reindex(last.union(pd.Index(today))).fillna(-np.inf).sort_values(ascending=False).index
over_cap_count = max(len(current_stock_list) - int(self.topk), 0)
hard_cap_sell = pd.Index(get_last_n(last, over_cap_count)) if over_cap_count > 0 else pd.Index([])
if self.method_sell == "bottom":
sell = last[last.isin(get_last_n(comb, self.n_drop))]
elif self.method_sell == "random":
candi = filter_stock(last)
try:
sell = pd.Index(np.random.choice(candi, self.n_drop, replace=False) if len(last) else [])
except ValueError:
sell = candi
else:
raise NotImplementedError(f"This type of input is not supported")
if len(hard_cap_sell) > 0:
sell = pd.Index(list(dict.fromkeys(list(sell) + list(hard_cap_sell))))
filtered_sell = []
hard_cap_sell_set = set(hard_cap_sell)
time_per_step = self.trade_calendar.get_freq()
for code in sell:
is_force_exit = bool(force_exit.reindex([code]).fillna(False).iloc[0])
should_defer = bool(defer_sell.reindex([code]).fillna(False).iloc[0])
is_hard_cap_exit = code in hard_cap_sell_set
if should_defer and not is_force_exit and not is_hard_cap_exit:
continue
if (
not is_hard_cap_exit
and current_temp.get_stock_count(code, bar=time_per_step) < self.hold_thresh
):
continue
filtered_sell.append(code)
sell = pd.Index(filtered_sell)
pre_trade_slots = max(int(self.topk) - len(current_stock_list), 0)
buy = today[:pre_trade_slots]
for code in current_stock_list:
if not self.trade_exchange.is_stock_tradable(
stock_id=code,
start_time=trade_start_time,
end_time=trade_end_time,
direction=None if self.forbid_all_trade_at_limit else OrderDir.SELL,
):
continue
if code in sell:
if (
code not in hard_cap_sell_set
and current_temp.get_stock_count(code, bar=time_per_step) < self.hold_thresh
):
continue
sell_amount = current_temp.get_stock_amount(code=code)
sell_order = Order(
stock_id=code,
amount=sell_amount,
start_time=trade_start_time,
end_time=trade_end_time,
direction=Order.SELL,
)
if self.trade_exchange.check_order(sell_order):
sell_order_list.append(sell_order)
if self.cooldown_period > 0:
self._sell_cooldown_until[code] = trade_step + self.cooldown_period
trade_val, trade_cost, trade_price = self.trade_exchange.deal_order(
sell_order,
position=current_temp,
)
cash += trade_val - trade_cost
value = cash * self.risk_degree / len(buy) if len(buy) > 0 else 0
for code in buy:
if not self.trade_exchange.is_stock_tradable(
stock_id=code,
start_time=trade_start_time,
end_time=trade_end_time,
direction=None if self.forbid_all_trade_at_limit else OrderDir.BUY,
):
continue
buy_price = self.trade_exchange.get_deal_price(
stock_id=code,
start_time=trade_start_time,
end_time=trade_end_time,
direction=OrderDir.BUY,
)
buy_amount = value / buy_price
factor = self.trade_exchange.get_factor(
stock_id=code,
start_time=trade_start_time,
end_time=trade_end_time,
)
buy_amount = self.trade_exchange.round_amount_by_trade_unit(buy_amount, factor)
buy_order = Order(
stock_id=code,
amount=buy_amount,
start_time=trade_start_time,
end_time=trade_end_time,
direction=Order.BUY,
)
buy_order_list.append(buy_order)
return TradeDecisionWO(sell_order_list + buy_order_list, self)
return {
"qlib": qlib,
"Account": Account,
"CommonInfrastructure": CommonInfrastructure,
"SimulatorExecutor": SimulatorExecutor,
"TopkDropoutStrategy": TopkDropoutStrategy,
"ForceExitTopkDropoutStrategy": ForceExitTopkDropoutStrategy,
"backtest_loop": backtest_loop,
"get_exchange": get_exchange,
"set_global_logger_level_cm": set_global_logger_level_cm,
}
def _resolve_qlib_dump_script(qlib_module: Any) -> Path:
env_override = os.getenv("ALPHAEVO_QLIB_DUMP_SCRIPT")
if env_override:
override_path = Path(env_override).expanduser().resolve()
if override_path.exists():
return override_path
qlib_file = Path(qlib_module.__file__).resolve()
candidates = [
PROJECT_ROOT / "backtest" / "vendor" / "qlib_dump_bin.py", # bundled fallback for wheel installs
qlib_file.parent.parent / "scripts" / "dump_bin.py", # source checkout layout
qlib_file.parent / "scripts" / "dump_bin.py", # wheel layout with qlib/scripts
qlib_file.parent.parent / "qlib" / "scripts" / "dump_bin.py",
qlib_file.parents[2] / "scripts" / "dump_bin.py" if len(qlib_file.parents) > 2 else None,
]
tried: list[str] = []
for candidate in candidates:
if candidate is None:
continue
candidate = candidate.resolve()
tried.append(str(candidate))
if candidate.exists():
return candidate
for root in [qlib_file.parent, qlib_file.parent.parent]:
try:
for candidate in root.rglob("dump_bin.py"):
candidate = candidate.resolve()
tried.append(str(candidate))
if candidate.exists():
return candidate
except OSError:
continue
raise RuntimeError(
"Could not locate Qlib dump_bin.py. Tried: " + ", ".join(dict.fromkeys(tried))
)
def _provider_signature(price_df: pd.DataFrame) -> str:
cache_key = id(price_df)
cached = _PROVIDER_SIGNATURE_CACHE.get(cache_key)
if cached is not None:
return cached
reset_df = price_df.reset_index()
dates = pd.to_datetime(reset_df["datetime"])
summary = {
"rows": int(len(reset_df)),
"instruments": int(reset_df["instrument"].nunique()),
"start": dates.min().strftime("%Y-%m-%d") if len(reset_df) else "NA",
"end": dates.max().strftime("%Y-%m-%d") if len(reset_df) else "NA",
"cols": sorted(str(col) for col in price_df.columns),
"close_sum_head": round(float(pd.to_numeric(reset_df.get("$close"), errors="coerce").fillna(0.0).head(5000).sum()), 6),
"volume_sum_head": round(float(pd.to_numeric(reset_df.get("$volume"), errors="coerce").fillna(0.0).head(5000).sum()), 6),
}
digest = hashlib.sha256(json.dumps(summary, sort_keys=True).encode("utf-8")).hexdigest()
signature = digest[:16]
_PROVIDER_SIGNATURE_CACHE[cache_key] = signature
return signature
def _get_price_wide_frame(price_df: pd.DataFrame, column: str) -> pd.DataFrame | None:
if column not in price_df.columns:
return None
cache_key = (id(price_df), column)
cached = _PRICE_WIDE_CACHE.get(cache_key)
if cached is not None:
return cached
frame = price_df[column].unstack("instrument")
_PRICE_WIDE_CACHE[cache_key] = frame
return frame
def _get_available_dates(price_df: pd.DataFrame) -> pd.DatetimeIndex:
cache_key = id(price_df)
cached = _AVAILABLE_DATES_CACHE.get(cache_key)
if cached is not None:
return cached
dates = pd.DatetimeIndex(price_df.index.get_level_values("datetime").unique()).sort_values()
_AVAILABLE_DATES_CACHE[cache_key] = dates
return dates
def _write_qlib_source_csvs(price_df: pd.DataFrame, source_dir: Path) -> None:
source_dir.mkdir(parents=True, exist_ok=True)
reset_df = price_df.reset_index().rename(columns={"instrument": "symbol", "datetime": "date"}).copy()
reset_df["date"] = pd.to_datetime(reset_df["date"])
for raw_symbol, group in reset_df.groupby("symbol", sort=True):
symbol = str(raw_symbol)
group = group.sort_values("date").copy()
close_series = pd.to_numeric(group.get("$close"), errors="coerce")
change = close_series.pct_change(fill_method=None).replace([np.inf, -np.inf], np.nan).fillna(0.0)
out_df = pd.DataFrame(
{
"symbol": symbol,
"date": group["date"].dt.strftime("%Y-%m-%d"),
"open": pd.to_numeric(group.get("$open"), errors="coerce"),
"close": close_series,
"volume": pd.to_numeric(group.get("$volume"), errors="coerce").fillna(0.0),
"amount": pd.to_numeric(group.get("$amount"), errors="coerce").fillna(0.0),
"factor": 1.0,
"change": change,
}
)
out_df.to_csv(source_dir / f"{symbol}.csv", index=False)
def _ensure_qlib_provider(price_df: pd.DataFrame) -> Path:
runtime = _import_qlib_runtime()
provider_key = _provider_signature(price_df)
cache_root = _get_qlib_cache_root() / provider_key
provider_dir = cache_root / "provider"
source_dir = cache_root / "csv"
ready_path = cache_root / "ready.json"
lock_path = cache_root.with_suffix(".lock")
if ready_path.exists() and provider_dir.exists():
return provider_dir
with _provider_build_lock(lock_path):
if ready_path.exists() and provider_dir.exists():
return provider_dir
if cache_root.exists():
shutil.rmtree(cache_root)
cache_root.mkdir(parents=True, exist_ok=True)
_write_qlib_source_csvs(price_df, source_dir)
dump_script = _resolve_qlib_dump_script(runtime["qlib"])
cmd = [
sys.executable,
str(dump_script),
"dump_all",
f"--data_path={source_dir}",
f"--qlib_dir={provider_dir}",
"--freq=day",
"--date_field_name=date",
"--symbol_field_name=symbol",
"--include_fields=open,close,volume,amount,factor,change",
"--file_suffix=.csv",
"--max_workers=8",
]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(
"Failed to build Qlib provider cache: "
+ (proc.stderr.strip() or proc.stdout.strip())[:800]
)
ready_path.write_text(
json.dumps(
{
"provider_key": provider_key,
"provider_dir": str(provider_dir),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
return provider_dir
def _ensure_qlib_initialized(provider_dir: Path) -> dict[str, Any]:
runtime = _import_qlib_runtime()
provider_uri = str(provider_dir)
if _QLIB_RUNTIME_STATE.get("provider_uri") != provider_uri:
with _quiet_runtime_io(), runtime["set_global_logger_level_cm"](logging.ERROR):
runtime["qlib"].init(
provider_uri=provider_uri,
kernels=1,
joblib_backend="threading",
expression_cache=None,
dataset_cache=None,
redis_port=-1,
clear_mem_cache=False,
logging_level=logging.ERROR,
)
_QLIB_RUNTIME_STATE["provider_uri"] = provider_uri
return runtime
def _prepare_qlib_signal(factor_values: pd.Series) -> pd.Series:
signal = factor_values.copy()
if isinstance(signal, pd.DataFrame):
signal = signal.iloc[:, 0]
if "datetime" in signal.index.names and "instrument" in signal.index.names:
signal = signal.reorder_levels(["datetime", "instrument"]).sort_index()
else:
signal = signal.sort_index()
signal = signal.astype(float).dropna()
signal.name = "score"
return signal
def _prepare_force_exit_qlib_signal(
factor_values: pd.Series,
price_df: pd.DataFrame,
trade_guard_config: dict[str, Any],
) -> pd.DataFrame:
score = _prepare_qlib_signal(factor_values)
close_series = price_df["$close"].copy()
if "datetime" in close_series.index.names and "instrument" in close_series.index.names:
close_series = close_series.reorder_levels(["datetime", "instrument"]).sort_index()
else:
close_series = close_series.sort_index()
close_series = pd.to_numeric(close_series, errors="coerce")
buy_gate = pd.Series(True, index=score.index, dtype=bool)
force_exit = pd.Series(False, index=score.index, dtype=bool)
defer_sell = pd.Series(False, index=score.index, dtype=bool)
trend_ema_span = _cfg_int(trade_guard_config, "trend_ema_span")
force_exit_trend_days = _cfg_int(trade_guard_config, "force_exit_trend_days")
if trend_ema_span is not None and trend_ema_span > 0:
ema = close_series.groupby(level="instrument").transform(
lambda x: x.ewm(span=trend_ema_span, adjust=False, min_periods=1).mean()
)
trend_ok = (close_series > ema).reindex(score.index).fillna(False)
buy_gate &= trend_ok
if force_exit_trend_days is not None and force_exit_trend_days > 0:
trend_bad = close_series < ema
trend_bad_streak = trend_bad.groupby(level="instrument").transform(
lambda x: x.astype(float).rolling(force_exit_trend_days, min_periods=force_exit_trend_days).sum()
>= force_exit_trend_days
)
force_exit |= trend_bad_streak.reindex(score.index).fillna(False)
prev_close = close_series.groupby(level="instrument").shift(1)
close_to_close_return = (close_series / prev_close - 1.0).replace([np.inf, -np.inf], np.nan)
intraday_return = pd.Series(np.nan, index=close_series.index, dtype=float)
if "$open" in price_df.columns:
open_series = price_df["$open"].copy()
if "datetime" in open_series.index.names and "instrument" in open_series.index.names:
open_series = open_series.reorder_levels(["datetime", "instrument"]).sort_index()
else:
open_series = open_series.sort_index()
open_series = pd.to_numeric(open_series, errors="coerce")
intraday_return = (close_series / open_series - 1.0).replace([np.inf, -np.inf], np.nan)
buy_chase = pd.Series(False, index=score.index, dtype=bool)
buy_chase_ctc_thresh = _cfg_float(trade_guard_config, "buy_chase_ctc_thresh")
if buy_chase_ctc_thresh is not None:
buy_chase |= (close_to_close_return > abs(buy_chase_ctc_thresh)).reindex(score.index).fillna(False)
buy_chase_intraday_thresh = _cfg_float(trade_guard_config, "buy_chase_intraday_thresh")
if buy_chase_intraday_thresh is not None:
buy_chase |= (intraday_return > abs(buy_chase_intraday_thresh)).reindex(score.index).fillna(False)
panic_drop = pd.Series(False, index=score.index, dtype=bool)
panic_drop_ctc_thresh = _cfg_float(trade_guard_config, "panic_drop_ctc_thresh")
if panic_drop_ctc_thresh is not None:
panic_drop |= (close_to_close_return < -abs(panic_drop_ctc_thresh)).reindex(score.index).fillna(False)
panic_drop_intraday_thresh = _cfg_float(trade_guard_config, "panic_drop_intraday_thresh")
if panic_drop_intraday_thresh is not None:
panic_drop |= (intraday_return < -abs(panic_drop_intraday_thresh)).reindex(score.index).fillna(False)
extreme_range = pd.Series(False, index=score.index)
extreme_range_thresh = _cfg_float(trade_guard_config, "extreme_range_thresh")
if extreme_range_thresh is not None and "$high" in price_df.columns and "$low" in price_df.columns:
high_series = price_df["$high"].copy()
low_series = price_df["$low"].copy()
if "datetime" in high_series.index.names and "instrument" in high_series.index.names:
high_series = high_series.reorder_levels(["datetime", "instrument"]).sort_index()
low_series = low_series.reorder_levels(["datetime", "instrument"]).sort_index()
else:
high_series = high_series.sort_index()
low_series = low_series.sort_index()
high_series = pd.to_numeric(high_series, errors="coerce")
low_series = pd.to_numeric(low_series, errors="coerce")
day_range = ((high_series - low_series) / close_series).replace([np.inf, -np.inf], np.nan)
extreme_range = (day_range > abs(extreme_range_thresh)).reindex(score.index).fillna(False)
buy_gate &= ~(buy_chase | extreme_range)
defer_sell = panic_drop & ~force_exit
amount_ma_window = _cfg_int(trade_guard_config, "amount_ma_window")
if amount_ma_window is not None and amount_ma_window > 0 and "$amount" in price_df.columns:
amount_series = price_df["$amount"].copy()
if "datetime" in amount_series.index.names and "instrument" in amount_series.index.names:
amount_series = amount_series.reorder_levels(["datetime", "instrument"]).sort_index()
else:
amount_series = amount_series.sort_index()
amount_series = pd.to_numeric(amount_series, errors="coerce")
amount_ma20 = amount_series.groupby(level="instrument").transform(
lambda x: x.rolling(amount_ma_window, min_periods=1).mean()
)
amount_buy_min_ratio = _cfg_float(trade_guard_config, "amount_buy_min_ratio")
if amount_buy_min_ratio is not None:
amount_ok = (amount_series >= (float(amount_buy_min_ratio) * amount_ma20)).reindex(score.index).fillna(False)
buy_gate &= amount_ok
amount_force_exit_ratio = _cfg_float(trade_guard_config, "amount_force_exit_ratio")
if amount_force_exit_ratio is not None:
amount_weak = (amount_series < (float(amount_force_exit_ratio) * amount_ma20)).reindex(score.index).fillna(False)
force_exit |= amount_weak
defer_sell &= ~amount_weak
strategy_signal = pd.DataFrame(
{
"score": score.astype(float),
"__buy_gate__": buy_gate.astype(bool),
"__force_exit__": force_exit.astype(bool),
"__defer_sell__": defer_sell.astype(bool),
},
index=score.index,
)
return strategy_signal
def build_signal_selection_log(
*,
factor_values: pd.Series,
price_df: pd.DataFrame,
top_k: int = 10,
start_date: str | None = None,
end_date: str | None = None,
backtest_engine: str = "qlib_original",
trade_guard_config: dict[str, Any] | bool | None = None,
holding_log: list[dict[str, Any]] | None = None,
trade_log: list[dict[str, Any]] | None = None,
portfolio_log: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
normalized_engine = _normalize_backtest_engine(backtest_engine)
normalized_trade_guard_config = _normalize_trade_guard_config(trade_guard_config)
if normalized_engine == "qlib_original" and normalized_trade_guard_config is not None:
signal_payload = _prepare_force_exit_qlib_signal(
factor_values=factor_values,
price_df=price_df,
trade_guard_config=normalized_trade_guard_config,
)
else:
score = _prepare_qlib_signal(factor_values)
signal_payload = pd.DataFrame(
{
"score": score.astype(float),
"__buy_gate__": True,
"__force_exit__": False,
"__defer_sell__": False,
},
index=score.index,
)
if signal_payload is None or len(signal_payload) == 0:
return []
if isinstance(signal_payload, pd.Series):
signal_frame = signal_payload.to_frame(name="score")
else:
signal_frame = signal_payload.copy()
if "score" not in signal_frame.columns:
signal_frame["score"] = pd.to_numeric(signal_frame.iloc[:, 0], errors="coerce")
if "datetime" in signal_frame.index.names and "instrument" in signal_frame.index.names:
signal_frame = signal_frame.reorder_levels(["datetime", "instrument"]).sort_index()
else:
signal_frame = signal_frame.sort_index()
signal_frame = signal_frame.reset_index()
signal_frame["datetime"] = pd.to_datetime(signal_frame["datetime"], errors="coerce").dt.normalize()
signal_frame["instrument"] = signal_frame["instrument"].astype(str)
signal_frame["score"] = pd.to_numeric(signal_frame["score"], errors="coerce")
signal_frame = signal_frame.dropna(subset=["datetime", "instrument", "score"])
if signal_frame.empty:
return []
available_dates = _get_available_dates(price_df)
if len(available_dates) <= 1:
return []
date_map = {
pd.Timestamp(available_dates[i]).normalize(): pd.Timestamp(available_dates[i + 1]).normalize()
for i in range(0, len(available_dates) - 1)
}
signal_frame["trade_date"] = signal_frame["datetime"].map(date_map)
signal_frame = signal_frame.dropna(subset=["trade_date"]).copy()
if signal_frame.empty:
return []
if start_date:
signal_frame = signal_frame[signal_frame["trade_date"] >= pd.Timestamp(start_date)]
if end_date:
signal_frame = signal_frame[signal_frame["trade_date"] <= pd.Timestamp(end_date)]
if signal_frame.empty:
return []
if portfolio_log:
portfolio_df = pd.DataFrame(portfolio_log).copy()
if not portfolio_df.empty and "date" in portfolio_df.columns:
portfolio_df["date"] = pd.to_datetime(portfolio_df["date"], errors="coerce").dt.normalize()
if "is_rebalance" in portfolio_df.columns:
portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool)
rebalance_dates = set(portfolio_df.loc[portfolio_df["is_rebalance"], "date"].dropna().tolist())
if rebalance_dates:
signal_frame = signal_frame[signal_frame["trade_date"].isin(rebalance_dates)]
if signal_frame.empty:
return []
for col, default in (
("__buy_gate__", True),
("__force_exit__", False),
("__defer_sell__", False),
):
if col not in signal_frame.columns:
signal_frame[col] = default
signal_frame[col] = signal_frame[col].fillna(default).astype(bool)
signal_frame["trade_score_rank"] = (
signal_frame.groupby("trade_date", dropna=False)["score"]
.rank(method="first", ascending=False)
.astype(int)
)
debug_top_n = max(int(top_k), 5)
signal_frame["top5_by_score"] = signal_frame["trade_score_rank"] <= 5
signal_frame["topk_by_score"] = signal_frame["trade_score_rank"] <= int(top_k)
hold_summary = pd.DataFrame()
if holding_log:
hold_summary = pd.DataFrame(holding_log).copy()
if not hold_summary.empty and {"date", "instrument"}.issubset(hold_summary.columns):
hold_summary["date"] = pd.to_datetime(hold_summary["date"], errors="coerce").dt.normalize()
hold_summary["instrument"] = hold_summary["instrument"].astype(str)
for col in ["market_value", "weight", "shares_held"]:
if col in hold_summary.columns:
hold_summary[col] = pd.to_numeric(hold_summary[col], errors="coerce")
hold_summary = (
hold_summary.groupby(["date", "instrument"], dropna=False)
.agg(
selected_eod=("instrument", "size"),
shares_held_eod=("shares_held", "sum"),
market_value_eod=("market_value", "sum"),
weight_eod=("weight", "sum"),
)
.reset_index()
)
hold_summary["selected_eod"] = hold_summary["selected_eod"].fillna(0).astype(int) > 0
hold_summary["eod_hold_rank"] = (
hold_summary.groupby("date", dropna=False)["market_value_eod"]
.rank(method="first", ascending=False)
)
trade_summary = pd.DataFrame()
if trade_log:
trade_summary = pd.DataFrame(trade_log).copy()
if not trade_summary.empty and {"date", "instrument"}.issubset(trade_summary.columns):
trade_summary["date"] = pd.to_datetime(trade_summary["date"], errors="coerce").dt.normalize()
trade_summary["instrument"] = trade_summary["instrument"].astype(str)
for col in ["requested_shares", "filled_shares", "gross_notional", "filled_value", "fill_ratio"]:
if col in trade_summary.columns:
trade_summary[col] = pd.to_numeric(trade_summary[col], errors="coerce")
trade_summary["requested_shares_norm"] = pd.to_numeric(
trade_summary.get("requested_shares", trade_summary.get("filled_shares", trade_summary.get("shares", 0.0))),
errors="coerce",
)
trade_summary["filled_shares_norm"] = pd.to_numeric(
trade_summary.get("filled_shares", trade_summary.get("shares", 0.0)),
errors="coerce",
)
trade_summary["requested_notional_norm"] = pd.to_numeric(
trade_summary.get("order_value", trade_summary.get("gross_notional", trade_summary.get("filled_value", 0.0))),
errors="coerce",
)
trade_summary["filled_notional_norm"] = pd.to_numeric(
trade_summary.get("filled_value", trade_summary.get("gross_notional", 0.0)),
errors="coerce",
)
trade_summary = (
trade_summary.groupby(["date", "instrument"], dropna=False)
.agg(
had_trade=("action", "size"),
trade_actions=("action", lambda s: "|".join(dict.fromkeys(str(v) for v in s if str(v)))),
requested_shares_total=("requested_shares_norm", "sum"),
filled_shares_total=("filled_shares_norm", "sum"),
requested_notional_total=("requested_notional_norm", "sum"),
filled_notional_total=("filled_notional_norm", "sum"),
fill_ratio_mean=("fill_ratio", "mean"),
clip_reason=("clip_reason", lambda s: "|".join(sorted({str(v) for v in s if str(v)}))),
)
.reset_index()
)
trade_summary["had_trade"] = trade_summary["had_trade"].fillna(0).astype(int) > 0
merged = signal_frame.merge(
hold_summary,
left_on=["trade_date", "instrument"],
right_on=["date", "instrument"],
how="left",
)
if "date" in merged.columns:
merged = merged.drop(columns=["date"])
merged = merged.merge(
trade_summary,
left_on=["trade_date", "instrument"],
right_on=["date", "instrument"],
how="left",
)
if "date" in merged.columns:
merged = merged.drop(columns=["date"])
selected_source = merged["selected_eod"] if "selected_eod" in merged.columns else pd.Series(False, index=merged.index)
had_trade_source = merged["had_trade"] if "had_trade" in merged.columns else pd.Series(False, index=merged.index)
trade_actions_source = merged["trade_actions"] if "trade_actions" in merged.columns else pd.Series("", index=merged.index)
merged["selected_eod"] = selected_source.where(selected_source.notna(), False).astype(bool)
merged["had_trade"] = had_trade_source.where(had_trade_source.notna(), False).astype(bool)
merged["trade_actions"] = trade_actions_source.where(trade_actions_source.notna(), "").astype(str)
keep_mask = (
(merged["trade_score_rank"] <= debug_top_n)
| merged["selected_eod"]
| merged["had_trade"]
)
merged = merged[keep_mask].copy()
if merged.empty:
return []
merged = merged.sort_values(["trade_date", "trade_score_rank", "instrument"], ascending=[True, True, True])
rows: list[dict[str, Any]] = []
for row in merged.to_dict("records"):
rows.append(
{
"signal_date": pd.Timestamp(row["datetime"]).strftime("%Y-%m-%d"),
"trade_date": pd.Timestamp(row["trade_date"]).strftime("%Y-%m-%d"),
"instrument": str(row["instrument"]),
"score": round(float(row.get("score", 0.0) or 0.0), 10),
"trade_score_rank": int(row.get("trade_score_rank", 0) or 0),
"top5_by_score": bool(row.get("top5_by_score", False)),
"topk_by_score": bool(row.get("topk_by_score", False)),
"buy_gate": bool(row.get("__buy_gate__", True)),
"force_exit": bool(row.get("__force_exit__", False)),
"defer_sell": bool(row.get("__defer_sell__", False)),
"selected_eod": bool(row.get("selected_eod", False)),
"eod_hold_rank": int(row["eod_hold_rank"]) if pd.notna(row.get("eod_hold_rank")) else None,
"shares_held_eod": round(float(row.get("shares_held_eod", 0.0) or 0.0), 6),
"market_value_eod": round(float(row.get("market_value_eod", 0.0) or 0.0), 6),
"weight_eod": round(float(row.get("weight_eod", 0.0) or 0.0), 8),
"had_trade": bool(row.get("had_trade", False)),
"trade_actions": str(row.get("trade_actions", "") or ""),
"requested_shares_total": round(float(row.get("requested_shares_total", 0.0) or 0.0), 6),
"filled_shares_total": round(float(row.get("filled_shares_total", 0.0) or 0.0), 6),
"requested_notional_total": round(float(row.get("requested_notional_total", 0.0) or 0.0), 6),
"filled_notional_total": round(float(row.get("filled_notional_total", 0.0) or 0.0), 6),
"fill_ratio_mean": round(float(row.get("fill_ratio_mean", 0.0) or 0.0), 6) if pd.notna(row.get("fill_ratio_mean")) else None,
"clip_reason": str(row.get("clip_reason", "") or ""),
}
)
return rows
def _extract_position_amounts(position: Any) -> dict[str, float]:
if position is None:
return {}
return {
str(instrument): float(position.get_stock_amount(instrument))
for instrument in position.get_stock_list()
if float(position.get_stock_amount(instrument)) > 0
}
def _build_holding_log_from_qlib_positions(
positions_normal: dict[pd.Timestamp, Any],
report_normal: pd.DataFrame,
volume_frame: pd.DataFrame | None,
amount_frame: pd.DataFrame | None,
build_rows: bool = True,
) -> tuple[list[dict[str, Any]], dict[int, dict[str, float]]]:
rows: list[dict[str, Any]] = []
yearly_holding_stats: dict[int, dict[str, float]] = defaultdict(
lambda: {
"n_days": 0,
"holdings_count_sum": 0.0,
"max_holdings_count": 0,
}
)
pos_by_date = {pd.Timestamp(date): position for date, position in positions_normal.items()}
for date in sorted(pos_by_date):
position = pos_by_date[date]
stock_list = [inst for inst in position.get_stock_list() if float(position.get_stock_amount(inst)) > 0]
year = int(date.year)
stats = yearly_holding_stats[year]
stats["n_days"] += 1
stats["holdings_count_sum"] += float(len(stock_list))
stats["max_holdings_count"] = max(int(stats["max_holdings_count"]), len(stock_list))
if not build_rows:
continue
portfolio_value = float(report_normal.loc[date, "account"]) if date in report_normal.index else float(position.calculate_value())
cash_eod = float(report_normal.loc[date, "cash"]) if date in report_normal.index else float(position.get_cash(include_settle=True))
for instrument in stock_list:
shares_held = float(position.get_stock_amount(instrument))
close_price = float(position.get_stock_price(instrument))
market_value = float(shares_held * close_price)
rows.append(
{
"date": date.strftime("%Y-%m-%d"),
"year": year,
"market_regime": _market_regime(year),
"instrument": instrument,
"shares_held": round(float(shares_held), 6),
"market_value": round(float(market_value), 6),
"close_price": round(float(close_price), 6),
"market_volume": round(_get_frame_value(volume_frame, date, instrument, default=0.0), 6),
"market_amount": round(_get_frame_value(amount_frame, date, instrument, default=0.0), 6),
"portfolio_value": round(float(portfolio_value), 6),
"cash_eod": round(float(cash_eod), 6),
"weight": round(float(market_value) / float(portfolio_value), 6) if portfolio_value > 0 else 0.0,
}
)
return rows, yearly_holding_stats
def _build_trade_log_from_qlib_positions(
positions_normal: dict[pd.Timestamp, Any],
open_prices: pd.DataFrame,
close_prices: pd.DataFrame,
volume_frame: pd.DataFrame | None,
amount_frame: pd.DataFrame | None,
cost_buy: float,
cost_sell: float,
build_rows: bool = True,
) -> tuple[list[dict[str, Any]], dict[int, dict[str, float]]]:
rows: list[dict[str, Any]] = []
yearly_trade_stats: dict[int, dict[str, float]] = defaultdict(
lambda: {
"buy_trades": 0,
"sell_trades": 0,
"shares_bought": 0.0,
"shares_sold": 0.0,
"buy_gross_notional": 0.0,
"sell_gross_notional": 0.0,
"buy_cash_outflow": 0.0,
"sell_net_proceeds": 0.0,
"buy_transaction_cost": 0.0,
"sell_transaction_cost": 0.0,
"transaction_cost": 0.0,
"gross_turnover": 0.0,
}
)
pos_by_date = {pd.Timestamp(date): position for date, position in positions_normal.items()}
prev_position = None
prev_amounts: dict[str, float] = {}
for date in sorted(pos_by_date):
position = pos_by_date[date]
current_amounts = _extract_position_amounts(position)
holdings_count_before = int(sum(1 for value in prev_amounts.values() if value > 0))
holdings_count_after = int(sum(1 for value in current_amounts.values() if value > 0))
for instrument in sorted(set(prev_amounts) | set(current_amounts)):
current_shares = float(prev_amounts.get(instrument, 0.0))
target_shares = float(current_amounts.get(instrument, 0.0))
delta_shares = target_shares - current_shares
if abs(delta_shares) <= 1e-12:
continue
action = "buy" if delta_shares > 0 else "sell"
filled_shares = abs(float(delta_shares))
deal_price = _get_frame_value(open_prices if action == "buy" else close_prices, date, instrument, default=np.nan)
if not np.isfinite(deal_price) or deal_price <= 0:
deal_price = _get_frame_value(close_prices, date, instrument, default=np.nan)
if (not np.isfinite(deal_price) or deal_price <= 0) and action == "buy" and instrument in current_amounts:
deal_price = float(position.get_stock_price(instrument))
if (not np.isfinite(deal_price) or deal_price <= 0) and action == "sell" and prev_position is not None and instrument in prev_amounts:
deal_price = float(prev_position.get_stock_price(instrument))
if not np.isfinite(deal_price) or deal_price <= 0:
continue
order_value = float(filled_shares * deal_price)
transaction_cost = float(order_value * (cost_buy if action == "buy" else cost_sell))
cash_outflow = float(order_value + transaction_cost) if action == "buy" else 0.0
net_proceeds = float(order_value - transaction_cost) if action == "sell" else 0.0
days_held = 0
if action == "sell" and prev_position is not None:
try:
days_held = int(prev_position.get_stock_count(instrument, "day"))
except Exception:
days_held = 0
year = int(date.year)
if build_rows:
market_volume = _get_frame_value(volume_frame, date, instrument, default=np.nan)
market_amount = _get_frame_value(amount_frame, date, instrument, default=np.nan)
rows.append(
{
"date": date.strftime("%Y-%m-%d"),
"year": year,
"market_regime": _market_regime(year),
"action": action,
"instrument": instrument,
"shares": round(float(filled_shares), 6),
"current_shares": round(float(current_shares), 6),
"target_shares": round(float(target_shares), 6),
"requested_shares": round(float(filled_shares), 6),
"filled_shares": round(float(filled_shares), 6),
"unfilled_shares": 0.0,
"fill_ratio": 1.0,
"price": round(float(deal_price), 6),
"order_value": round(float(order_value), 6),
"filled_value": round(float(order_value), 6),
"gross_notional": round(float(order_value), 6),
"net_proceeds": round(float(net_proceeds), 6),
"cash_outflow": round(float(cash_outflow), 6),
"transaction_cost": round(float(transaction_cost), 6),
"realized_pnl": 0.0,
"holdings_count_before": holdings_count_before,
"holdings_count_after": holdings_count_after,
"days_held": days_held,
"market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0,
"market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0,
"volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0,
"amount_participation": round(float(order_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0,
"clip_reason": "qlib_original",
}
)
stats = yearly_trade_stats[year]
if action == "buy":
stats["buy_trades"] += 1
stats["shares_bought"] += float(filled_shares)
stats["buy_gross_notional"] += float(order_value)
stats["buy_cash_outflow"] += float(cash_outflow)
stats["buy_transaction_cost"] += float(transaction_cost)
else:
stats["sell_trades"] += 1
stats["shares_sold"] += float(filled_shares)
stats["sell_gross_notional"] += float(order_value)
stats["sell_net_proceeds"] += float(net_proceeds)
stats["sell_transaction_cost"] += float(transaction_cost)
stats["transaction_cost"] += float(transaction_cost)
stats["gross_turnover"] += float(order_value)
prev_position = position
prev_amounts = current_amounts
return rows, yearly_trade_stats
def _build_portfolio_log_rows(
*,
dates: list[pd.Timestamp],
portfolio_value: pd.Series,
portfolio_return: pd.Series,
benchmark_return: pd.Series,
cash_series: pd.Series | None,
holdings_count: dict[pd.Timestamp, int] | None,
rebalance_freq: int,
rebalance_flags: dict[pd.Timestamp, bool] | None = None,
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
rebalance_step = max(int(rebalance_freq), 1)
holdings_count = holdings_count or {}
cash_series = cash_series if cash_series is not None else pd.Series(dtype=float)
rebalance_flags = rebalance_flags or {}
for idx, date in enumerate(dates):
date_key = pd.Timestamp(date)
portfolio_value_at_date = float(portfolio_value.get(date, np.nan))
portfolio_return_at_date = float(portfolio_return.get(date, 0.0) or 0.0)
benchmark_return_at_date = float(benchmark_return.get(date, 0.0) or 0.0)
cash_at_date = float(cash_series.get(date, np.nan)) if date in cash_series.index else np.nan
is_rebalance = bool(rebalance_flags.get(date_key, idx % rebalance_step == 0))
rows.append(
{
"date": date_key.strftime("%Y-%m-%d"),
"year": int(date_key.year),
"market_regime": _market_regime(int(date_key.year)),
"portfolio_value": round(float(portfolio_value_at_date), 6) if np.isfinite(portfolio_value_at_date) else None,
"cash_eod": round(float(cash_at_date), 6) if np.isfinite(cash_at_date) else None,
"cash_weight": round(float(cash_at_date / portfolio_value_at_date), 6)
if np.isfinite(cash_at_date) and np.isfinite(portfolio_value_at_date) and abs(portfolio_value_at_date) > 1e-12
else None,
"n_held": int(holdings_count.get(date_key, 0)),
"portfolio_return": round(float(portfolio_return_at_date), 8),
"benchmark_return": round(float(benchmark_return_at_date), 8),
"excess_return": round(float(portfolio_return_at_date - benchmark_return_at_date), 8),
"is_rebalance": is_rebalance,
}
)
return rows
def _compute_portfolio_ir_qlib(
factor_values: pd.Series,
price_df: pd.DataFrame,
bench_return: pd.Series | None = None,
top_k: int = 10,
n_drop: int = 2,
rebalance_freq: int = 5,
cost_buy: float = 0.0013,
cost_sell: float = 0.0013,
hold_thresh: int = 2,
label_forward_days: int = 5,
ann_scaler: int = 252,
start_date: str | None = None,
end_date: str | None = None,
risk_free_rate_annual: float = 0.0,
start_cash: float = 200_000_000.0,
position_size: float = 1.0,
max_pos_each_stock: float = 1.0,
lot_size: int = 100,
max_daily_volume_participation: float = 0.0,
max_daily_amount_participation: float = 0.0,
capture_details: bool = False,
trade_guard_config: dict[str, Any] | bool | None = None,
rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE,
) -> dict:
if isinstance(factor_values, pd.DataFrame):
factor_values = factor_values.iloc[:, 0]
open_prices = _get_price_wide_frame(price_df, "$open")
close_prices = _get_price_wide_frame(price_df, "$close")
volume_frame = _get_price_wide_frame(price_df, "$volume") if capture_details else None
amount_frame = _get_price_wide_frame(price_df, "$amount") if capture_details else None
if open_prices is None or close_prices is None:
return _empty_result("Missing $open or $close in price_df")
normalized_rebalance_mode = _normalize_rebalance_mode(rebalance_mode)
normalized_trade_guard_config = _normalize_trade_guard_config(trade_guard_config)
if normalized_trade_guard_config is None:
signal = _prepare_qlib_signal(factor_values)
cooldown_period = 0
else:
signal = _prepare_force_exit_qlib_signal(factor_values, price_df, normalized_trade_guard_config)
if "cooldown_period" in normalized_trade_guard_config:
cooldown_value = normalized_trade_guard_config.get("cooldown_period")
cooldown_period = 0 if cooldown_value is None else max(int(cooldown_value), 0)
else:
cooldown_period = max(int(rebalance_freq) * 2, int(hold_thresh))
if start_date:
signal = signal[signal.index.get_level_values("datetime") >= pd.Timestamp(start_date)]
if end_date:
signal = signal[signal.index.get_level_values("datetime") <= pd.Timestamp(end_date)]
if signal.empty:
return _empty_result("No valid signal values for qlib_original backtest")
runtime = _ensure_qlib_initialized(_ensure_qlib_provider(price_df))
ForceExitTopkDropoutStrategy = runtime["ForceExitTopkDropoutStrategy"]
SimulatorExecutor = runtime["SimulatorExecutor"]
Account = runtime["Account"]
CommonInfrastructure = runtime["CommonInfrastructure"]
backtest_loop = runtime["backtest_loop"]
get_exchange = runtime["get_exchange"]
qlib_warnings: list[str] = []
effective_risk_degree = min(max(float(position_size), 0.0), 1.0)
if max_pos_each_stock > 0 and top_k > 0:
effective_risk_degree = min(effective_risk_degree, float(top_k) * float(max_pos_each_stock))
if max_daily_amount_participation > 0:
qlib_warnings.append("max_daily_amount_participation is not natively supported by qlib_original and is ignored")
if normalized_trade_guard_config is not None:
qlib_warnings.append("qlib_original trade_guard_config enabled")
if normalized_rebalance_mode != DEFAULT_QLIB_REBALANCE_MODE:
qlib_warnings.append(f"qlib_original rebalance_mode={normalized_rebalance_mode}")
strategy = ForceExitTopkDropoutStrategy(
signal=signal,
topk=top_k,
n_drop=n_drop,
rebalance_freq=rebalance_freq,
cooldown_period=cooldown_period,
rebalance_mode=normalized_rebalance_mode,
max_pos_each_stock=max_pos_each_stock,
hold_thresh=hold_thresh,
risk_degree=effective_risk_degree,
only_tradable=False,
forbid_all_trade_at_limit=False,
)
executor_obj = SimulatorExecutor(
time_per_step="day",
generate_portfolio_metrics=True,
verbose=False,
)
exchange_kwargs: dict[str, Any] = {
"freq": "day",
"deal_price": ("$open", "$close"),
"limit_threshold": None,
"open_cost": cost_buy,
"close_cost": cost_sell,
"min_cost": 0.0,
"trade_unit": max(int(lot_size), 1),
}
if max_daily_volume_participation > 0:
exchange_kwargs["volume_threshold"] = ("current", f"{float(max_daily_volume_participation)} * $volume")
available_dates = _get_available_dates(price_df)
if len(available_dates) < 2:
return _empty_result("Qlib original backtest requires at least 2 trading dates")
latest_safe_end = pd.Timestamp(available_dates[-2])
bt_start = pd.Timestamp(start_date) if start_date else pd.Timestamp(signal.index.get_level_values("datetime").min())
requested_end = pd.Timestamp(end_date) if end_date else pd.Timestamp(signal.index.get_level_values("datetime").max())
bt_end = min(requested_end, latest_safe_end)
if bt_end < bt_start:
return _empty_result("Qlib original backtest window is empty after calendar clipping")
if bt_end < requested_end:
qlib_warnings.append(
f"qlib_original clipped end_date from {requested_end.strftime('%Y-%m-%d')} "
f"to {bt_end.strftime('%Y-%m-%d')} because the provider has no future calendar bar"
)
signal = signal[signal.index.get_level_values("datetime") <= bt_end]
if signal.empty:
return _empty_result("No valid signal values remain after qlib_original calendar clipping")
benchmark_series = None
if bench_return is not None:
benchmark_series = bench_return.astype(float).sort_index()
benchmark_series = benchmark_series[benchmark_series.index >= bt_start]
benchmark_series = benchmark_series[benchmark_series.index <= bt_end]
try:
with _quiet_runtime_io(), runtime["set_global_logger_level_cm"](logging.ERROR):
trade_account = Account(
init_cash=max(float(start_cash), 0.0),
position_dict={},
pos_type="Position",
benchmark_config={"benchmark": None},
)
exchange_with_dates = dict(exchange_kwargs)
exchange_with_dates.setdefault("start_time", bt_start)
exchange_with_dates.setdefault("end_time", bt_end)
trade_exchange = get_exchange(**exchange_with_dates)
common_infra = CommonInfrastructure(trade_account=trade_account, trade_exchange=trade_exchange)
strategy.reset_common_infra(common_infra)
executor_obj.reset_common_infra(common_infra)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Mean of empty slice", category=RuntimeWarning)
portfolio_metric_dict, _indicator_dict = backtest_loop(
start_time=bt_start,
end_time=bt_end,
trade_strategy=strategy,
trade_executor=executor_obj,
)
except Exception as exc: # pragma: no cover - depends on qlib runtime
return _empty_result(f"Qlib original backtest failed: {type(exc).__name__}: {str(exc)[:500]}")
if not portfolio_metric_dict:
return _empty_result("Qlib original backtest returned no portfolio metrics")
analysis_key = sorted(portfolio_metric_dict.keys())[0]
report_normal, positions_normal = portfolio_metric_dict[analysis_key]
if report_normal is None or report_normal.empty:
return _empty_result("Qlib original backtest returned an empty report")
report_normal = report_normal.sort_index()
portfolio_returns_with_cost = report_normal["return"].astype(float) - report_normal["cost"].astype(float)
if benchmark_series is not None:
benchmark_returns = benchmark_series.reindex(report_normal.index).fillna(0.0).astype(float)
else:
benchmark_returns = report_normal.get("bench", pd.Series(index=report_normal.index, dtype=float)).astype(float).fillna(0.0)
return_frame = pd.DataFrame(
{
"portfolio_value": report_normal["account"].astype(float),
"portfolio_return": portfolio_returns_with_cost,
"benchmark_return": benchmark_returns,
},
index=report_normal.index,
)
ic_frame = _compute_daily_cross_sectional_ic_frame(
factor_values=factor_values,
close_prices=close_prices,
label_forward_days=label_forward_days,
)
if start_date and not ic_frame.empty:
ic_frame = ic_frame[ic_frame.index >= pd.Timestamp(start_date)]
if end_date and not ic_frame.empty:
ic_frame = ic_frame[ic_frame.index <= pd.Timestamp(end_date)]
holding_log, yearly_holding_stats = _build_holding_log_from_qlib_positions(
positions_normal=positions_normal,
report_normal=report_normal,
volume_frame=volume_frame,
amount_frame=amount_frame,
build_rows=capture_details,
)
trade_log, yearly_trade_stats = _build_trade_log_from_qlib_positions(
positions_normal=positions_normal,
open_prices=open_prices,
close_prices=close_prices,
volume_frame=volume_frame,
amount_frame=amount_frame,
cost_buy=cost_buy,
cost_sell=cost_sell,
build_rows=capture_details,
)
holdings_count_by_date = {
pd.Timestamp(date): int(
sum(1 for instrument in position.get_stock_list() if float(position.get_stock_amount(instrument)) > 0)
)
for date, position in positions_normal.items()
}
portfolio_log = _build_portfolio_log_rows(
dates=list(return_frame.index),
portfolio_value=return_frame["portfolio_value"].astype(float),
portfolio_return=return_frame["portfolio_return"].astype(float),
benchmark_return=return_frame["benchmark_return"].astype(float),
cash_series=report_normal["cash"].astype(float) if "cash" in report_normal.columns else None,
holdings_count=holdings_count_by_date,
rebalance_freq=rebalance_freq,
)
total_transaction_cost = float(sum(stats.get("transaction_cost", 0.0) for stats in yearly_trade_stats.values()))
total_gross_turnover = float(sum(stats.get("gross_turnover", 0.0) for stats in yearly_trade_stats.values()))
result = _summarize_return_frame(
return_frame=return_frame,
ann_scaler=ann_scaler,
risk_free_rate_annual=risk_free_rate_annual,
)
result.update(_summarize_ic_frame(ic_frame))
result.update(
{
"success": True,
"final_value": round(float(report_normal["account"].iloc[-1]), 6),
"error": None,
"qlib_warnings": qlib_warnings,
"trade_guard_config": normalized_trade_guard_config,
"rebalance_mode": normalized_rebalance_mode,
"transaction_cost": round(total_transaction_cost, 6),
"gross_turnover": round(total_gross_turnover, 6),
"turnover_ratio": round(total_gross_turnover / max(float(start_cash), 1e-12), 6),
"yearly_metrics": _build_yearly_metrics(
return_frame=return_frame,
ic_frame=ic_frame,
yearly_trade_stats=yearly_trade_stats,
yearly_holding_stats=yearly_holding_stats,
ann_scaler=ann_scaler,
risk_free_rate_annual=risk_free_rate_annual,
),
"trade_log": trade_log if capture_details else [],
"stock_contrib": _build_stock_contribution_summary(holding_log, trade_log) if capture_details else [],
"holding_log": holding_log if capture_details else [],
"portfolio_log": portfolio_log if capture_details else [],
}
)
return result
def _compute_portfolio_ir_custom(
factor_values: pd.Series,
price_df: pd.DataFrame,
bench_return: pd.Series | None = None,
top_k: int = 10,
n_drop: int = 2,
rebalance_freq: int = 5,
cost_buy: float = 0.0013,
cost_sell: float = 0.0013,
hold_thresh: int = 2,
label_forward_days: int = 5,
ann_scaler: int = 252,
start_date: str | None = None,
end_date: str | None = None,
risk_free_rate_annual: float = 0.0,
start_cash: float = 200_000_000.0,
position_size: float = 1.0,
max_pos_each_stock: float = 1.0,
lot_size: int = 100,
max_daily_volume_participation: float = 0.0,
max_daily_amount_participation: float = 0.0,
capture_details: bool = False,
custom_weight_mode: str = "equal",
redistribute_unfilled_cash: bool = False,
enforce_cash_limit: bool = False,
) -> dict:
"""Compute portfolio-based IR matching AlphaAgent's Qlib config."""
if isinstance(factor_values, pd.DataFrame):
factor_values = factor_values.iloc[:, 0]
all_dates = sorted(factor_values.index.get_level_values("datetime").unique())
if start_date:
all_dates = [d for d in all_dates if d >= pd.Timestamp(start_date)]
if end_date:
all_dates = [d for d in all_dates if d <= pd.Timestamp(end_date)]
if len(all_dates) < rebalance_freq + hold_thresh + 1:
return _empty_result("Not enough dates for backtesting")
open_prices = price_df["$open"].unstack("instrument") if "$open" in price_df.columns else None
close_prices = price_df["$close"].unstack("instrument") if "$close" in price_df.columns else None
volume_frame = price_df["$volume"].unstack("instrument") if "$volume" in price_df.columns else None
amount_frame = price_df["$amount"].unstack("instrument") if "$amount" in price_df.columns else None
if open_prices is None or close_prices is None:
return _empty_result("Missing $open or $close in price_df")
effective_freq = max(rebalance_freq, hold_thresh)
rebalance_indices = list(range(0, len(all_dates), effective_freq))
rebalance_set = {all_dates[i] for i in rebalance_indices}
lot_size = max(int(lot_size), 1)
start_cash = max(float(start_cash), 0.0)
position_size = min(max(float(position_size), 0.0), 1.0)
max_pos_each_stock = min(max(float(max_pos_each_stock), 0.0), 1.0)
max_daily_volume_participation = max(float(max_daily_volume_participation), 0.0)
max_daily_amount_participation = max(float(max_daily_amount_participation), 0.0)
normalized_custom_weight_mode = _normalize_custom_weight_mode(custom_weight_mode)
redistribute_unfilled_cash = bool(redistribute_unfilled_cash)
enforce_cash_limit = bool(enforce_cash_limit)
holdings: dict[str, dict[str, Any]] = {}
cash = float(start_cash)
daily_portfolio_values: list[float] = []
daily_dates_out: list[pd.Timestamp] = []
daily_portfolio_records: list[dict[str, Any]] = []
daily_holding_records: list[dict[str, Any]] = []
trade_records: list[dict[str, Any]] = []
yearly_trade_stats: dict[int, dict[str, float]] = defaultdict(
lambda: {
"buy_trades": 0,
"sell_trades": 0,
"shares_bought": 0.0,
"shares_sold": 0.0,
"buy_gross_notional": 0.0,
"sell_gross_notional": 0.0,
"buy_cash_outflow": 0.0,
"sell_net_proceeds": 0.0,
"buy_transaction_cost": 0.0,
"sell_transaction_cost": 0.0,
"transaction_cost": 0.0,
"gross_turnover": 0.0,
}
)
yearly_holding_stats: dict[int, dict[str, float]] = defaultdict(
lambda: {
"n_days": 0,
"holdings_count_sum": 0.0,
"max_holdings_count": 0,
}
)
rebalance_execution_count = 0
for day_idx, date in enumerate(all_dates):
is_rebalance = date in rebalance_set
rebalance_executed = False
signal = pd.Series(dtype=float)
if is_rebalance and day_idx > 0:
prev_date = all_dates[day_idx - 1]
try:
signal = factor_values.xs(prev_date, level="datetime")
except KeyError:
signal = pd.Series(dtype=float)
signal = signal.dropna().sort_values(ascending=False)
if is_rebalance and day_idx > 0 and not signal.empty:
rebalance_executed = True
is_initial_rebalance = rebalance_execution_count == 0
effective_custom_weight_mode = "equal" if is_initial_rebalance else normalized_custom_weight_mode
effective_redistribute_unfilled_cash = False if is_initial_rebalance else redistribute_unfilled_cash
current_insts = set(holdings.keys())
locked_insts = {
inst for inst, pos in holdings.items() if day_idx - int(pos["buy_day_idx"]) < hold_thresh
}
new_top = set(signal.nlargest(top_k).index.tolist())
eligible_current = list(current_insts - locked_insts)
# Custom engine now rebalances directly to the current top-k target set:
# any sellable holding that falls out of the target top-k becomes a drop
# candidate, without an n_drop cap.
dropout_candidates = {inst for inst in eligible_current if inst not in new_top}
keep_insts = list(current_insts - dropout_candidates)
keep_insts_set = {str(inst) for inst in keep_insts}
equity = cash + sum(
_market_value_from_close(close_prices, date, pos, inst)
for inst, pos in holdings.items()
)
max_position_value = equity * max_pos_each_stock
if max_pos_each_stock >= 1.0 - 1e-12:
baseline_slot_value = equity * position_size
else:
baseline_slot_value = min(
max_position_value,
(equity * position_size / float(top_k)) if top_k > 0 else 0.0,
)
target_universe = list(keep_insts)
for inst in signal.index.tolist():
if len(target_universe) >= top_k:
break
if inst in target_universe or inst in dropout_candidates:
continue
px = _get_frame_value(open_prices, date, inst, default=np.nan)
if not np.isfinite(px) or px <= 0:
px = _get_frame_value(close_prices, date, inst, default=np.nan)
if not np.isfinite(px) or px <= 0:
continue
one_lot_cash = float(lot_size) * float(px) * (1.0 + cost_buy)
if baseline_slot_value > 0 and one_lot_cash > baseline_slot_value:
continue
target_universe.append(inst)
ranked_target_names = [str(inst) for inst in target_universe]
target_shares_map: dict[str, float] = {}
score_target_weights = pd.Series(dtype=float)
if effective_custom_weight_mode == "equal" and not effective_redistribute_unfilled_cash:
target_count = max(len(ranked_target_names), 1)
target_value_per_name = min(
equity * position_size / float(target_count),
max_position_value,
)
for inst in set(ranked_target_names).union(current_insts):
current_shares = float(holdings.get(inst, {}).get("shares", 0.0))
if inst not in ranked_target_names or target_value_per_name <= 0:
target_shares = 0.0
else:
price_for_target = _get_frame_value(open_prices, date, inst, default=np.nan)
if not np.isfinite(price_for_target) or price_for_target <= 0:
price_for_target = _get_frame_value(close_prices, date, inst, default=np.nan)
if not np.isfinite(price_for_target) or price_for_target <= 0:
target_shares = current_shares
else:
target_shares = _round_down_lot(target_value_per_name / float(price_for_target), lot_size)
if inst in keep_insts_set and target_shares < current_shares:
target_shares = current_shares
target_shares_map[inst] = float(target_shares)
else:
ranked_target_scores = signal.reindex(pd.Index(ranked_target_names)).dropna().sort_values(ascending=False)
ranked_target_names = [str(inst) for inst in ranked_target_scores.index.tolist()]
score_target_weights = _build_custom_target_weights(
target_scores=ranked_target_scores,
target_names=ranked_target_names,
total_weight=1.0 if position_size > 0 else 0.0,
max_pos_each_stock=max_pos_each_stock,
weight_mode=effective_custom_weight_mode,
)
for inst in set(ranked_target_names).union(current_insts):
current_shares = float(holdings.get(inst, {}).get("shares", 0.0))
# Score-weighted custom mode treats sell decisions as membership-driven:
# if a name remains in the refreshed alpha basket, keep the current size
# and let the normalized score decide how much incremental cash to add
# after sells have completed.
if str(inst) not in ranked_target_names:
target_shares = 0.0
else:
target_shares = current_shares
if inst in keep_insts_set and target_shares < current_shares:
target_shares = current_shares
target_shares_map[inst] = float(target_shares)
for inst in list(holdings.keys()):
pos = holdings[inst]
current_shares = float(pos["shares"])
target_shares = float(target_shares_map.get(inst, 0.0))
requested_shares = max(current_shares - target_shares, 0.0)
if requested_shares <= 0:
continue
hold_count_before = len(holdings)
days_held = int(day_idx - int(pos["buy_day_idx"]))
sell_px = _get_frame_value(close_prices, date, inst, default=pos.get("avg_cost_per_share", 0.0))
sell_px = sell_px if sell_px > 0 else float(pos.get("avg_cost_per_share", 0.0))
market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan)
market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan)
# Custom engine uses sell signals as hard exits: once an instrument
# falls out of the active alpha basket, liquidate the whole sellable
# position on that rebalance date instead of clipping the order into
# multiple partial sells across later rebalances.
clip_reasons: list[str] = []
if not np.isfinite(sell_px) or sell_px <= 0:
filled_shares = 0.0
clip_reasons.append("invalid_price")
else:
filled_shares = min(requested_shares, current_shares)
order_value = requested_shares * sell_px
filled_value = filled_shares * sell_px
transaction_cost = filled_value * cost_sell
net_proceeds = filled_value - transaction_cost
realized_pnl = filled_value - transaction_cost - float(pos["avg_cost_per_share"]) * filled_shares
if filled_shares > 0:
cash += net_proceeds
pos["shares"] = float(pos["shares"]) - filled_shares
pos["total_cost_basis"] = max(
float(pos["total_cost_basis"]) - float(pos["avg_cost_per_share"]) * filled_shares,
0.0,
)
if pos["shares"] > 0:
pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"])
else:
del holdings[inst]
year = int(pd.Timestamp(date).year)
stats = yearly_trade_stats[year]
stats["sell_trades"] += 1
stats["shares_sold"] += float(filled_shares)
stats["sell_gross_notional"] += float(filled_value)
stats["sell_net_proceeds"] += float(net_proceeds)
stats["sell_transaction_cost"] += float(transaction_cost)
stats["transaction_cost"] += float(transaction_cost)
stats["gross_turnover"] += float(filled_value)
if capture_details:
year = int(pd.Timestamp(date).year)
trade_records.append(
{
"date": pd.Timestamp(date).strftime("%Y-%m-%d"),
"year": year,
"market_regime": _market_regime(year),
"action": "sell",
"instrument": inst,
"shares": round(float(filled_shares), 6),
"current_shares": round(float(current_shares), 6),
"target_shares": round(float(target_shares), 6),
"requested_shares": round(float(requested_shares), 6),
"filled_shares": round(float(filled_shares), 6),
"unfilled_shares": round(float(max(requested_shares - filled_shares, 0.0)), 6),
"fill_ratio": round(float(filled_shares / requested_shares), 6) if requested_shares > 0 else 0.0,
"price": round(float(sell_px), 6),
"order_value": round(float(order_value), 6),
"filled_value": round(float(filled_value), 6),
"redistributed_notional": 0.0,
"gross_notional": round(float(filled_value), 6),
"net_proceeds": round(float(net_proceeds), 6),
"cash_outflow": 0.0,
"transaction_cost": round(float(transaction_cost), 6),
"realized_pnl": round(float(realized_pnl), 6),
"holdings_count_before": int(hold_count_before),
"holdings_count_after": int(len(holdings)),
"days_held": days_held,
"market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0,
"market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0,
"volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0,
"amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0,
"clip_reason": _clip_reason_text(clip_reasons),
}
)
score_buy_budget_by_inst: dict[str, float] = {}
if not score_target_weights.empty:
available_buy_budget = max(float(cash), 0.0)
score_buy_budget_by_inst = {
str(inst): float(weight) * available_buy_budget
for inst, weight in score_target_weights.items()
if float(weight) > 0
}
carry_budget = 0.0
buy_filled_by_inst: dict[str, float] = {}
buy_names = ranked_target_names if ranked_target_names else []
for inst in buy_names:
current_shares = float(holdings.get(inst, {}).get("shares", 0.0))
target_shares = float(target_shares_map.get(inst, 0.0))
reported_target_shares = float(target_shares)
buy_px = _get_frame_value(open_prices, date, inst, default=np.nan)
if not np.isfinite(buy_px) or buy_px <= 0:
buy_px = _get_frame_value(close_prices, date, inst, default=np.nan)
hold_count_before = len(holdings)
year = int(pd.Timestamp(date).year)
market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan)
market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan)
clip_reasons: list[str] = []
filled_shares = 0.0
requested_shares = 0.0
order_value = 0.0
filled_value = 0.0
transaction_cost = 0.0
cash_outflow = 0.0
redistributed_notional = 0.0
if effective_custom_weight_mode == "equal" and not effective_redistribute_unfilled_cash:
requested_shares = max(target_shares - current_shares, 0.0)
if requested_shares <= 0:
continue
order_value = requested_shares * float(buy_px) if np.isfinite(buy_px) and buy_px > 0 else 0.0
if not np.isfinite(buy_px) or buy_px <= 0:
clip_reasons.append("invalid_price")
else:
volume_cap, amount_cap = _liquidity_caps(
market_volume=market_volume,
market_amount=market_amount,
price=buy_px,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
)
cash_cap = (
_round_down_lot(cash / (float(buy_px) * (1.0 + cost_buy)), lot_size)
if enforce_cash_limit
else np.inf
)
filled_shares = min(requested_shares, volume_cap, amount_cap, cash_cap)
if volume_cap < requested_shares:
clip_reasons.append("volume_limit")
if amount_cap < requested_shares:
clip_reasons.append("amount_limit")
if enforce_cash_limit and cash_cap < requested_shares:
clip_reasons.append("cash_limit")
if filled_shares < requested_shares and filled_shares <= 0:
clip_reasons.append("lot_rounding")
filled_value = filled_shares * float(buy_px)
transaction_cost = filled_value * cost_buy
cash_outflow = filled_value + transaction_cost
else:
current_value = float(current_shares * buy_px) if np.isfinite(buy_px) and buy_px > 0 else 0.0
base_requested_budget = float(score_buy_budget_by_inst.get(str(inst), 0.0))
requested_budget = float(base_requested_budget)
carry_in_budget = 0.0
if effective_redistribute_unfilled_cash:
carry_in_budget = float(carry_budget)
requested_budget += carry_in_budget
carry_budget = 0.0
if max_pos_each_stock > 0 and np.isfinite(buy_px) and buy_px > 0:
per_name_value_cap = float(equity * max_pos_each_stock)
budget_cap = max(per_name_value_cap - current_value, 0.0)
carry_budget += max(requested_budget - budget_cap, 0.0)
requested_budget = min(requested_budget, budget_cap)
if requested_budget <= 1e-12 and carry_budget <= 1e-12:
continue
if not np.isfinite(buy_px) or buy_px <= 0:
clip_reasons.append("invalid_price")
if effective_redistribute_unfilled_cash:
carry_budget += requested_budget
else:
requested_shares = _round_down_lot(
requested_budget / (float(buy_px) * (1.0 + cost_buy)),
lot_size,
)
reported_target_shares = float(current_shares + requested_shares)
order_value = requested_shares * float(buy_px)
volume_cap, amount_cap = _liquidity_caps(
market_volume=market_volume,
market_amount=market_amount,
price=buy_px,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
)
cash_cap = (
_round_down_lot(cash / (float(buy_px) * (1.0 + cost_buy)), lot_size)
if enforce_cash_limit
else np.inf
)
filled_shares = min(requested_shares, volume_cap, amount_cap, cash_cap)
if volume_cap < requested_shares:
clip_reasons.append("volume_limit")
if amount_cap < requested_shares:
clip_reasons.append("amount_limit")
if enforce_cash_limit and cash_cap < requested_shares:
clip_reasons.append("cash_limit")
if requested_budget > 1e-12 and requested_shares <= 0:
clip_reasons.append("lot_rounding")
if filled_shares < requested_shares and filled_shares <= 0:
clip_reasons.append("lot_rounding")
filled_value = filled_shares * float(buy_px)
transaction_cost = filled_value * cost_buy
cash_outflow = filled_value + transaction_cost
if carry_in_budget > 0 and filled_shares > 0:
redistributed_notional = min(float(cash_outflow), float(carry_in_budget))
if effective_redistribute_unfilled_cash:
carry_budget += max(requested_budget - cash_outflow, 0.0)
if filled_shares > 0:
cash -= cash_outflow
buy_filled_by_inst[inst] = float(buy_filled_by_inst.get(inst, 0.0)) + float(filled_shares)
if inst in holdings:
pos = holdings[inst]
pos["shares"] = float(pos["shares"]) + filled_shares
pos["total_cost_basis"] = float(pos["total_cost_basis"]) + cash_outflow
pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"])
else:
holdings[inst] = {
"shares": float(filled_shares),
"buy_date": pd.Timestamp(date),
"buy_day_idx": int(day_idx),
"avg_cost_per_share": float(cash_outflow / filled_shares),
"total_cost_basis": float(cash_outflow),
}
stats = yearly_trade_stats[year]
stats["buy_trades"] += 1
stats["shares_bought"] += float(filled_shares)
stats["buy_gross_notional"] += float(filled_value)
stats["buy_cash_outflow"] += float(cash_outflow)
stats["buy_transaction_cost"] += float(transaction_cost)
stats["transaction_cost"] += float(transaction_cost)
stats["gross_turnover"] += float(filled_value)
if capture_details:
trade_records.append(
{
"date": pd.Timestamp(date).strftime("%Y-%m-%d"),
"year": year,
"market_regime": _market_regime(year),
"action": "buy",
"instrument": inst,
"shares": round(float(filled_shares), 6),
"current_shares": round(float(current_shares), 6),
"target_shares": round(float(reported_target_shares), 6),
"requested_shares": round(float(requested_shares), 6),
"filled_shares": round(float(filled_shares), 6),
"unfilled_shares": round(float(max(requested_shares - filled_shares, 0.0)), 6),
"fill_ratio": round(float(filled_shares / requested_shares), 6) if requested_shares > 0 else 0.0,
"price": round(float(buy_px), 6) if np.isfinite(buy_px) else 0.0,
"order_value": round(float(order_value), 6),
"filled_value": round(float(filled_value), 6),
"redistributed_notional": round(float(redistributed_notional), 6),
"gross_notional": round(float(filled_value), 6),
"net_proceeds": 0.0,
"cash_outflow": round(float(cash_outflow), 6),
"transaction_cost": round(float(transaction_cost), 6),
"realized_pnl": 0.0,
"holdings_count_before": int(hold_count_before),
"holdings_count_after": int(len(holdings)),
"days_held": 0,
"market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0,
"market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0,
"volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0,
"amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0,
"clip_reason": _clip_reason_text(clip_reasons),
}
)
if effective_redistribute_unfilled_cash and carry_budget > 1e-12 and buy_names:
residual_budget = (
min(float(carry_budget), float(cash))
if enforce_cash_limit
else float(carry_budget)
)
for inst in buy_names:
if residual_budget <= 1e-12:
break
buy_px = _get_frame_value(open_prices, date, inst, default=np.nan)
if not np.isfinite(buy_px) or buy_px <= 0:
buy_px = _get_frame_value(close_prices, date, inst, default=np.nan)
if not np.isfinite(buy_px) or buy_px <= 0:
continue
one_lot_cash = float(lot_size) * float(buy_px) * (1.0 + cost_buy)
if one_lot_cash <= 0 or residual_budget + 1e-12 < one_lot_cash or (enforce_cash_limit and cash + 1e-12 < one_lot_cash):
continue
current_shares = float(holdings.get(inst, {}).get("shares", 0.0))
current_value = float(current_shares * float(buy_px))
extra_budget_cap = float(residual_budget)
if max_pos_each_stock > 0:
per_name_value_cap = float(equity * max_pos_each_stock)
extra_budget_cap = min(extra_budget_cap, max(per_name_value_cap - current_value, 0.0))
if extra_budget_cap + 1e-12 < one_lot_cash:
continue
market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan)
market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan)
volume_cap, amount_cap = _liquidity_caps(
market_volume=market_volume,
market_amount=market_amount,
price=buy_px,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
)
used_shares_today = float(buy_filled_by_inst.get(inst, 0.0))
remaining_volume_cap = max(float(volume_cap) - used_shares_today, 0.0)
remaining_amount_cap = max(float(amount_cap) - used_shares_today, 0.0)
remaining_liquidity_cap = min(remaining_volume_cap, remaining_amount_cap)
budget_cap_shares = _round_down_lot(extra_budget_cap / (float(buy_px) * (1.0 + cost_buy)), lot_size)
cash_cap_shares = (
_round_down_lot(min(float(cash), float(residual_budget)) / (float(buy_px) * (1.0 + cost_buy)), lot_size)
if enforce_cash_limit
else budget_cap_shares
)
filled_shares = min(remaining_liquidity_cap, budget_cap_shares, cash_cap_shares)
if filled_shares <= 0:
continue
hold_count_before = len(holdings)
year = int(pd.Timestamp(date).year)
filled_value = filled_shares * float(buy_px)
transaction_cost = filled_value * cost_buy
cash_outflow = filled_value + transaction_cost
cash -= cash_outflow
residual_budget = max(float(residual_budget) - float(cash_outflow), 0.0)
buy_filled_by_inst[inst] = float(buy_filled_by_inst.get(inst, 0.0)) + float(filled_shares)
if inst in holdings:
pos = holdings[inst]
pos["shares"] = float(pos["shares"]) + filled_shares
pos["total_cost_basis"] = float(pos["total_cost_basis"]) + cash_outflow
pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"])
else:
holdings[inst] = {
"shares": float(filled_shares),
"buy_date": pd.Timestamp(date),
"buy_day_idx": int(day_idx),
"avg_cost_per_share": float(cash_outflow / filled_shares),
"total_cost_basis": float(cash_outflow),
}
stats = yearly_trade_stats[year]
stats["buy_trades"] += 1
stats["shares_bought"] += float(filled_shares)
stats["buy_gross_notional"] += float(filled_value)
stats["buy_cash_outflow"] += float(cash_outflow)
stats["buy_transaction_cost"] += float(transaction_cost)
stats["transaction_cost"] += float(transaction_cost)
stats["gross_turnover"] += float(filled_value)
if capture_details:
trade_records.append(
{
"date": pd.Timestamp(date).strftime("%Y-%m-%d"),
"year": year,
"market_regime": _market_regime(year),
"action": "buy",
"instrument": inst,
"shares": round(float(filled_shares), 6),
"current_shares": round(float(current_shares), 6),
"target_shares": round(float(target_shares_map.get(inst, 0.0)), 6),
"requested_shares": round(float(filled_shares), 6),
"filled_shares": round(float(filled_shares), 6),
"unfilled_shares": 0.0,
"fill_ratio": 1.0,
"price": round(float(buy_px), 6),
"order_value": round(float(filled_value), 6),
"filled_value": round(float(filled_value), 6),
"redistributed_notional": round(float(cash_outflow), 6),
"gross_notional": round(float(filled_value), 6),
"net_proceeds": 0.0,
"cash_outflow": round(float(cash_outflow), 6),
"transaction_cost": round(float(transaction_cost), 6),
"realized_pnl": 0.0,
"holdings_count_before": int(hold_count_before),
"holdings_count_after": int(len(holdings)),
"days_held": 0,
"market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0,
"market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0,
"volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0,
"amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0,
"clip_reason": "cash_sweep",
}
)
rebalance_execution_count += 1
year = int(pd.Timestamp(date).year)
holding_stats = yearly_holding_stats[year]
holding_stats["n_days"] += 1
holding_stats["holdings_count_sum"] += float(len(holdings))
holding_stats["max_holdings_count"] = max(int(holding_stats["max_holdings_count"]), len(holdings))
eod_rows: list[dict[str, Any]] = []
holdings_market_value = 0.0
for inst, pos in holdings.items():
close_px = _get_frame_value(close_prices, date, inst, default=pos.get("avg_cost_per_share", 0.0))
if close_px > 0:
market_value = float(pos["shares"]) * float(close_px)
else:
market_value = float(pos["total_cost_basis"])
holdings_market_value += float(market_value)
if capture_details:
eod_rows.append(
{
"date": pd.Timestamp(date).strftime("%Y-%m-%d"),
"year": year,
"market_regime": _market_regime(year),
"instrument": inst,
"shares_held": round(float(pos["shares"]), 6),
"market_value": round(float(market_value), 6),
"close_price": round(float(close_px), 6),
"market_volume": round(_get_frame_value(volume_frame, date, inst, default=0.0), 6),
"market_amount": round(_get_frame_value(amount_frame, date, inst, default=0.0), 6),
}
)
portfolio_value = float(cash + holdings_market_value)
daily_portfolio_values.append(portfolio_value)
daily_dates_out.append(pd.Timestamp(date))
daily_portfolio_records.append(
{
"Date": pd.Timestamp(date),
"portfolio_value": float(portfolio_value),
"cash": float(cash),
"n_held": int(len(holdings)),
"is_rebalance": bool(rebalance_executed),
}
)
if capture_details and eod_rows:
for row in eod_rows:
row["portfolio_value"] = round(float(portfolio_value), 6)
row["cash_eod"] = round(float(cash), 6)
row["weight"] = round(float(row["market_value"]) / float(portfolio_value), 6) if portfolio_value > 0 else 0.0
daily_holding_records.append(row)
if len(daily_portfolio_values) < 10:
return _empty_result("Not enough daily returns")
pv_series = pd.Series(daily_portfolio_values, index=pd.DatetimeIndex(daily_dates_out), name="portfolio_value")
portfolio_returns = pv_series.pct_change().dropna()
if len(portfolio_returns) < 2:
return _empty_result("Not enough daily returns")
benchmark_returns = (
bench_return.reindex(portfolio_returns.index).fillna(0.0)
if bench_return is not None
else pd.Series(0.0, index=portfolio_returns.index)
)
return_frame = pd.DataFrame(
{
"portfolio_value": pv_series.reindex(portfolio_returns.index),
"portfolio_return": portfolio_returns.astype(float),
"benchmark_return": benchmark_returns.astype(float),
},
index=portfolio_returns.index,
)
portfolio_dates = list(pv_series.index)
portfolio_returns_aligned = portfolio_returns.reindex(portfolio_dates).fillna(0.0).astype(float)
benchmark_returns_aligned = benchmark_returns.reindex(portfolio_dates).fillna(0.0).astype(float)
cash_series_aligned = pd.Series(
{
pd.Timestamp(row["Date"]): float(row["cash"])
for row in daily_portfolio_records
}
)
holdings_count_aligned = {
pd.Timestamp(row["Date"]): int(row["n_held"])
for row in daily_portfolio_records
}
rebalance_flags_aligned = {
pd.Timestamp(row["Date"]): bool(row.get("is_rebalance", False))
for row in daily_portfolio_records
}
portfolio_log = _build_portfolio_log_rows(
dates=portfolio_dates,
portfolio_value=pv_series.astype(float),
portfolio_return=portfolio_returns_aligned,
benchmark_return=benchmark_returns_aligned,
cash_series=cash_series_aligned,
holdings_count=holdings_count_aligned,
rebalance_freq=rebalance_freq,
rebalance_flags=rebalance_flags_aligned,
)
ic_frame = _compute_daily_cross_sectional_ic_frame(
factor_values=factor_values,
close_prices=close_prices,
label_forward_days=label_forward_days,
)
if start_date and not ic_frame.empty:
ic_frame = ic_frame[ic_frame.index >= pd.Timestamp(start_date)]
if end_date and not ic_frame.empty:
ic_frame = ic_frame[ic_frame.index <= pd.Timestamp(end_date)]
result = _summarize_return_frame(
return_frame=return_frame,
ann_scaler=ann_scaler,
risk_free_rate_annual=risk_free_rate_annual,
)
result.update(_summarize_ic_frame(ic_frame))
total_transaction_cost = float(sum(stats.get("transaction_cost", 0.0) for stats in yearly_trade_stats.values()))
total_gross_turnover = float(sum(stats.get("gross_turnover", 0.0) for stats in yearly_trade_stats.values()))
result.update(
{
"success": True,
"final_value": round(float(pv_series.iloc[-1]), 6),
"error": None,
"rebalance_mode": DEFAULT_QLIB_REBALANCE_MODE,
"custom_weight_mode": normalized_custom_weight_mode,
"redistribute_unfilled_cash": redistribute_unfilled_cash,
"enforce_cash_limit": enforce_cash_limit,
"transaction_cost": round(total_transaction_cost, 6),
"gross_turnover": round(total_gross_turnover, 6),
"turnover_ratio": round(total_gross_turnover / max(float(start_cash), 1e-12), 6),
"yearly_metrics": _build_yearly_metrics(
return_frame=return_frame,
ic_frame=ic_frame,
yearly_trade_stats=yearly_trade_stats,
yearly_holding_stats=yearly_holding_stats,
ann_scaler=ann_scaler,
risk_free_rate_annual=risk_free_rate_annual,
),
"trade_log": trade_records if capture_details else [],
"stock_contrib": _build_stock_contribution_summary(daily_holding_records, trade_records) if capture_details else [],
"holding_log": daily_holding_records if capture_details else [],
"portfolio_log": portfolio_log if capture_details else [],
}
)
return result
def compute_portfolio_ir(
factor_values: pd.Series,
price_df: pd.DataFrame,
bench_return: pd.Series | None = None,
top_k: int = 10,
n_drop: int = 2,
rebalance_freq: int = 5,
cost_buy: float = 0.0013,
cost_sell: float = 0.0013,
hold_thresh: int = 2,
label_forward_days: int = 5,
ann_scaler: int = 252,
start_date: str | None = None,
end_date: str | None = None,
risk_free_rate_annual: float = 0.0,
start_cash: float = 200_000_000.0,
position_size: float = 1.0,
max_pos_each_stock: float = 1.0,
lot_size: int = 100,
max_daily_volume_participation: float = 0.0,
max_daily_amount_participation: float = 0.0,
capture_details: bool = False,
engine: str = "custom",
trade_guard_config: dict[str, Any] | bool | None = None,
rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE,
custom_weight_mode: str = "equal",
redistribute_unfilled_cash: bool = False,
enforce_cash_limit: bool = False,
) -> dict:
normalized_engine = _normalize_backtest_engine(engine)
if normalized_engine == "custom":
return _compute_portfolio_ir_custom(
factor_values=factor_values,
price_df=price_df,
bench_return=bench_return,
top_k=top_k,
n_drop=n_drop,
rebalance_freq=rebalance_freq,
cost_buy=cost_buy,
cost_sell=cost_sell,
hold_thresh=hold_thresh,
label_forward_days=label_forward_days,
ann_scaler=ann_scaler,
start_date=start_date,
end_date=end_date,
risk_free_rate_annual=risk_free_rate_annual,
start_cash=start_cash,
position_size=position_size,
max_pos_each_stock=max_pos_each_stock,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
capture_details=capture_details,
custom_weight_mode=custom_weight_mode,
redistribute_unfilled_cash=redistribute_unfilled_cash,
enforce_cash_limit=enforce_cash_limit,
)
if normalized_engine == "qlib_original":
return _compute_portfolio_ir_qlib(
factor_values=factor_values,
price_df=price_df,
bench_return=bench_return,
top_k=top_k,
n_drop=n_drop,
rebalance_freq=rebalance_freq,
cost_buy=cost_buy,
cost_sell=cost_sell,
hold_thresh=hold_thresh,
label_forward_days=label_forward_days,
ann_scaler=ann_scaler,
start_date=start_date,
end_date=end_date,
risk_free_rate_annual=risk_free_rate_annual,
start_cash=start_cash,
position_size=position_size,
max_pos_each_stock=max_pos_each_stock,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
capture_details=capture_details,
trade_guard_config=trade_guard_config,
rebalance_mode=rebalance_mode,
)
if normalized_engine in {"spec_shares_cash", "spec_return_based"}:
from backtest.spec_bridge_backtester import compute_portfolio_ir_spec_bridge
return compute_portfolio_ir_spec_bridge(
factor_values=factor_values,
price_df=price_df,
bench_return=bench_return,
top_k=top_k,
n_drop=n_drop,
rebalance_freq=rebalance_freq,
cost_buy=cost_buy,
cost_sell=cost_sell,
hold_thresh=hold_thresh,
label_forward_days=label_forward_days,
ann_scaler=ann_scaler,
start_date=start_date,
end_date=end_date,
risk_free_rate_annual=risk_free_rate_annual,
start_cash=start_cash,
position_size=position_size,
max_pos_each_stock=max_pos_each_stock,
lot_size=lot_size,
max_daily_volume_participation=max_daily_volume_participation,
max_daily_amount_participation=max_daily_amount_participation,
capture_details=capture_details,
mode="shares_cash" if normalized_engine == "spec_shares_cash" else "return_based",
trade_guard_config=trade_guard_config,
rebalance_mode=rebalance_mode,
)
return _empty_result(f"Unsupported backtest engine: {engine}")
def _empty_result(error: str) -> dict:
return {
"success": False,
"ir": 0.0,
"ic_mean": 0.0,
"ic_std": 0.0,
"icir": 0.0,
"rank_ic_mean": 0.0,
"rank_ic_std": 0.0,
"rank_icir": 0.0,
"n_ic_days": 0,
"annualized_return": 0.0,
"annualized_volatility": 0.0,
"sharpe": 0.0,
"winrate": 0.0,
"mdd": 0.0,
"excess_mdd": 0.0,
"portfolio_nav_mdd": 0.0,
"drawdown_duration_max": 0,
"drawdown_duration_mean": 0.0,
"drawdown_duration_median": 0.0,
"total_return": 0.0,
"performance_return": 0.0,
"benchmark_performance_return": 0.0,
"excess_compounded_return": 0.0,
"mean_daily_return": 0.0,
"std_daily_return": 0.0,
"n_days": 0,
"final_value": 1.0,
"yearly_metrics": {},
"trade_log": [],
"stock_contrib": [],
"holding_log": [],
"portfolio_log": [],
"error": error,
}