Spaces:
Sleeping
Sleeping
| """ | |
| backtester.py — Portfolio simulation engine. | |
| Signals come from the Sniper model. Execution logic is improved from backtest-old.py: | |
| - ATR-based stops/targets (mirrors trainer labels instead of fixed %) | |
| - Volatility-adjusted or equal-weight position sizing | |
| - Compound or realistic (profit-withdrawal) account modes | |
| - Cooldown periods, max positions, benchmark overlay | |
| """ | |
| import gc | |
| import logging | |
| from dataclasses import dataclass | |
| from datetime import timedelta | |
| from typing import Callable, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from src.features import build_features, compute_atr | |
| from src.data_loader import extract_market_series, get_current_regime | |
| from src.registry import ArtifactBundle, predict_proba | |
| logger = logging.getLogger("SniperBacktest") | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| class BacktestConfig: | |
| # Date range | |
| start_date: str = "2021-01-01" | |
| end_date: str = "2024-12-31" | |
| # Capital | |
| initial_cash: float = 10_000.0 | |
| # Signal | |
| conviction_threshold: float = 0.50 | |
| use_regime_routing: bool = True | |
| # Risk / position | |
| max_positions: int = 5 | |
| pt_multiplier: float = 3.0 # Take-profit: entry + N × ATR | |
| sl_multiplier: float = 0.5 # Stop-loss: entry - N × ATR | |
| atr_period: int = 14 | |
| horizon_days: int = 15 | |
| cooldown_days: int = 2 | |
| # Sizing | |
| sizing_mode: str = "volatility_adjusted" # or "equal_weight" | |
| risk_fraction: float = 0.02 # fraction of NAV risked per trade | |
| # Costs | |
| transaction_pct: float = 0.001 # 0.1% round-trip each side | |
| # Account mode | |
| account_mode: str = "compound" # or "realistic" | |
| withdrawal_fraction: float = 0.20 # fraction of profit withdrawn (realistic mode) | |
| # Confluence filter | |
| use_confluence: bool = False | |
| min_confluence_score: int = 3 | |
| # Benchmark | |
| benchmark: str = "SPY" # "SPY", "QQQ", or "None" | |
| PRESETS = { | |
| "Conservative": BacktestConfig( | |
| conviction_threshold=0.65, | |
| max_positions=3, | |
| sl_multiplier=0.35, | |
| pt_multiplier=2.0, | |
| horizon_days=20, | |
| sizing_mode="volatility_adjusted", | |
| risk_fraction=0.01, | |
| ), | |
| "Balanced": BacktestConfig(), | |
| "Aggressive": BacktestConfig( | |
| conviction_threshold=0.35, | |
| max_positions=10, | |
| sl_multiplier=0.75, | |
| pt_multiplier=4.0, | |
| horizon_days=10, | |
| sizing_mode="equal_weight", | |
| risk_fraction=0.04, | |
| ), | |
| "Paper test": BacktestConfig( | |
| conviction_threshold=0.30, | |
| max_positions=15, | |
| sl_multiplier=1.0, | |
| pt_multiplier=3.0, | |
| horizon_days=7, | |
| sizing_mode="equal_weight", | |
| risk_fraction=0.05, | |
| ), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Result containers | |
| # --------------------------------------------------------------------------- | |
| class BacktestResult: | |
| nav_df: pd.DataFrame # Date, NAV, Exchange Cash, Personal Cash, Holdings | |
| trades_df: pd.DataFrame # closed trades | |
| benchmark_df: pd.DataFrame # benchmark NAV (normalized) | |
| metrics: dict | |
| config: BacktestConfig | |
| n_tickers_processed: int = 0 | |
| warnings: list = None | |
| def __post_init__(self): | |
| if self.warnings is None: | |
| self.warnings = [] | |
| # --------------------------------------------------------------------------- | |
| # Core engine | |
| # --------------------------------------------------------------------------- | |
| def run_backtest( | |
| ticker_data: dict[str, pd.DataFrame], | |
| bundle: ArtifactBundle, | |
| config: BacktestConfig, | |
| progress_cb: Callable = None, | |
| ) -> BacktestResult: | |
| """ | |
| Main backtest loop. ticker_data must already be downloaded and filtered. | |
| bundle must already be loaded (call registry.load_bundle first). | |
| """ | |
| def _cb(msg, frac=None): | |
| if progress_cb: | |
| progress_cb(msg, frac) | |
| logger.info(msg) | |
| _cb("Building feature matrix for all tickers...", 0.38) | |
| vix_data, sp500_data = extract_market_series(ticker_data) | |
| feature_list = bundle.feature_list | |
| # ----------------------------------------------------------------------- | |
| # Step 1: Build features & generate signals for all tickers | |
| # ----------------------------------------------------------------------- | |
| signal_cache: dict[pd.Timestamp, list] = {} # date -> [{ticker, prob, price, atr}] | |
| process_tickers = [t for t in ticker_data if not t.startswith("^")] | |
| n_proc = len(process_tickers) | |
| for i, ticker in enumerate(process_tickers): | |
| if i % 50 == 0: | |
| frac = 0.38 + 0.30 * (i / max(1, n_proc)) | |
| _cb(f"Engineering features: {ticker} ({i+1}/{n_proc})...", frac) | |
| df = ticker_data[ticker] | |
| try: | |
| feat = build_features(df, vix_data=vix_data, sp500_data=sp500_data) | |
| except Exception as e: | |
| logger.warning(f"Feature build failed for {ticker}: {e}") | |
| continue | |
| if feature_list: | |
| missing = [f for f in feature_list if f not in feat.columns] | |
| for m in missing: | |
| feat[m] = 0.0 | |
| feat = feat[feature_list] | |
| feat_clean = feat.fillna(0).replace([float("inf"), float("-inf")], 0) | |
| # ATR for sizing/stops | |
| try: | |
| atr_series = compute_atr(df, period=config.atr_period) | |
| except Exception: | |
| atr_series = pd.Series(np.nan, index=df.index) | |
| # Regime | |
| use_regime = config.use_regime_routing and bundle.has_regime_models | |
| try: | |
| probas = predict_proba( | |
| bundle, feat_clean, | |
| use_regime=use_regime, | |
| sp500_above_sma=(sp500_data is not None), | |
| vix_high=False, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Prediction failed for {ticker}: {e}") | |
| continue | |
| # Build per-date signals | |
| for j, date in enumerate(feat_clean.index): | |
| prob = float(probas[j]) | |
| if prob < config.conviction_threshold: | |
| continue | |
| # price = Close on that date | |
| if date not in df.index: | |
| continue | |
| price = float(df.loc[date, "Close"]) | |
| atr_val = float(atr_series.get(date, np.nan)) if hasattr(atr_series, "get") else float(atr_series.loc[date] if date in atr_series.index else np.nan) | |
| if np.isnan(price) or price <= 0: | |
| continue | |
| if np.isnan(atr_val): | |
| atr_val = price * 0.02 # fallback 2% | |
| ts = pd.Timestamp(date) | |
| if ts not in signal_cache: | |
| signal_cache[ts] = [] | |
| signal_cache[ts].append({ | |
| "ticker": ticker, | |
| "prob": prob, | |
| "price": price, | |
| "atr": atr_val, | |
| }) | |
| _cb(f"Signal generation complete. {len(signal_cache)} active signal days.", 0.70) | |
| # ----------------------------------------------------------------------- | |
| # Step 2: Portfolio simulation — day by day | |
| # ----------------------------------------------------------------------- | |
| _cb("Running portfolio simulation...", 0.72) | |
| start_ts = pd.Timestamp(config.start_date) | |
| end_ts = pd.Timestamp(config.end_date) | |
| all_dates = sorted(signal_cache.keys()) | |
| if not all_dates: | |
| all_dates = pd.date_range(config.start_date, config.end_date, freq="B").tolist() | |
| # Add all business days in range (even if no signals) | |
| date_range = pd.date_range(config.start_date, config.end_date, freq="B") | |
| all_dates_set = set(all_dates) | |
| sim_dates = sorted(set(date_range.tolist()) | all_dates_set) | |
| sim_dates = [d for d in sim_dates if start_ts <= d <= end_ts] | |
| exchange_cash = config.initial_cash | |
| personal_cash = 0.0 | |
| positions = [] # list of dicts | |
| cooldowns: dict[str, pd.Timestamp] = {} | |
| nav_history = [] | |
| closed_trades = [] | |
| for current_date in sim_dates: | |
| current_holdings_val = 0.0 | |
| active_positions = [] | |
| # ---- Exit logic ---- | |
| for pos in positions: | |
| ticker = pos["ticker"] | |
| df_t = ticker_data.get(ticker) | |
| if df_t is None: | |
| active_positions.append(pos) | |
| continue | |
| idx_matches = df_t.index.get_indexer([current_date], method="ffill") | |
| if idx_matches[0] < 0: | |
| active_positions.append(pos) | |
| continue | |
| row_idx = idx_matches[0] | |
| curr_high = float(df_t.iloc[row_idx]["High"]) | |
| curr_low = float(df_t.iloc[row_idx]["Low"]) | |
| curr_price = float(df_t.iloc[row_idx]["Close"]) | |
| days_held = (current_date - pos["entry_date"]).days | |
| exit_signal = False | |
| exit_reason = "" | |
| exit_price = curr_price | |
| # Priority 1: Stop loss | |
| if curr_low <= pos["stop_price"]: | |
| exit_signal = True | |
| exit_reason = "Stop Loss" | |
| exit_price = pos["stop_price"] | |
| # Priority 2: Take profit | |
| elif curr_high >= pos["take_price"]: | |
| exit_signal = True | |
| exit_reason = "Take Profit" | |
| exit_price = pos["take_price"] | |
| # Priority 3: Time horizon | |
| elif days_held >= config.horizon_days: | |
| exit_signal = True | |
| exit_reason = "Time Horizon" | |
| exit_price = curr_price | |
| if exit_signal: | |
| gross = pos["shares"] * exit_price | |
| cost_basis = pos["shares"] * pos["entry_price"] * (1 + config.transaction_pct) | |
| net_proceeds = gross * (1 - config.transaction_pct) | |
| net_profit = net_proceeds - cost_basis | |
| if net_profit > 0: | |
| if config.account_mode == "compound": | |
| withdrawal = 0.0 | |
| else: | |
| withdrawal = net_profit * config.withdrawal_fraction | |
| reinvest = net_profit - withdrawal | |
| personal_cash += withdrawal | |
| exchange_cash += (cost_basis + reinvest) | |
| else: | |
| exchange_cash += net_proceeds | |
| closed_trades.append({ | |
| "Ticker": ticker, | |
| "Entry Date": pos["entry_date"].date(), | |
| "Exit Date": current_date.date(), | |
| "Exit Reason": exit_reason, | |
| "Entry Price": round(pos["entry_price"], 4), | |
| "Exit Price": round(exit_price, 4), | |
| "Shares": pos["shares"], | |
| "Profit $": round(net_profit, 2), | |
| "Return %": round((exit_price / pos["entry_price"] - 1) * 100, 2), | |
| "Days Held": days_held, | |
| "Entry Prob": round(pos.get("entry_prob", 0), 4), | |
| }) | |
| cooldowns[ticker] = current_date + timedelta(days=config.cooldown_days) | |
| else: | |
| active_positions.append(pos) | |
| current_holdings_val += pos["shares"] * curr_price | |
| positions = active_positions | |
| total_nav = exchange_cash + current_holdings_val + personal_cash | |
| # ---- Entry logic ---- | |
| if len(positions) < config.max_positions and current_date in signal_cache: | |
| todays_signals = sorted( | |
| signal_cache[current_date], key=lambda x: x["prob"], reverse=True | |
| ) | |
| for sig in todays_signals: | |
| if len(positions) >= config.max_positions: | |
| break | |
| ticker = sig["ticker"] | |
| if ticker in cooldowns and current_date < cooldowns[ticker]: | |
| continue | |
| if any(p["ticker"] == ticker for p in positions): | |
| continue | |
| price = sig["price"] | |
| atr_val = sig["atr"] | |
| stop_price = price - config.sl_multiplier * atr_val | |
| take_price = price + config.pt_multiplier * atr_val | |
| risk_per_share = max(price - stop_price, price * 0.001) | |
| if config.sizing_mode == "volatility_adjusted": | |
| risk_amt = total_nav * config.risk_fraction | |
| shares = max(0, int(risk_amt / risk_per_share)) | |
| else: | |
| slots_free = config.max_positions - len(positions) | |
| alloc = exchange_cash / max(1, slots_free) | |
| shares = max(0, int(alloc / price)) | |
| if shares < 1: | |
| continue | |
| cost = shares * price * (1 + config.transaction_pct) | |
| if cost > exchange_cash: | |
| continue | |
| exchange_cash -= cost | |
| positions.append({ | |
| "ticker": ticker, | |
| "entry_date": current_date, | |
| "entry_price": price, | |
| "stop_price": stop_price, | |
| "take_price": take_price, | |
| "shares": shares, | |
| "entry_prob": sig["prob"], | |
| }) | |
| # NAV snapshot | |
| total_nav = exchange_cash + current_holdings_val + personal_cash | |
| nav_history.append({ | |
| "Date": current_date.date(), | |
| "NAV": round(total_nav, 2), | |
| "Exchange Cash": round(exchange_cash, 2), | |
| "Holdings": round(current_holdings_val, 2), | |
| "Personal Cash": round(personal_cash, 2), | |
| "Open Positions": len(positions), | |
| }) | |
| _cb("Simulation complete. Computing metrics...", 0.92) | |
| nav_df = pd.DataFrame(nav_history) | |
| trades_df = pd.DataFrame(closed_trades) if closed_trades else pd.DataFrame() | |
| # ----------------------------------------------------------------------- | |
| # Step 3: Benchmark | |
| # ----------------------------------------------------------------------- | |
| benchmark_df = _build_benchmark( | |
| config.benchmark, config.start_date, config.end_date, config.initial_cash, ticker_data | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # Step 4: Metrics | |
| # ----------------------------------------------------------------------- | |
| metrics = _compute_metrics(nav_df, trades_df, config.initial_cash) | |
| _cb("Metrics computed. Rendering results...", 0.97) | |
| gc.collect() | |
| return BacktestResult( | |
| nav_df=nav_df, | |
| trades_df=trades_df, | |
| benchmark_df=benchmark_df, | |
| metrics=metrics, | |
| config=config, | |
| n_tickers_processed=n_proc, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Benchmark | |
| # --------------------------------------------------------------------------- | |
| def _build_benchmark( | |
| symbol: str, | |
| start: str, | |
| end: str, | |
| initial_cash: float, | |
| ticker_data: dict, | |
| ) -> pd.DataFrame: | |
| if symbol == "None": | |
| return pd.DataFrame() | |
| df_b = ticker_data.get(symbol) | |
| if df_b is None: | |
| # Try downloading | |
| try: | |
| import yfinance as yf | |
| df_b = yf.download(symbol, start=start, end=end, auto_adjust=True, progress=False) | |
| except Exception: | |
| return pd.DataFrame() | |
| if df_b is None or df_b.empty: | |
| return pd.DataFrame() | |
| df_b = df_b.loc[start:end] | |
| if df_b.empty: | |
| return pd.DataFrame() | |
| first_price = float(df_b["Close"].iloc[0]) | |
| bdf = pd.DataFrame({ | |
| "Date": [d.date() if hasattr(d, "date") else d for d in df_b.index], | |
| "Benchmark NAV": (df_b["Close"] / first_price * initial_cash).round(2).values, | |
| }) | |
| return bdf | |
| # --------------------------------------------------------------------------- | |
| # Metrics | |
| # --------------------------------------------------------------------------- | |
| def _compute_metrics(nav_df: pd.DataFrame, trades_df: pd.DataFrame, initial_cash: float) -> dict: | |
| if nav_df.empty: | |
| return {} | |
| final_nav = float(nav_df["NAV"].iloc[-1]) | |
| total_return_pct = (final_nav / initial_cash - 1) * 100 | |
| nav_s = nav_df["NAV"].values.astype(float) | |
| daily_rets = pd.Series(nav_s).pct_change().dropna() | |
| n_days = len(nav_df) | |
| n_years = max(n_days / 252, 0.01) | |
| ann_return = ((final_nav / initial_cash) ** (1 / n_years) - 1) * 100 | |
| sharpe = 0.0 | |
| if daily_rets.std() > 0: | |
| sharpe = (daily_rets.mean() / daily_rets.std()) * np.sqrt(252) | |
| # Max drawdown | |
| peak = pd.Series(nav_s).cummax() | |
| drawdown = (pd.Series(nav_s) - peak) / peak | |
| max_dd = float(drawdown.min()) * 100 | |
| # Calmar | |
| calmar = ann_return / abs(max_dd) if max_dd != 0 else 0.0 | |
| # Trade stats | |
| n_trades = len(trades_df) | |
| win_rate = 0.0 | |
| avg_win = 0.0 | |
| avg_loss = 0.0 | |
| profit_factor = 0.0 | |
| avg_hold = 0.0 | |
| exit_reasons = {} | |
| if n_trades > 0 and "Profit $" in trades_df.columns: | |
| wins = trades_df[trades_df["Profit $"] > 0] | |
| losses = trades_df[trades_df["Profit $"] <= 0] | |
| win_rate = len(wins) / n_trades * 100 | |
| avg_win = float(wins["Profit $"].mean()) if len(wins) > 0 else 0.0 | |
| avg_loss = float(losses["Profit $"].mean()) if len(losses) > 0 else 0.0 | |
| total_win = wins["Profit $"].sum() | |
| total_loss = abs(losses["Profit $"].sum()) | |
| profit_factor = total_win / max(total_loss, 0.01) | |
| if "Days Held" in trades_df.columns: | |
| avg_hold = float(trades_df["Days Held"].mean()) | |
| if "Exit Reason" in trades_df.columns: | |
| exit_reasons = trades_df["Exit Reason"].value_counts().to_dict() | |
| return { | |
| "Final NAV": round(final_nav, 2), | |
| "Total Return %": round(total_return_pct, 2), | |
| "Annualized Return %": round(ann_return, 2), | |
| "Sharpe Ratio": round(sharpe, 3), | |
| "Max Drawdown %": round(max_dd, 2), | |
| "Calmar Ratio": round(calmar, 3), | |
| "Total Trades": n_trades, | |
| "Win Rate %": round(win_rate, 1), | |
| "Avg Win $": round(avg_win, 2), | |
| "Avg Loss $": round(avg_loss, 2), | |
| "Profit Factor": round(profit_factor, 3), | |
| "Avg Hold Days": round(avg_hold, 1), | |
| "Exit Reasons": exit_reasons, | |
| "Initial Capital": initial_cash, | |
| } |