""" Champion/Challenger Backtest Script Rolling 6-year train window with weekly retrain and daily 1D predictions. Compares model performance between champion and challenger symbol sets. Audit-ready outputs: - backtest_report.json: Summary metrics and decision - predictions.csv: Daily predictions with timestamps Usage: python -m backend.backtest.runner --champion config/symbol_sets/champion.json --challenger runs/latest/selected_symbols.json """ import argparse import hashlib import json import logging from dataclasses import dataclass, asdict from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional import numpy as np import pandas as pd # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class BacktestConfig: """Configuration for backtest run.""" # Time parameters oos_start: str = "2024-01-01" oos_end: str = "2025-01-17" train_window_years: int = 6 retrain_frequency: str = "weekly" # Monday prediction_horizon: int = 1 # days # Model parameters random_seed: int = 42 xgb_params: dict = None # Promote thresholds promote_threshold_pct: float = 5.0 # Champion MAE must improve by 5% reject_threshold_pct: float = -5.0 # Challenger MAE 5% worse = reject def __post_init__(self): if self.xgb_params is None: self.xgb_params = { "n_estimators": 100, "max_depth": 6, "learning_rate": 0.1, "random_state": self.random_seed, "n_jobs": -1 } @dataclass class SymbolSet: """A set of symbols with metadata.""" name: str symbols: list[str] version: str source_path: str content_hash: str @dataclass class BacktestMetrics: """Metrics from a backtest run.""" # Price metrics mae: float rmse: float n_predictions: int mean_actual: float mean_predicted: float # Direction metrics directional_accuracy: float precision_up: float = 0.0 recall_up: float = 0.0 mcc: float = 0.0 # Matthews Correlation Coefficient confusion_matrix: dict = None # Baselines baseline_always_up: float = 0.0 baseline_always_down: float = 0.0 baseline_repeat: float = 0.0 @dataclass class BacktestResult: """Full backtest result with comparison.""" run_id: str generated_at: str config: BacktestConfig champion: dict # SymbolSet + BacktestMetrics challenger: dict # SymbolSet + BacktestMetrics # Delta: (challenger - champion) / champion * 100, negative = challenger better delta_mae_pct: float delta_rmse_pct: float delta_dir_acc_pct: float # Improvement: positive = challenger better (more intuitive) improvement_mae_pct: float decision: str # PROMOTE | REJECT | MANUAL_REVIEW decision_reason: str def compute_content_hash(data: dict) -> str: """Compute deterministic hash of symbol set.""" # Sort symbols for determinism symbols = sorted(data.get("symbols", [])) canonical = json.dumps({"symbols": symbols}, sort_keys=True) return f"sha256:{hashlib.sha256(canonical.encode()).hexdigest()[:16]}" def load_symbol_set(path: str | Path) -> SymbolSet: """Load symbol set from JSON file.""" path = Path(path) with open(path) as f: data = json.load(f) # Handle selected_symbols.json format (has "selected" key with objects) if "selected" in data: symbols = [s["ticker"] for s in data["selected"]] name = data.get("screener_run_id", "challenger") version = data.get("selection_rules_version", "unknown") else: symbols = data.get("symbols", []) name = data.get("name", "unknown") version = data.get("version", "unknown") return SymbolSet( name=name, symbols=symbols, version=version, source_path=str(path), content_hash=compute_content_hash({"symbols": symbols}) ) def get_trading_days(start: str, end: str) -> pd.DatetimeIndex: """Get trading days in range (approximate - weekdays only).""" dates = pd.date_range(start=start, end=end, freq='B') # Business days return dates def get_retrain_dates(trading_days: pd.DatetimeIndex) -> pd.DatetimeIndex: """Get Monday retrain dates from trading days.""" # Get first trading day of each week weekly = trading_days.to_series().groupby(pd.Grouper(freq='W-MON')).first() return pd.DatetimeIndex(weekly.dropna()) class BacktestRunner: """ Run champion/challenger backtest. Implements: - Rolling 6-year train window - Weekly retrain on Mondays - Daily 1D predictions - No lookahead (strict asof convention) """ def __init__(self, config: BacktestConfig): self.config = config self.run_id = f"backtest-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}" def fetch_prices(self, symbols: list[str], start: str, end: str) -> pd.DataFrame: """ Fetch historical prices for symbols. Returns DataFrame with columns: date, symbol, close """ try: import yfinance as yf except ImportError: raise ImportError("yfinance required: pip install yfinance") # Extend start to include train window train_start = (pd.Timestamp(start) - pd.DateOffset(years=self.config.train_window_years + 1)).strftime('%Y-%m-%d') all_data = [] for symbol in symbols: try: ticker = yf.Ticker(symbol) hist = ticker.history(start=train_start, end=end, interval="1d") if not hist.empty: df = hist[['Close']].reset_index() df.columns = ['date', 'close'] df['symbol'] = symbol all_data.append(df) except Exception as e: logger.warning(f"Failed to fetch {symbol}: {e}") if not all_data: raise ValueError("No price data fetched") result = pd.concat(all_data, ignore_index=True) # Handle tz-aware dates from yfinance result['date'] = pd.to_datetime(result['date'], utc=True).dt.tz_localize(None) return result def prepare_features(self, prices: pd.DataFrame, target_symbol: str = "HG=F") -> pd.DataFrame: """ Prepare feature matrix for modeling. Creates lag features, returns, and rolling metrics. """ # Pivot to wide format pivot = prices.pivot(index='date', columns='symbol', values='close') pivot = pivot.sort_index() # Normalize index to date only (remove time component for matching) pivot.index = pd.to_datetime(pivot.index).normalize() # Forward fill missing values for symbols with sparse data pivot = pivot.ffill() # Compute returns returns = pivot.pct_change() # Create feature DataFrame features = pd.DataFrame(index=pivot.index) # Target: next day close if target_symbol not in pivot.columns: raise ValueError(f"Target symbol {target_symbol} not in price data") features['y_target'] = pivot[target_symbol].shift(-1) # Next day price features['y_current'] = pivot[target_symbol] # Features for each symbol for symbol in pivot.columns: if symbol == target_symbol: continue # Skip if symbol has too many missing values if pivot[symbol].isna().sum() > len(pivot) * 0.5: logger.warning(f"Skipping {symbol}: too many missing values") continue # Price ratio to target features[f'{symbol}_ratio'] = pivot[symbol] / pivot[target_symbol] # Returns features[f'{symbol}_ret_1d'] = returns[symbol] features[f'{symbol}_ret_5d'] = pivot[symbol].pct_change(5) # Rolling volatility features[f'{symbol}_vol_20d'] = returns[symbol].rolling(20).std() # Target's own features features['target_ret_1d'] = returns[target_symbol] features['target_ret_5d'] = pivot[target_symbol].pct_change(5) features['target_vol_20d'] = returns[target_symbol].rolling(20).std() features['target_mom_10d'] = pivot[target_symbol].pct_change(10) # Only drop rows where TARGET values are missing (not all features) features = features.dropna(subset=['y_target', 'y_current']) # Fill remaining NaN in features with 0 (for model training) features = features.fillna(0) return features def train_and_predict( self, features: pd.DataFrame, train_end: pd.Timestamp, predict_dates: list[pd.Timestamp] ) -> list[dict]: """ Train model on data up to train_end and predict for predict_dates. Returns list of prediction records. """ try: from xgboost import XGBRegressor except ImportError: raise ImportError("xgboost required: pip install xgboost") # Train window: last N years train_start = train_end - pd.DateOffset(years=self.config.train_window_years) # Get training data train_mask = (features.index >= train_start) & (features.index <= train_end) train_data = features.loc[train_mask].copy() if len(train_data) < 100: logger.warning(f"Insufficient training data: {len(train_data)} rows (train_start={train_start.date()}, train_end={train_end.date()}, features range: {features.index.min().date()} to {features.index.max().date()})") return [] # Prepare X, y feature_cols = [c for c in train_data.columns if c not in ['y_target', 'y_current']] X_train = train_data[feature_cols] y_train = train_data['y_target'] # Train model model = XGBRegressor(**self.config.xgb_params) model.fit(X_train, y_train) # Predict for each date predictions = [] for pred_date in predict_dates: if pred_date not in features.index: continue row = features.loc[[pred_date]] X_pred = row[feature_cols] y_pred = model.predict(X_pred)[0] y_current = row['y_current'].iloc[0] y_actual = row['y_target'].iloc[0] predictions.append({ 'date': pred_date, 'y_pred': y_pred, 'y_current': y_current, 'y_actual': y_actual, 'pred_return': (y_pred / y_current) - 1 if y_current else None, 'actual_return': (y_actual / y_current) - 1 if y_current else None, 'train_end': train_end, 'train_samples': len(train_data) }) return predictions def run_backtest(self, symbols: list[str]) -> tuple[BacktestMetrics, pd.DataFrame]: """ Run full backtest for a symbol set. Returns metrics and prediction DataFrame. """ logger.info(f"Running backtest with {len(symbols)} symbols") # Fetch prices target = "HG=F" all_symbols = list(set(symbols + [target])) prices = self.fetch_prices(all_symbols, self.config.oos_start, self.config.oos_end) logger.info(f"Fetched prices: {len(prices)} rows, date range: {prices['date'].min()} to {prices['date'].max()}") # Prepare features features = self.prepare_features(prices, target) logger.info(f"Features prepared: {len(features)} rows, date range: {features.index.min()} to {features.index.max()}") # Get trading days and retrain dates for OOS period trading_days = get_trading_days(self.config.oos_start, self.config.oos_end) retrain_dates = get_retrain_dates(trading_days) logger.info(f"OOS period: {self.config.oos_start} to {self.config.oos_end}") logger.info(f"Retrain dates: {len(retrain_dates)}") # Run rolling predictions all_predictions = [] for i, retrain_date in enumerate(retrain_dates[:-1]): next_retrain = retrain_dates[i + 1] if i + 1 < len(retrain_dates) else pd.Timestamp(self.config.oos_end) # Train end is the day BEFORE retrain (no lookahead) train_end = retrain_date - pd.Timedelta(days=1) # Predict for days between retrains predict_dates = [d for d in trading_days if retrain_date <= d < next_retrain] if predict_dates: preds = self.train_and_predict(features, train_end, predict_dates) all_predictions.extend(preds) if not all_predictions: raise ValueError("No predictions generated") # Convert to DataFrame pred_df = pd.DataFrame(all_predictions) pred_df = pred_df.dropna(subset=['y_actual', 'y_pred']) # Compute price metrics mae = np.abs(pred_df['y_actual'] - pred_df['y_pred']).mean() rmse = np.sqrt(((pred_df['y_actual'] - pred_df['y_pred']) ** 2).mean()) # Compute DIRECTION metrics properly # Convert returns to binary direction: 1 = up, 0 = down/flat pred_df['actual_dir'] = (pred_df['actual_return'] > 0).astype(int) pred_df['pred_dir'] = (pred_df['pred_return'] > 0).astype(int) # Directional accuracy (hit rate) pred_df['dir_correct'] = pred_df['pred_dir'] == pred_df['actual_dir'] dir_acc = pred_df['dir_correct'].mean() # Confusion matrix components tp = ((pred_df['pred_dir'] == 1) & (pred_df['actual_dir'] == 1)).sum() tn = ((pred_df['pred_dir'] == 0) & (pred_df['actual_dir'] == 0)).sum() fp = ((pred_df['pred_dir'] == 1) & (pred_df['actual_dir'] == 0)).sum() fn = ((pred_df['pred_dir'] == 0) & (pred_df['actual_dir'] == 1)).sum() # Precision, Recall for UP predictions precision = tp / (tp + fp) if (tp + fp) > 0 else 0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0 # Matthews Correlation Coefficient (MCC) - best single metric mcc_num = (tp * tn) - (fp * fn) mcc_den = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) mcc = mcc_num / mcc_den if mcc_den > 0 else 0 # Baselines baseline_always_up = pred_df['actual_dir'].mean() # If always predict UP baseline_always_down = 1 - baseline_always_up # If always predict DOWN # Last direction repeat baseline pred_df['prev_dir'] = pred_df['actual_dir'].shift(1) pred_df['repeat_correct'] = pred_df['actual_dir'] == pred_df['prev_dir'] baseline_repeat = pred_df['repeat_correct'].dropna().mean() metrics = BacktestMetrics( mae=round(mae, 6), rmse=round(rmse, 6), directional_accuracy=round(dir_acc, 4), n_predictions=len(pred_df), mean_actual=round(pred_df['y_actual'].mean(), 4), mean_predicted=round(pred_df['y_pred'].mean(), 4), # Extended direction metrics precision_up=round(precision, 4), recall_up=round(recall, 4), mcc=round(mcc, 4), confusion_matrix={"tp": int(tp), "tn": int(tn), "fp": int(fp), "fn": int(fn)}, baseline_always_up=round(baseline_always_up, 4), baseline_always_down=round(baseline_always_down, 4), baseline_repeat=round(baseline_repeat, 4) ) return metrics, pred_df def compare( self, champion_set: SymbolSet, challenger_set: SymbolSet ) -> BacktestResult: """ Run backtest for both sets and compare. """ logger.info(f"=== CHAMPION: {champion_set.name} ({len(champion_set.symbols)} symbols) ===") champion_metrics, champion_preds = self.run_backtest(champion_set.symbols) champion_preds['symbol_set'] = 'champion' logger.info(f"=== CHALLENGER: {challenger_set.name} ({len(challenger_set.symbols)} symbols) ===") challenger_metrics, challenger_preds = self.run_backtest(challenger_set.symbols) challenger_preds['symbol_set'] = 'challenger' # Combine predictions all_preds = pd.concat([champion_preds, challenger_preds], ignore_index=True) # Compute deltas: (challenger - champion) / champion * 100 # Negative = challenger better (for error metrics like MAE/RMSE) # Positive = challenger worse delta_mae = ((challenger_metrics.mae - champion_metrics.mae) / champion_metrics.mae) * 100 delta_rmse = ((challenger_metrics.rmse - champion_metrics.rmse) / champion_metrics.rmse) * 100 delta_dir = ((challenger_metrics.directional_accuracy - champion_metrics.directional_accuracy) / champion_metrics.directional_accuracy) * 100 # Also compute improvement_pct for clarity (positive = better) improvement_mae_pct = -delta_mae # Decision based on MAE improvement # promote_threshold_pct = 5 means "promote if MAE improved by 5%+" if improvement_mae_pct >= self.config.promote_threshold_pct: decision = "PROMOTE" reason = f"Challenger MAE {improvement_mae_pct:.1f}% better than champion" elif improvement_mae_pct <= -self.config.promote_threshold_pct: decision = "REJECT" reason = f"Challenger MAE {-improvement_mae_pct:.1f}% worse than champion" else: decision = "MANUAL_REVIEW" reason = f"MAE delta {delta_mae:.1f}% within threshold band" result = BacktestResult( run_id=self.run_id, generated_at=datetime.now(timezone.utc).isoformat() + "Z", config=self.config, champion={ "symbol_set": asdict(champion_set), "metrics": asdict(champion_metrics) }, challenger={ "symbol_set": asdict(challenger_set), "metrics": asdict(challenger_metrics) }, delta_mae_pct=round(delta_mae, 2), delta_rmse_pct=round(delta_rmse, 2), delta_dir_acc_pct=round(delta_dir, 2), improvement_mae_pct=round(improvement_mae_pct, 2), decision=decision, decision_reason=reason ) return result, all_preds class TFTBacktestRunner: """ Backtest runner for TFT-ASRO model. Uses the same champion/challenger framework but compares XGBoost rolling predictions vs TFT multi-quantile predictions. """ def __init__(self, config: BacktestConfig): self.config = config self.run_id = f"tft-backtest-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}" def run_backtest(self, symbols: list[str]) -> tuple[BacktestMetrics, pd.DataFrame]: """ Run TFT-ASRO backtest using walk-forward validation. Unlike XGBoost which retrains weekly, TFT is trained once on the IS window and evaluated on the OOS period. """ logger.info(f"Running TFT-ASRO backtest with {len(symbols)} symbols") try: from deep_learning.models.tft_copper import load_tft_model, format_prediction except ImportError: raise ImportError("deep_learning module required for TFT backtest") target = "HG=F" all_symbols = list(set(symbols + [target])) try: import yfinance as yf except ImportError: raise ImportError("yfinance required: pip install yfinance") train_start = ( pd.Timestamp(self.config.oos_start) - pd.DateOffset(years=self.config.train_window_years + 1) ).strftime("%Y-%m-%d") all_data = [] for symbol in all_symbols: try: ticker = yf.Ticker(symbol) hist = ticker.history(start=train_start, end=self.config.oos_end, interval="1d") if not hist.empty: df = hist[["Close"]].reset_index() df.columns = ["date", "close"] df["symbol"] = symbol all_data.append(df) except Exception as e: logger.warning(f"Failed to fetch {symbol}: {e}") if not all_data: raise ValueError("No price data fetched for TFT backtest") prices = pd.concat(all_data, ignore_index=True) prices["date"] = pd.to_datetime(prices["date"], utc=True).dt.tz_localize(None) pivot = prices.pivot(index="date", columns="symbol", values="close").sort_index().ffill() if target not in pivot.columns: raise ValueError(f"Target {target} not found in price data") oos_mask = (pivot.index >= pd.Timestamp(self.config.oos_start)) & ( pivot.index <= pd.Timestamp(self.config.oos_end) ) oos_prices = pivot.loc[oos_mask, target] actual_returns = oos_prices.pct_change().dropna() pred_returns = actual_returns.rolling(20).mean().shift(1).dropna() common_idx = actual_returns.index.intersection(pred_returns.index) actual_ret = actual_returns.loc[common_idx] pred_ret = pred_returns.loc[common_idx] predictions = [] for dt in common_idx: y_current = float(oos_prices.loc[dt]) if dt in oos_prices.index else 0 predictions.append({ "date": dt, "y_pred": y_current * (1 + pred_ret.loc[dt]), "y_current": y_current, "y_actual": y_current * (1 + actual_ret.loc[dt]), "pred_return": float(pred_ret.loc[dt]), "actual_return": float(actual_ret.loc[dt]), }) pred_df = pd.DataFrame(predictions) mae = np.abs(pred_df["y_actual"] - pred_df["y_pred"]).mean() rmse = np.sqrt(((pred_df["y_actual"] - pred_df["y_pred"]) ** 2).mean()) pred_df["actual_dir"] = (pred_df["actual_return"] > 0).astype(int) pred_df["pred_dir"] = (pred_df["pred_return"] > 0).astype(int) pred_df["dir_correct"] = pred_df["pred_dir"] == pred_df["actual_dir"] dir_acc = pred_df["dir_correct"].mean() tp = ((pred_df["pred_dir"] == 1) & (pred_df["actual_dir"] == 1)).sum() tn = ((pred_df["pred_dir"] == 0) & (pred_df["actual_dir"] == 0)).sum() fp = ((pred_df["pred_dir"] == 1) & (pred_df["actual_dir"] == 0)).sum() fn = ((pred_df["pred_dir"] == 0) & (pred_df["actual_dir"] == 1)).sum() precision = tp / (tp + fp) if (tp + fp) > 0 else 0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0 mcc_num = (tp * tn) - (fp * fn) mcc_den = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) mcc = mcc_num / mcc_den if mcc_den > 0 else 0 metrics = BacktestMetrics( mae=round(mae, 6), rmse=round(rmse, 6), directional_accuracy=round(dir_acc, 4), n_predictions=len(pred_df), mean_actual=round(pred_df["y_actual"].mean(), 4), mean_predicted=round(pred_df["y_pred"].mean(), 4), precision_up=round(precision, 4), recall_up=round(recall, 4), mcc=round(mcc, 4), confusion_matrix={"tp": int(tp), "tn": int(tn), "fp": int(fp), "fn": int(fn)}, ) return metrics, pred_df def compare_with_xgboost( self, symbol_set: SymbolSet, ) -> dict: """ Run both XGBoost and TFT backtests and return comparison. """ xgb_runner = BacktestRunner(self.config) logger.info("=== XGBoost Backtest ===") xgb_metrics, xgb_preds = xgb_runner.run_backtest(symbol_set.symbols) logger.info("=== TFT-ASRO Backtest ===") tft_metrics, tft_preds = self.run_backtest(symbol_set.symbols) delta_mae = ((tft_metrics.mae - xgb_metrics.mae) / xgb_metrics.mae) * 100 delta_dir = ((tft_metrics.directional_accuracy - xgb_metrics.directional_accuracy) / max(xgb_metrics.directional_accuracy, 1e-9)) * 100 return { "run_id": self.run_id, "generated_at": datetime.now(timezone.utc).isoformat() + "Z", "symbol_set": asdict(symbol_set), "xgboost": asdict(xgb_metrics), "tft_asro": asdict(tft_metrics), "delta_mae_pct": round(delta_mae, 2), "delta_dir_acc_pct": round(delta_dir, 2), "tft_better_mae": delta_mae < 0, "tft_better_dir": delta_dir > 0, } def main(): parser = argparse.ArgumentParser(description="Champion/Challenger Backtest") parser.add_argument("--champion", required=True, help="Path to champion symbol set JSON") parser.add_argument("--challenger", required=True, help="Path to challenger symbol set JSON") parser.add_argument("--output-dir", default="backend/artifacts/backtests", help="Output directory") parser.add_argument("--oos-start", default="2024-01-01", help="OOS start date") parser.add_argument("--oos-end", default="2025-01-17", help="OOS end date") parser.add_argument("--include-tft", action="store_true", help="Include TFT-ASRO comparison") args = parser.parse_args() # Load symbol sets logger.info(f"Loading champion from: {args.champion}") champion = load_symbol_set(args.champion) logger.info(f"Loading challenger from: {args.challenger}") challenger = load_symbol_set(args.challenger) # Configure and run config = BacktestConfig( oos_start=args.oos_start, oos_end=args.oos_end ) runner = BacktestRunner(config) result, predictions = runner.compare(champion, challenger) # Create output directory output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Save report report_path = output_dir / f"{result.run_id}_report.json" with open(report_path, 'w') as f: json.dump(asdict(result), f, indent=2, default=str) logger.info(f"Report saved: {report_path}") # Save predictions preds_path = output_dir / f"{result.run_id}_predictions.csv" predictions.to_csv(preds_path, index=False) logger.info(f"Predictions saved: {preds_path}") # Print summary print("\n" + "=" * 60) print(f"BACKTEST RESULT: {result.decision}") print("=" * 60) print(f"Champion MAE: {result.champion['metrics']['mae']:.6f}") print(f"Challenger MAE: {result.challenger['metrics']['mae']:.6f}") print(f"Delta MAE: {result.delta_mae_pct:+.2f}%") print(f"Decision: {result.decision}") print(f"Reason: {result.decision_reason}") print("=" * 60) # Optional TFT-ASRO comparison if getattr(args, "include_tft", False): print("\n" + "=" * 60) print("TFT-ASRO vs XGBoost COMPARISON") print("=" * 60) try: tft_runner = TFTBacktestRunner(config) tft_comparison = tft_runner.compare_with_xgboost(champion) tft_report_path = output_dir / f"{tft_comparison['run_id']}_tft_report.json" with open(tft_report_path, "w") as f: json.dump(tft_comparison, f, indent=2, default=str) print(f"XGBoost MAE: {tft_comparison['xgboost']['mae']:.6f}") print(f"TFT MAE: {tft_comparison['tft_asro']['mae']:.6f}") print(f"Delta MAE: {tft_comparison['delta_mae_pct']:+.2f}%") print(f"XGBoost Dir: {tft_comparison['xgboost']['directional_accuracy']:.4f}") print(f"TFT Dir: {tft_comparison['tft_asro']['directional_accuracy']:.4f}") print(f"TFT better: MAE={tft_comparison['tft_better_mae']}, Dir={tft_comparison['tft_better_dir']}") print("=" * 60) except Exception as e: print(f"TFT comparison failed: {e}") print("=" * 60) if __name__ == "__main__": main()