| """Robustness-only duplicate of the single-factor portfolio backtester. |
| |
| Strictly matches AlphaAgent's conf_vn_combined_kdd_ver.yaml: |
| |
| Strategy: ForceExitTopkDropoutStrategy |
| - topk: 10 (absolute, not percentage) |
| - n_drop: 2 |
| - hold_thresh: 2 (T+2 settlement) |
| - rebalance_mode="dropout" keeps legacy TopKDropout behavior |
| - rebalance_mode="sell_all" sells all sellable holdings, then buys the new alpha top-k |
| - rebalance_mode="target_weight" syncs holdings to alpha top-k target weights |
| |
| Execution: |
| - Buy at OPEN price on rebalance dates |
| - Sell at CLOSE price on rebalance dates |
| - Optional trade guards are disabled by default and enabled only via trade_guard_config |
| - Signal uses data up to T-1 |
| |
| Costs: |
| - open_cost: 0.13% (buy) |
| - close_cost: 0.13% (sell) |
| |
| Metrics (Qlib "sum" mode): |
| - IR = mean(daily_excess_return_vs_benchmark) / std(daily_excess_return_vs_benchmark) * sqrt(252) |
| - Sharpe = mean(daily_return - daily_risk_free_rate) / std(daily_return - daily_risk_free_rate) * sqrt(252) |
| - annualized_return = mean daily excess return * 252 |
| - performance_return = compounded portfolio return over the evaluated window |
| - benchmark_performance_return = compounded benchmark return over the evaluated window |
| - excess_compounded_return = performance_return - benchmark_performance_return |
| - max_drawdown = (cumsum(excess_return) - cummax(cumsum(excess_return))).min() |
| - portfolio_nav_mdd = ((cumprod(1 + portfolio_return) / cummax(cumprod(1 + portfolio_return))) - 1).min() |
| - ann_scaler: 252 |
| |
| Additional factor metrics: |
| - IC = mean daily cross-sectional Pearson correlation between factor_t and |
| close-to-close forward return over the next `label_forward_days` |
| - ICIR = IC / std(IC) |
| - RankIC = mean daily cross-sectional Spearman correlation |
| - RankICIR = RankIC / std(RankIC) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import contextlib |
| import copy |
| from collections import defaultdict |
| import hashlib |
| import io |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from typing import Any |
| import warnings |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parent.parent |
| _QLIB_CACHE_ROOT: Path | None = None |
| _QLIB_RUNTIME_STATE: dict[str, str | None] = {"provider_uri": None} |
| _PROVIDER_SIGNATURE_CACHE: dict[int, str] = {} |
| _PRICE_WIDE_CACHE: dict[tuple[int, str], pd.DataFrame] = {} |
| _AVAILABLE_DATES_CACHE: dict[int, pd.DatetimeIndex] = {} |
|
|
|
|
| YEAR_REGIMES = { |
| 2022: "bearish", |
| 2025: "bullish", |
| } |
|
|
| DEFAULT_VN_TRADE_GUARD_CONFIG: dict[str, Any] = { |
| "trend_ema_span": 20, |
| "force_exit_trend_days": 3, |
| "buy_chase_ctc_thresh": 0.05, |
| "buy_chase_intraday_thresh": 0.04, |
| "panic_drop_ctc_thresh": 0.05, |
| "panic_drop_intraday_thresh": 0.04, |
| "extreme_range_thresh": 0.10, |
| "amount_ma_window": 20, |
| "amount_buy_min_ratio": 1.0, |
| "amount_force_exit_ratio": 0.5, |
| } |
|
|
| DEFAULT_QLIB_REBALANCE_MODE = "dropout" |
| QLIB_REBALANCE_MODE_ALIASES = { |
| "dropout": "dropout", |
| "topk_dropout": "dropout", |
| "original": "dropout", |
| "sell_all": "sell_all", |
| "sell_all_buy_topk": "sell_all", |
| "full_liquidate": "sell_all", |
| "full_rebalance": "sell_all", |
| "target_weight": "target_weight", |
| "target_weight_topk": "target_weight", |
| "sync_topk": "target_weight", |
| "weight_sync": "target_weight", |
| } |
|
|
|
|
| def _market_regime(year: int) -> str: |
| return YEAR_REGIMES.get(int(year), "neutral") |
|
|
|
|
| def _normalize_rebalance_mode(mode: str | None) -> str: |
| key = (mode or DEFAULT_QLIB_REBALANCE_MODE).strip().lower() |
| if key in ("", "none", "null"): |
| key = DEFAULT_QLIB_REBALANCE_MODE |
| try: |
| return QLIB_REBALANCE_MODE_ALIASES[key] |
| except KeyError as exc: |
| choices = ", ".join(sorted(set(QLIB_REBALANCE_MODE_ALIASES.values()))) |
| raise ValueError(f"Unsupported qlib rebalance_mode={mode!r}. Choose one of: {choices}") from exc |
|
|
|
|
| def _normalize_trade_guard_config(config: dict[str, Any] | bool | None) -> dict[str, Any] | None: |
| if config is None or config is False: |
| return None |
| if config is True: |
| return dict(DEFAULT_VN_TRADE_GUARD_CONFIG) |
| if not isinstance(config, dict): |
| raise TypeError("trade_guard_config must be a dict, True, False, or None") |
| normalized = dict(DEFAULT_VN_TRADE_GUARD_CONFIG) |
| normalized.update(config) |
| return normalized |
|
|
|
|
| def _cfg_float(config: dict[str, Any], key: str) -> float | None: |
| value = config.get(key) |
| if value is None: |
| return None |
| return float(value) |
|
|
|
|
| def _cfg_int(config: dict[str, Any], key: str) -> int | None: |
| value = config.get(key) |
| if value is None: |
| return None |
| return int(value) |
|
|
|
|
| def _get_frame_value(frame: pd.DataFrame | None, date: pd.Timestamp, inst: str, default: float = np.nan) -> float: |
| if frame is None: |
| return float(default) |
| try: |
| value = frame.loc[date, inst] |
| except (KeyError, TypeError): |
| return float(default) |
| if pd.isna(value): |
| return float(default) |
| return float(value) |
|
|
|
|
| def _compute_daily_cross_sectional_ic_frame( |
| factor_values: pd.Series, |
| close_prices: pd.DataFrame, |
| label_forward_days: int = 5, |
| ) -> pd.DataFrame: |
| """Compute daily cross-sectional IC / RankIC series on forward close returns.""" |
| empty_ic_frame = pd.DataFrame( |
| { |
| "ic": pd.Series(dtype=float), |
| "rank_ic": pd.Series(dtype=float), |
| }, |
| index=pd.DatetimeIndex([], name="date"), |
| ) |
| if isinstance(factor_values, pd.DataFrame): |
| factor_values = factor_values.iloc[:, 0] |
|
|
| factor_wide = factor_values.unstack("instrument").sort_index() |
| close_wide = close_prices.sort_index() |
|
|
| common_dates = factor_wide.index.intersection(close_wide.index) |
| if len(common_dates) <= label_forward_days: |
| return empty_ic_frame |
|
|
| factor_wide = factor_wide.reindex(common_dates) |
| close_wide = close_wide.reindex(common_dates) |
| forward_returns = close_wide.shift(-label_forward_days) / close_wide - 1.0 |
|
|
| rows: list[dict[str, Any]] = [] |
| last_valid_idx = len(common_dates) - label_forward_days |
| for idx in range(last_valid_idx): |
| date = common_dates[idx] |
| factor_row = factor_wide.iloc[idx] |
| ret_row = forward_returns.iloc[idx] |
| valid = factor_row.notna() & ret_row.notna() |
| if valid.sum() < 2: |
| continue |
|
|
| x = factor_row[valid] |
| y = ret_row[valid] |
| if x.nunique(dropna=True) < 2 or y.nunique(dropna=True) < 2: |
| continue |
|
|
| ic = x.corr(y, method="pearson") |
| rank_ic = x.corr(y, method="spearman") |
| rows.append( |
| { |
| "date": date, |
| "ic": float(ic) if pd.notna(ic) else np.nan, |
| "rank_ic": float(rank_ic) if pd.notna(rank_ic) else np.nan, |
| } |
| ) |
|
|
| if not rows: |
| return empty_ic_frame |
|
|
| return pd.DataFrame(rows).set_index("date").sort_index() |
|
|
|
|
| def _summarize_ic_frame(ic_frame: pd.DataFrame) -> dict[str, Any]: |
| if ic_frame is None or ic_frame.empty: |
| return { |
| "ic_mean": 0.0, |
| "ic_std": 0.0, |
| "icir": 0.0, |
| "rank_ic_mean": 0.0, |
| "rank_ic_std": 0.0, |
| "rank_icir": 0.0, |
| "n_ic_days": 0, |
| } |
|
|
| ic_values = ic_frame["ic"].dropna() |
| rank_values = ic_frame["rank_ic"].dropna() |
|
|
| ic_mean = float(ic_values.mean()) if len(ic_values) else 0.0 |
| ic_std = float(ic_values.std(ddof=1)) if len(ic_values) > 1 else 0.0 |
| rank_ic_mean = float(rank_values.mean()) if len(rank_values) else 0.0 |
| rank_ic_std = float(rank_values.std(ddof=1)) if len(rank_values) > 1 else 0.0 |
|
|
| return { |
| "ic_mean": round(ic_mean, 6), |
| "ic_std": round(ic_std, 6), |
| "icir": round(ic_mean / (ic_std + 1e-12), 6) if len(ic_values) > 1 else 0.0, |
| "rank_ic_mean": round(rank_ic_mean, 6), |
| "rank_ic_std": round(rank_ic_std, 6), |
| "rank_icir": round(rank_ic_mean / (rank_ic_std + 1e-12), 6) if len(rank_values) > 1 else 0.0, |
| "n_ic_days": int(len(ic_values)), |
| } |
|
|
|
|
| def _summarize_return_frame( |
| return_frame: pd.DataFrame, |
| ann_scaler: int = 252, |
| risk_free_rate_annual: float = 0.0, |
| ) -> dict[str, Any]: |
| def _drawdown_duration_stats(cumulative_excess: np.ndarray) -> tuple[int, float, float]: |
| if len(cumulative_excess) == 0: |
| return 0, 0.0, 0.0 |
| running_max = np.maximum.accumulate(cumulative_excess) |
| in_drawdown = cumulative_excess < running_max - 1e-12 |
| durations: list[int] = [] |
| current = 0 |
| for flag in in_drawdown: |
| if flag: |
| current += 1 |
| elif current > 0: |
| durations.append(current) |
| current = 0 |
| if current > 0: |
| durations.append(current) |
| if not durations: |
| return 0, 0.0, 0.0 |
| return int(max(durations)), float(np.mean(durations)), float(np.median(durations)) |
|
|
| if return_frame is None or return_frame.empty: |
| return { |
| "ir": 0.0, |
| "annualized_return": 0.0, |
| "annualized_volatility": 0.0, |
| "sharpe": 0.0, |
| "winrate": 0.0, |
| "mdd": 0.0, |
| "excess_mdd": 0.0, |
| "portfolio_nav_mdd": 0.0, |
| "drawdown_duration_max": 0, |
| "drawdown_duration_mean": 0.0, |
| "drawdown_duration_median": 0.0, |
| "total_return": 0.0, |
| "performance_return": 0.0, |
| "benchmark_performance_return": 0.0, |
| "excess_compounded_return": 0.0, |
| "mean_daily_return": 0.0, |
| "std_daily_return": 0.0, |
| "n_days": 0, |
| } |
|
|
| portfolio_returns = return_frame["portfolio_return"].astype(float) |
| benchmark_returns = return_frame["benchmark_return"].astype(float) |
| excess_returns = portfolio_returns - benchmark_returns |
|
|
| daily_rf = float(risk_free_rate_annual) / float(ann_scaler) |
| risk_free_adjusted = portfolio_returns - daily_rf |
|
|
| mean_excess = float(excess_returns.mean()) |
| std_excess = float(excess_returns.std(ddof=1)) if len(excess_returns) > 1 else 1e-8 |
| mean_rf = float(risk_free_adjusted.mean()) |
| std_rf = float(risk_free_adjusted.std(ddof=1)) if len(risk_free_adjusted) > 1 else 1e-8 |
|
|
| performance_return = float((1.0 + portfolio_returns).prod() - 1.0) |
| benchmark_performance_return = float((1.0 + benchmark_returns).prod() - 1.0) |
| excess_compounded_return = performance_return - benchmark_performance_return |
| annualized_return = mean_excess * ann_scaler |
| information_ratio = mean_excess / (std_excess + 1e-12) * np.sqrt(ann_scaler) |
| sharpe_ratio = mean_rf / (std_rf + 1e-12) * np.sqrt(ann_scaler) |
| winrate = float(np.mean(excess_returns > 0)) if len(excess_returns) > 0 else 0.0 |
|
|
| cumulative_excess = np.cumsum(excess_returns.to_numpy(dtype=float)) |
| if len(cumulative_excess): |
| max_drawdown = float((cumulative_excess - np.maximum.accumulate(cumulative_excess)).min()) |
| else: |
| max_drawdown = 0.0 |
| drawdown_duration_max, drawdown_duration_mean, drawdown_duration_median = _drawdown_duration_stats(cumulative_excess) |
|
|
| nav = np.cumprod(1.0 + portfolio_returns.to_numpy(dtype=float)) |
| if len(nav): |
| nav_peak = np.maximum.accumulate(nav) |
| portfolio_nav_mdd = float(np.min(nav / (nav_peak + 1e-12) - 1.0)) |
| else: |
| portfolio_nav_mdd = 0.0 |
|
|
| annualized_volatility = std_excess * np.sqrt(ann_scaler) |
| total_return = float(np.sum(portfolio_returns)) |
|
|
| return { |
| "ir": round(float(information_ratio), 6), |
| "annualized_return": round(float(annualized_return), 6), |
| "annualized_volatility": round(float(annualized_volatility), 6), |
| "sharpe": round(float(sharpe_ratio), 6), |
| "winrate": round(float(winrate), 6), |
| "mdd": round(float(max_drawdown), 6), |
| "excess_mdd": round(float(max_drawdown), 6), |
| "portfolio_nav_mdd": round(float(portfolio_nav_mdd), 6), |
| "drawdown_duration_max": int(drawdown_duration_max), |
| "drawdown_duration_mean": round(float(drawdown_duration_mean), 4), |
| "drawdown_duration_median": round(float(drawdown_duration_median), 4), |
| "total_return": round(float(total_return), 6), |
| "performance_return": round(float(performance_return), 6), |
| "benchmark_performance_return": round(float(benchmark_performance_return), 6), |
| "excess_compounded_return": round(float(excess_compounded_return), 6), |
| "mean_daily_return": round(float(mean_excess), 8), |
| "std_daily_return": round(float(std_excess), 8), |
| "n_days": int(len(portfolio_returns)), |
| } |
|
|
|
|
| def _build_yearly_metrics( |
| return_frame: pd.DataFrame, |
| ic_frame: pd.DataFrame, |
| yearly_trade_stats: dict[int, dict[str, float]], |
| yearly_holding_stats: dict[int, dict[str, float]], |
| ann_scaler: int, |
| risk_free_rate_annual: float, |
| ) -> dict[str, Any]: |
| years: set[int] = set() |
| if return_frame is not None and not return_frame.empty: |
| years.update(int(year) for year in return_frame.index.year.unique()) |
| if ic_frame is not None and not ic_frame.empty: |
| years.update(int(year) for year in ic_frame.index.year.unique()) |
| years.update(int(year) for year in yearly_trade_stats.keys()) |
| years.update(int(year) for year in yearly_holding_stats.keys()) |
|
|
| yearly_metrics: dict[str, Any] = {} |
| for year in sorted(years): |
| year_return_frame = return_frame[return_frame.index.year == year] if not return_frame.empty else pd.DataFrame() |
| year_ic_frame = ic_frame[ic_frame.index.year == year] if not ic_frame.empty else pd.DataFrame() |
|
|
| metrics = _summarize_return_frame( |
| return_frame=year_return_frame, |
| ann_scaler=ann_scaler, |
| risk_free_rate_annual=risk_free_rate_annual, |
| ) |
| metrics.update(_summarize_ic_frame(year_ic_frame)) |
|
|
| trade_stats = yearly_trade_stats.get(year, {}) |
| holding_stats = yearly_holding_stats.get(year, {}) |
| n_days = int(holding_stats.get("n_days", 0)) |
| holdings_count_sum = float(holding_stats.get("holdings_count_sum", 0.0)) |
|
|
| metrics.update( |
| { |
| "year": int(year), |
| "market_regime": _market_regime(year), |
| "avg_holdings_count": round(holdings_count_sum / n_days, 4) if n_days else 0.0, |
| "max_holdings_count": int(holding_stats.get("max_holdings_count", 0)), |
| "drawdown_duration_max": int(metrics.get("drawdown_duration_max", 0) or 0), |
| "drawdown_duration_mean": round(float(metrics.get("drawdown_duration_mean", 0.0) or 0.0), 4), |
| "drawdown_duration_median": round(float(metrics.get("drawdown_duration_median", 0.0) or 0.0), 4), |
| "buy_trades": int(trade_stats.get("buy_trades", 0)), |
| "sell_trades": int(trade_stats.get("sell_trades", 0)), |
| "shares_bought": round(float(trade_stats.get("shares_bought", 0.0)), 6), |
| "shares_sold": round(float(trade_stats.get("shares_sold", 0.0)), 6), |
| "buy_gross_notional": round(float(trade_stats.get("buy_gross_notional", 0.0)), 6), |
| "sell_gross_notional": round(float(trade_stats.get("sell_gross_notional", 0.0)), 6), |
| "buy_cash_outflow": round(float(trade_stats.get("buy_cash_outflow", 0.0)), 6), |
| "sell_net_proceeds": round(float(trade_stats.get("sell_net_proceeds", 0.0)), 6), |
| "buy_transaction_cost": round(float(trade_stats.get("buy_transaction_cost", 0.0)), 6), |
| "sell_transaction_cost": round(float(trade_stats.get("sell_transaction_cost", 0.0)), 6), |
| "transaction_cost": round(float(trade_stats.get("transaction_cost", 0.0)), 6), |
| "gross_turnover": round(float(trade_stats.get("gross_turnover", 0.0)), 6), |
| } |
| ) |
| yearly_metrics[str(year)] = metrics |
|
|
| return yearly_metrics |
|
|
|
|
| def _build_stock_contribution_summary( |
| daily_holding_records: list[dict[str, Any]], |
| trade_records: list[dict[str, Any]], |
| ) -> list[dict[str, Any]]: |
| if not daily_holding_records and not trade_records: |
| return [] |
|
|
| hold_df = pd.DataFrame(daily_holding_records) |
| trade_df = pd.DataFrame(trade_records) |
|
|
| if not hold_df.empty: |
| hold_df["date"] = pd.to_datetime(hold_df["date"]) |
| if not trade_df.empty: |
| trade_df["date"] = pd.to_datetime(trade_df["date"]) |
|
|
| years: set[int] = set() |
| if not hold_df.empty: |
| years.update(int(year) for year in hold_df["year"].dropna().unique()) |
| if not trade_df.empty: |
| years.update(int(year) for year in trade_df["year"].dropna().unique()) |
| if not years: |
| return [] |
|
|
| holdings_by_date: dict[pd.Timestamp, pd.Series] = {} |
| if not hold_df.empty: |
| for date, grp in hold_df.groupby("date"): |
| holdings_by_date[pd.Timestamp(date)] = grp.groupby("instrument")["market_value"].sum() |
|
|
| snapshot_dates = sorted(holdings_by_date.keys()) |
| rows: list[dict[str, Any]] = [] |
|
|
| for year in sorted(years): |
| year_hold = hold_df[hold_df["year"] == year] if not hold_df.empty else pd.DataFrame() |
| year_trade = trade_df[trade_df["year"] == year] if not trade_df.empty else pd.DataFrame() |
|
|
| instruments: set[str] = set() |
| if not year_hold.empty: |
| instruments.update(str(inst) for inst in year_hold["instrument"].unique()) |
| if not year_trade.empty: |
| instruments.update(str(inst) for inst in year_trade["instrument"].unique()) |
| if not instruments: |
| continue |
|
|
| year_start = pd.Timestamp(f"{year}-01-01") |
| prior_dates = [date for date in snapshot_dates if date < year_start] |
| prior_snapshot = holdings_by_date[prior_dates[-1]] if prior_dates else pd.Series(dtype=float) |
|
|
| if not year_hold.empty: |
| year_end_date = pd.Timestamp(year_hold["date"].max()) |
| end_snapshot = holdings_by_date.get(year_end_date, pd.Series(dtype=float)) |
| else: |
| end_snapshot = pd.Series(dtype=float) |
|
|
| for instrument in sorted(instruments): |
| hold_inst = year_hold[year_hold["instrument"] == instrument] if not year_hold.empty else pd.DataFrame() |
| trade_inst = year_trade[year_trade["instrument"] == instrument] if not year_trade.empty else pd.DataFrame() |
|
|
| if not trade_inst.empty: |
| filled_mask = trade_inst.get("filled_shares", trade_inst.get("shares", 0.0)).astype(float) > 0 |
| filled_trade_inst = trade_inst[filled_mask] |
| buy_inst = filled_trade_inst[filled_trade_inst["action"] == "buy"] |
| sell_inst = filled_trade_inst[filled_trade_inst["action"] == "sell"] |
| else: |
| buy_inst = pd.DataFrame() |
| sell_inst = pd.DataFrame() |
|
|
| start_value = float(prior_snapshot.get(instrument, 0.0)) if not prior_snapshot.empty else 0.0 |
| end_value = float(end_snapshot.get(instrument, 0.0)) if not end_snapshot.empty else 0.0 |
| buy_cash_outflow = float(buy_inst["cash_outflow"].sum()) if not buy_inst.empty else 0.0 |
| sell_net_proceeds = float(sell_inst["net_proceeds"].sum()) if not sell_inst.empty else 0.0 |
| realized_pnl = float(sell_inst["realized_pnl"].sum()) if not sell_inst.empty else 0.0 |
|
|
| contribution_return = end_value + sell_net_proceeds - start_value - buy_cash_outflow |
| rows.append( |
| { |
| "year": int(year), |
| "market_regime": _market_regime(year), |
| "instrument": instrument, |
| "contribution_return": round(float(contribution_return), 6), |
| "abs_contribution_return": round(abs(float(contribution_return)), 6), |
| "realized_pnl": round(float(realized_pnl), 6), |
| "start_value": round(float(start_value), 6), |
| "end_value": round(float(end_value), 6), |
| "buy_trades": int(len(buy_inst)), |
| "sell_trades": int(len(sell_inst)), |
| "shares_bought": round(float(buy_inst["shares"].sum()) if not buy_inst.empty else 0.0, 6), |
| "shares_sold": round(float(sell_inst["shares"].sum()) if not sell_inst.empty else 0.0, 6), |
| "buy_cash_outflow": round(float(buy_cash_outflow), 6), |
| "sell_net_proceeds": round(float(sell_net_proceeds), 6), |
| "holding_days": int(hold_inst["date"].nunique()) if not hold_inst.empty else 0, |
| "avg_shares_held": round(float(hold_inst["shares_held"].mean()) if not hold_inst.empty else 0.0, 6), |
| "ending_shares": round(float(hold_inst.sort_values("date")["shares_held"].iloc[-1]) if not hold_inst.empty else 0.0, 6), |
| "avg_market_value": round(float(hold_inst["market_value"].mean()) if not hold_inst.empty else 0.0, 6), |
| "max_market_value": round(float(hold_inst["market_value"].max()) if not hold_inst.empty else 0.0, 6), |
| "market_volume_sum": round(float(hold_inst["market_volume"].sum()) if not hold_inst.empty else 0.0, 6), |
| "market_amount_sum": round(float(hold_inst["market_amount"].sum()) if not hold_inst.empty else 0.0, 6), |
| } |
| ) |
|
|
| if not rows: |
| return rows |
|
|
| contrib_df = pd.DataFrame(rows) |
| contrib_df = contrib_df.sort_values(["year", "abs_contribution_return", "instrument"], ascending=[True, False, True]) |
| contrib_df["rank"] = contrib_df.groupby("year").cumcount() + 1 |
| return contrib_df.to_dict("records") |
|
|
|
|
| def _round_down_lot(shares: float, lot_size: int) -> float: |
| if not np.isfinite(shares) or shares <= 0: |
| return 0.0 |
| lot = max(int(lot_size), 1) |
| return float(np.floor(float(shares) / float(lot)) * float(lot)) |
|
|
|
|
| def _market_value_from_close( |
| close_prices: pd.DataFrame, |
| date: pd.Timestamp, |
| position: dict[str, Any], |
| instrument: str, |
| ) -> float: |
| close_px = _get_frame_value(close_prices, date, instrument, default=position.get("avg_cost_per_share", 0.0)) |
| if close_px > 0: |
| return float(position["shares"]) * float(close_px) |
| return float(position["total_cost_basis"]) |
|
|
|
|
| def _liquidity_caps( |
| *, |
| market_volume: float, |
| market_amount: float, |
| price: float, |
| lot_size: int, |
| max_daily_volume_participation: float, |
| max_daily_amount_participation: float, |
| ) -> tuple[float, float]: |
| volume_cap = np.inf |
| if np.isfinite(market_volume) and market_volume > 0 and max_daily_volume_participation > 0: |
| volume_cap = _round_down_lot(float(market_volume) * float(max_daily_volume_participation), lot_size) |
|
|
| amount_cap = np.inf |
| if np.isfinite(market_amount) and market_amount > 0 and price > 0 and max_daily_amount_participation > 0: |
| amount_cap = _round_down_lot( |
| (float(market_amount) * float(max_daily_amount_participation)) / float(price), |
| lot_size, |
| ) |
| return float(volume_cap), float(amount_cap) |
|
|
|
|
| def _clip_reason_text(reasons: list[str]) -> str: |
| deduped = [] |
| for reason in reasons: |
| if reason and reason not in deduped: |
| deduped.append(reason) |
| return ",".join(deduped) if deduped else "none" |
|
|
|
|
| CUSTOM_WEIGHT_MODE_ALIASES = { |
| "equal": "equal", |
| "equal_weight": "equal", |
| "weight_equal": "equal", |
| "alpha_score": "alpha_score", |
| "score": "alpha_score", |
| "score_weight": "alpha_score", |
| "alpha_weight": "alpha_score", |
| } |
|
|
|
|
| def _normalize_custom_weight_mode(mode: str | None) -> str: |
| key = str(mode or "equal").strip().lower() |
| if key in ("", "none", "null"): |
| key = "equal" |
| try: |
| return CUSTOM_WEIGHT_MODE_ALIASES[key] |
| except KeyError as exc: |
| choices = ", ".join(sorted(set(CUSTOM_WEIGHT_MODE_ALIASES.values()))) |
| raise ValueError(f"Unsupported custom_weight_mode={mode!r}. Choose one of: {choices}") from exc |
|
|
|
|
| def _normalize_positive_weights( |
| raw: pd.Series, |
| *, |
| total_weight: float, |
| max_pos_each_stock: float, |
| ) -> pd.Series: |
| raw = raw.replace([np.inf, -np.inf], np.nan).dropna().astype(float) |
| raw = raw[raw > 0] |
| if raw.empty: |
| return pd.Series(dtype=float) |
|
|
| total_weight = max(float(total_weight), 0.0) |
| if total_weight <= 0: |
| return pd.Series(dtype=float) |
|
|
| cap = max(float(max_pos_each_stock), 0.0) |
| effective_total_weight = total_weight |
| if cap > 0: |
| effective_total_weight = min(effective_total_weight, float(len(raw)) * cap) |
| if effective_total_weight <= 1e-12: |
| return pd.Series(dtype=float) |
|
|
| weights = raw / float(raw.sum()) * effective_total_weight |
| if cap <= 0: |
| return weights.reindex(raw.index).fillna(0.0) |
|
|
| fixed: dict[str, float] = {} |
| free = weights.copy() |
| remaining = effective_total_weight |
| for _ in range(len(weights)): |
| over = free[free > cap] |
| if over.empty: |
| break |
| for code in over.index: |
| fixed[str(code)] = cap |
| free = free.drop(index=over.index) |
| remaining = max(effective_total_weight - float(sum(fixed.values())), 0.0) |
| if free.empty or remaining <= 1e-12: |
| free = pd.Series(dtype=float) |
| break |
| if float(free.sum()) <= 1e-12: |
| free = pd.Series(dtype=float) |
| break |
| free = free / float(free.sum()) * remaining |
|
|
| out = pd.Series(fixed, dtype=float) |
| if not free.empty: |
| out = pd.concat([out, free]) |
| return out.reindex(raw.index).fillna(0.0) |
|
|
|
|
| def _build_custom_target_weights( |
| *, |
| target_scores: pd.Series, |
| target_names: list[str], |
| total_weight: float, |
| max_pos_each_stock: float, |
| weight_mode: str, |
| ) -> pd.Series: |
| ordered_names = [str(code) for code in target_names] |
| if not ordered_names: |
| return pd.Series(dtype=float) |
|
|
| normalized_mode = _normalize_custom_weight_mode(weight_mode) |
| if normalized_mode == "equal": |
| raw = pd.Series(1.0, index=ordered_names, dtype=float) |
| else: |
| ordered_scores = pd.to_numeric(target_scores.reindex(ordered_names), errors="coerce") |
| if ordered_scores.isna().any(): |
| raw = pd.Series(1.0, index=ordered_names, dtype=float) |
| else: |
| spread = float(ordered_scores.max() - ordered_scores.min()) |
| if spread <= 1e-12: |
| raw = pd.Series(1.0, index=ordered_names, dtype=float) |
| else: |
| |
| |
| |
| raw = (ordered_scores - float(ordered_scores.min())) + spread * 0.05 |
|
|
| weights = _normalize_positive_weights( |
| raw, |
| total_weight=total_weight, |
| max_pos_each_stock=max_pos_each_stock, |
| ) |
| if weights.empty: |
| return weights |
| return weights.reindex(ordered_names).fillna(0.0) |
|
|
|
|
| def _normalize_backtest_engine(engine: str | None) -> str: |
| value = str(engine or "custom").strip().lower() |
| aliases = { |
| "custom": "custom", |
| "qlib_custom": "custom", |
| "qlib_customize": "custom", |
| "qlib_customized": "custom", |
| "customized": "custom", |
| "original": "qlib_original", |
| "qlib": "qlib_original", |
| "qlib_original": "qlib_original", |
| "spec": "spec_shares_cash", |
| "spec_cash": "spec_shares_cash", |
| "spec_shares_cash": "spec_shares_cash", |
| "qlib_spec": "spec_shares_cash", |
| "spec_return": "spec_return_based", |
| "spec_return_based": "spec_return_based", |
| "qlib_spec_return_based": "spec_return_based", |
| } |
| return aliases.get(value, value) |
|
|
|
|
| def _resolve_writable_cache_root() -> Path: |
| candidates: list[Path] = [] |
|
|
| env_override = os.getenv("ALPHAEVO_QLIB_CACHE_ROOT") or os.getenv("QLIB_CACHE_ROOT") |
| if env_override: |
| candidates.append(Path(env_override).expanduser()) |
|
|
| if Path("/kaggle/working").exists(): |
| candidates.append(Path("/kaggle/working") / ".cache" / "qlib_vn") |
|
|
| candidates.extend( |
| [ |
| PROJECT_ROOT / ".cache" / "qlib_vn", |
| Path.home() / ".cache" / "alphaevo" / "qlib_vn", |
| Path("/tmp") / "alphaevo_qlib_vn", |
| ] |
| ) |
|
|
| tried: list[str] = [] |
| for root in candidates: |
| root = root.resolve() |
| tried.append(str(root)) |
| try: |
| root.mkdir(parents=True, exist_ok=True) |
| probe = root / ".write_probe" |
| probe.write_text("ok", encoding="utf-8") |
| probe.unlink(missing_ok=True) |
| return root |
| except OSError: |
| continue |
|
|
| raise RuntimeError( |
| "Could not find a writable cache directory for qlib_original. Tried: " |
| + ", ".join(tried) |
| ) |
|
|
|
|
| def _get_qlib_cache_root() -> Path: |
| global _QLIB_CACHE_ROOT |
| if _QLIB_CACHE_ROOT is None: |
| _QLIB_CACHE_ROOT = _resolve_writable_cache_root() |
| return _QLIB_CACHE_ROOT |
|
|
|
|
| @contextlib.contextmanager |
| def _provider_build_lock(lock_path: Path, timeout_sec: float = 900.0): |
| lock_path.parent.mkdir(parents=True, exist_ok=True) |
| start = time.monotonic() |
| fd: int | None = None |
| while fd is None: |
| try: |
| fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR) |
| os.write(fd, f"{os.getpid()}\n".encode("utf-8")) |
| except FileExistsError: |
| try: |
| age = time.time() - lock_path.stat().st_mtime |
| if age > timeout_sec: |
| lock_path.unlink(missing_ok=True) |
| continue |
| except FileNotFoundError: |
| continue |
| if time.monotonic() - start > timeout_sec: |
| raise TimeoutError(f"Timed out waiting for Qlib provider cache lock: {lock_path}") |
| time.sleep(0.25) |
|
|
| try: |
| yield |
| finally: |
| if fd is not None: |
| os.close(fd) |
| lock_path.unlink(missing_ok=True) |
|
|
|
|
| @contextlib.contextmanager |
| def _quiet_runtime_io() -> Any: |
| stdout_buffer = io.StringIO() |
| stderr_buffer = io.StringIO() |
| with contextlib.redirect_stdout(stdout_buffer), contextlib.redirect_stderr(stderr_buffer): |
| yield stdout_buffer, stderr_buffer |
|
|
|
|
| def _import_qlib_runtime() -> dict[str, Any]: |
| try: |
| with _quiet_runtime_io(): |
| import qlib |
| from qlib.backtest import CommonInfrastructure, backtest_loop, get_exchange |
| from qlib.backtest.account import Account |
| from qlib.backtest.decision import Order, OrderDir, TradeDecisionWO |
| from qlib.backtest.executor import SimulatorExecutor |
| from qlib.backtest.position import Position |
| from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy |
| from qlib.log import set_global_logger_level_cm |
| except Exception as exc: |
| raise RuntimeError( |
| "qlib_original requires the real Qlib package/runtime to be available in the environment" |
| ) from exc |
|
|
| class ForceExitTopkDropoutStrategy(TopkDropoutStrategy): |
| """Top-k dropout with explicit rebalance cadence and VN-specific overlays. |
| |
| The wrapper stays close to Qlib's original TopkDropoutStrategy: |
| - keep `topk`, `n_drop`, and `hold_thresh` |
| - only rebalance every `rebalance_freq` bars |
| - restrict new buys to names that pass the precomputed `__buy_gate__` |
| - push `__force_exit__` names to the bottom of the sell ranking without |
| bypassing `n_drop` |
| - defer non-forced sells when `__defer_sell__` marks a panic-drop day |
| - avoid rebuying recently sold names for a short cooldown window |
| - hard-cap live stock count at `topk`; new buys only use pre-trade empty slots |
| - optionally run full liquidation or target-weight top-k sync modes |
| """ |
|
|
| SCORE_COL = "score" |
| BUY_GATE_COL = "__buy_gate__" |
| FORCE_EXIT_COL = "__force_exit__" |
| DEFER_SELL_COL = "__defer_sell__" |
|
|
| def __init__( |
| self, |
| *, |
| rebalance_freq: int = 5, |
| cooldown_period: int = 10, |
| rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE, |
| max_pos_each_stock: float = 1.0, |
| target_weight_eps: float = 0.001, |
| **kwargs, |
| ): |
| super().__init__(**kwargs) |
| self.rebalance_freq = max(int(rebalance_freq), 1) |
| self.cooldown_period = max(int(cooldown_period), 0) |
| self.rebalance_mode = _normalize_rebalance_mode(rebalance_mode) |
| self.max_pos_each_stock = max(float(max_pos_each_stock), 0.0) |
| self.target_weight_eps = max(float(target_weight_eps), 0.0) |
| self._sell_cooldown_until: dict[str, int] = {} |
|
|
| def _score_to_target_weights(self, target_score: pd.Series) -> dict[str, float]: |
| target_score = target_score.replace([np.inf, -np.inf], np.nan).dropna().astype(float) |
| if target_score.empty: |
| return {} |
|
|
| spread = float(target_score.max() - target_score.min()) |
| if spread <= 1e-12: |
| raw = pd.Series(1.0, index=target_score.index, dtype=float) |
| else: |
| |
| raw = (target_score - float(target_score.min())) + spread * 0.05 |
|
|
| total_weight = max(min(float(self.risk_degree), 1.0), 0.0) |
| if raw.sum() <= 1e-12 or total_weight <= 0: |
| return {} |
|
|
| weights = raw / float(raw.sum()) * total_weight |
| cap = float(self.max_pos_each_stock) |
| if cap > 0: |
| fixed: dict[str, float] = {} |
| free = weights.copy() |
| remaining = total_weight |
| for _ in range(len(weights)): |
| over = free[free > cap] |
| if over.empty: |
| break |
| for code in over.index: |
| fixed[str(code)] = cap |
| free = free.drop(index=over.index) |
| remaining = max(total_weight - float(sum(fixed.values())), 0.0) |
| if free.empty or remaining <= 1e-12: |
| free = pd.Series(dtype=float) |
| break |
| free = free / float(free.sum()) * remaining |
| weights = pd.Series(fixed, dtype=float) |
| if not free.empty: |
| weights = pd.concat([weights, free]) |
|
|
| return {str(code): float(weight) for code, weight in weights.items() if float(weight) > 1e-12} |
|
|
| def _deal_price(self, code, start_time, end_time, direction, fallback: float = np.nan) -> float: |
| try: |
| price = self.trade_exchange.get_deal_price( |
| stock_id=code, |
| start_time=start_time, |
| end_time=end_time, |
| direction=direction, |
| ) |
| except Exception: |
| price = fallback |
| if price is None or not np.isfinite(float(price)) or float(price) <= 0: |
| return float(fallback) |
| return float(price) |
|
|
| def _round_trade_amount(self, code, amount: float, start_time, end_time) -> float: |
| if amount <= 0: |
| return 0.0 |
| try: |
| factor = self.trade_exchange.get_factor( |
| stock_id=code, |
| start_time=start_time, |
| end_time=end_time, |
| ) |
| return float(self.trade_exchange.round_amount_by_trade_unit(amount, factor)) |
| except Exception: |
| return float(amount) |
|
|
| @staticmethod |
| def _stock_amount(position: Position, code) -> float: |
| try: |
| return float(position.get_stock_amount(code=code)) |
| except Exception: |
| return 0.0 |
|
|
| @staticmethod |
| def _stock_price(position: Position, code, default: float = np.nan) -> float: |
| try: |
| price = float(position.get_stock_price(code)) |
| except Exception: |
| price = float(default) |
| return price if np.isfinite(price) and price > 0 else float(default) |
|
|
| def _is_stock_tradable(self, code, start_time, end_time, direction) -> bool: |
| try: |
| return bool( |
| self.trade_exchange.is_stock_tradable( |
| stock_id=code, |
| start_time=start_time, |
| end_time=end_time, |
| direction=None if self.forbid_all_trade_at_limit else direction, |
| ) |
| ) |
| except Exception: |
| return False |
|
|
| def _can_sell(self, position: Position, code, time_per_step) -> bool: |
| try: |
| return bool(position.get_stock_count(code, bar=time_per_step) >= self.hold_thresh) |
| except Exception: |
| return False |
|
|
| def _select_target_scores( |
| self, |
| score: pd.Series, |
| buy_gate: pd.Series, |
| force_exit: pd.Series, |
| trade_start_time, |
| trade_end_time, |
| ) -> pd.Series: |
| target_score = score.replace([np.inf, -np.inf], np.nan).dropna().astype(float) |
| if target_score.empty: |
| return target_score |
| allowed = buy_gate.reindex(target_score.index).fillna(False).astype(bool) |
| forced = force_exit.reindex(target_score.index).fillna(False).astype(bool) |
| target_score = target_score[allowed & ~forced] |
| if self._sell_cooldown_until: |
| target_score = target_score[~target_score.index.isin(self._sell_cooldown_until.keys())] |
| target_score = target_score.sort_values(ascending=False) |
|
|
| selected = [] |
| for code in target_score.index: |
| if self.only_tradable and not self._is_stock_tradable( |
| code, |
| trade_start_time, |
| trade_end_time, |
| OrderDir.BUY, |
| ): |
| continue |
| selected.append(code) |
| if len(selected) >= int(self.topk): |
| break |
| return target_score.reindex(selected) |
|
|
| def _portfolio_value( |
| self, |
| position: Position, |
| stock_list: list[str], |
| trade_start_time, |
| trade_end_time, |
| ) -> float: |
| value = float(position.get_cash()) |
| for code in stock_list: |
| amount = self._stock_amount(position, code) |
| if amount <= 0: |
| continue |
| fallback = self._stock_price(position, code) |
| price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.SELL, fallback=fallback) |
| if np.isfinite(price) and price > 0: |
| value += float(amount * price) |
| return float(value) |
|
|
| def _make_sell_order( |
| self, |
| position: Position, |
| code, |
| amount: float, |
| trade_start_time, |
| trade_end_time, |
| ): |
| amount = min(float(amount), self._stock_amount(position, code)) |
| if amount <= 1e-12: |
| return None |
| if not self._is_stock_tradable(code, trade_start_time, trade_end_time, OrderDir.SELL): |
| return None |
| sell_order = Order( |
| stock_id=code, |
| amount=amount, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=Order.SELL, |
| ) |
| return sell_order if self.trade_exchange.check_order(sell_order) else None |
|
|
| def _make_buy_order( |
| self, |
| code, |
| cash_value: float, |
| trade_start_time, |
| trade_end_time, |
| ): |
| if cash_value <= 1e-12: |
| return None |
| if not self._is_stock_tradable(code, trade_start_time, trade_end_time, OrderDir.BUY): |
| return None |
| buy_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.BUY) |
| if not np.isfinite(buy_price) or buy_price <= 0: |
| return None |
| buy_amount = self._round_trade_amount(code, float(cash_value) / float(buy_price), trade_start_time, trade_end_time) |
| if buy_amount <= 1e-12: |
| return None |
| buy_order = Order( |
| stock_id=code, |
| amount=buy_amount, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=Order.BUY, |
| ) |
| return buy_order if self.trade_exchange.check_order(buy_order) else None |
|
|
| def _generate_sell_all_decision( |
| self, |
| score: pd.Series, |
| buy_gate: pd.Series, |
| force_exit: pd.Series, |
| trade_start_time, |
| trade_end_time, |
| ): |
| current_temp: Position = copy.deepcopy(self.trade_position) |
| sell_order_list = [] |
| buy_order_list = [] |
| cash = float(current_temp.get_cash()) |
| time_per_step = self.trade_calendar.get_freq() |
|
|
| for code in list(current_temp.get_stock_list()): |
| if not self._can_sell(current_temp, code, time_per_step): |
| continue |
| amount = self._stock_amount(current_temp, code) |
| sell_order = self._make_sell_order(current_temp, code, amount, trade_start_time, trade_end_time) |
| if sell_order is None: |
| continue |
| sell_order_list.append(sell_order) |
| if self.cooldown_period > 0: |
| self._sell_cooldown_until[code] = self.trade_calendar.get_trade_step() + self.cooldown_period |
| trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order( |
| sell_order, |
| position=current_temp, |
| ) |
| cash += float(trade_val - trade_cost) |
|
|
| target_scores = self._select_target_scores(score, buy_gate, force_exit, trade_start_time, trade_end_time) |
| remaining = set(current_temp.get_stock_list()) |
| available_slots = max(int(self.topk) - len(remaining), 0) |
| buy_codes = [code for code in target_scores.index if code not in remaining][:available_slots] |
| buy_value = cash * max(min(float(self.risk_degree), 1.0), 0.0) / len(buy_codes) if buy_codes else 0.0 |
| for code in buy_codes: |
| buy_order = self._make_buy_order(code, buy_value, trade_start_time, trade_end_time) |
| if buy_order is None: |
| continue |
| buy_order_list.append(buy_order) |
| trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order( |
| buy_order, |
| position=current_temp, |
| ) |
| cash -= float(trade_val + trade_cost) |
|
|
| return TradeDecisionWO(sell_order_list + buy_order_list, self) |
|
|
| def _generate_target_weight_decision( |
| self, |
| score: pd.Series, |
| buy_gate: pd.Series, |
| force_exit: pd.Series, |
| trade_start_time, |
| trade_end_time, |
| ): |
| current_temp: Position = copy.deepcopy(self.trade_position) |
| sell_order_list = [] |
| buy_order_list = [] |
| time_per_step = self.trade_calendar.get_freq() |
|
|
| target_scores = self._select_target_scores(score, buy_gate, force_exit, trade_start_time, trade_end_time) |
| target_weights = self._score_to_target_weights(target_scores) |
| if not target_weights: |
| return TradeDecisionWO([], self) |
|
|
| current_stock_list = list(current_temp.get_stock_list()) |
| portfolio_value = self._portfolio_value(current_temp, current_stock_list, trade_start_time, trade_end_time) |
| if portfolio_value <= 1e-12: |
| return TradeDecisionWO([], self) |
|
|
| eps_value = float(portfolio_value * self.target_weight_eps) |
| for code in current_stock_list: |
| amount = self._stock_amount(current_temp, code) |
| if amount <= 0: |
| continue |
| fallback = self._stock_price(current_temp, code) |
| sell_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.SELL, fallback=fallback) |
| if not np.isfinite(sell_price) or sell_price <= 0: |
| continue |
| current_value = float(amount * sell_price) |
| target_value = float(target_weights.get(str(code), 0.0) * portfolio_value) |
| sell_value = current_value - target_value |
| if sell_value <= eps_value: |
| continue |
| if not self._can_sell(current_temp, code, time_per_step): |
| continue |
| sell_amount = self._round_trade_amount(code, sell_value / sell_price, trade_start_time, trade_end_time) |
| sell_order = self._make_sell_order(current_temp, code, sell_amount, trade_start_time, trade_end_time) |
| if sell_order is None: |
| continue |
| sell_order_list.append(sell_order) |
| if str(code) not in target_weights and self.cooldown_period > 0: |
| self._sell_cooldown_until[code] = self.trade_calendar.get_trade_step() + self.cooldown_period |
| self.trade_exchange.deal_order(sell_order, position=current_temp) |
|
|
| current_stock_after_sell = set(current_temp.get_stock_list()) |
| portfolio_value_after_sell = self._portfolio_value( |
| current_temp, |
| list(current_stock_after_sell), |
| trade_start_time, |
| trade_end_time, |
| ) |
| eps_value = float(portfolio_value_after_sell * self.target_weight_eps) |
| cash = float(current_temp.get_cash()) |
| available_new_slots = max(int(self.topk) - len(current_stock_after_sell), 0) |
|
|
| for code in target_weights: |
| fallback = self._stock_price(current_temp, code) |
| buy_price = self._deal_price(code, trade_start_time, trade_end_time, OrderDir.BUY, fallback=fallback) |
| if not np.isfinite(buy_price) or buy_price <= 0: |
| continue |
| current_amount = self._stock_amount(current_temp, code) |
| current_value = float(current_amount * buy_price) |
| target_value = float(target_weights[code] * portfolio_value_after_sell) |
| buy_value = target_value - current_value |
| if buy_value <= eps_value: |
| continue |
| if code not in current_stock_after_sell: |
| if available_new_slots <= 0: |
| continue |
| available_new_slots -= 1 |
| current_stock_after_sell.add(code) |
| buy_order = self._make_buy_order(code, min(float(buy_value), cash), trade_start_time, trade_end_time) |
| if buy_order is None: |
| continue |
| buy_order_list.append(buy_order) |
| trade_val, trade_cost, _trade_price = self.trade_exchange.deal_order( |
| buy_order, |
| position=current_temp, |
| ) |
| cash -= float(trade_val + trade_cost) |
| if cash <= 1e-12: |
| break |
|
|
| return TradeDecisionWO(sell_order_list + buy_order_list, self) |
|
|
| @staticmethod |
| def _split_signal_frame( |
| pred_score: pd.Series | pd.DataFrame, |
| ) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]: |
| if isinstance(pred_score, pd.DataFrame): |
| score = pred_score[ForceExitTopkDropoutStrategy.SCORE_COL] if ForceExitTopkDropoutStrategy.SCORE_COL in pred_score.columns else pred_score.iloc[:, 0] |
| buy_gate = ( |
| pred_score[ForceExitTopkDropoutStrategy.BUY_GATE_COL].astype(bool) |
| if ForceExitTopkDropoutStrategy.BUY_GATE_COL in pred_score.columns |
| else pd.Series(True, index=pred_score.index) |
| ) |
| force_exit = ( |
| pred_score[ForceExitTopkDropoutStrategy.FORCE_EXIT_COL].astype(bool) |
| if ForceExitTopkDropoutStrategy.FORCE_EXIT_COL in pred_score.columns |
| else pd.Series(False, index=pred_score.index) |
| ) |
| defer_sell = ( |
| pred_score[ForceExitTopkDropoutStrategy.DEFER_SELL_COL].astype(bool) |
| if ForceExitTopkDropoutStrategy.DEFER_SELL_COL in pred_score.columns |
| else pd.Series(False, index=pred_score.index) |
| ) |
| return ( |
| score.astype(float), |
| buy_gate.reindex(score.index).fillna(False), |
| force_exit.reindex(score.index).fillna(False), |
| defer_sell.reindex(score.index).fillna(False), |
| ) |
| score = pred_score.astype(float) |
| return ( |
| score, |
| pd.Series(True, index=score.index), |
| pd.Series(False, index=score.index), |
| pd.Series(False, index=score.index), |
| ) |
|
|
| def generate_trade_decision(self, execute_result=None): |
| trade_step = self.trade_calendar.get_trade_step() |
| if self.cooldown_period > 0: |
| self._sell_cooldown_until = { |
| code: until |
| for code, until in self._sell_cooldown_until.items() |
| if until > trade_step |
| } |
| if trade_step % self.rebalance_freq != 0: |
| return TradeDecisionWO([], self) |
|
|
| trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step) |
| pred_start_time, pred_end_time = self.trade_calendar.get_step_time(trade_step, shift=1) |
| pred_score_raw = self.signal.get_signal(start_time=pred_start_time, end_time=pred_end_time) |
| if pred_score_raw is None: |
| return TradeDecisionWO([], self) |
|
|
| score, buy_gate, force_exit, defer_sell = self._split_signal_frame(pred_score_raw) |
|
|
| if self.rebalance_mode == "sell_all": |
| return self._generate_sell_all_decision( |
| score, |
| buy_gate, |
| force_exit, |
| trade_start_time, |
| trade_end_time, |
| ) |
| if self.rebalance_mode == "target_weight": |
| return self._generate_target_weight_decision( |
| score, |
| buy_gate, |
| force_exit, |
| trade_start_time, |
| trade_end_time, |
| ) |
|
|
| if self.only_tradable: |
| def get_first_n(li, n, reverse=False): |
| cur_n = 0 |
| res = [] |
| for si in reversed(li) if reverse else li: |
| if self.trade_exchange.is_stock_tradable( |
| stock_id=si, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| ): |
| res.append(si) |
| cur_n += 1 |
| if cur_n >= n: |
| break |
| return res[::-1] if reverse else res |
|
|
| def get_last_n(li, n): |
| return get_first_n(li, n, reverse=True) |
|
|
| def filter_stock(li): |
| return [ |
| si |
| for si in li |
| if self.trade_exchange.is_stock_tradable( |
| stock_id=si, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| ) |
| ] |
| else: |
| def get_first_n(li, n): |
| return list(li)[:n] |
|
|
| def get_last_n(li, n): |
| return list(li)[-n:] |
|
|
| def filter_stock(li): |
| return list(li) |
|
|
| current_temp: Position = copy.deepcopy(self.trade_position) |
| sell_order_list = [] |
| buy_order_list = [] |
|
|
| cash = current_temp.get_cash() |
| current_stock_list = current_temp.get_stock_list() |
|
|
| rank_score = score.copy() |
| force_exit_index = force_exit[force_exit].index.intersection(current_stock_list) |
| if len(force_exit_index) > 0: |
| rank_score.loc[force_exit_index] = -np.inf |
|
|
| last = rank_score.reindex(current_stock_list).fillna(-np.inf).sort_values(ascending=False).index |
|
|
| outsider_score = score[~score.index.isin(last)] |
| outsider_score = outsider_score[buy_gate.reindex(outsider_score.index).fillna(False)] |
| if self._sell_cooldown_until: |
| outsider_score = outsider_score[~outsider_score.index.isin(self._sell_cooldown_until.keys())] |
| outsider_score = outsider_score.sort_values(ascending=False) |
|
|
| if self.method_buy == "top": |
| today = get_first_n( |
| outsider_score.index, |
| self.n_drop + self.topk - len(last), |
| ) |
| elif self.method_buy == "random": |
| topk_candi = get_first_n(outsider_score.index, self.topk) |
| candi = list(filter(lambda x: x not in last, topk_candi)) |
| n = self.n_drop + self.topk - len(last) |
| try: |
| today = np.random.choice(candi, n, replace=False) |
| except ValueError: |
| today = candi |
| else: |
| raise NotImplementedError(f"This type of input is not supported") |
|
|
| comb = rank_score.reindex(last.union(pd.Index(today))).fillna(-np.inf).sort_values(ascending=False).index |
|
|
| over_cap_count = max(len(current_stock_list) - int(self.topk), 0) |
| hard_cap_sell = pd.Index(get_last_n(last, over_cap_count)) if over_cap_count > 0 else pd.Index([]) |
|
|
| if self.method_sell == "bottom": |
| sell = last[last.isin(get_last_n(comb, self.n_drop))] |
| elif self.method_sell == "random": |
| candi = filter_stock(last) |
| try: |
| sell = pd.Index(np.random.choice(candi, self.n_drop, replace=False) if len(last) else []) |
| except ValueError: |
| sell = candi |
| else: |
| raise NotImplementedError(f"This type of input is not supported") |
| if len(hard_cap_sell) > 0: |
| sell = pd.Index(list(dict.fromkeys(list(sell) + list(hard_cap_sell)))) |
|
|
| filtered_sell = [] |
| hard_cap_sell_set = set(hard_cap_sell) |
| time_per_step = self.trade_calendar.get_freq() |
| for code in sell: |
| is_force_exit = bool(force_exit.reindex([code]).fillna(False).iloc[0]) |
| should_defer = bool(defer_sell.reindex([code]).fillna(False).iloc[0]) |
| is_hard_cap_exit = code in hard_cap_sell_set |
| if should_defer and not is_force_exit and not is_hard_cap_exit: |
| continue |
| if ( |
| not is_hard_cap_exit |
| and current_temp.get_stock_count(code, bar=time_per_step) < self.hold_thresh |
| ): |
| continue |
| filtered_sell.append(code) |
| sell = pd.Index(filtered_sell) |
| pre_trade_slots = max(int(self.topk) - len(current_stock_list), 0) |
| buy = today[:pre_trade_slots] |
|
|
| for code in current_stock_list: |
| if not self.trade_exchange.is_stock_tradable( |
| stock_id=code, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=None if self.forbid_all_trade_at_limit else OrderDir.SELL, |
| ): |
| continue |
| if code in sell: |
| if ( |
| code not in hard_cap_sell_set |
| and current_temp.get_stock_count(code, bar=time_per_step) < self.hold_thresh |
| ): |
| continue |
| sell_amount = current_temp.get_stock_amount(code=code) |
| sell_order = Order( |
| stock_id=code, |
| amount=sell_amount, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=Order.SELL, |
| ) |
| if self.trade_exchange.check_order(sell_order): |
| sell_order_list.append(sell_order) |
| if self.cooldown_period > 0: |
| self._sell_cooldown_until[code] = trade_step + self.cooldown_period |
| trade_val, trade_cost, trade_price = self.trade_exchange.deal_order( |
| sell_order, |
| position=current_temp, |
| ) |
| cash += trade_val - trade_cost |
|
|
| value = cash * self.risk_degree / len(buy) if len(buy) > 0 else 0 |
| for code in buy: |
| if not self.trade_exchange.is_stock_tradable( |
| stock_id=code, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=None if self.forbid_all_trade_at_limit else OrderDir.BUY, |
| ): |
| continue |
| buy_price = self.trade_exchange.get_deal_price( |
| stock_id=code, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=OrderDir.BUY, |
| ) |
| buy_amount = value / buy_price |
| factor = self.trade_exchange.get_factor( |
| stock_id=code, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| ) |
| buy_amount = self.trade_exchange.round_amount_by_trade_unit(buy_amount, factor) |
| buy_order = Order( |
| stock_id=code, |
| amount=buy_amount, |
| start_time=trade_start_time, |
| end_time=trade_end_time, |
| direction=Order.BUY, |
| ) |
| buy_order_list.append(buy_order) |
| return TradeDecisionWO(sell_order_list + buy_order_list, self) |
|
|
| return { |
| "qlib": qlib, |
| "Account": Account, |
| "CommonInfrastructure": CommonInfrastructure, |
| "SimulatorExecutor": SimulatorExecutor, |
| "TopkDropoutStrategy": TopkDropoutStrategy, |
| "ForceExitTopkDropoutStrategy": ForceExitTopkDropoutStrategy, |
| "backtest_loop": backtest_loop, |
| "get_exchange": get_exchange, |
| "set_global_logger_level_cm": set_global_logger_level_cm, |
| } |
|
|
|
|
| def _resolve_qlib_dump_script(qlib_module: Any) -> Path: |
| env_override = os.getenv("ALPHAEVO_QLIB_DUMP_SCRIPT") |
| if env_override: |
| override_path = Path(env_override).expanduser().resolve() |
| if override_path.exists(): |
| return override_path |
|
|
| qlib_file = Path(qlib_module.__file__).resolve() |
| candidates = [ |
| PROJECT_ROOT / "backtest" / "vendor" / "qlib_dump_bin.py", |
| qlib_file.parent.parent / "scripts" / "dump_bin.py", |
| qlib_file.parent / "scripts" / "dump_bin.py", |
| qlib_file.parent.parent / "qlib" / "scripts" / "dump_bin.py", |
| qlib_file.parents[2] / "scripts" / "dump_bin.py" if len(qlib_file.parents) > 2 else None, |
| ] |
|
|
| tried: list[str] = [] |
| for candidate in candidates: |
| if candidate is None: |
| continue |
| candidate = candidate.resolve() |
| tried.append(str(candidate)) |
| if candidate.exists(): |
| return candidate |
|
|
| for root in [qlib_file.parent, qlib_file.parent.parent]: |
| try: |
| for candidate in root.rglob("dump_bin.py"): |
| candidate = candidate.resolve() |
| tried.append(str(candidate)) |
| if candidate.exists(): |
| return candidate |
| except OSError: |
| continue |
|
|
| raise RuntimeError( |
| "Could not locate Qlib dump_bin.py. Tried: " + ", ".join(dict.fromkeys(tried)) |
| ) |
|
|
|
|
| def _provider_signature(price_df: pd.DataFrame) -> str: |
| cache_key = id(price_df) |
| cached = _PROVIDER_SIGNATURE_CACHE.get(cache_key) |
| if cached is not None: |
| return cached |
|
|
| reset_df = price_df.reset_index() |
| dates = pd.to_datetime(reset_df["datetime"]) |
| summary = { |
| "rows": int(len(reset_df)), |
| "instruments": int(reset_df["instrument"].nunique()), |
| "start": dates.min().strftime("%Y-%m-%d") if len(reset_df) else "NA", |
| "end": dates.max().strftime("%Y-%m-%d") if len(reset_df) else "NA", |
| "cols": sorted(str(col) for col in price_df.columns), |
| "close_sum_head": round(float(pd.to_numeric(reset_df.get("$close"), errors="coerce").fillna(0.0).head(5000).sum()), 6), |
| "volume_sum_head": round(float(pd.to_numeric(reset_df.get("$volume"), errors="coerce").fillna(0.0).head(5000).sum()), 6), |
| } |
| digest = hashlib.sha256(json.dumps(summary, sort_keys=True).encode("utf-8")).hexdigest() |
| signature = digest[:16] |
| _PROVIDER_SIGNATURE_CACHE[cache_key] = signature |
| return signature |
|
|
|
|
| def _get_price_wide_frame(price_df: pd.DataFrame, column: str) -> pd.DataFrame | None: |
| if column not in price_df.columns: |
| return None |
| cache_key = (id(price_df), column) |
| cached = _PRICE_WIDE_CACHE.get(cache_key) |
| if cached is not None: |
| return cached |
| frame = price_df[column].unstack("instrument") |
| _PRICE_WIDE_CACHE[cache_key] = frame |
| return frame |
|
|
|
|
| def _get_available_dates(price_df: pd.DataFrame) -> pd.DatetimeIndex: |
| cache_key = id(price_df) |
| cached = _AVAILABLE_DATES_CACHE.get(cache_key) |
| if cached is not None: |
| return cached |
| dates = pd.DatetimeIndex(price_df.index.get_level_values("datetime").unique()).sort_values() |
| _AVAILABLE_DATES_CACHE[cache_key] = dates |
| return dates |
|
|
|
|
| def _write_qlib_source_csvs(price_df: pd.DataFrame, source_dir: Path) -> None: |
| source_dir.mkdir(parents=True, exist_ok=True) |
| reset_df = price_df.reset_index().rename(columns={"instrument": "symbol", "datetime": "date"}).copy() |
| reset_df["date"] = pd.to_datetime(reset_df["date"]) |
| for raw_symbol, group in reset_df.groupby("symbol", sort=True): |
| symbol = str(raw_symbol) |
| group = group.sort_values("date").copy() |
| close_series = pd.to_numeric(group.get("$close"), errors="coerce") |
| change = close_series.pct_change(fill_method=None).replace([np.inf, -np.inf], np.nan).fillna(0.0) |
| out_df = pd.DataFrame( |
| { |
| "symbol": symbol, |
| "date": group["date"].dt.strftime("%Y-%m-%d"), |
| "open": pd.to_numeric(group.get("$open"), errors="coerce"), |
| "close": close_series, |
| "volume": pd.to_numeric(group.get("$volume"), errors="coerce").fillna(0.0), |
| "amount": pd.to_numeric(group.get("$amount"), errors="coerce").fillna(0.0), |
| "factor": 1.0, |
| "change": change, |
| } |
| ) |
| out_df.to_csv(source_dir / f"{symbol}.csv", index=False) |
|
|
|
|
| def _ensure_qlib_provider(price_df: pd.DataFrame) -> Path: |
| runtime = _import_qlib_runtime() |
| provider_key = _provider_signature(price_df) |
| cache_root = _get_qlib_cache_root() / provider_key |
| provider_dir = cache_root / "provider" |
| source_dir = cache_root / "csv" |
| ready_path = cache_root / "ready.json" |
| lock_path = cache_root.with_suffix(".lock") |
|
|
| if ready_path.exists() and provider_dir.exists(): |
| return provider_dir |
|
|
| with _provider_build_lock(lock_path): |
| if ready_path.exists() and provider_dir.exists(): |
| return provider_dir |
|
|
| if cache_root.exists(): |
| shutil.rmtree(cache_root) |
| cache_root.mkdir(parents=True, exist_ok=True) |
| _write_qlib_source_csvs(price_df, source_dir) |
|
|
| dump_script = _resolve_qlib_dump_script(runtime["qlib"]) |
| cmd = [ |
| sys.executable, |
| str(dump_script), |
| "dump_all", |
| f"--data_path={source_dir}", |
| f"--qlib_dir={provider_dir}", |
| "--freq=day", |
| "--date_field_name=date", |
| "--symbol_field_name=symbol", |
| "--include_fields=open,close,volume,amount,factor,change", |
| "--file_suffix=.csv", |
| "--max_workers=8", |
| ] |
| proc = subprocess.run(cmd, capture_output=True, text=True) |
| if proc.returncode != 0: |
| raise RuntimeError( |
| "Failed to build Qlib provider cache: " |
| + (proc.stderr.strip() or proc.stdout.strip())[:800] |
| ) |
|
|
| ready_path.write_text( |
| json.dumps( |
| { |
| "provider_key": provider_key, |
| "provider_dir": str(provider_dir), |
| }, |
| ensure_ascii=False, |
| indent=2, |
| ), |
| encoding="utf-8", |
| ) |
| return provider_dir |
|
|
|
|
| def _ensure_qlib_initialized(provider_dir: Path) -> dict[str, Any]: |
| runtime = _import_qlib_runtime() |
| provider_uri = str(provider_dir) |
| if _QLIB_RUNTIME_STATE.get("provider_uri") != provider_uri: |
| with _quiet_runtime_io(), runtime["set_global_logger_level_cm"](logging.ERROR): |
| runtime["qlib"].init( |
| provider_uri=provider_uri, |
| kernels=1, |
| joblib_backend="threading", |
| expression_cache=None, |
| dataset_cache=None, |
| redis_port=-1, |
| clear_mem_cache=False, |
| logging_level=logging.ERROR, |
| ) |
| _QLIB_RUNTIME_STATE["provider_uri"] = provider_uri |
| return runtime |
|
|
|
|
| def _prepare_qlib_signal(factor_values: pd.Series) -> pd.Series: |
| signal = factor_values.copy() |
| if isinstance(signal, pd.DataFrame): |
| signal = signal.iloc[:, 0] |
| if "datetime" in signal.index.names and "instrument" in signal.index.names: |
| signal = signal.reorder_levels(["datetime", "instrument"]).sort_index() |
| else: |
| signal = signal.sort_index() |
| signal = signal.astype(float).dropna() |
| signal.name = "score" |
| return signal |
|
|
|
|
| def _prepare_force_exit_qlib_signal( |
| factor_values: pd.Series, |
| price_df: pd.DataFrame, |
| trade_guard_config: dict[str, Any], |
| ) -> pd.DataFrame: |
| score = _prepare_qlib_signal(factor_values) |
| close_series = price_df["$close"].copy() |
| if "datetime" in close_series.index.names and "instrument" in close_series.index.names: |
| close_series = close_series.reorder_levels(["datetime", "instrument"]).sort_index() |
| else: |
| close_series = close_series.sort_index() |
| close_series = pd.to_numeric(close_series, errors="coerce") |
|
|
| buy_gate = pd.Series(True, index=score.index, dtype=bool) |
| force_exit = pd.Series(False, index=score.index, dtype=bool) |
| defer_sell = pd.Series(False, index=score.index, dtype=bool) |
|
|
| trend_ema_span = _cfg_int(trade_guard_config, "trend_ema_span") |
| force_exit_trend_days = _cfg_int(trade_guard_config, "force_exit_trend_days") |
| if trend_ema_span is not None and trend_ema_span > 0: |
| ema = close_series.groupby(level="instrument").transform( |
| lambda x: x.ewm(span=trend_ema_span, adjust=False, min_periods=1).mean() |
| ) |
| trend_ok = (close_series > ema).reindex(score.index).fillna(False) |
| buy_gate &= trend_ok |
| if force_exit_trend_days is not None and force_exit_trend_days > 0: |
| trend_bad = close_series < ema |
| trend_bad_streak = trend_bad.groupby(level="instrument").transform( |
| lambda x: x.astype(float).rolling(force_exit_trend_days, min_periods=force_exit_trend_days).sum() |
| >= force_exit_trend_days |
| ) |
| force_exit |= trend_bad_streak.reindex(score.index).fillna(False) |
|
|
| prev_close = close_series.groupby(level="instrument").shift(1) |
| close_to_close_return = (close_series / prev_close - 1.0).replace([np.inf, -np.inf], np.nan) |
|
|
| intraday_return = pd.Series(np.nan, index=close_series.index, dtype=float) |
| if "$open" in price_df.columns: |
| open_series = price_df["$open"].copy() |
| if "datetime" in open_series.index.names and "instrument" in open_series.index.names: |
| open_series = open_series.reorder_levels(["datetime", "instrument"]).sort_index() |
| else: |
| open_series = open_series.sort_index() |
| open_series = pd.to_numeric(open_series, errors="coerce") |
| intraday_return = (close_series / open_series - 1.0).replace([np.inf, -np.inf], np.nan) |
|
|
| buy_chase = pd.Series(False, index=score.index, dtype=bool) |
| buy_chase_ctc_thresh = _cfg_float(trade_guard_config, "buy_chase_ctc_thresh") |
| if buy_chase_ctc_thresh is not None: |
| buy_chase |= (close_to_close_return > abs(buy_chase_ctc_thresh)).reindex(score.index).fillna(False) |
| buy_chase_intraday_thresh = _cfg_float(trade_guard_config, "buy_chase_intraday_thresh") |
| if buy_chase_intraday_thresh is not None: |
| buy_chase |= (intraday_return > abs(buy_chase_intraday_thresh)).reindex(score.index).fillna(False) |
|
|
| panic_drop = pd.Series(False, index=score.index, dtype=bool) |
| panic_drop_ctc_thresh = _cfg_float(trade_guard_config, "panic_drop_ctc_thresh") |
| if panic_drop_ctc_thresh is not None: |
| panic_drop |= (close_to_close_return < -abs(panic_drop_ctc_thresh)).reindex(score.index).fillna(False) |
| panic_drop_intraday_thresh = _cfg_float(trade_guard_config, "panic_drop_intraday_thresh") |
| if panic_drop_intraday_thresh is not None: |
| panic_drop |= (intraday_return < -abs(panic_drop_intraday_thresh)).reindex(score.index).fillna(False) |
|
|
| extreme_range = pd.Series(False, index=score.index) |
| extreme_range_thresh = _cfg_float(trade_guard_config, "extreme_range_thresh") |
| if extreme_range_thresh is not None and "$high" in price_df.columns and "$low" in price_df.columns: |
| high_series = price_df["$high"].copy() |
| low_series = price_df["$low"].copy() |
| if "datetime" in high_series.index.names and "instrument" in high_series.index.names: |
| high_series = high_series.reorder_levels(["datetime", "instrument"]).sort_index() |
| low_series = low_series.reorder_levels(["datetime", "instrument"]).sort_index() |
| else: |
| high_series = high_series.sort_index() |
| low_series = low_series.sort_index() |
| high_series = pd.to_numeric(high_series, errors="coerce") |
| low_series = pd.to_numeric(low_series, errors="coerce") |
| day_range = ((high_series - low_series) / close_series).replace([np.inf, -np.inf], np.nan) |
| extreme_range = (day_range > abs(extreme_range_thresh)).reindex(score.index).fillna(False) |
|
|
| buy_gate &= ~(buy_chase | extreme_range) |
| defer_sell = panic_drop & ~force_exit |
|
|
| amount_ma_window = _cfg_int(trade_guard_config, "amount_ma_window") |
| if amount_ma_window is not None and amount_ma_window > 0 and "$amount" in price_df.columns: |
| amount_series = price_df["$amount"].copy() |
| if "datetime" in amount_series.index.names and "instrument" in amount_series.index.names: |
| amount_series = amount_series.reorder_levels(["datetime", "instrument"]).sort_index() |
| else: |
| amount_series = amount_series.sort_index() |
| amount_series = pd.to_numeric(amount_series, errors="coerce") |
| amount_ma20 = amount_series.groupby(level="instrument").transform( |
| lambda x: x.rolling(amount_ma_window, min_periods=1).mean() |
| ) |
| amount_buy_min_ratio = _cfg_float(trade_guard_config, "amount_buy_min_ratio") |
| if amount_buy_min_ratio is not None: |
| amount_ok = (amount_series >= (float(amount_buy_min_ratio) * amount_ma20)).reindex(score.index).fillna(False) |
| buy_gate &= amount_ok |
| amount_force_exit_ratio = _cfg_float(trade_guard_config, "amount_force_exit_ratio") |
| if amount_force_exit_ratio is not None: |
| amount_weak = (amount_series < (float(amount_force_exit_ratio) * amount_ma20)).reindex(score.index).fillna(False) |
| force_exit |= amount_weak |
| defer_sell &= ~amount_weak |
|
|
| strategy_signal = pd.DataFrame( |
| { |
| "score": score.astype(float), |
| "__buy_gate__": buy_gate.astype(bool), |
| "__force_exit__": force_exit.astype(bool), |
| "__defer_sell__": defer_sell.astype(bool), |
| }, |
| index=score.index, |
| ) |
| return strategy_signal |
|
|
|
|
| def build_signal_selection_log( |
| *, |
| factor_values: pd.Series, |
| price_df: pd.DataFrame, |
| top_k: int = 10, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| backtest_engine: str = "qlib_original", |
| trade_guard_config: dict[str, Any] | bool | None = None, |
| holding_log: list[dict[str, Any]] | None = None, |
| trade_log: list[dict[str, Any]] | None = None, |
| portfolio_log: list[dict[str, Any]] | None = None, |
| ) -> list[dict[str, Any]]: |
| normalized_engine = _normalize_backtest_engine(backtest_engine) |
| normalized_trade_guard_config = _normalize_trade_guard_config(trade_guard_config) |
|
|
| if normalized_engine == "qlib_original" and normalized_trade_guard_config is not None: |
| signal_payload = _prepare_force_exit_qlib_signal( |
| factor_values=factor_values, |
| price_df=price_df, |
| trade_guard_config=normalized_trade_guard_config, |
| ) |
| else: |
| score = _prepare_qlib_signal(factor_values) |
| signal_payload = pd.DataFrame( |
| { |
| "score": score.astype(float), |
| "__buy_gate__": True, |
| "__force_exit__": False, |
| "__defer_sell__": False, |
| }, |
| index=score.index, |
| ) |
|
|
| if signal_payload is None or len(signal_payload) == 0: |
| return [] |
|
|
| if isinstance(signal_payload, pd.Series): |
| signal_frame = signal_payload.to_frame(name="score") |
| else: |
| signal_frame = signal_payload.copy() |
| if "score" not in signal_frame.columns: |
| signal_frame["score"] = pd.to_numeric(signal_frame.iloc[:, 0], errors="coerce") |
| if "datetime" in signal_frame.index.names and "instrument" in signal_frame.index.names: |
| signal_frame = signal_frame.reorder_levels(["datetime", "instrument"]).sort_index() |
| else: |
| signal_frame = signal_frame.sort_index() |
| signal_frame = signal_frame.reset_index() |
| signal_frame["datetime"] = pd.to_datetime(signal_frame["datetime"], errors="coerce").dt.normalize() |
| signal_frame["instrument"] = signal_frame["instrument"].astype(str) |
| signal_frame["score"] = pd.to_numeric(signal_frame["score"], errors="coerce") |
| signal_frame = signal_frame.dropna(subset=["datetime", "instrument", "score"]) |
| if signal_frame.empty: |
| return [] |
|
|
| available_dates = _get_available_dates(price_df) |
| if len(available_dates) <= 1: |
| return [] |
| date_map = { |
| pd.Timestamp(available_dates[i]).normalize(): pd.Timestamp(available_dates[i + 1]).normalize() |
| for i in range(0, len(available_dates) - 1) |
| } |
| signal_frame["trade_date"] = signal_frame["datetime"].map(date_map) |
| signal_frame = signal_frame.dropna(subset=["trade_date"]).copy() |
| if signal_frame.empty: |
| return [] |
|
|
| if start_date: |
| signal_frame = signal_frame[signal_frame["trade_date"] >= pd.Timestamp(start_date)] |
| if end_date: |
| signal_frame = signal_frame[signal_frame["trade_date"] <= pd.Timestamp(end_date)] |
| if signal_frame.empty: |
| return [] |
|
|
| if portfolio_log: |
| portfolio_df = pd.DataFrame(portfolio_log).copy() |
| if not portfolio_df.empty and "date" in portfolio_df.columns: |
| portfolio_df["date"] = pd.to_datetime(portfolio_df["date"], errors="coerce").dt.normalize() |
| if "is_rebalance" in portfolio_df.columns: |
| portfolio_df["is_rebalance"] = portfolio_df["is_rebalance"].fillna(False).astype(bool) |
| rebalance_dates = set(portfolio_df.loc[portfolio_df["is_rebalance"], "date"].dropna().tolist()) |
| if rebalance_dates: |
| signal_frame = signal_frame[signal_frame["trade_date"].isin(rebalance_dates)] |
| if signal_frame.empty: |
| return [] |
|
|
| for col, default in ( |
| ("__buy_gate__", True), |
| ("__force_exit__", False), |
| ("__defer_sell__", False), |
| ): |
| if col not in signal_frame.columns: |
| signal_frame[col] = default |
| signal_frame[col] = signal_frame[col].fillna(default).astype(bool) |
|
|
| signal_frame["trade_score_rank"] = ( |
| signal_frame.groupby("trade_date", dropna=False)["score"] |
| .rank(method="first", ascending=False) |
| .astype(int) |
| ) |
| debug_top_n = max(int(top_k), 5) |
| signal_frame["top5_by_score"] = signal_frame["trade_score_rank"] <= 5 |
| signal_frame["topk_by_score"] = signal_frame["trade_score_rank"] <= int(top_k) |
|
|
| hold_summary = pd.DataFrame() |
| if holding_log: |
| hold_summary = pd.DataFrame(holding_log).copy() |
| if not hold_summary.empty and {"date", "instrument"}.issubset(hold_summary.columns): |
| hold_summary["date"] = pd.to_datetime(hold_summary["date"], errors="coerce").dt.normalize() |
| hold_summary["instrument"] = hold_summary["instrument"].astype(str) |
| for col in ["market_value", "weight", "shares_held"]: |
| if col in hold_summary.columns: |
| hold_summary[col] = pd.to_numeric(hold_summary[col], errors="coerce") |
| hold_summary = ( |
| hold_summary.groupby(["date", "instrument"], dropna=False) |
| .agg( |
| selected_eod=("instrument", "size"), |
| shares_held_eod=("shares_held", "sum"), |
| market_value_eod=("market_value", "sum"), |
| weight_eod=("weight", "sum"), |
| ) |
| .reset_index() |
| ) |
| hold_summary["selected_eod"] = hold_summary["selected_eod"].fillna(0).astype(int) > 0 |
| hold_summary["eod_hold_rank"] = ( |
| hold_summary.groupby("date", dropna=False)["market_value_eod"] |
| .rank(method="first", ascending=False) |
| ) |
|
|
| trade_summary = pd.DataFrame() |
| if trade_log: |
| trade_summary = pd.DataFrame(trade_log).copy() |
| if not trade_summary.empty and {"date", "instrument"}.issubset(trade_summary.columns): |
| trade_summary["date"] = pd.to_datetime(trade_summary["date"], errors="coerce").dt.normalize() |
| trade_summary["instrument"] = trade_summary["instrument"].astype(str) |
| for col in ["requested_shares", "filled_shares", "gross_notional", "filled_value", "fill_ratio"]: |
| if col in trade_summary.columns: |
| trade_summary[col] = pd.to_numeric(trade_summary[col], errors="coerce") |
| trade_summary["requested_shares_norm"] = pd.to_numeric( |
| trade_summary.get("requested_shares", trade_summary.get("filled_shares", trade_summary.get("shares", 0.0))), |
| errors="coerce", |
| ) |
| trade_summary["filled_shares_norm"] = pd.to_numeric( |
| trade_summary.get("filled_shares", trade_summary.get("shares", 0.0)), |
| errors="coerce", |
| ) |
| trade_summary["requested_notional_norm"] = pd.to_numeric( |
| trade_summary.get("order_value", trade_summary.get("gross_notional", trade_summary.get("filled_value", 0.0))), |
| errors="coerce", |
| ) |
| trade_summary["filled_notional_norm"] = pd.to_numeric( |
| trade_summary.get("filled_value", trade_summary.get("gross_notional", 0.0)), |
| errors="coerce", |
| ) |
| trade_summary = ( |
| trade_summary.groupby(["date", "instrument"], dropna=False) |
| .agg( |
| had_trade=("action", "size"), |
| trade_actions=("action", lambda s: "|".join(dict.fromkeys(str(v) for v in s if str(v)))), |
| requested_shares_total=("requested_shares_norm", "sum"), |
| filled_shares_total=("filled_shares_norm", "sum"), |
| requested_notional_total=("requested_notional_norm", "sum"), |
| filled_notional_total=("filled_notional_norm", "sum"), |
| fill_ratio_mean=("fill_ratio", "mean"), |
| clip_reason=("clip_reason", lambda s: "|".join(sorted({str(v) for v in s if str(v)}))), |
| ) |
| .reset_index() |
| ) |
| trade_summary["had_trade"] = trade_summary["had_trade"].fillna(0).astype(int) > 0 |
|
|
| merged = signal_frame.merge( |
| hold_summary, |
| left_on=["trade_date", "instrument"], |
| right_on=["date", "instrument"], |
| how="left", |
| ) |
| if "date" in merged.columns: |
| merged = merged.drop(columns=["date"]) |
| merged = merged.merge( |
| trade_summary, |
| left_on=["trade_date", "instrument"], |
| right_on=["date", "instrument"], |
| how="left", |
| ) |
| if "date" in merged.columns: |
| merged = merged.drop(columns=["date"]) |
|
|
| selected_source = merged["selected_eod"] if "selected_eod" in merged.columns else pd.Series(False, index=merged.index) |
| had_trade_source = merged["had_trade"] if "had_trade" in merged.columns else pd.Series(False, index=merged.index) |
| trade_actions_source = merged["trade_actions"] if "trade_actions" in merged.columns else pd.Series("", index=merged.index) |
| merged["selected_eod"] = selected_source.where(selected_source.notna(), False).astype(bool) |
| merged["had_trade"] = had_trade_source.where(had_trade_source.notna(), False).astype(bool) |
| merged["trade_actions"] = trade_actions_source.where(trade_actions_source.notna(), "").astype(str) |
|
|
| keep_mask = ( |
| (merged["trade_score_rank"] <= debug_top_n) |
| | merged["selected_eod"] |
| | merged["had_trade"] |
| ) |
| merged = merged[keep_mask].copy() |
| if merged.empty: |
| return [] |
|
|
| merged = merged.sort_values(["trade_date", "trade_score_rank", "instrument"], ascending=[True, True, True]) |
| rows: list[dict[str, Any]] = [] |
| for row in merged.to_dict("records"): |
| rows.append( |
| { |
| "signal_date": pd.Timestamp(row["datetime"]).strftime("%Y-%m-%d"), |
| "trade_date": pd.Timestamp(row["trade_date"]).strftime("%Y-%m-%d"), |
| "instrument": str(row["instrument"]), |
| "score": round(float(row.get("score", 0.0) or 0.0), 10), |
| "trade_score_rank": int(row.get("trade_score_rank", 0) or 0), |
| "top5_by_score": bool(row.get("top5_by_score", False)), |
| "topk_by_score": bool(row.get("topk_by_score", False)), |
| "buy_gate": bool(row.get("__buy_gate__", True)), |
| "force_exit": bool(row.get("__force_exit__", False)), |
| "defer_sell": bool(row.get("__defer_sell__", False)), |
| "selected_eod": bool(row.get("selected_eod", False)), |
| "eod_hold_rank": int(row["eod_hold_rank"]) if pd.notna(row.get("eod_hold_rank")) else None, |
| "shares_held_eod": round(float(row.get("shares_held_eod", 0.0) or 0.0), 6), |
| "market_value_eod": round(float(row.get("market_value_eod", 0.0) or 0.0), 6), |
| "weight_eod": round(float(row.get("weight_eod", 0.0) or 0.0), 8), |
| "had_trade": bool(row.get("had_trade", False)), |
| "trade_actions": str(row.get("trade_actions", "") or ""), |
| "requested_shares_total": round(float(row.get("requested_shares_total", 0.0) or 0.0), 6), |
| "filled_shares_total": round(float(row.get("filled_shares_total", 0.0) or 0.0), 6), |
| "requested_notional_total": round(float(row.get("requested_notional_total", 0.0) or 0.0), 6), |
| "filled_notional_total": round(float(row.get("filled_notional_total", 0.0) or 0.0), 6), |
| "fill_ratio_mean": round(float(row.get("fill_ratio_mean", 0.0) or 0.0), 6) if pd.notna(row.get("fill_ratio_mean")) else None, |
| "clip_reason": str(row.get("clip_reason", "") or ""), |
| } |
| ) |
| return rows |
|
|
|
|
| def _extract_position_amounts(position: Any) -> dict[str, float]: |
| if position is None: |
| return {} |
| return { |
| str(instrument): float(position.get_stock_amount(instrument)) |
| for instrument in position.get_stock_list() |
| if float(position.get_stock_amount(instrument)) > 0 |
| } |
|
|
|
|
| def _build_holding_log_from_qlib_positions( |
| positions_normal: dict[pd.Timestamp, Any], |
| report_normal: pd.DataFrame, |
| volume_frame: pd.DataFrame | None, |
| amount_frame: pd.DataFrame | None, |
| build_rows: bool = True, |
| ) -> tuple[list[dict[str, Any]], dict[int, dict[str, float]]]: |
| rows: list[dict[str, Any]] = [] |
| yearly_holding_stats: dict[int, dict[str, float]] = defaultdict( |
| lambda: { |
| "n_days": 0, |
| "holdings_count_sum": 0.0, |
| "max_holdings_count": 0, |
| } |
| ) |
|
|
| pos_by_date = {pd.Timestamp(date): position for date, position in positions_normal.items()} |
| for date in sorted(pos_by_date): |
| position = pos_by_date[date] |
| stock_list = [inst for inst in position.get_stock_list() if float(position.get_stock_amount(inst)) > 0] |
| year = int(date.year) |
| stats = yearly_holding_stats[year] |
| stats["n_days"] += 1 |
| stats["holdings_count_sum"] += float(len(stock_list)) |
| stats["max_holdings_count"] = max(int(stats["max_holdings_count"]), len(stock_list)) |
|
|
| if not build_rows: |
| continue |
|
|
| portfolio_value = float(report_normal.loc[date, "account"]) if date in report_normal.index else float(position.calculate_value()) |
| cash_eod = float(report_normal.loc[date, "cash"]) if date in report_normal.index else float(position.get_cash(include_settle=True)) |
|
|
| for instrument in stock_list: |
| shares_held = float(position.get_stock_amount(instrument)) |
| close_price = float(position.get_stock_price(instrument)) |
| market_value = float(shares_held * close_price) |
| rows.append( |
| { |
| "date": date.strftime("%Y-%m-%d"), |
| "year": year, |
| "market_regime": _market_regime(year), |
| "instrument": instrument, |
| "shares_held": round(float(shares_held), 6), |
| "market_value": round(float(market_value), 6), |
| "close_price": round(float(close_price), 6), |
| "market_volume": round(_get_frame_value(volume_frame, date, instrument, default=0.0), 6), |
| "market_amount": round(_get_frame_value(amount_frame, date, instrument, default=0.0), 6), |
| "portfolio_value": round(float(portfolio_value), 6), |
| "cash_eod": round(float(cash_eod), 6), |
| "weight": round(float(market_value) / float(portfolio_value), 6) if portfolio_value > 0 else 0.0, |
| } |
| ) |
|
|
| return rows, yearly_holding_stats |
|
|
|
|
| def _build_trade_log_from_qlib_positions( |
| positions_normal: dict[pd.Timestamp, Any], |
| open_prices: pd.DataFrame, |
| close_prices: pd.DataFrame, |
| volume_frame: pd.DataFrame | None, |
| amount_frame: pd.DataFrame | None, |
| cost_buy: float, |
| cost_sell: float, |
| build_rows: bool = True, |
| ) -> tuple[list[dict[str, Any]], dict[int, dict[str, float]]]: |
| rows: list[dict[str, Any]] = [] |
| yearly_trade_stats: dict[int, dict[str, float]] = defaultdict( |
| lambda: { |
| "buy_trades": 0, |
| "sell_trades": 0, |
| "shares_bought": 0.0, |
| "shares_sold": 0.0, |
| "buy_gross_notional": 0.0, |
| "sell_gross_notional": 0.0, |
| "buy_cash_outflow": 0.0, |
| "sell_net_proceeds": 0.0, |
| "buy_transaction_cost": 0.0, |
| "sell_transaction_cost": 0.0, |
| "transaction_cost": 0.0, |
| "gross_turnover": 0.0, |
| } |
| ) |
|
|
| pos_by_date = {pd.Timestamp(date): position for date, position in positions_normal.items()} |
| prev_position = None |
| prev_amounts: dict[str, float] = {} |
|
|
| for date in sorted(pos_by_date): |
| position = pos_by_date[date] |
| current_amounts = _extract_position_amounts(position) |
| holdings_count_before = int(sum(1 for value in prev_amounts.values() if value > 0)) |
| holdings_count_after = int(sum(1 for value in current_amounts.values() if value > 0)) |
|
|
| for instrument in sorted(set(prev_amounts) | set(current_amounts)): |
| current_shares = float(prev_amounts.get(instrument, 0.0)) |
| target_shares = float(current_amounts.get(instrument, 0.0)) |
| delta_shares = target_shares - current_shares |
| if abs(delta_shares) <= 1e-12: |
| continue |
|
|
| action = "buy" if delta_shares > 0 else "sell" |
| filled_shares = abs(float(delta_shares)) |
| deal_price = _get_frame_value(open_prices if action == "buy" else close_prices, date, instrument, default=np.nan) |
| if not np.isfinite(deal_price) or deal_price <= 0: |
| deal_price = _get_frame_value(close_prices, date, instrument, default=np.nan) |
| if (not np.isfinite(deal_price) or deal_price <= 0) and action == "buy" and instrument in current_amounts: |
| deal_price = float(position.get_stock_price(instrument)) |
| if (not np.isfinite(deal_price) or deal_price <= 0) and action == "sell" and prev_position is not None and instrument in prev_amounts: |
| deal_price = float(prev_position.get_stock_price(instrument)) |
| if not np.isfinite(deal_price) or deal_price <= 0: |
| continue |
|
|
| order_value = float(filled_shares * deal_price) |
| transaction_cost = float(order_value * (cost_buy if action == "buy" else cost_sell)) |
| cash_outflow = float(order_value + transaction_cost) if action == "buy" else 0.0 |
| net_proceeds = float(order_value - transaction_cost) if action == "sell" else 0.0 |
| days_held = 0 |
| if action == "sell" and prev_position is not None: |
| try: |
| days_held = int(prev_position.get_stock_count(instrument, "day")) |
| except Exception: |
| days_held = 0 |
|
|
| year = int(date.year) |
| if build_rows: |
| market_volume = _get_frame_value(volume_frame, date, instrument, default=np.nan) |
| market_amount = _get_frame_value(amount_frame, date, instrument, default=np.nan) |
| rows.append( |
| { |
| "date": date.strftime("%Y-%m-%d"), |
| "year": year, |
| "market_regime": _market_regime(year), |
| "action": action, |
| "instrument": instrument, |
| "shares": round(float(filled_shares), 6), |
| "current_shares": round(float(current_shares), 6), |
| "target_shares": round(float(target_shares), 6), |
| "requested_shares": round(float(filled_shares), 6), |
| "filled_shares": round(float(filled_shares), 6), |
| "unfilled_shares": 0.0, |
| "fill_ratio": 1.0, |
| "price": round(float(deal_price), 6), |
| "order_value": round(float(order_value), 6), |
| "filled_value": round(float(order_value), 6), |
| "gross_notional": round(float(order_value), 6), |
| "net_proceeds": round(float(net_proceeds), 6), |
| "cash_outflow": round(float(cash_outflow), 6), |
| "transaction_cost": round(float(transaction_cost), 6), |
| "realized_pnl": 0.0, |
| "holdings_count_before": holdings_count_before, |
| "holdings_count_after": holdings_count_after, |
| "days_held": days_held, |
| "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, |
| "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, |
| "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, |
| "amount_participation": round(float(order_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, |
| "clip_reason": "qlib_original", |
| } |
| ) |
|
|
| stats = yearly_trade_stats[year] |
| if action == "buy": |
| stats["buy_trades"] += 1 |
| stats["shares_bought"] += float(filled_shares) |
| stats["buy_gross_notional"] += float(order_value) |
| stats["buy_cash_outflow"] += float(cash_outflow) |
| stats["buy_transaction_cost"] += float(transaction_cost) |
| else: |
| stats["sell_trades"] += 1 |
| stats["shares_sold"] += float(filled_shares) |
| stats["sell_gross_notional"] += float(order_value) |
| stats["sell_net_proceeds"] += float(net_proceeds) |
| stats["sell_transaction_cost"] += float(transaction_cost) |
| stats["transaction_cost"] += float(transaction_cost) |
| stats["gross_turnover"] += float(order_value) |
|
|
| prev_position = position |
| prev_amounts = current_amounts |
|
|
| return rows, yearly_trade_stats |
|
|
|
|
| def _build_portfolio_log_rows( |
| *, |
| dates: list[pd.Timestamp], |
| portfolio_value: pd.Series, |
| portfolio_return: pd.Series, |
| benchmark_return: pd.Series, |
| cash_series: pd.Series | None, |
| holdings_count: dict[pd.Timestamp, int] | None, |
| rebalance_freq: int, |
| rebalance_flags: dict[pd.Timestamp, bool] | None = None, |
| ) -> list[dict[str, Any]]: |
| rows: list[dict[str, Any]] = [] |
| rebalance_step = max(int(rebalance_freq), 1) |
| holdings_count = holdings_count or {} |
| cash_series = cash_series if cash_series is not None else pd.Series(dtype=float) |
| rebalance_flags = rebalance_flags or {} |
| for idx, date in enumerate(dates): |
| date_key = pd.Timestamp(date) |
| portfolio_value_at_date = float(portfolio_value.get(date, np.nan)) |
| portfolio_return_at_date = float(portfolio_return.get(date, 0.0) or 0.0) |
| benchmark_return_at_date = float(benchmark_return.get(date, 0.0) or 0.0) |
| cash_at_date = float(cash_series.get(date, np.nan)) if date in cash_series.index else np.nan |
| is_rebalance = bool(rebalance_flags.get(date_key, idx % rebalance_step == 0)) |
| rows.append( |
| { |
| "date": date_key.strftime("%Y-%m-%d"), |
| "year": int(date_key.year), |
| "market_regime": _market_regime(int(date_key.year)), |
| "portfolio_value": round(float(portfolio_value_at_date), 6) if np.isfinite(portfolio_value_at_date) else None, |
| "cash_eod": round(float(cash_at_date), 6) if np.isfinite(cash_at_date) else None, |
| "cash_weight": round(float(cash_at_date / portfolio_value_at_date), 6) |
| if np.isfinite(cash_at_date) and np.isfinite(portfolio_value_at_date) and abs(portfolio_value_at_date) > 1e-12 |
| else None, |
| "n_held": int(holdings_count.get(date_key, 0)), |
| "portfolio_return": round(float(portfolio_return_at_date), 8), |
| "benchmark_return": round(float(benchmark_return_at_date), 8), |
| "excess_return": round(float(portfolio_return_at_date - benchmark_return_at_date), 8), |
| "is_rebalance": is_rebalance, |
| } |
| ) |
| return rows |
|
|
|
|
| def _compute_portfolio_ir_qlib( |
| factor_values: pd.Series, |
| price_df: pd.DataFrame, |
| bench_return: pd.Series | None = None, |
| top_k: int = 10, |
| n_drop: int = 2, |
| rebalance_freq: int = 5, |
| cost_buy: float = 0.0013, |
| cost_sell: float = 0.0013, |
| hold_thresh: int = 2, |
| label_forward_days: int = 5, |
| ann_scaler: int = 252, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| risk_free_rate_annual: float = 0.0, |
| start_cash: float = 200_000_000.0, |
| position_size: float = 1.0, |
| max_pos_each_stock: float = 1.0, |
| lot_size: int = 100, |
| max_daily_volume_participation: float = 0.0, |
| max_daily_amount_participation: float = 0.0, |
| capture_details: bool = False, |
| trade_guard_config: dict[str, Any] | bool | None = None, |
| rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE, |
| ) -> dict: |
| if isinstance(factor_values, pd.DataFrame): |
| factor_values = factor_values.iloc[:, 0] |
|
|
| open_prices = _get_price_wide_frame(price_df, "$open") |
| close_prices = _get_price_wide_frame(price_df, "$close") |
| volume_frame = _get_price_wide_frame(price_df, "$volume") if capture_details else None |
| amount_frame = _get_price_wide_frame(price_df, "$amount") if capture_details else None |
|
|
| if open_prices is None or close_prices is None: |
| return _empty_result("Missing $open or $close in price_df") |
|
|
| normalized_rebalance_mode = _normalize_rebalance_mode(rebalance_mode) |
| normalized_trade_guard_config = _normalize_trade_guard_config(trade_guard_config) |
| if normalized_trade_guard_config is None: |
| signal = _prepare_qlib_signal(factor_values) |
| cooldown_period = 0 |
| else: |
| signal = _prepare_force_exit_qlib_signal(factor_values, price_df, normalized_trade_guard_config) |
| if "cooldown_period" in normalized_trade_guard_config: |
| cooldown_value = normalized_trade_guard_config.get("cooldown_period") |
| cooldown_period = 0 if cooldown_value is None else max(int(cooldown_value), 0) |
| else: |
| cooldown_period = max(int(rebalance_freq) * 2, int(hold_thresh)) |
| if start_date: |
| signal = signal[signal.index.get_level_values("datetime") >= pd.Timestamp(start_date)] |
| if end_date: |
| signal = signal[signal.index.get_level_values("datetime") <= pd.Timestamp(end_date)] |
| if signal.empty: |
| return _empty_result("No valid signal values for qlib_original backtest") |
|
|
| runtime = _ensure_qlib_initialized(_ensure_qlib_provider(price_df)) |
| ForceExitTopkDropoutStrategy = runtime["ForceExitTopkDropoutStrategy"] |
| SimulatorExecutor = runtime["SimulatorExecutor"] |
| Account = runtime["Account"] |
| CommonInfrastructure = runtime["CommonInfrastructure"] |
| backtest_loop = runtime["backtest_loop"] |
| get_exchange = runtime["get_exchange"] |
|
|
| qlib_warnings: list[str] = [] |
| effective_risk_degree = min(max(float(position_size), 0.0), 1.0) |
| if max_pos_each_stock > 0 and top_k > 0: |
| effective_risk_degree = min(effective_risk_degree, float(top_k) * float(max_pos_each_stock)) |
| if max_daily_amount_participation > 0: |
| qlib_warnings.append("max_daily_amount_participation is not natively supported by qlib_original and is ignored") |
| if normalized_trade_guard_config is not None: |
| qlib_warnings.append("qlib_original trade_guard_config enabled") |
| if normalized_rebalance_mode != DEFAULT_QLIB_REBALANCE_MODE: |
| qlib_warnings.append(f"qlib_original rebalance_mode={normalized_rebalance_mode}") |
|
|
| strategy = ForceExitTopkDropoutStrategy( |
| signal=signal, |
| topk=top_k, |
| n_drop=n_drop, |
| rebalance_freq=rebalance_freq, |
| cooldown_period=cooldown_period, |
| rebalance_mode=normalized_rebalance_mode, |
| max_pos_each_stock=max_pos_each_stock, |
| hold_thresh=hold_thresh, |
| risk_degree=effective_risk_degree, |
| only_tradable=False, |
| forbid_all_trade_at_limit=False, |
| ) |
| executor_obj = SimulatorExecutor( |
| time_per_step="day", |
| generate_portfolio_metrics=True, |
| verbose=False, |
| ) |
|
|
| exchange_kwargs: dict[str, Any] = { |
| "freq": "day", |
| "deal_price": ("$open", "$close"), |
| "limit_threshold": None, |
| "open_cost": cost_buy, |
| "close_cost": cost_sell, |
| "min_cost": 0.0, |
| "trade_unit": max(int(lot_size), 1), |
| } |
| if max_daily_volume_participation > 0: |
| exchange_kwargs["volume_threshold"] = ("current", f"{float(max_daily_volume_participation)} * $volume") |
|
|
| available_dates = _get_available_dates(price_df) |
| if len(available_dates) < 2: |
| return _empty_result("Qlib original backtest requires at least 2 trading dates") |
|
|
| latest_safe_end = pd.Timestamp(available_dates[-2]) |
| bt_start = pd.Timestamp(start_date) if start_date else pd.Timestamp(signal.index.get_level_values("datetime").min()) |
| requested_end = pd.Timestamp(end_date) if end_date else pd.Timestamp(signal.index.get_level_values("datetime").max()) |
| bt_end = min(requested_end, latest_safe_end) |
| if bt_end < bt_start: |
| return _empty_result("Qlib original backtest window is empty after calendar clipping") |
| if bt_end < requested_end: |
| qlib_warnings.append( |
| f"qlib_original clipped end_date from {requested_end.strftime('%Y-%m-%d')} " |
| f"to {bt_end.strftime('%Y-%m-%d')} because the provider has no future calendar bar" |
| ) |
|
|
| signal = signal[signal.index.get_level_values("datetime") <= bt_end] |
| if signal.empty: |
| return _empty_result("No valid signal values remain after qlib_original calendar clipping") |
|
|
| benchmark_series = None |
| if bench_return is not None: |
| benchmark_series = bench_return.astype(float).sort_index() |
| benchmark_series = benchmark_series[benchmark_series.index >= bt_start] |
| benchmark_series = benchmark_series[benchmark_series.index <= bt_end] |
|
|
| try: |
| with _quiet_runtime_io(), runtime["set_global_logger_level_cm"](logging.ERROR): |
| trade_account = Account( |
| init_cash=max(float(start_cash), 0.0), |
| position_dict={}, |
| pos_type="Position", |
| benchmark_config={"benchmark": None}, |
| ) |
| exchange_with_dates = dict(exchange_kwargs) |
| exchange_with_dates.setdefault("start_time", bt_start) |
| exchange_with_dates.setdefault("end_time", bt_end) |
| trade_exchange = get_exchange(**exchange_with_dates) |
| common_infra = CommonInfrastructure(trade_account=trade_account, trade_exchange=trade_exchange) |
| strategy.reset_common_infra(common_infra) |
| executor_obj.reset_common_infra(common_infra) |
| with warnings.catch_warnings(): |
| warnings.filterwarnings("ignore", message="Mean of empty slice", category=RuntimeWarning) |
| portfolio_metric_dict, _indicator_dict = backtest_loop( |
| start_time=bt_start, |
| end_time=bt_end, |
| trade_strategy=strategy, |
| trade_executor=executor_obj, |
| ) |
| except Exception as exc: |
| return _empty_result(f"Qlib original backtest failed: {type(exc).__name__}: {str(exc)[:500]}") |
|
|
| if not portfolio_metric_dict: |
| return _empty_result("Qlib original backtest returned no portfolio metrics") |
|
|
| analysis_key = sorted(portfolio_metric_dict.keys())[0] |
| report_normal, positions_normal = portfolio_metric_dict[analysis_key] |
| if report_normal is None or report_normal.empty: |
| return _empty_result("Qlib original backtest returned an empty report") |
|
|
| report_normal = report_normal.sort_index() |
| portfolio_returns_with_cost = report_normal["return"].astype(float) - report_normal["cost"].astype(float) |
| if benchmark_series is not None: |
| benchmark_returns = benchmark_series.reindex(report_normal.index).fillna(0.0).astype(float) |
| else: |
| benchmark_returns = report_normal.get("bench", pd.Series(index=report_normal.index, dtype=float)).astype(float).fillna(0.0) |
| return_frame = pd.DataFrame( |
| { |
| "portfolio_value": report_normal["account"].astype(float), |
| "portfolio_return": portfolio_returns_with_cost, |
| "benchmark_return": benchmark_returns, |
| }, |
| index=report_normal.index, |
| ) |
|
|
| ic_frame = _compute_daily_cross_sectional_ic_frame( |
| factor_values=factor_values, |
| close_prices=close_prices, |
| label_forward_days=label_forward_days, |
| ) |
| if start_date and not ic_frame.empty: |
| ic_frame = ic_frame[ic_frame.index >= pd.Timestamp(start_date)] |
| if end_date and not ic_frame.empty: |
| ic_frame = ic_frame[ic_frame.index <= pd.Timestamp(end_date)] |
|
|
| holding_log, yearly_holding_stats = _build_holding_log_from_qlib_positions( |
| positions_normal=positions_normal, |
| report_normal=report_normal, |
| volume_frame=volume_frame, |
| amount_frame=amount_frame, |
| build_rows=capture_details, |
| ) |
| trade_log, yearly_trade_stats = _build_trade_log_from_qlib_positions( |
| positions_normal=positions_normal, |
| open_prices=open_prices, |
| close_prices=close_prices, |
| volume_frame=volume_frame, |
| amount_frame=amount_frame, |
| cost_buy=cost_buy, |
| cost_sell=cost_sell, |
| build_rows=capture_details, |
| ) |
| holdings_count_by_date = { |
| pd.Timestamp(date): int( |
| sum(1 for instrument in position.get_stock_list() if float(position.get_stock_amount(instrument)) > 0) |
| ) |
| for date, position in positions_normal.items() |
| } |
| portfolio_log = _build_portfolio_log_rows( |
| dates=list(return_frame.index), |
| portfolio_value=return_frame["portfolio_value"].astype(float), |
| portfolio_return=return_frame["portfolio_return"].astype(float), |
| benchmark_return=return_frame["benchmark_return"].astype(float), |
| cash_series=report_normal["cash"].astype(float) if "cash" in report_normal.columns else None, |
| holdings_count=holdings_count_by_date, |
| rebalance_freq=rebalance_freq, |
| ) |
| total_transaction_cost = float(sum(stats.get("transaction_cost", 0.0) for stats in yearly_trade_stats.values())) |
| total_gross_turnover = float(sum(stats.get("gross_turnover", 0.0) for stats in yearly_trade_stats.values())) |
|
|
| result = _summarize_return_frame( |
| return_frame=return_frame, |
| ann_scaler=ann_scaler, |
| risk_free_rate_annual=risk_free_rate_annual, |
| ) |
| result.update(_summarize_ic_frame(ic_frame)) |
| result.update( |
| { |
| "success": True, |
| "final_value": round(float(report_normal["account"].iloc[-1]), 6), |
| "error": None, |
| "qlib_warnings": qlib_warnings, |
| "trade_guard_config": normalized_trade_guard_config, |
| "rebalance_mode": normalized_rebalance_mode, |
| "transaction_cost": round(total_transaction_cost, 6), |
| "gross_turnover": round(total_gross_turnover, 6), |
| "turnover_ratio": round(total_gross_turnover / max(float(start_cash), 1e-12), 6), |
| "yearly_metrics": _build_yearly_metrics( |
| return_frame=return_frame, |
| ic_frame=ic_frame, |
| yearly_trade_stats=yearly_trade_stats, |
| yearly_holding_stats=yearly_holding_stats, |
| ann_scaler=ann_scaler, |
| risk_free_rate_annual=risk_free_rate_annual, |
| ), |
| "trade_log": trade_log if capture_details else [], |
| "stock_contrib": _build_stock_contribution_summary(holding_log, trade_log) if capture_details else [], |
| "holding_log": holding_log if capture_details else [], |
| "portfolio_log": portfolio_log if capture_details else [], |
| } |
| ) |
| return result |
|
|
|
|
| def _compute_portfolio_ir_custom( |
| factor_values: pd.Series, |
| price_df: pd.DataFrame, |
| bench_return: pd.Series | None = None, |
| top_k: int = 10, |
| n_drop: int = 2, |
| rebalance_freq: int = 5, |
| cost_buy: float = 0.0013, |
| cost_sell: float = 0.0013, |
| hold_thresh: int = 2, |
| label_forward_days: int = 5, |
| ann_scaler: int = 252, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| risk_free_rate_annual: float = 0.0, |
| start_cash: float = 200_000_000.0, |
| position_size: float = 1.0, |
| max_pos_each_stock: float = 1.0, |
| lot_size: int = 100, |
| max_daily_volume_participation: float = 0.0, |
| max_daily_amount_participation: float = 0.0, |
| capture_details: bool = False, |
| custom_weight_mode: str = "equal", |
| redistribute_unfilled_cash: bool = False, |
| enforce_cash_limit: bool = False, |
| ) -> dict: |
| """Compute portfolio-based IR matching AlphaAgent's Qlib config.""" |
| if isinstance(factor_values, pd.DataFrame): |
| factor_values = factor_values.iloc[:, 0] |
|
|
| all_dates = sorted(factor_values.index.get_level_values("datetime").unique()) |
| if start_date: |
| all_dates = [d for d in all_dates if d >= pd.Timestamp(start_date)] |
| if end_date: |
| all_dates = [d for d in all_dates if d <= pd.Timestamp(end_date)] |
|
|
| if len(all_dates) < rebalance_freq + hold_thresh + 1: |
| return _empty_result("Not enough dates for backtesting") |
|
|
| open_prices = price_df["$open"].unstack("instrument") if "$open" in price_df.columns else None |
| close_prices = price_df["$close"].unstack("instrument") if "$close" in price_df.columns else None |
| volume_frame = price_df["$volume"].unstack("instrument") if "$volume" in price_df.columns else None |
| amount_frame = price_df["$amount"].unstack("instrument") if "$amount" in price_df.columns else None |
|
|
| if open_prices is None or close_prices is None: |
| return _empty_result("Missing $open or $close in price_df") |
|
|
| effective_freq = max(rebalance_freq, hold_thresh) |
| rebalance_indices = list(range(0, len(all_dates), effective_freq)) |
| rebalance_set = {all_dates[i] for i in rebalance_indices} |
|
|
| lot_size = max(int(lot_size), 1) |
| start_cash = max(float(start_cash), 0.0) |
| position_size = min(max(float(position_size), 0.0), 1.0) |
| max_pos_each_stock = min(max(float(max_pos_each_stock), 0.0), 1.0) |
| max_daily_volume_participation = max(float(max_daily_volume_participation), 0.0) |
| max_daily_amount_participation = max(float(max_daily_amount_participation), 0.0) |
| normalized_custom_weight_mode = _normalize_custom_weight_mode(custom_weight_mode) |
| redistribute_unfilled_cash = bool(redistribute_unfilled_cash) |
| enforce_cash_limit = bool(enforce_cash_limit) |
|
|
| holdings: dict[str, dict[str, Any]] = {} |
| cash = float(start_cash) |
|
|
| daily_portfolio_values: list[float] = [] |
| daily_dates_out: list[pd.Timestamp] = [] |
| daily_portfolio_records: list[dict[str, Any]] = [] |
| daily_holding_records: list[dict[str, Any]] = [] |
| trade_records: list[dict[str, Any]] = [] |
|
|
| yearly_trade_stats: dict[int, dict[str, float]] = defaultdict( |
| lambda: { |
| "buy_trades": 0, |
| "sell_trades": 0, |
| "shares_bought": 0.0, |
| "shares_sold": 0.0, |
| "buy_gross_notional": 0.0, |
| "sell_gross_notional": 0.0, |
| "buy_cash_outflow": 0.0, |
| "sell_net_proceeds": 0.0, |
| "buy_transaction_cost": 0.0, |
| "sell_transaction_cost": 0.0, |
| "transaction_cost": 0.0, |
| "gross_turnover": 0.0, |
| } |
| ) |
| yearly_holding_stats: dict[int, dict[str, float]] = defaultdict( |
| lambda: { |
| "n_days": 0, |
| "holdings_count_sum": 0.0, |
| "max_holdings_count": 0, |
| } |
| ) |
| rebalance_execution_count = 0 |
|
|
| for day_idx, date in enumerate(all_dates): |
| is_rebalance = date in rebalance_set |
| rebalance_executed = False |
| signal = pd.Series(dtype=float) |
| if is_rebalance and day_idx > 0: |
| prev_date = all_dates[day_idx - 1] |
| try: |
| signal = factor_values.xs(prev_date, level="datetime") |
| except KeyError: |
| signal = pd.Series(dtype=float) |
| signal = signal.dropna().sort_values(ascending=False) |
|
|
| if is_rebalance and day_idx > 0 and not signal.empty: |
| rebalance_executed = True |
| is_initial_rebalance = rebalance_execution_count == 0 |
| effective_custom_weight_mode = "equal" if is_initial_rebalance else normalized_custom_weight_mode |
| effective_redistribute_unfilled_cash = False if is_initial_rebalance else redistribute_unfilled_cash |
| current_insts = set(holdings.keys()) |
| locked_insts = { |
| inst for inst, pos in holdings.items() if day_idx - int(pos["buy_day_idx"]) < hold_thresh |
| } |
| new_top = set(signal.nlargest(top_k).index.tolist()) |
| eligible_current = list(current_insts - locked_insts) |
| |
| |
| |
| dropout_candidates = {inst for inst in eligible_current if inst not in new_top} |
|
|
| keep_insts = list(current_insts - dropout_candidates) |
| keep_insts_set = {str(inst) for inst in keep_insts} |
| equity = cash + sum( |
| _market_value_from_close(close_prices, date, pos, inst) |
| for inst, pos in holdings.items() |
| ) |
| max_position_value = equity * max_pos_each_stock |
| if max_pos_each_stock >= 1.0 - 1e-12: |
| baseline_slot_value = equity * position_size |
| else: |
| baseline_slot_value = min( |
| max_position_value, |
| (equity * position_size / float(top_k)) if top_k > 0 else 0.0, |
| ) |
|
|
| target_universe = list(keep_insts) |
| for inst in signal.index.tolist(): |
| if len(target_universe) >= top_k: |
| break |
| if inst in target_universe or inst in dropout_candidates: |
| continue |
| px = _get_frame_value(open_prices, date, inst, default=np.nan) |
| if not np.isfinite(px) or px <= 0: |
| px = _get_frame_value(close_prices, date, inst, default=np.nan) |
| if not np.isfinite(px) or px <= 0: |
| continue |
| one_lot_cash = float(lot_size) * float(px) * (1.0 + cost_buy) |
| if baseline_slot_value > 0 and one_lot_cash > baseline_slot_value: |
| continue |
| target_universe.append(inst) |
|
|
| ranked_target_names = [str(inst) for inst in target_universe] |
| target_shares_map: dict[str, float] = {} |
| score_target_weights = pd.Series(dtype=float) |
| if effective_custom_weight_mode == "equal" and not effective_redistribute_unfilled_cash: |
| target_count = max(len(ranked_target_names), 1) |
| target_value_per_name = min( |
| equity * position_size / float(target_count), |
| max_position_value, |
| ) |
| for inst in set(ranked_target_names).union(current_insts): |
| current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) |
| if inst not in ranked_target_names or target_value_per_name <= 0: |
| target_shares = 0.0 |
| else: |
| price_for_target = _get_frame_value(open_prices, date, inst, default=np.nan) |
| if not np.isfinite(price_for_target) or price_for_target <= 0: |
| price_for_target = _get_frame_value(close_prices, date, inst, default=np.nan) |
| if not np.isfinite(price_for_target) or price_for_target <= 0: |
| target_shares = current_shares |
| else: |
| target_shares = _round_down_lot(target_value_per_name / float(price_for_target), lot_size) |
| if inst in keep_insts_set and target_shares < current_shares: |
| target_shares = current_shares |
| target_shares_map[inst] = float(target_shares) |
| else: |
| ranked_target_scores = signal.reindex(pd.Index(ranked_target_names)).dropna().sort_values(ascending=False) |
| ranked_target_names = [str(inst) for inst in ranked_target_scores.index.tolist()] |
| score_target_weights = _build_custom_target_weights( |
| target_scores=ranked_target_scores, |
| target_names=ranked_target_names, |
| total_weight=1.0 if position_size > 0 else 0.0, |
| max_pos_each_stock=max_pos_each_stock, |
| weight_mode=effective_custom_weight_mode, |
| ) |
| for inst in set(ranked_target_names).union(current_insts): |
| current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) |
| |
| |
| |
| |
| if str(inst) not in ranked_target_names: |
| target_shares = 0.0 |
| else: |
| target_shares = current_shares |
| if inst in keep_insts_set and target_shares < current_shares: |
| target_shares = current_shares |
| target_shares_map[inst] = float(target_shares) |
|
|
| for inst in list(holdings.keys()): |
| pos = holdings[inst] |
| current_shares = float(pos["shares"]) |
| target_shares = float(target_shares_map.get(inst, 0.0)) |
| requested_shares = max(current_shares - target_shares, 0.0) |
| if requested_shares <= 0: |
| continue |
|
|
| hold_count_before = len(holdings) |
| days_held = int(day_idx - int(pos["buy_day_idx"])) |
| sell_px = _get_frame_value(close_prices, date, inst, default=pos.get("avg_cost_per_share", 0.0)) |
| sell_px = sell_px if sell_px > 0 else float(pos.get("avg_cost_per_share", 0.0)) |
| market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan) |
| market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan) |
|
|
| |
| |
| |
| |
| clip_reasons: list[str] = [] |
| if not np.isfinite(sell_px) or sell_px <= 0: |
| filled_shares = 0.0 |
| clip_reasons.append("invalid_price") |
| else: |
| filled_shares = min(requested_shares, current_shares) |
|
|
| order_value = requested_shares * sell_px |
| filled_value = filled_shares * sell_px |
| transaction_cost = filled_value * cost_sell |
| net_proceeds = filled_value - transaction_cost |
| realized_pnl = filled_value - transaction_cost - float(pos["avg_cost_per_share"]) * filled_shares |
|
|
| if filled_shares > 0: |
| cash += net_proceeds |
| pos["shares"] = float(pos["shares"]) - filled_shares |
| pos["total_cost_basis"] = max( |
| float(pos["total_cost_basis"]) - float(pos["avg_cost_per_share"]) * filled_shares, |
| 0.0, |
| ) |
| if pos["shares"] > 0: |
| pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"]) |
| else: |
| del holdings[inst] |
|
|
| year = int(pd.Timestamp(date).year) |
| stats = yearly_trade_stats[year] |
| stats["sell_trades"] += 1 |
| stats["shares_sold"] += float(filled_shares) |
| stats["sell_gross_notional"] += float(filled_value) |
| stats["sell_net_proceeds"] += float(net_proceeds) |
| stats["sell_transaction_cost"] += float(transaction_cost) |
| stats["transaction_cost"] += float(transaction_cost) |
| stats["gross_turnover"] += float(filled_value) |
|
|
| if capture_details: |
| year = int(pd.Timestamp(date).year) |
| trade_records.append( |
| { |
| "date": pd.Timestamp(date).strftime("%Y-%m-%d"), |
| "year": year, |
| "market_regime": _market_regime(year), |
| "action": "sell", |
| "instrument": inst, |
| "shares": round(float(filled_shares), 6), |
| "current_shares": round(float(current_shares), 6), |
| "target_shares": round(float(target_shares), 6), |
| "requested_shares": round(float(requested_shares), 6), |
| "filled_shares": round(float(filled_shares), 6), |
| "unfilled_shares": round(float(max(requested_shares - filled_shares, 0.0)), 6), |
| "fill_ratio": round(float(filled_shares / requested_shares), 6) if requested_shares > 0 else 0.0, |
| "price": round(float(sell_px), 6), |
| "order_value": round(float(order_value), 6), |
| "filled_value": round(float(filled_value), 6), |
| "redistributed_notional": 0.0, |
| "gross_notional": round(float(filled_value), 6), |
| "net_proceeds": round(float(net_proceeds), 6), |
| "cash_outflow": 0.0, |
| "transaction_cost": round(float(transaction_cost), 6), |
| "realized_pnl": round(float(realized_pnl), 6), |
| "holdings_count_before": int(hold_count_before), |
| "holdings_count_after": int(len(holdings)), |
| "days_held": days_held, |
| "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, |
| "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, |
| "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, |
| "amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, |
| "clip_reason": _clip_reason_text(clip_reasons), |
| } |
| ) |
|
|
| score_buy_budget_by_inst: dict[str, float] = {} |
| if not score_target_weights.empty: |
| available_buy_budget = max(float(cash), 0.0) |
| score_buy_budget_by_inst = { |
| str(inst): float(weight) * available_buy_budget |
| for inst, weight in score_target_weights.items() |
| if float(weight) > 0 |
| } |
|
|
| carry_budget = 0.0 |
| buy_filled_by_inst: dict[str, float] = {} |
| buy_names = ranked_target_names if ranked_target_names else [] |
| for inst in buy_names: |
| current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) |
| target_shares = float(target_shares_map.get(inst, 0.0)) |
| reported_target_shares = float(target_shares) |
| buy_px = _get_frame_value(open_prices, date, inst, default=np.nan) |
| if not np.isfinite(buy_px) or buy_px <= 0: |
| buy_px = _get_frame_value(close_prices, date, inst, default=np.nan) |
|
|
| hold_count_before = len(holdings) |
| year = int(pd.Timestamp(date).year) |
| market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan) |
| market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan) |
| clip_reasons: list[str] = [] |
| filled_shares = 0.0 |
| requested_shares = 0.0 |
| order_value = 0.0 |
| filled_value = 0.0 |
| transaction_cost = 0.0 |
| cash_outflow = 0.0 |
| redistributed_notional = 0.0 |
|
|
| if effective_custom_weight_mode == "equal" and not effective_redistribute_unfilled_cash: |
| requested_shares = max(target_shares - current_shares, 0.0) |
| if requested_shares <= 0: |
| continue |
| order_value = requested_shares * float(buy_px) if np.isfinite(buy_px) and buy_px > 0 else 0.0 |
| if not np.isfinite(buy_px) or buy_px <= 0: |
| clip_reasons.append("invalid_price") |
| else: |
| volume_cap, amount_cap = _liquidity_caps( |
| market_volume=market_volume, |
| market_amount=market_amount, |
| price=buy_px, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| ) |
| cash_cap = ( |
| _round_down_lot(cash / (float(buy_px) * (1.0 + cost_buy)), lot_size) |
| if enforce_cash_limit |
| else np.inf |
| ) |
| filled_shares = min(requested_shares, volume_cap, amount_cap, cash_cap) |
|
|
| if volume_cap < requested_shares: |
| clip_reasons.append("volume_limit") |
| if amount_cap < requested_shares: |
| clip_reasons.append("amount_limit") |
| if enforce_cash_limit and cash_cap < requested_shares: |
| clip_reasons.append("cash_limit") |
| if filled_shares < requested_shares and filled_shares <= 0: |
| clip_reasons.append("lot_rounding") |
|
|
| filled_value = filled_shares * float(buy_px) |
| transaction_cost = filled_value * cost_buy |
| cash_outflow = filled_value + transaction_cost |
| else: |
| current_value = float(current_shares * buy_px) if np.isfinite(buy_px) and buy_px > 0 else 0.0 |
| base_requested_budget = float(score_buy_budget_by_inst.get(str(inst), 0.0)) |
| requested_budget = float(base_requested_budget) |
| carry_in_budget = 0.0 |
| if effective_redistribute_unfilled_cash: |
| carry_in_budget = float(carry_budget) |
| requested_budget += carry_in_budget |
| carry_budget = 0.0 |
| if max_pos_each_stock > 0 and np.isfinite(buy_px) and buy_px > 0: |
| per_name_value_cap = float(equity * max_pos_each_stock) |
| budget_cap = max(per_name_value_cap - current_value, 0.0) |
| carry_budget += max(requested_budget - budget_cap, 0.0) |
| requested_budget = min(requested_budget, budget_cap) |
| if requested_budget <= 1e-12 and carry_budget <= 1e-12: |
| continue |
| if not np.isfinite(buy_px) or buy_px <= 0: |
| clip_reasons.append("invalid_price") |
| if effective_redistribute_unfilled_cash: |
| carry_budget += requested_budget |
| else: |
| requested_shares = _round_down_lot( |
| requested_budget / (float(buy_px) * (1.0 + cost_buy)), |
| lot_size, |
| ) |
| reported_target_shares = float(current_shares + requested_shares) |
| order_value = requested_shares * float(buy_px) |
| volume_cap, amount_cap = _liquidity_caps( |
| market_volume=market_volume, |
| market_amount=market_amount, |
| price=buy_px, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| ) |
| cash_cap = ( |
| _round_down_lot(cash / (float(buy_px) * (1.0 + cost_buy)), lot_size) |
| if enforce_cash_limit |
| else np.inf |
| ) |
| filled_shares = min(requested_shares, volume_cap, amount_cap, cash_cap) |
|
|
| if volume_cap < requested_shares: |
| clip_reasons.append("volume_limit") |
| if amount_cap < requested_shares: |
| clip_reasons.append("amount_limit") |
| if enforce_cash_limit and cash_cap < requested_shares: |
| clip_reasons.append("cash_limit") |
| if requested_budget > 1e-12 and requested_shares <= 0: |
| clip_reasons.append("lot_rounding") |
| if filled_shares < requested_shares and filled_shares <= 0: |
| clip_reasons.append("lot_rounding") |
|
|
| filled_value = filled_shares * float(buy_px) |
| transaction_cost = filled_value * cost_buy |
| cash_outflow = filled_value + transaction_cost |
| if carry_in_budget > 0 and filled_shares > 0: |
| redistributed_notional = min(float(cash_outflow), float(carry_in_budget)) |
| if effective_redistribute_unfilled_cash: |
| carry_budget += max(requested_budget - cash_outflow, 0.0) |
|
|
| if filled_shares > 0: |
| cash -= cash_outflow |
| buy_filled_by_inst[inst] = float(buy_filled_by_inst.get(inst, 0.0)) + float(filled_shares) |
| if inst in holdings: |
| pos = holdings[inst] |
| pos["shares"] = float(pos["shares"]) + filled_shares |
| pos["total_cost_basis"] = float(pos["total_cost_basis"]) + cash_outflow |
| pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"]) |
| else: |
| holdings[inst] = { |
| "shares": float(filled_shares), |
| "buy_date": pd.Timestamp(date), |
| "buy_day_idx": int(day_idx), |
| "avg_cost_per_share": float(cash_outflow / filled_shares), |
| "total_cost_basis": float(cash_outflow), |
| } |
|
|
| stats = yearly_trade_stats[year] |
| stats["buy_trades"] += 1 |
| stats["shares_bought"] += float(filled_shares) |
| stats["buy_gross_notional"] += float(filled_value) |
| stats["buy_cash_outflow"] += float(cash_outflow) |
| stats["buy_transaction_cost"] += float(transaction_cost) |
| stats["transaction_cost"] += float(transaction_cost) |
| stats["gross_turnover"] += float(filled_value) |
|
|
| if capture_details: |
| trade_records.append( |
| { |
| "date": pd.Timestamp(date).strftime("%Y-%m-%d"), |
| "year": year, |
| "market_regime": _market_regime(year), |
| "action": "buy", |
| "instrument": inst, |
| "shares": round(float(filled_shares), 6), |
| "current_shares": round(float(current_shares), 6), |
| "target_shares": round(float(reported_target_shares), 6), |
| "requested_shares": round(float(requested_shares), 6), |
| "filled_shares": round(float(filled_shares), 6), |
| "unfilled_shares": round(float(max(requested_shares - filled_shares, 0.0)), 6), |
| "fill_ratio": round(float(filled_shares / requested_shares), 6) if requested_shares > 0 else 0.0, |
| "price": round(float(buy_px), 6) if np.isfinite(buy_px) else 0.0, |
| "order_value": round(float(order_value), 6), |
| "filled_value": round(float(filled_value), 6), |
| "redistributed_notional": round(float(redistributed_notional), 6), |
| "gross_notional": round(float(filled_value), 6), |
| "net_proceeds": 0.0, |
| "cash_outflow": round(float(cash_outflow), 6), |
| "transaction_cost": round(float(transaction_cost), 6), |
| "realized_pnl": 0.0, |
| "holdings_count_before": int(hold_count_before), |
| "holdings_count_after": int(len(holdings)), |
| "days_held": 0, |
| "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, |
| "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, |
| "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, |
| "amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, |
| "clip_reason": _clip_reason_text(clip_reasons), |
| } |
| ) |
|
|
| if effective_redistribute_unfilled_cash and carry_budget > 1e-12 and buy_names: |
| residual_budget = ( |
| min(float(carry_budget), float(cash)) |
| if enforce_cash_limit |
| else float(carry_budget) |
| ) |
| for inst in buy_names: |
| if residual_budget <= 1e-12: |
| break |
|
|
| buy_px = _get_frame_value(open_prices, date, inst, default=np.nan) |
| if not np.isfinite(buy_px) or buy_px <= 0: |
| buy_px = _get_frame_value(close_prices, date, inst, default=np.nan) |
| if not np.isfinite(buy_px) or buy_px <= 0: |
| continue |
|
|
| one_lot_cash = float(lot_size) * float(buy_px) * (1.0 + cost_buy) |
| if one_lot_cash <= 0 or residual_budget + 1e-12 < one_lot_cash or (enforce_cash_limit and cash + 1e-12 < one_lot_cash): |
| continue |
|
|
| current_shares = float(holdings.get(inst, {}).get("shares", 0.0)) |
| current_value = float(current_shares * float(buy_px)) |
| extra_budget_cap = float(residual_budget) |
| if max_pos_each_stock > 0: |
| per_name_value_cap = float(equity * max_pos_each_stock) |
| extra_budget_cap = min(extra_budget_cap, max(per_name_value_cap - current_value, 0.0)) |
| if extra_budget_cap + 1e-12 < one_lot_cash: |
| continue |
|
|
| market_volume = _get_frame_value(volume_frame, date, inst, default=np.nan) |
| market_amount = _get_frame_value(amount_frame, date, inst, default=np.nan) |
| volume_cap, amount_cap = _liquidity_caps( |
| market_volume=market_volume, |
| market_amount=market_amount, |
| price=buy_px, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| ) |
| used_shares_today = float(buy_filled_by_inst.get(inst, 0.0)) |
| remaining_volume_cap = max(float(volume_cap) - used_shares_today, 0.0) |
| remaining_amount_cap = max(float(amount_cap) - used_shares_today, 0.0) |
| remaining_liquidity_cap = min(remaining_volume_cap, remaining_amount_cap) |
| budget_cap_shares = _round_down_lot(extra_budget_cap / (float(buy_px) * (1.0 + cost_buy)), lot_size) |
| cash_cap_shares = ( |
| _round_down_lot(min(float(cash), float(residual_budget)) / (float(buy_px) * (1.0 + cost_buy)), lot_size) |
| if enforce_cash_limit |
| else budget_cap_shares |
| ) |
| filled_shares = min(remaining_liquidity_cap, budget_cap_shares, cash_cap_shares) |
| if filled_shares <= 0: |
| continue |
|
|
| hold_count_before = len(holdings) |
| year = int(pd.Timestamp(date).year) |
| filled_value = filled_shares * float(buy_px) |
| transaction_cost = filled_value * cost_buy |
| cash_outflow = filled_value + transaction_cost |
|
|
| cash -= cash_outflow |
| residual_budget = max(float(residual_budget) - float(cash_outflow), 0.0) |
| buy_filled_by_inst[inst] = float(buy_filled_by_inst.get(inst, 0.0)) + float(filled_shares) |
|
|
| if inst in holdings: |
| pos = holdings[inst] |
| pos["shares"] = float(pos["shares"]) + filled_shares |
| pos["total_cost_basis"] = float(pos["total_cost_basis"]) + cash_outflow |
| pos["avg_cost_per_share"] = float(pos["total_cost_basis"]) / float(pos["shares"]) |
| else: |
| holdings[inst] = { |
| "shares": float(filled_shares), |
| "buy_date": pd.Timestamp(date), |
| "buy_day_idx": int(day_idx), |
| "avg_cost_per_share": float(cash_outflow / filled_shares), |
| "total_cost_basis": float(cash_outflow), |
| } |
|
|
| stats = yearly_trade_stats[year] |
| stats["buy_trades"] += 1 |
| stats["shares_bought"] += float(filled_shares) |
| stats["buy_gross_notional"] += float(filled_value) |
| stats["buy_cash_outflow"] += float(cash_outflow) |
| stats["buy_transaction_cost"] += float(transaction_cost) |
| stats["transaction_cost"] += float(transaction_cost) |
| stats["gross_turnover"] += float(filled_value) |
|
|
| if capture_details: |
| trade_records.append( |
| { |
| "date": pd.Timestamp(date).strftime("%Y-%m-%d"), |
| "year": year, |
| "market_regime": _market_regime(year), |
| "action": "buy", |
| "instrument": inst, |
| "shares": round(float(filled_shares), 6), |
| "current_shares": round(float(current_shares), 6), |
| "target_shares": round(float(target_shares_map.get(inst, 0.0)), 6), |
| "requested_shares": round(float(filled_shares), 6), |
| "filled_shares": round(float(filled_shares), 6), |
| "unfilled_shares": 0.0, |
| "fill_ratio": 1.0, |
| "price": round(float(buy_px), 6), |
| "order_value": round(float(filled_value), 6), |
| "filled_value": round(float(filled_value), 6), |
| "redistributed_notional": round(float(cash_outflow), 6), |
| "gross_notional": round(float(filled_value), 6), |
| "net_proceeds": 0.0, |
| "cash_outflow": round(float(cash_outflow), 6), |
| "transaction_cost": round(float(transaction_cost), 6), |
| "realized_pnl": 0.0, |
| "holdings_count_before": int(hold_count_before), |
| "holdings_count_after": int(len(holdings)), |
| "days_held": 0, |
| "market_volume": round(float(market_volume), 6) if np.isfinite(market_volume) else 0.0, |
| "market_amount": round(float(market_amount), 6) if np.isfinite(market_amount) else 0.0, |
| "volume_participation": round(float(filled_shares / market_volume), 8) if np.isfinite(market_volume) and market_volume > 0 else 0.0, |
| "amount_participation": round(float(filled_value / market_amount), 8) if np.isfinite(market_amount) and market_amount > 0 else 0.0, |
| "clip_reason": "cash_sweep", |
| } |
| ) |
|
|
| rebalance_execution_count += 1 |
|
|
| year = int(pd.Timestamp(date).year) |
| holding_stats = yearly_holding_stats[year] |
| holding_stats["n_days"] += 1 |
| holding_stats["holdings_count_sum"] += float(len(holdings)) |
| holding_stats["max_holdings_count"] = max(int(holding_stats["max_holdings_count"]), len(holdings)) |
|
|
| eod_rows: list[dict[str, Any]] = [] |
| holdings_market_value = 0.0 |
| for inst, pos in holdings.items(): |
| close_px = _get_frame_value(close_prices, date, inst, default=pos.get("avg_cost_per_share", 0.0)) |
| if close_px > 0: |
| market_value = float(pos["shares"]) * float(close_px) |
| else: |
| market_value = float(pos["total_cost_basis"]) |
| holdings_market_value += float(market_value) |
| if capture_details: |
| eod_rows.append( |
| { |
| "date": pd.Timestamp(date).strftime("%Y-%m-%d"), |
| "year": year, |
| "market_regime": _market_regime(year), |
| "instrument": inst, |
| "shares_held": round(float(pos["shares"]), 6), |
| "market_value": round(float(market_value), 6), |
| "close_price": round(float(close_px), 6), |
| "market_volume": round(_get_frame_value(volume_frame, date, inst, default=0.0), 6), |
| "market_amount": round(_get_frame_value(amount_frame, date, inst, default=0.0), 6), |
| } |
| ) |
|
|
| portfolio_value = float(cash + holdings_market_value) |
| daily_portfolio_values.append(portfolio_value) |
| daily_dates_out.append(pd.Timestamp(date)) |
| daily_portfolio_records.append( |
| { |
| "Date": pd.Timestamp(date), |
| "portfolio_value": float(portfolio_value), |
| "cash": float(cash), |
| "n_held": int(len(holdings)), |
| "is_rebalance": bool(rebalance_executed), |
| } |
| ) |
|
|
| if capture_details and eod_rows: |
| for row in eod_rows: |
| row["portfolio_value"] = round(float(portfolio_value), 6) |
| row["cash_eod"] = round(float(cash), 6) |
| row["weight"] = round(float(row["market_value"]) / float(portfolio_value), 6) if portfolio_value > 0 else 0.0 |
| daily_holding_records.append(row) |
|
|
| if len(daily_portfolio_values) < 10: |
| return _empty_result("Not enough daily returns") |
|
|
| pv_series = pd.Series(daily_portfolio_values, index=pd.DatetimeIndex(daily_dates_out), name="portfolio_value") |
| portfolio_returns = pv_series.pct_change().dropna() |
| if len(portfolio_returns) < 2: |
| return _empty_result("Not enough daily returns") |
|
|
| benchmark_returns = ( |
| bench_return.reindex(portfolio_returns.index).fillna(0.0) |
| if bench_return is not None |
| else pd.Series(0.0, index=portfolio_returns.index) |
| ) |
| return_frame = pd.DataFrame( |
| { |
| "portfolio_value": pv_series.reindex(portfolio_returns.index), |
| "portfolio_return": portfolio_returns.astype(float), |
| "benchmark_return": benchmark_returns.astype(float), |
| }, |
| index=portfolio_returns.index, |
| ) |
| portfolio_dates = list(pv_series.index) |
| portfolio_returns_aligned = portfolio_returns.reindex(portfolio_dates).fillna(0.0).astype(float) |
| benchmark_returns_aligned = benchmark_returns.reindex(portfolio_dates).fillna(0.0).astype(float) |
| cash_series_aligned = pd.Series( |
| { |
| pd.Timestamp(row["Date"]): float(row["cash"]) |
| for row in daily_portfolio_records |
| } |
| ) |
| holdings_count_aligned = { |
| pd.Timestamp(row["Date"]): int(row["n_held"]) |
| for row in daily_portfolio_records |
| } |
| rebalance_flags_aligned = { |
| pd.Timestamp(row["Date"]): bool(row.get("is_rebalance", False)) |
| for row in daily_portfolio_records |
| } |
| portfolio_log = _build_portfolio_log_rows( |
| dates=portfolio_dates, |
| portfolio_value=pv_series.astype(float), |
| portfolio_return=portfolio_returns_aligned, |
| benchmark_return=benchmark_returns_aligned, |
| cash_series=cash_series_aligned, |
| holdings_count=holdings_count_aligned, |
| rebalance_freq=rebalance_freq, |
| rebalance_flags=rebalance_flags_aligned, |
| ) |
|
|
| ic_frame = _compute_daily_cross_sectional_ic_frame( |
| factor_values=factor_values, |
| close_prices=close_prices, |
| label_forward_days=label_forward_days, |
| ) |
| if start_date and not ic_frame.empty: |
| ic_frame = ic_frame[ic_frame.index >= pd.Timestamp(start_date)] |
| if end_date and not ic_frame.empty: |
| ic_frame = ic_frame[ic_frame.index <= pd.Timestamp(end_date)] |
|
|
| result = _summarize_return_frame( |
| return_frame=return_frame, |
| ann_scaler=ann_scaler, |
| risk_free_rate_annual=risk_free_rate_annual, |
| ) |
| result.update(_summarize_ic_frame(ic_frame)) |
| total_transaction_cost = float(sum(stats.get("transaction_cost", 0.0) for stats in yearly_trade_stats.values())) |
| total_gross_turnover = float(sum(stats.get("gross_turnover", 0.0) for stats in yearly_trade_stats.values())) |
| result.update( |
| { |
| "success": True, |
| "final_value": round(float(pv_series.iloc[-1]), 6), |
| "error": None, |
| "rebalance_mode": DEFAULT_QLIB_REBALANCE_MODE, |
| "custom_weight_mode": normalized_custom_weight_mode, |
| "redistribute_unfilled_cash": redistribute_unfilled_cash, |
| "enforce_cash_limit": enforce_cash_limit, |
| "transaction_cost": round(total_transaction_cost, 6), |
| "gross_turnover": round(total_gross_turnover, 6), |
| "turnover_ratio": round(total_gross_turnover / max(float(start_cash), 1e-12), 6), |
| "yearly_metrics": _build_yearly_metrics( |
| return_frame=return_frame, |
| ic_frame=ic_frame, |
| yearly_trade_stats=yearly_trade_stats, |
| yearly_holding_stats=yearly_holding_stats, |
| ann_scaler=ann_scaler, |
| risk_free_rate_annual=risk_free_rate_annual, |
| ), |
| "trade_log": trade_records if capture_details else [], |
| "stock_contrib": _build_stock_contribution_summary(daily_holding_records, trade_records) if capture_details else [], |
| "holding_log": daily_holding_records if capture_details else [], |
| "portfolio_log": portfolio_log if capture_details else [], |
| } |
| ) |
| return result |
|
|
|
|
| def compute_portfolio_ir( |
| factor_values: pd.Series, |
| price_df: pd.DataFrame, |
| bench_return: pd.Series | None = None, |
| top_k: int = 10, |
| n_drop: int = 2, |
| rebalance_freq: int = 5, |
| cost_buy: float = 0.0013, |
| cost_sell: float = 0.0013, |
| hold_thresh: int = 2, |
| label_forward_days: int = 5, |
| ann_scaler: int = 252, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| risk_free_rate_annual: float = 0.0, |
| start_cash: float = 200_000_000.0, |
| position_size: float = 1.0, |
| max_pos_each_stock: float = 1.0, |
| lot_size: int = 100, |
| max_daily_volume_participation: float = 0.0, |
| max_daily_amount_participation: float = 0.0, |
| capture_details: bool = False, |
| engine: str = "custom", |
| trade_guard_config: dict[str, Any] | bool | None = None, |
| rebalance_mode: str = DEFAULT_QLIB_REBALANCE_MODE, |
| custom_weight_mode: str = "equal", |
| redistribute_unfilled_cash: bool = False, |
| enforce_cash_limit: bool = False, |
| ) -> dict: |
| normalized_engine = _normalize_backtest_engine(engine) |
| if normalized_engine == "custom": |
| return _compute_portfolio_ir_custom( |
| factor_values=factor_values, |
| price_df=price_df, |
| bench_return=bench_return, |
| top_k=top_k, |
| n_drop=n_drop, |
| rebalance_freq=rebalance_freq, |
| cost_buy=cost_buy, |
| cost_sell=cost_sell, |
| hold_thresh=hold_thresh, |
| label_forward_days=label_forward_days, |
| ann_scaler=ann_scaler, |
| start_date=start_date, |
| end_date=end_date, |
| risk_free_rate_annual=risk_free_rate_annual, |
| start_cash=start_cash, |
| position_size=position_size, |
| max_pos_each_stock=max_pos_each_stock, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| capture_details=capture_details, |
| custom_weight_mode=custom_weight_mode, |
| redistribute_unfilled_cash=redistribute_unfilled_cash, |
| enforce_cash_limit=enforce_cash_limit, |
| ) |
| if normalized_engine == "qlib_original": |
| return _compute_portfolio_ir_qlib( |
| factor_values=factor_values, |
| price_df=price_df, |
| bench_return=bench_return, |
| top_k=top_k, |
| n_drop=n_drop, |
| rebalance_freq=rebalance_freq, |
| cost_buy=cost_buy, |
| cost_sell=cost_sell, |
| hold_thresh=hold_thresh, |
| label_forward_days=label_forward_days, |
| ann_scaler=ann_scaler, |
| start_date=start_date, |
| end_date=end_date, |
| risk_free_rate_annual=risk_free_rate_annual, |
| start_cash=start_cash, |
| position_size=position_size, |
| max_pos_each_stock=max_pos_each_stock, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| capture_details=capture_details, |
| trade_guard_config=trade_guard_config, |
| rebalance_mode=rebalance_mode, |
| ) |
| if normalized_engine in {"spec_shares_cash", "spec_return_based"}: |
| from backtest.spec_bridge_backtester import compute_portfolio_ir_spec_bridge |
|
|
| return compute_portfolio_ir_spec_bridge( |
| factor_values=factor_values, |
| price_df=price_df, |
| bench_return=bench_return, |
| top_k=top_k, |
| n_drop=n_drop, |
| rebalance_freq=rebalance_freq, |
| cost_buy=cost_buy, |
| cost_sell=cost_sell, |
| hold_thresh=hold_thresh, |
| label_forward_days=label_forward_days, |
| ann_scaler=ann_scaler, |
| start_date=start_date, |
| end_date=end_date, |
| risk_free_rate_annual=risk_free_rate_annual, |
| start_cash=start_cash, |
| position_size=position_size, |
| max_pos_each_stock=max_pos_each_stock, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| capture_details=capture_details, |
| mode="shares_cash" if normalized_engine == "spec_shares_cash" else "return_based", |
| trade_guard_config=trade_guard_config, |
| rebalance_mode=rebalance_mode, |
| ) |
| return _empty_result(f"Unsupported backtest engine: {engine}") |
|
|
|
|
| def _empty_result(error: str) -> dict: |
| return { |
| "success": False, |
| "ir": 0.0, |
| "ic_mean": 0.0, |
| "ic_std": 0.0, |
| "icir": 0.0, |
| "rank_ic_mean": 0.0, |
| "rank_ic_std": 0.0, |
| "rank_icir": 0.0, |
| "n_ic_days": 0, |
| "annualized_return": 0.0, |
| "annualized_volatility": 0.0, |
| "sharpe": 0.0, |
| "winrate": 0.0, |
| "mdd": 0.0, |
| "excess_mdd": 0.0, |
| "portfolio_nav_mdd": 0.0, |
| "drawdown_duration_max": 0, |
| "drawdown_duration_mean": 0.0, |
| "drawdown_duration_median": 0.0, |
| "total_return": 0.0, |
| "performance_return": 0.0, |
| "benchmark_performance_return": 0.0, |
| "excess_compounded_return": 0.0, |
| "mean_daily_return": 0.0, |
| "std_daily_return": 0.0, |
| "n_days": 0, |
| "final_value": 1.0, |
| "yearly_metrics": {}, |
| "trade_log": [], |
| "stock_contrib": [], |
| "holding_log": [], |
| "portfolio_log": [], |
| "error": error, |
| } |
|
|