""" Backtesting Engine. Event-driven backtester that simulates strategy execution over historical data: - Iterates daily bars - Applies strategy entry/exit logic - Tracks positions and portfolio value - Accounts for transaction costs and slippage - Produces equity curve, trade log, and comprehensive metrics """ from __future__ import annotations import json import logging from datetime import date, timedelta from typing import Any, Dict, List, Optional import numpy as np import pandas as pd from app.services.data_ingestion.yahoo import yahoo_adapter from app.services.feature_engineering.pipeline import feature_pipeline logger = logging.getLogger(__name__) class BacktestEngine: """Event-driven portfolio backtester.""" async def run_backtest( self, strategy_config: Dict[str, Any], start_date: date, end_date: date, initial_capital: float = 1_000_000.0, commission_pct: float = 0.001, slippage_pct: float = 0.0005, benchmark_ticker: str = "SPY", rebalance_frequency: str = "monthly", ) -> Dict[str, Any]: """ Run a complete backtest simulation. Returns: Dict with metrics, equity curve, trades, and monthly returns. """ universe = strategy_config.get("universe", []) if not universe: return {"status": "failed", "error": "Empty universe"} # 1. Fetch historical data price_frames: Dict[str, pd.DataFrame] = {} for ticker in universe: df = await yahoo_adapter.get_price_dataframe( ticker, period="max" ) if not df.empty: # Filter by date range df.index = pd.to_datetime(df.index) mask = (df.index >= pd.Timestamp(start_date)) & ( df.index <= pd.Timestamp(end_date) ) filtered = df.loc[mask] if not filtered.empty: price_frames[ticker] = filtered if not price_frames: return {"status": "failed", "error": "No price data available for universe"} # Fetch benchmark bench_df = await yahoo_adapter.get_price_dataframe(benchmark_ticker, period="max") bench_prices = None if not bench_df.empty: bench_df.index = pd.to_datetime(bench_df.index) mask = (bench_df.index >= pd.Timestamp(start_date)) & ( bench_df.index <= pd.Timestamp(end_date) ) bench_filtered = bench_df.loc[mask] if not bench_filtered.empty: bench_prices = bench_filtered["Close"] # 2. Build aligned price matrix close_prices = pd.DataFrame( {t: df["Close"] for t, df in price_frames.items()} ).dropna() if close_prices.empty: return {"status": "failed", "error": "No overlapping price data"} # 3. Compute features for signal generation featured_data: Dict[str, pd.DataFrame] = {} for ticker, df in price_frames.items(): featured_data[ticker] = feature_pipeline.compute_all_features(df) # 4. Run simulation dates = close_prices.index.tolist() portfolio_value = initial_capital cash = initial_capital positions: Dict[str, float] = {} # ticker -> shares weights: Dict[str, float] = {} equity_curve: List[Dict[str, Any]] = [] trades: List[Dict[str, Any]] = [] total_commission = 0.0 total_slippage = 0.0 # Determine rebalance dates rebal_dates = self._get_rebalance_dates(dates, rebalance_frequency) # Strategy parameters entry_rules = strategy_config.get("entry_rules", []) exit_rules = strategy_config.get("exit_rules", []) sizing_method = strategy_config.get("position_sizing", {}).get("method", "equal_weight") max_position = strategy_config.get("position_sizing", {}).get("max_position_pct", 0.15) stop_loss = strategy_config.get("risk_management", {}).get("stop_loss_pct", 0.05) take_profit = strategy_config.get("risk_management", {}).get("take_profit_pct", 0.20) entry_prices: Dict[str, float] = {} for i, dt in enumerate(dates): day_prices = close_prices.loc[dt] # Update portfolio value position_value = sum( positions.get(t, 0) * day_prices.get(t, 0) for t in positions if t in day_prices.index ) portfolio_value = cash + position_value # Record equity curve point prev_value = equity_curve[-1]["portfolio_value"] if equity_curve else initial_capital daily_ret = (portfolio_value / prev_value - 1) if prev_value > 0 else 0 bench_val = None if bench_prices is not None and dt in bench_prices.index: bench_val = float(bench_prices.loc[dt]) equity_curve.append({ "date": dt.strftime("%Y-%m-%d"), "portfolio_value": round(portfolio_value, 2), "benchmark_value": bench_val, "daily_return": round(daily_ret, 6), }) # Check stop loss / take profit for existing positions for ticker in list(positions.keys()): if ticker not in day_prices.index or positions[ticker] == 0: continue current_price = day_prices[ticker] entry_price = entry_prices.get(ticker, current_price) pnl_pct = (current_price / entry_price - 1) if entry_price > 0 else 0 should_exit = False exit_reason = "" if pnl_pct <= -stop_loss: should_exit = True exit_reason = "stop_loss" elif pnl_pct >= take_profit: should_exit = True exit_reason = "take_profit" if should_exit: shares = positions[ticker] trade_value = shares * current_price comm = trade_value * commission_pct slip = trade_value * slippage_pct total_commission += comm total_slippage += slip cash += trade_value - comm - slip pnl = (current_price - entry_price) * shares trades.append({ "date": dt.strftime("%Y-%m-%d"), "ticker": ticker, "action": "sell", "quantity": round(shares, 2), "price": round(current_price, 2), "commission": round(comm, 2), "slippage": round(slip, 2), "pnl": round(pnl, 2), "reason": exit_reason, }) del positions[ticker] del entry_prices[ticker] # Rebalance on schedule if dt in rebal_dates: # Generate new target weights target_weights = self._compute_target_weights( universe, day_prices, featured_data, dt, sizing_method, max_position ) # Execute trades to reach target for ticker, target_weight in target_weights.items(): if ticker not in day_prices.index: continue price = day_prices[ticker] if price <= 0: continue target_value = portfolio_value * target_weight current_shares = positions.get(ticker, 0) current_value = current_shares * price delta_value = target_value - current_value if abs(delta_value) < portfolio_value * 0.005: continue # Skip tiny trades delta_shares = delta_value / price trade_value = abs(delta_value) comm = trade_value * commission_pct slip = trade_value * slippage_pct total_commission += comm total_slippage += slip if delta_shares > 0: cash -= (trade_value + comm + slip) action = "buy" entry_prices[ticker] = price else: cash += (trade_value - comm - slip) action = "sell" new_shares = current_shares + delta_shares if abs(new_shares) < 0.01: positions.pop(ticker, None) entry_prices.pop(ticker, None) else: positions[ticker] = new_shares trades.append({ "date": dt.strftime("%Y-%m-%d"), "ticker": ticker, "action": action, "quantity": round(abs(delta_shares), 2), "price": round(price, 2), "commission": round(comm, 2), "slippage": round(slip, 2), "pnl": None, }) # 5. Compute final metrics equity_values = [p["portfolio_value"] for p in equity_curve] daily_returns = [p["daily_return"] for p in equity_curve if p["daily_return"] is not None] metrics = self._compute_backtest_metrics( equity_values, daily_returns, trades, initial_capital, total_commission, total_slippage, bench_prices, ) # Monthly returns monthly_returns = self._compute_monthly_returns(equity_curve) # Drawdown series for point in equity_curve: idx = equity_curve.index(point) peak = max(p["portfolio_value"] for p in equity_curve[: idx + 1]) point["drawdown"] = round((point["portfolio_value"] - peak) / peak, 6) if peak > 0 else 0 return { "status": "completed", "start_date": start_date.isoformat(), "end_date": end_date.isoformat(), "initial_capital": initial_capital, "final_value": round(equity_values[-1], 2) if equity_values else initial_capital, "metrics": metrics, "equity_curve": equity_curve, "trades": trades, "monthly_returns": monthly_returns, } def _compute_target_weights( self, universe: List[str], prices: pd.Series, featured_data: Dict[str, pd.DataFrame], current_date: pd.Timestamp, method: str, max_weight: float, ) -> Dict[str, float]: """Compute target portfolio weights for a rebalance date.""" valid_tickers = [t for t in universe if t in prices.index and prices[t] > 0] if not valid_tickers: return {} n = len(valid_tickers) if method == "equal_weight": w = min(1.0 / n, max_weight) return {t: w for t in valid_tickers} elif method == "score_weighted": # Use momentum as weight proxy scores = {} for t in valid_tickers: if t in featured_data and "momentum_10" in featured_data[t].columns: feat_df = featured_data[t] mask = feat_df.index <= current_date if mask.any(): mom = feat_df.loc[mask, "momentum_10"].iloc[-1] scores[t] = max(float(mom) if pd.notna(mom) else 0, 0) else: scores[t] = 0 else: scores[t] = 0 total = sum(scores.values()) or 1.0 return {t: min(s / total, max_weight) for t, s in scores.items()} else: w = min(1.0 / n, max_weight) return {t: w for t in valid_tickers} @staticmethod def _get_rebalance_dates( dates: List[pd.Timestamp], frequency: str, ) -> set: """Determine which dates are rebalance dates.""" if not dates: return set() rebal = set() rebal.add(dates[0]) # Always rebalance on first date if frequency == "daily": return set(dates) elif frequency == "weekly": for d in dates: if d.weekday() == 0: # Monday rebal.add(d) elif frequency == "monthly": current_month = dates[0].month for d in dates: if d.month != current_month: rebal.add(d) current_month = d.month elif frequency == "quarterly": current_quarter = (dates[0].month - 1) // 3 for d in dates: q = (d.month - 1) // 3 if q != current_quarter: rebal.add(d) current_quarter = q return rebal @staticmethod def _compute_backtest_metrics( equity_values: List[float], daily_returns: List[float], trades: List[Dict], initial_capital: float, total_commission: float, total_slippage: float, bench_prices: Optional[pd.Series], ) -> Dict[str, Any]: """Compute comprehensive backtest metrics.""" if not equity_values or not daily_returns: return {} returns = np.array(daily_returns) final = equity_values[-1] n_days = len(returns) n_years = n_days / 252 total_return = (final / initial_capital - 1) ann_return = (1 + total_return) ** (1 / n_years) - 1 if n_years > 0 else 0 ann_vol = float(np.std(returns, ddof=1) * np.sqrt(252)) if len(returns) > 1 else 0 sharpe = (ann_return - 0.04) / ann_vol if ann_vol > 0 else 0 # Sortino downside = returns[returns < 0] down_dev = float(np.std(downside, ddof=1) * np.sqrt(252)) if len(downside) > 1 else ann_vol sortino = (ann_return - 0.04) / down_dev if down_dev > 0 else 0 # Max drawdown cum = np.cumprod(1 + returns) peak = np.maximum.accumulate(cum) dd = (cum - peak) / peak max_dd = float(np.min(dd)) if len(dd) > 0 else 0 # Calmar calmar = ann_return / abs(max_dd) if max_dd != 0 else 0 # Win rate and profit factor trade_pnls = [t.get("pnl", 0) for t in trades if t.get("pnl") is not None] wins = [p for p in trade_pnls if p > 0] losses = [p for p in trade_pnls if p < 0] win_rate = len(wins) / len(trade_pnls) if trade_pnls else 0 profit_factor = ( sum(wins) / abs(sum(losses)) if losses else float("inf") if wins else 0 ) avg_trade_return = float(np.mean(trade_pnls)) if trade_pnls else 0 metrics = { "total_return": round(total_return, 4), "annualized_return": round(ann_return, 4), "sharpe_ratio": round(sharpe, 4), "sortino_ratio": round(sortino, 4), "max_drawdown": round(max_dd, 4), "volatility": round(ann_vol, 4), "calmar_ratio": round(calmar, 4), "win_rate": round(win_rate, 4), "profit_factor": round(profit_factor, 4) if profit_factor != float("inf") else None, "total_trades": len(trades), "avg_trade_return": round(avg_trade_return, 2), "total_commission": round(total_commission, 2), "total_slippage": round(total_slippage, 2), } # Alpha/beta vs benchmark if bench_prices is not None and len(bench_prices) > 10: bench_ret = bench_prices.pct_change().dropna().values min_len = min(len(returns), len(bench_ret)) if min_len > 10: r = returns[:min_len] b = bench_ret[:min_len] cov_rb = np.cov(r, b)[0, 1] var_b = np.var(b, ddof=1) beta = cov_rb / var_b if var_b > 0 else 1.0 alpha = ann_return - beta * float(np.mean(b) * 252) metrics["beta"] = round(float(beta), 4) metrics["alpha"] = round(float(alpha), 4) return metrics @staticmethod def _compute_monthly_returns( equity_curve: List[Dict[str, Any]], ) -> Dict[str, float]: """Compute monthly returns from equity curve.""" if not equity_curve: return {} monthly: Dict[str, float] = {} prev_value = equity_curve[0]["portfolio_value"] current_month = equity_curve[0]["date"][:7] for point in equity_curve: month = point["date"][:7] if month != current_month: monthly[current_month] = round( (point["portfolio_value"] / prev_value - 1), 4 ) prev_value = point["portfolio_value"] current_month = month # Final month if equity_curve: monthly[current_month] = round( (equity_curve[-1]["portfolio_value"] / prev_value - 1), 4 ) return monthly backtest_engine = BacktestEngine()