Spaces:
Sleeping
Sleeping
| """ | |
| Backtesting Engine. | |
| Event-driven backtester that simulates strategy execution over historical data: | |
| - Iterates daily bars | |
| - Applies strategy entry/exit logic | |
| - Tracks positions and portfolio value | |
| - Accounts for transaction costs and slippage | |
| - Produces equity curve, trade log, and comprehensive metrics | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from datetime import date, timedelta | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| from app.services.feature_engineering.pipeline import feature_pipeline | |
| logger = logging.getLogger(__name__) | |
| class BacktestEngine: | |
| """Event-driven portfolio backtester.""" | |
| async def run_backtest( | |
| self, | |
| strategy_config: Dict[str, Any], | |
| start_date: date, | |
| end_date: date, | |
| initial_capital: float = 1_000_000.0, | |
| commission_pct: float = 0.001, | |
| slippage_pct: float = 0.0005, | |
| benchmark_ticker: str = "SPY", | |
| rebalance_frequency: str = "monthly", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run a complete backtest simulation. | |
| Returns: | |
| Dict with metrics, equity curve, trades, and monthly returns. | |
| """ | |
| universe = strategy_config.get("universe", []) | |
| if not universe: | |
| return {"status": "failed", "error": "Empty universe"} | |
| # 1. Fetch historical data | |
| price_frames: Dict[str, pd.DataFrame] = {} | |
| for ticker in universe: | |
| df = await yahoo_adapter.get_price_dataframe( | |
| ticker, period="max" | |
| ) | |
| if not df.empty: | |
| # Filter by date range | |
| df.index = pd.to_datetime(df.index) | |
| mask = (df.index >= pd.Timestamp(start_date)) & ( | |
| df.index <= pd.Timestamp(end_date) | |
| ) | |
| filtered = df.loc[mask] | |
| if not filtered.empty: | |
| price_frames[ticker] = filtered | |
| if not price_frames: | |
| return {"status": "failed", "error": "No price data available for universe"} | |
| # Fetch benchmark | |
| bench_df = await yahoo_adapter.get_price_dataframe(benchmark_ticker, period="max") | |
| bench_prices = None | |
| if not bench_df.empty: | |
| bench_df.index = pd.to_datetime(bench_df.index) | |
| mask = (bench_df.index >= pd.Timestamp(start_date)) & ( | |
| bench_df.index <= pd.Timestamp(end_date) | |
| ) | |
| bench_filtered = bench_df.loc[mask] | |
| if not bench_filtered.empty: | |
| bench_prices = bench_filtered["Close"] | |
| # 2. Build aligned price matrix | |
| close_prices = pd.DataFrame( | |
| {t: df["Close"] for t, df in price_frames.items()} | |
| ).dropna() | |
| if close_prices.empty: | |
| return {"status": "failed", "error": "No overlapping price data"} | |
| # 3. Compute features for signal generation | |
| featured_data: Dict[str, pd.DataFrame] = {} | |
| for ticker, df in price_frames.items(): | |
| featured_data[ticker] = feature_pipeline.compute_all_features(df) | |
| # 4. Run simulation | |
| dates = close_prices.index.tolist() | |
| portfolio_value = initial_capital | |
| cash = initial_capital | |
| positions: Dict[str, float] = {} # ticker -> shares | |
| weights: Dict[str, float] = {} | |
| equity_curve: List[Dict[str, Any]] = [] | |
| trades: List[Dict[str, Any]] = [] | |
| total_commission = 0.0 | |
| total_slippage = 0.0 | |
| # Determine rebalance dates | |
| rebal_dates = self._get_rebalance_dates(dates, rebalance_frequency) | |
| # Strategy parameters | |
| entry_rules = strategy_config.get("entry_rules", []) | |
| exit_rules = strategy_config.get("exit_rules", []) | |
| sizing_method = strategy_config.get("position_sizing", {}).get("method", "equal_weight") | |
| max_position = strategy_config.get("position_sizing", {}).get("max_position_pct", 0.15) | |
| stop_loss = strategy_config.get("risk_management", {}).get("stop_loss_pct", 0.05) | |
| take_profit = strategy_config.get("risk_management", {}).get("take_profit_pct", 0.20) | |
| entry_prices: Dict[str, float] = {} | |
| for i, dt in enumerate(dates): | |
| day_prices = close_prices.loc[dt] | |
| # Update portfolio value | |
| position_value = sum( | |
| positions.get(t, 0) * day_prices.get(t, 0) | |
| for t in positions | |
| if t in day_prices.index | |
| ) | |
| portfolio_value = cash + position_value | |
| # Record equity curve point | |
| prev_value = equity_curve[-1]["portfolio_value"] if equity_curve else initial_capital | |
| daily_ret = (portfolio_value / prev_value - 1) if prev_value > 0 else 0 | |
| bench_val = None | |
| if bench_prices is not None and dt in bench_prices.index: | |
| bench_val = float(bench_prices.loc[dt]) | |
| equity_curve.append({ | |
| "date": dt.strftime("%Y-%m-%d"), | |
| "portfolio_value": round(portfolio_value, 2), | |
| "benchmark_value": bench_val, | |
| "daily_return": round(daily_ret, 6), | |
| }) | |
| # Check stop loss / take profit for existing positions | |
| for ticker in list(positions.keys()): | |
| if ticker not in day_prices.index or positions[ticker] == 0: | |
| continue | |
| current_price = day_prices[ticker] | |
| entry_price = entry_prices.get(ticker, current_price) | |
| pnl_pct = (current_price / entry_price - 1) if entry_price > 0 else 0 | |
| should_exit = False | |
| exit_reason = "" | |
| if pnl_pct <= -stop_loss: | |
| should_exit = True | |
| exit_reason = "stop_loss" | |
| elif pnl_pct >= take_profit: | |
| should_exit = True | |
| exit_reason = "take_profit" | |
| if should_exit: | |
| shares = positions[ticker] | |
| trade_value = shares * current_price | |
| comm = trade_value * commission_pct | |
| slip = trade_value * slippage_pct | |
| total_commission += comm | |
| total_slippage += slip | |
| cash += trade_value - comm - slip | |
| pnl = (current_price - entry_price) * shares | |
| trades.append({ | |
| "date": dt.strftime("%Y-%m-%d"), | |
| "ticker": ticker, | |
| "action": "sell", | |
| "quantity": round(shares, 2), | |
| "price": round(current_price, 2), | |
| "commission": round(comm, 2), | |
| "slippage": round(slip, 2), | |
| "pnl": round(pnl, 2), | |
| "reason": exit_reason, | |
| }) | |
| del positions[ticker] | |
| del entry_prices[ticker] | |
| # Rebalance on schedule | |
| if dt in rebal_dates: | |
| # Generate new target weights | |
| target_weights = self._compute_target_weights( | |
| universe, day_prices, featured_data, dt, sizing_method, max_position | |
| ) | |
| # Execute trades to reach target | |
| for ticker, target_weight in target_weights.items(): | |
| if ticker not in day_prices.index: | |
| continue | |
| price = day_prices[ticker] | |
| if price <= 0: | |
| continue | |
| target_value = portfolio_value * target_weight | |
| current_shares = positions.get(ticker, 0) | |
| current_value = current_shares * price | |
| delta_value = target_value - current_value | |
| if abs(delta_value) < portfolio_value * 0.005: | |
| continue # Skip tiny trades | |
| delta_shares = delta_value / price | |
| trade_value = abs(delta_value) | |
| comm = trade_value * commission_pct | |
| slip = trade_value * slippage_pct | |
| total_commission += comm | |
| total_slippage += slip | |
| if delta_shares > 0: | |
| cash -= (trade_value + comm + slip) | |
| action = "buy" | |
| entry_prices[ticker] = price | |
| else: | |
| cash += (trade_value - comm - slip) | |
| action = "sell" | |
| new_shares = current_shares + delta_shares | |
| if abs(new_shares) < 0.01: | |
| positions.pop(ticker, None) | |
| entry_prices.pop(ticker, None) | |
| else: | |
| positions[ticker] = new_shares | |
| trades.append({ | |
| "date": dt.strftime("%Y-%m-%d"), | |
| "ticker": ticker, | |
| "action": action, | |
| "quantity": round(abs(delta_shares), 2), | |
| "price": round(price, 2), | |
| "commission": round(comm, 2), | |
| "slippage": round(slip, 2), | |
| "pnl": None, | |
| }) | |
| # 5. Compute final metrics | |
| equity_values = [p["portfolio_value"] for p in equity_curve] | |
| daily_returns = [p["daily_return"] for p in equity_curve if p["daily_return"] is not None] | |
| metrics = self._compute_backtest_metrics( | |
| equity_values, | |
| daily_returns, | |
| trades, | |
| initial_capital, | |
| total_commission, | |
| total_slippage, | |
| bench_prices, | |
| ) | |
| # Monthly returns | |
| monthly_returns = self._compute_monthly_returns(equity_curve) | |
| # Drawdown series | |
| for point in equity_curve: | |
| idx = equity_curve.index(point) | |
| peak = max(p["portfolio_value"] for p in equity_curve[: idx + 1]) | |
| point["drawdown"] = round((point["portfolio_value"] - peak) / peak, 6) if peak > 0 else 0 | |
| return { | |
| "status": "completed", | |
| "start_date": start_date.isoformat(), | |
| "end_date": end_date.isoformat(), | |
| "initial_capital": initial_capital, | |
| "final_value": round(equity_values[-1], 2) if equity_values else initial_capital, | |
| "metrics": metrics, | |
| "equity_curve": equity_curve, | |
| "trades": trades, | |
| "monthly_returns": monthly_returns, | |
| } | |
| def _compute_target_weights( | |
| self, | |
| universe: List[str], | |
| prices: pd.Series, | |
| featured_data: Dict[str, pd.DataFrame], | |
| current_date: pd.Timestamp, | |
| method: str, | |
| max_weight: float, | |
| ) -> Dict[str, float]: | |
| """Compute target portfolio weights for a rebalance date.""" | |
| valid_tickers = [t for t in universe if t in prices.index and prices[t] > 0] | |
| if not valid_tickers: | |
| return {} | |
| n = len(valid_tickers) | |
| if method == "equal_weight": | |
| w = min(1.0 / n, max_weight) | |
| return {t: w for t in valid_tickers} | |
| elif method == "score_weighted": | |
| # Use momentum as weight proxy | |
| scores = {} | |
| for t in valid_tickers: | |
| if t in featured_data and "momentum_10" in featured_data[t].columns: | |
| feat_df = featured_data[t] | |
| mask = feat_df.index <= current_date | |
| if mask.any(): | |
| mom = feat_df.loc[mask, "momentum_10"].iloc[-1] | |
| scores[t] = max(float(mom) if pd.notna(mom) else 0, 0) | |
| else: | |
| scores[t] = 0 | |
| else: | |
| scores[t] = 0 | |
| total = sum(scores.values()) or 1.0 | |
| return {t: min(s / total, max_weight) for t, s in scores.items()} | |
| else: | |
| w = min(1.0 / n, max_weight) | |
| return {t: w for t in valid_tickers} | |
| def _get_rebalance_dates( | |
| dates: List[pd.Timestamp], | |
| frequency: str, | |
| ) -> set: | |
| """Determine which dates are rebalance dates.""" | |
| if not dates: | |
| return set() | |
| rebal = set() | |
| rebal.add(dates[0]) # Always rebalance on first date | |
| if frequency == "daily": | |
| return set(dates) | |
| elif frequency == "weekly": | |
| for d in dates: | |
| if d.weekday() == 0: # Monday | |
| rebal.add(d) | |
| elif frequency == "monthly": | |
| current_month = dates[0].month | |
| for d in dates: | |
| if d.month != current_month: | |
| rebal.add(d) | |
| current_month = d.month | |
| elif frequency == "quarterly": | |
| current_quarter = (dates[0].month - 1) // 3 | |
| for d in dates: | |
| q = (d.month - 1) // 3 | |
| if q != current_quarter: | |
| rebal.add(d) | |
| current_quarter = q | |
| return rebal | |
| def _compute_backtest_metrics( | |
| equity_values: List[float], | |
| daily_returns: List[float], | |
| trades: List[Dict], | |
| initial_capital: float, | |
| total_commission: float, | |
| total_slippage: float, | |
| bench_prices: Optional[pd.Series], | |
| ) -> Dict[str, Any]: | |
| """Compute comprehensive backtest metrics.""" | |
| if not equity_values or not daily_returns: | |
| return {} | |
| returns = np.array(daily_returns) | |
| final = equity_values[-1] | |
| n_days = len(returns) | |
| n_years = n_days / 252 | |
| total_return = (final / initial_capital - 1) | |
| ann_return = (1 + total_return) ** (1 / n_years) - 1 if n_years > 0 else 0 | |
| ann_vol = float(np.std(returns, ddof=1) * np.sqrt(252)) if len(returns) > 1 else 0 | |
| sharpe = (ann_return - 0.04) / ann_vol if ann_vol > 0 else 0 | |
| # Sortino | |
| downside = returns[returns < 0] | |
| down_dev = float(np.std(downside, ddof=1) * np.sqrt(252)) if len(downside) > 1 else ann_vol | |
| sortino = (ann_return - 0.04) / down_dev if down_dev > 0 else 0 | |
| # Max drawdown | |
| cum = np.cumprod(1 + returns) | |
| peak = np.maximum.accumulate(cum) | |
| dd = (cum - peak) / peak | |
| max_dd = float(np.min(dd)) if len(dd) > 0 else 0 | |
| # Calmar | |
| calmar = ann_return / abs(max_dd) if max_dd != 0 else 0 | |
| # Win rate and profit factor | |
| trade_pnls = [t.get("pnl", 0) for t in trades if t.get("pnl") is not None] | |
| wins = [p for p in trade_pnls if p > 0] | |
| losses = [p for p in trade_pnls if p < 0] | |
| win_rate = len(wins) / len(trade_pnls) if trade_pnls else 0 | |
| profit_factor = ( | |
| sum(wins) / abs(sum(losses)) if losses else float("inf") if wins else 0 | |
| ) | |
| avg_trade_return = float(np.mean(trade_pnls)) if trade_pnls else 0 | |
| metrics = { | |
| "total_return": round(total_return, 4), | |
| "annualized_return": round(ann_return, 4), | |
| "sharpe_ratio": round(sharpe, 4), | |
| "sortino_ratio": round(sortino, 4), | |
| "max_drawdown": round(max_dd, 4), | |
| "volatility": round(ann_vol, 4), | |
| "calmar_ratio": round(calmar, 4), | |
| "win_rate": round(win_rate, 4), | |
| "profit_factor": round(profit_factor, 4) if profit_factor != float("inf") else None, | |
| "total_trades": len(trades), | |
| "avg_trade_return": round(avg_trade_return, 2), | |
| "total_commission": round(total_commission, 2), | |
| "total_slippage": round(total_slippage, 2), | |
| } | |
| # Alpha/beta vs benchmark | |
| if bench_prices is not None and len(bench_prices) > 10: | |
| bench_ret = bench_prices.pct_change().dropna().values | |
| min_len = min(len(returns), len(bench_ret)) | |
| if min_len > 10: | |
| r = returns[:min_len] | |
| b = bench_ret[:min_len] | |
| cov_rb = np.cov(r, b)[0, 1] | |
| var_b = np.var(b, ddof=1) | |
| beta = cov_rb / var_b if var_b > 0 else 1.0 | |
| alpha = ann_return - beta * float(np.mean(b) * 252) | |
| metrics["beta"] = round(float(beta), 4) | |
| metrics["alpha"] = round(float(alpha), 4) | |
| return metrics | |
| def _compute_monthly_returns( | |
| equity_curve: List[Dict[str, Any]], | |
| ) -> Dict[str, float]: | |
| """Compute monthly returns from equity curve.""" | |
| if not equity_curve: | |
| return {} | |
| monthly: Dict[str, float] = {} | |
| prev_value = equity_curve[0]["portfolio_value"] | |
| current_month = equity_curve[0]["date"][:7] | |
| for point in equity_curve: | |
| month = point["date"][:7] | |
| if month != current_month: | |
| monthly[current_month] = round( | |
| (point["portfolio_value"] / prev_value - 1), 4 | |
| ) | |
| prev_value = point["portfolio_value"] | |
| current_month = month | |
| # Final month | |
| if equity_curve: | |
| monthly[current_month] = round( | |
| (equity_curve[-1]["portfolio_value"] / prev_value - 1), 4 | |
| ) | |
| return monthly | |
| backtest_engine = BacktestEngine() | |