"""Robustness-focused factor execution engine. Loads daily_pv.h5 into memory, parses factor expressions via expr_parser, executes them using function_lib operators, and computes IC/IR metrics. Supports temporal data splits for proper train/val/test evaluation: - Factor computation uses ALL data (needs lookback for TS operators) - IC/IR metrics are computed only on the specified period's date range This module is intentionally independent from the stable production path in `backtest/factor_executor.py`. Use it for robustness experiments where we want to vary execution or portfolio construction assumptions without touching the current baseline backtest flow. """ import os import sys import time import traceback from pathlib import Path import numpy as np import pandas as pd # Add project root to path PROJECT_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from expression_manager.expr_parser import parse_expression, parse_symbol from expression_manager import function_lib # Default data path DATA_PATH = Path(__file__).resolve().parent / "data" / "daily_pv.h5" # Python keywords that can't be used as variable names _PYTHON_KEYWORDS = {'return', 'class', 'import', 'from', 'def', 'if', 'else', 'for', 'while', 'try', 'except', 'with', 'as', 'in', 'is', 'not', 'and', 'or', 'pass', 'break', 'continue', 'yield', 'lambda', 'global', 'nonlocal', 'del', 'raise', 'assert'} # Cached dataframe (full data — used for factor computation) _cached_df: pd.DataFrame | None = None _cached_data_path: Path | None = None _cached_columns: dict[str, pd.Series] = {} _cached_backtest_price_df: pd.DataFrame | None = None _cached_bench_return: pd.Series | None = None # Temporal split config: period_name -> (start_date, end_date) # Dates are inclusive. Set via configure_periods(). _period_ranges: dict[str, tuple[str, str]] = {} # Cached boolean masks per period (index-aligned with _cached_df) _period_masks: dict[str, np.ndarray] = {} _SCORE_TRANSFORM_ALIASES = { "": "identity", "identity": "identity", "none": "identity", "raw": "identity", "rank": "rank", "pct_rank": "rank", "zscore": "zscore", "rank_zscore": "rank_zscore", "signed": "signed", "sign": "signed", "clip_zscore": "clip_zscore", } _UNIVERSE_FILTER_ALIASES = { "": "none", "none": "none", "all": "none", "top_amount": "top_amount", "amount": "top_amount", "top_volume": "top_volume", "volume": "top_volume", } def configure_periods(periods: dict[str, dict[str, str]]) -> None: """Configure temporal date ranges for train/val/test splits. Args: periods: dict like {"train": {"start": "2016-01-01", "end": "2020-12-31"}, "val": {"start": "2021-01-01", "end": "2021-12-31"}, "test": {"start": "2022-01-01", "end": "2026-12-31"}} """ global _period_ranges, _period_masks _period_ranges.clear() _period_masks.clear() for name, cfg in periods.items(): _period_ranges[name] = (cfg["start"], cfg["end"]) print(f"[FactorExecutor] Configured periods: {_period_ranges}") # Rebuild masks if data already loaded if _cached_df is not None: _build_period_masks() def _build_period_masks() -> None: """Build boolean masks for each configured period.""" global _period_masks _period_masks.clear() if _cached_df is None or not _period_ranges: return dates = _cached_df.index.get_level_values("datetime") for name, (start, end) in _period_ranges.items(): mask = (dates >= pd.Timestamp(start)) & (dates <= pd.Timestamp(end)) mask_arr = mask.values if hasattr(mask, 'values') else np.asarray(mask) _period_masks[name] = mask_arr n_rows = mask_arr.sum() n_days = dates[mask_arr].nunique() print(f"[FactorExecutor] Period '{name}': {start} to {end} — {n_days} days, {n_rows} rows") def _normalize_score_transform(mode: str | None) -> str: key = str(mode or "").strip().lower() if key not in _SCORE_TRANSFORM_ALIASES: choices = ", ".join(sorted(set(_SCORE_TRANSFORM_ALIASES.values()))) raise ValueError(f"Unsupported score_transform={mode!r}. Choose one of: {choices}") return _SCORE_TRANSFORM_ALIASES[key] def _normalize_universe_filter(mode: str | None) -> str: key = str(mode or "").strip().lower() if key not in _UNIVERSE_FILTER_ALIASES: choices = ", ".join(sorted(set(_UNIVERSE_FILTER_ALIASES.values()))) raise ValueError(f"Unsupported universe_filter={mode!r}. Choose one of: {choices}") return _UNIVERSE_FILTER_ALIASES[key] def _cross_sectional_zscore(series: pd.Series) -> pd.Series: grouped = series.groupby(level="datetime") mean = grouped.transform("mean") std = grouped.transform("std").replace(0.0, np.nan) z = (series - mean) / std return z.replace([np.inf, -np.inf], np.nan).fillna(0.0) def _apply_score_transform( factor_values: pd.Series, *, score_transform: str, score_clip: float | None = None, ) -> pd.Series: mode = _normalize_score_transform(score_transform) out = pd.to_numeric(factor_values, errors="coerce").astype(float) if mode == "identity": return out if mode == "rank": return out.groupby(level="datetime").rank(method="average", pct=True) if mode == "zscore": return _cross_sectional_zscore(out) if mode == "rank_zscore": ranked = out.groupby(level="datetime").rank(method="average", pct=True) return _cross_sectional_zscore(ranked) if mode == "signed": return np.sign(out).astype(float) if mode == "clip_zscore": z = _cross_sectional_zscore(out) clip = float(score_clip) if score_clip is not None and np.isfinite(score_clip) else 3.0 clip = max(clip, 0.0) return z.clip(-clip, clip) return out def _build_liquidity_universe_mask( liquidity_series: pd.Series, *, top_n: int, lookback_days: int, ) -> pd.Series: if top_n <= 0: return pd.Series(True, index=liquidity_series.index) wide = liquidity_series.unstack("instrument").sort_index() lookback = max(int(lookback_days), 1) rolling_metric = wide.rolling(lookback, min_periods=1).mean().shift(1) ranks = rolling_metric.rank(axis=1, method="first", ascending=False) keep = ranks <= int(top_n) mask = keep.stack(dropna=False) mask.index.names = liquidity_series.index.names return mask.reindex(liquidity_series.index).fillna(False).astype(bool) def _apply_universe_filter( factor_values: pd.Series, *, universe_filter: str, universe_top_n: int, universe_lookback_days: int, ) -> pd.Series: mode = _normalize_universe_filter(universe_filter) if mode == "none" or universe_top_n <= 0: return factor_values if mode == "top_amount": liquidity_series = _cached_columns.get("$amount") else: liquidity_series = _cached_columns.get("$volume") if liquidity_series is None: raise ValueError( f"Universe filter {mode!r} requires {'$amount' if mode == 'top_amount' else '$volume'} in the dataset." ) mask = _build_liquidity_universe_mask( pd.to_numeric(liquidity_series, errors="coerce").astype(float), top_n=int(universe_top_n), lookback_days=int(universe_lookback_days), ) return factor_values.where(mask.reindex(factor_values.index).fillna(False)) def _resolve_data_path(data_path: str | Path | None = None) -> Path: if data_path: return Path(data_path).expanduser().resolve() for env_name in ("ALPHAEVO_DATA_PATH", "AAE_DATA_PATH", "DAILY_PV_PATH"): env_value = os.environ.get(env_name, "").strip() if env_value: return Path(env_value).expanduser().resolve() return DATA_PATH.resolve() def _reset_cached_data() -> None: global _cached_df, _cached_data_path, _cached_columns, _cached_backtest_price_df, _cached_bench_return _cached_df = None _cached_data_path = None _cached_columns = {} _cached_backtest_price_df = None _cached_bench_return = None def load_data(data_path: str | Path | None = None) -> pd.DataFrame: """Load daily_pv.h5 into memory (cached on first call).""" global _cached_df, _cached_data_path, _cached_columns, _cached_backtest_price_df, _cached_bench_return path = _resolve_data_path(data_path) if _cached_df is not None and _cached_data_path is not None and _cached_data_path != path: print(f"[FactorExecutor] Data path changed: {_cached_data_path} -> {path}; reloading cache") _reset_cached_data() if _cached_df is not None: if _cached_backtest_price_df is None: _cached_backtest_price_df = _cached_df[[col for col in ('$open', '$close', '$volume', '$amount') if col in _cached_df.columns]] if _cached_bench_return is None and '$bench_return' in _cached_columns: _cached_bench_return = _cached_columns['$bench_return'].groupby('datetime').first() return _cached_df print(f"[FactorExecutor] Loading data from {path}...") _cached_df = pd.read_hdf(str(path)) _cached_data_path = path # Pre-extract column Series for fast access during expression execution for col in _cached_df.columns: _cached_columns[col] = _cached_df[col] _cached_backtest_price_df = _cached_df[[col for col in ('$open', '$close', '$volume', '$amount') if col in _cached_df.columns]] if '$bench_return' in _cached_columns: _cached_bench_return = _cached_columns['$bench_return'].groupby('datetime').first() print(f"[FactorExecutor] Loaded {_cached_df.shape[0]} rows, " f"{len(_cached_df.columns)} columns, " f"instruments: {_cached_df.index.get_level_values('instrument').nunique()}") # Build period masks if periods already configured if _period_ranges: _build_period_masks() return _cached_df def execute_expression( factor_expr: str, data_path: str | Path | None = None, period: str | None = None, start_date: str | None = None, end_date: str | None = None, label_forward_days: int = 5, top_k: int = 10, n_drop: int = 2, start_cash: float = 200_000_000.0, position_size: float = 1.0, max_pos_each_stock: float = 1.0, lot_size: int = 100, max_daily_volume_participation: float = 0.0, max_daily_amount_participation: float = 0.0, rebalance_freq: int = 5, cost_buy: float = 0.0013, cost_sell: float = 0.0013, backtest_engine: str = "custom", capture_details: bool = False, trade_guard_config: dict | bool | None = None, rebalance_mode: str = "dropout", custom_weight_mode: str = "equal", redistribute_unfilled_cash: bool = False, enforce_cash_limit: bool = False, score_transform: str = "identity", score_clip: float = 3.0, universe_filter: str = "none", universe_top_n: int = 0, universe_lookback_days: int = 20, ) -> dict: """Execute a factor expression and compute IC/IR metrics. Args: factor_expr: Factor expression string, e.g. "RANK($close)" data_path: Optional path to daily_pv.h5 period: Optional period name ("train", "val", "test"). If set, IC/IR is computed only on that period's date range. Factor values are still computed on ALL data (TS operators need lookback). If None, uses all data. start_date: Optional explicit evaluation start date override end_date: Optional explicit evaluation end date override label_forward_days: Forward horizon for IC / RankIC computation capture_details: If True, return yearly metrics plus trade/stock detail artifacts trade_guard_config: Optional qlib_original trade guard config. None disables guards. Ignored by custom/spec engines. rebalance_mode: qlib_original rebalance mode ("dropout", "sell_all", or "target_weight"). Ignored by custom/spec engines. custom_weight_mode: custom engine buy sizing mode ("equal" or "alpha_score"). Ignored by qlib_original/spec engines. redistribute_unfilled_cash: When True, custom engine carries an unfilled buy budget down the remaining ranked names within the selected top-k. Ignored by qlib_original/spec engines. enforce_cash_limit: When False, custom engine allows buy-side cash to go negative instead of clipping orders by available cash. Ignored by qlib_original/spec engines. score_transform: Optional cross-sectional transform applied to raw scores. score_clip: Clip threshold used by score_transform="clip_zscore". universe_filter: Optional liquidity-derived universe filter. universe_top_n: Keep top-N names after applying universe_filter. universe_lookback_days: Rolling lookback used by liquidity-derived universes. Returns: dict with keys: success, score (IR), ic_mean, ic_std, ir, error, exec_time """ start_time = time.time() try: # Load data df = load_data(data_path) # Step 1: Parse expression into executable Python code parsed_code = parse_expression(factor_expr) # Step 2: Replace $variables with safe Python variable names # Build a safe name mapping: $return → col_return (avoid Python keywords) columns = [col for col in df.columns] safe_name_map = {} for col in columns: clean = col.replace('$', '') if clean in _PYTHON_KEYWORDS: safe_name_map[col] = f'col_{clean}' else: safe_name_map[col] = clean # First do standard parse_symbol to strip $ executable_code = parse_symbol(parsed_code, columns) # Then fix any Python keyword collisions for col in columns: clean = col.replace('$', '') safe = safe_name_map[col] if clean != safe: # Replace the bare keyword with the safe name # Use word-boundary replacement to avoid partial matches import re as _re executable_code = _re.sub( r'\b' + _re.escape(clean) + r'\b', safe, executable_code ) # Step 3: Build execution namespace with function_lib functions + data columns exec_namespace = {} # Import all functions from function_lib for name in dir(function_lib): obj = getattr(function_lib, name) if callable(obj) and not name.startswith('_'): exec_namespace[name] = obj # Add numpy and pandas exec_namespace['np'] = np exec_namespace['pd'] = pd # Add column data using safe variable names for col in columns: safe_name = safe_name_map[col] exec_namespace[safe_name] = _cached_columns[col].copy() # Step 4: Execute the parsed expression factor_values = eval(executable_code, exec_namespace) # Ensure result is a Series with the same index as data if isinstance(factor_values, pd.DataFrame): factor_values = factor_values.iloc[:, 0] elif isinstance(factor_values, np.ndarray): factor_values = pd.Series(factor_values, index=df.index) # Step 5: Compute portfolio-based IR (Qlib-style) from backtest.robust_qlib_backtester import build_signal_selection_log, compute_portfolio_ir if not isinstance(factor_values, pd.Series): factor_values = pd.Series(factor_values, index=df.index) factor_values = _apply_score_transform( factor_values, score_transform=score_transform, score_clip=score_clip, ) factor_values = _apply_universe_filter( factor_values, universe_filter=universe_filter, universe_top_n=universe_top_n, universe_lookback_days=universe_lookback_days, ) # Determine evaluation period eval_start_date = start_date eval_end_date = end_date if (eval_start_date is None or eval_end_date is None) and period and period in _period_ranges: period_start, period_end = _period_ranges[period] eval_start_date = eval_start_date or period_start eval_end_date = eval_end_date or period_end # Get benchmark return bench_return = _cached_bench_return result = compute_portfolio_ir( factor_values=factor_values, price_df=_cached_backtest_price_df if _cached_backtest_price_df is not None else df[[col for col in ('$open', '$close', '$volume', '$amount') if col in df.columns]], bench_return=bench_return, top_k=top_k, n_drop=n_drop, rebalance_freq=rebalance_freq, cost_buy=cost_buy, cost_sell=cost_sell, hold_thresh=2, label_forward_days=label_forward_days, ann_scaler=252, start_date=eval_start_date, end_date=eval_end_date, start_cash=start_cash, position_size=position_size, max_pos_each_stock=max_pos_each_stock, lot_size=lot_size, max_daily_volume_participation=max_daily_volume_participation, max_daily_amount_participation=max_daily_amount_participation, capture_details=capture_details, engine=backtest_engine, trade_guard_config=trade_guard_config, rebalance_mode=rebalance_mode, custom_weight_mode=custom_weight_mode, redistribute_unfilled_cash=redistribute_unfilled_cash, enforce_cash_limit=enforce_cash_limit, ) signal_selection_log = [] if capture_details and result.get('success'): try: signal_selection_log = build_signal_selection_log( factor_values=factor_values, price_df=_cached_backtest_price_df if _cached_backtest_price_df is not None else df[[col for col in ('$open', '$close', '$volume', '$amount') if col in df.columns]], top_k=top_k, start_date=eval_start_date, end_date=eval_end_date, backtest_engine=backtest_engine, trade_guard_config=trade_guard_config, holding_log=result.get('holding_log', []), trade_log=result.get('trade_log', []), portfolio_log=result.get('portfolio_log', []), ) except Exception: signal_selection_log = [] exec_time = time.time() - start_time return { 'success': result['success'], 'score': float(result['ir']), 'ic_mean': float(result.get('ic_mean', 0.0)), 'ic_std': float(result.get('ic_std', 0.0)), 'icir': float(result.get('icir', 0.0)), 'ir': float(result['ir']), 'rank_ic_mean': float(result.get('rank_ic_mean', 0.0)), 'rank_ic_std': float(result.get('rank_ic_std', 0.0)), 'rank_icir': float(result.get('rank_icir', 0.0)), 'annualized_return': float(result.get('annualized_return', 0.0)), 'annualized_volatility': float(result.get('annualized_volatility', 0.0)), 'performance_return': float(result.get('performance_return', 0.0)), 'benchmark_performance_return': float(result.get('benchmark_performance_return', 0.0)), 'excess_compounded_return': float(result.get('excess_compounded_return', 0.0)), 'sharpe': float(result.get('sharpe', 0.0)), 'winrate': float(result.get('winrate', 0.0)), 'mdd': float(result.get('mdd', 0.0)), 'excess_mdd': float(result.get('excess_mdd', result.get('mdd', 0.0))), 'portfolio_nav_mdd': float(result.get('portfolio_nav_mdd', 0.0)), 'drawdown_duration_max': int(result.get('drawdown_duration_max', 0) or 0), 'drawdown_duration_mean': float(result.get('drawdown_duration_mean', 0.0)), 'drawdown_duration_median': float(result.get('drawdown_duration_median', 0.0)), 'total_return': float(result.get('total_return', 0.0)), 'final_value': float(result.get('final_value', 0.0)), 'n_days': result.get('n_days', 0), 'n_ic_days': result.get('n_ic_days', 0), 'yearly_metrics': result.get('yearly_metrics', {}), 'trade_log': result.get('trade_log', []), 'stock_contrib': result.get('stock_contrib', []), 'holding_log': result.get('holding_log', []), 'portfolio_log': result.get('portfolio_log', []), 'signal_selection_log': signal_selection_log, 'backtest_engine': backtest_engine, 'label_forward_days': label_forward_days, 'qlib_warnings': result.get('qlib_warnings', []), 'trade_guard_config': result.get('trade_guard_config'), 'rebalance_mode': result.get('rebalance_mode', rebalance_mode), 'rebalance_freq': int(rebalance_freq), 'top_k': int(top_k), 'n_drop': int(n_drop), 'custom_weight_mode': result.get('custom_weight_mode', custom_weight_mode), 'redistribute_unfilled_cash': bool(result.get('redistribute_unfilled_cash', redistribute_unfilled_cash)), 'enforce_cash_limit': bool(result.get('enforce_cash_limit', enforce_cash_limit)), 'cost_buy': float(cost_buy), 'cost_sell': float(cost_sell), 'score_transform': _normalize_score_transform(score_transform), 'score_clip': float(score_clip), 'universe_filter': _normalize_universe_filter(universe_filter), 'universe_top_n': int(universe_top_n), 'universe_lookback_days': int(universe_lookback_days), 'transaction_cost': float(result.get('transaction_cost', 0.0)), 'gross_turnover': float(result.get('gross_turnover', 0.0)), 'turnover_ratio': float(result.get('turnover_ratio', 0.0)), 'error': result.get('error'), 'exec_time': round(exec_time, 3), } except Exception as e: exec_time = time.time() - start_time return { 'success': False, 'score': 0.0, 'ic_mean': 0.0, 'ic_std': 0.0, 'icir': 0.0, 'ir': 0.0, 'rank_ic_mean': 0.0, 'rank_ic_std': 0.0, 'rank_icir': 0.0, 'annualized_return': 0.0, 'annualized_volatility': 0.0, 'performance_return': 0.0, 'benchmark_performance_return': 0.0, 'excess_compounded_return': 0.0, 'sharpe': 0.0, 'winrate': 0.0, 'mdd': 0.0, 'excess_mdd': 0.0, 'portfolio_nav_mdd': 0.0, 'drawdown_duration_max': 0, 'drawdown_duration_mean': 0.0, 'drawdown_duration_median': 0.0, 'total_return': 0.0, 'final_value': 0.0, 'n_days': 0, 'n_ic_days': 0, 'yearly_metrics': {}, 'trade_log': [], 'stock_contrib': [], 'holding_log': [], 'portfolio_log': [], 'signal_selection_log': [], 'backtest_engine': backtest_engine, 'label_forward_days': label_forward_days, 'qlib_warnings': [], 'trade_guard_config': None, 'rebalance_mode': rebalance_mode, 'rebalance_freq': int(rebalance_freq), 'top_k': int(top_k), 'n_drop': int(n_drop), 'custom_weight_mode': custom_weight_mode, 'redistribute_unfilled_cash': bool(redistribute_unfilled_cash), 'enforce_cash_limit': bool(enforce_cash_limit), 'cost_buy': float(cost_buy), 'cost_sell': float(cost_sell), 'score_transform': _normalize_score_transform(score_transform), 'score_clip': float(score_clip), 'universe_filter': _normalize_universe_filter(universe_filter), 'universe_top_n': int(universe_top_n), 'universe_lookback_days': int(universe_lookback_days), 'transaction_cost': 0.0, 'gross_turnover': 0.0, 'turnover_ratio': 0.0, 'error': f"{type(e).__name__}: {str(e)[:500]}", 'exec_time': round(exec_time, 3), } if __name__ == "__main__": # Quick test test_exprs = [ "RANK($close)", "RANK(TS_MEAN($return, 5) / (TS_STD($return, 20) + 1e-8)) * SIGN(DELTA($close, 5) / ($close + 1e-8))", "TS_CORR(RANK($volume), RANK($close), 10) - TS_CORR(RANK($volume), RANK($close), 40)", ] for expr in test_exprs: print(f"\nExpression: {expr}") result = execute_expression(expr) print(f"Result: {result}")