jashdoshi77's picture
QuantHedge: Full deployment with Docker + nginx + uvicorn
9d29748
"""
Backtesting Engine.
Event-driven backtester that simulates strategy execution over historical data:
- Iterates daily bars
- Applies strategy entry/exit logic
- Tracks positions and portfolio value
- Accounts for transaction costs and slippage
- Produces equity curve, trade log, and comprehensive metrics
"""
from __future__ import annotations
import json
import logging
from datetime import date, timedelta
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
from app.services.data_ingestion.yahoo import yahoo_adapter
from app.services.feature_engineering.pipeline import feature_pipeline
logger = logging.getLogger(__name__)
class BacktestEngine:
"""Event-driven portfolio backtester."""
async def run_backtest(
self,
strategy_config: Dict[str, Any],
start_date: date,
end_date: date,
initial_capital: float = 1_000_000.0,
commission_pct: float = 0.001,
slippage_pct: float = 0.0005,
benchmark_ticker: str = "SPY",
rebalance_frequency: str = "monthly",
) -> Dict[str, Any]:
"""
Run a complete backtest simulation.
Returns:
Dict with metrics, equity curve, trades, and monthly returns.
"""
universe = strategy_config.get("universe", [])
if not universe:
return {"status": "failed", "error": "Empty universe"}
# 1. Fetch historical data
price_frames: Dict[str, pd.DataFrame] = {}
for ticker in universe:
df = await yahoo_adapter.get_price_dataframe(
ticker, period="max"
)
if not df.empty:
# Filter by date range
df.index = pd.to_datetime(df.index)
mask = (df.index >= pd.Timestamp(start_date)) & (
df.index <= pd.Timestamp(end_date)
)
filtered = df.loc[mask]
if not filtered.empty:
price_frames[ticker] = filtered
if not price_frames:
return {"status": "failed", "error": "No price data available for universe"}
# Fetch benchmark
bench_df = await yahoo_adapter.get_price_dataframe(benchmark_ticker, period="max")
bench_prices = None
if not bench_df.empty:
bench_df.index = pd.to_datetime(bench_df.index)
mask = (bench_df.index >= pd.Timestamp(start_date)) & (
bench_df.index <= pd.Timestamp(end_date)
)
bench_filtered = bench_df.loc[mask]
if not bench_filtered.empty:
bench_prices = bench_filtered["Close"]
# 2. Build aligned price matrix
close_prices = pd.DataFrame(
{t: df["Close"] for t, df in price_frames.items()}
).dropna()
if close_prices.empty:
return {"status": "failed", "error": "No overlapping price data"}
# 3. Compute features for signal generation
featured_data: Dict[str, pd.DataFrame] = {}
for ticker, df in price_frames.items():
featured_data[ticker] = feature_pipeline.compute_all_features(df)
# 4. Run simulation
dates = close_prices.index.tolist()
portfolio_value = initial_capital
cash = initial_capital
positions: Dict[str, float] = {} # ticker -> shares
weights: Dict[str, float] = {}
equity_curve: List[Dict[str, Any]] = []
trades: List[Dict[str, Any]] = []
total_commission = 0.0
total_slippage = 0.0
# Determine rebalance dates
rebal_dates = self._get_rebalance_dates(dates, rebalance_frequency)
# Strategy parameters
entry_rules = strategy_config.get("entry_rules", [])
exit_rules = strategy_config.get("exit_rules", [])
sizing_method = strategy_config.get("position_sizing", {}).get("method", "equal_weight")
max_position = strategy_config.get("position_sizing", {}).get("max_position_pct", 0.15)
stop_loss = strategy_config.get("risk_management", {}).get("stop_loss_pct", 0.05)
take_profit = strategy_config.get("risk_management", {}).get("take_profit_pct", 0.20)
entry_prices: Dict[str, float] = {}
for i, dt in enumerate(dates):
day_prices = close_prices.loc[dt]
# Update portfolio value
position_value = sum(
positions.get(t, 0) * day_prices.get(t, 0)
for t in positions
if t in day_prices.index
)
portfolio_value = cash + position_value
# Record equity curve point
prev_value = equity_curve[-1]["portfolio_value"] if equity_curve else initial_capital
daily_ret = (portfolio_value / prev_value - 1) if prev_value > 0 else 0
bench_val = None
if bench_prices is not None and dt in bench_prices.index:
bench_val = float(bench_prices.loc[dt])
equity_curve.append({
"date": dt.strftime("%Y-%m-%d"),
"portfolio_value": round(portfolio_value, 2),
"benchmark_value": bench_val,
"daily_return": round(daily_ret, 6),
})
# Check stop loss / take profit for existing positions
for ticker in list(positions.keys()):
if ticker not in day_prices.index or positions[ticker] == 0:
continue
current_price = day_prices[ticker]
entry_price = entry_prices.get(ticker, current_price)
pnl_pct = (current_price / entry_price - 1) if entry_price > 0 else 0
should_exit = False
exit_reason = ""
if pnl_pct <= -stop_loss:
should_exit = True
exit_reason = "stop_loss"
elif pnl_pct >= take_profit:
should_exit = True
exit_reason = "take_profit"
if should_exit:
shares = positions[ticker]
trade_value = shares * current_price
comm = trade_value * commission_pct
slip = trade_value * slippage_pct
total_commission += comm
total_slippage += slip
cash += trade_value - comm - slip
pnl = (current_price - entry_price) * shares
trades.append({
"date": dt.strftime("%Y-%m-%d"),
"ticker": ticker,
"action": "sell",
"quantity": round(shares, 2),
"price": round(current_price, 2),
"commission": round(comm, 2),
"slippage": round(slip, 2),
"pnl": round(pnl, 2),
"reason": exit_reason,
})
del positions[ticker]
del entry_prices[ticker]
# Rebalance on schedule
if dt in rebal_dates:
# Generate new target weights
target_weights = self._compute_target_weights(
universe, day_prices, featured_data, dt, sizing_method, max_position
)
# Execute trades to reach target
for ticker, target_weight in target_weights.items():
if ticker not in day_prices.index:
continue
price = day_prices[ticker]
if price <= 0:
continue
target_value = portfolio_value * target_weight
current_shares = positions.get(ticker, 0)
current_value = current_shares * price
delta_value = target_value - current_value
if abs(delta_value) < portfolio_value * 0.005:
continue # Skip tiny trades
delta_shares = delta_value / price
trade_value = abs(delta_value)
comm = trade_value * commission_pct
slip = trade_value * slippage_pct
total_commission += comm
total_slippage += slip
if delta_shares > 0:
cash -= (trade_value + comm + slip)
action = "buy"
entry_prices[ticker] = price
else:
cash += (trade_value - comm - slip)
action = "sell"
new_shares = current_shares + delta_shares
if abs(new_shares) < 0.01:
positions.pop(ticker, None)
entry_prices.pop(ticker, None)
else:
positions[ticker] = new_shares
trades.append({
"date": dt.strftime("%Y-%m-%d"),
"ticker": ticker,
"action": action,
"quantity": round(abs(delta_shares), 2),
"price": round(price, 2),
"commission": round(comm, 2),
"slippage": round(slip, 2),
"pnl": None,
})
# 5. Compute final metrics
equity_values = [p["portfolio_value"] for p in equity_curve]
daily_returns = [p["daily_return"] for p in equity_curve if p["daily_return"] is not None]
metrics = self._compute_backtest_metrics(
equity_values,
daily_returns,
trades,
initial_capital,
total_commission,
total_slippage,
bench_prices,
)
# Monthly returns
monthly_returns = self._compute_monthly_returns(equity_curve)
# Drawdown series
for point in equity_curve:
idx = equity_curve.index(point)
peak = max(p["portfolio_value"] for p in equity_curve[: idx + 1])
point["drawdown"] = round((point["portfolio_value"] - peak) / peak, 6) if peak > 0 else 0
return {
"status": "completed",
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"initial_capital": initial_capital,
"final_value": round(equity_values[-1], 2) if equity_values else initial_capital,
"metrics": metrics,
"equity_curve": equity_curve,
"trades": trades,
"monthly_returns": monthly_returns,
}
def _compute_target_weights(
self,
universe: List[str],
prices: pd.Series,
featured_data: Dict[str, pd.DataFrame],
current_date: pd.Timestamp,
method: str,
max_weight: float,
) -> Dict[str, float]:
"""Compute target portfolio weights for a rebalance date."""
valid_tickers = [t for t in universe if t in prices.index and prices[t] > 0]
if not valid_tickers:
return {}
n = len(valid_tickers)
if method == "equal_weight":
w = min(1.0 / n, max_weight)
return {t: w for t in valid_tickers}
elif method == "score_weighted":
# Use momentum as weight proxy
scores = {}
for t in valid_tickers:
if t in featured_data and "momentum_10" in featured_data[t].columns:
feat_df = featured_data[t]
mask = feat_df.index <= current_date
if mask.any():
mom = feat_df.loc[mask, "momentum_10"].iloc[-1]
scores[t] = max(float(mom) if pd.notna(mom) else 0, 0)
else:
scores[t] = 0
else:
scores[t] = 0
total = sum(scores.values()) or 1.0
return {t: min(s / total, max_weight) for t, s in scores.items()}
else:
w = min(1.0 / n, max_weight)
return {t: w for t in valid_tickers}
@staticmethod
def _get_rebalance_dates(
dates: List[pd.Timestamp],
frequency: str,
) -> set:
"""Determine which dates are rebalance dates."""
if not dates:
return set()
rebal = set()
rebal.add(dates[0]) # Always rebalance on first date
if frequency == "daily":
return set(dates)
elif frequency == "weekly":
for d in dates:
if d.weekday() == 0: # Monday
rebal.add(d)
elif frequency == "monthly":
current_month = dates[0].month
for d in dates:
if d.month != current_month:
rebal.add(d)
current_month = d.month
elif frequency == "quarterly":
current_quarter = (dates[0].month - 1) // 3
for d in dates:
q = (d.month - 1) // 3
if q != current_quarter:
rebal.add(d)
current_quarter = q
return rebal
@staticmethod
def _compute_backtest_metrics(
equity_values: List[float],
daily_returns: List[float],
trades: List[Dict],
initial_capital: float,
total_commission: float,
total_slippage: float,
bench_prices: Optional[pd.Series],
) -> Dict[str, Any]:
"""Compute comprehensive backtest metrics."""
if not equity_values or not daily_returns:
return {}
returns = np.array(daily_returns)
final = equity_values[-1]
n_days = len(returns)
n_years = n_days / 252
total_return = (final / initial_capital - 1)
ann_return = (1 + total_return) ** (1 / n_years) - 1 if n_years > 0 else 0
ann_vol = float(np.std(returns, ddof=1) * np.sqrt(252)) if len(returns) > 1 else 0
sharpe = (ann_return - 0.04) / ann_vol if ann_vol > 0 else 0
# Sortino
downside = returns[returns < 0]
down_dev = float(np.std(downside, ddof=1) * np.sqrt(252)) if len(downside) > 1 else ann_vol
sortino = (ann_return - 0.04) / down_dev if down_dev > 0 else 0
# Max drawdown
cum = np.cumprod(1 + returns)
peak = np.maximum.accumulate(cum)
dd = (cum - peak) / peak
max_dd = float(np.min(dd)) if len(dd) > 0 else 0
# Calmar
calmar = ann_return / abs(max_dd) if max_dd != 0 else 0
# Win rate and profit factor
trade_pnls = [t.get("pnl", 0) for t in trades if t.get("pnl") is not None]
wins = [p for p in trade_pnls if p > 0]
losses = [p for p in trade_pnls if p < 0]
win_rate = len(wins) / len(trade_pnls) if trade_pnls else 0
profit_factor = (
sum(wins) / abs(sum(losses)) if losses else float("inf") if wins else 0
)
avg_trade_return = float(np.mean(trade_pnls)) if trade_pnls else 0
metrics = {
"total_return": round(total_return, 4),
"annualized_return": round(ann_return, 4),
"sharpe_ratio": round(sharpe, 4),
"sortino_ratio": round(sortino, 4),
"max_drawdown": round(max_dd, 4),
"volatility": round(ann_vol, 4),
"calmar_ratio": round(calmar, 4),
"win_rate": round(win_rate, 4),
"profit_factor": round(profit_factor, 4) if profit_factor != float("inf") else None,
"total_trades": len(trades),
"avg_trade_return": round(avg_trade_return, 2),
"total_commission": round(total_commission, 2),
"total_slippage": round(total_slippage, 2),
}
# Alpha/beta vs benchmark
if bench_prices is not None and len(bench_prices) > 10:
bench_ret = bench_prices.pct_change().dropna().values
min_len = min(len(returns), len(bench_ret))
if min_len > 10:
r = returns[:min_len]
b = bench_ret[:min_len]
cov_rb = np.cov(r, b)[0, 1]
var_b = np.var(b, ddof=1)
beta = cov_rb / var_b if var_b > 0 else 1.0
alpha = ann_return - beta * float(np.mean(b) * 252)
metrics["beta"] = round(float(beta), 4)
metrics["alpha"] = round(float(alpha), 4)
return metrics
@staticmethod
def _compute_monthly_returns(
equity_curve: List[Dict[str, Any]],
) -> Dict[str, float]:
"""Compute monthly returns from equity curve."""
if not equity_curve:
return {}
monthly: Dict[str, float] = {}
prev_value = equity_curve[0]["portfolio_value"]
current_month = equity_curve[0]["date"][:7]
for point in equity_curve:
month = point["date"][:7]
if month != current_month:
monthly[current_month] = round(
(point["portfolio_value"] / prev_value - 1), 4
)
prev_value = point["portfolio_value"]
current_month = month
# Final month
if equity_curve:
monthly[current_month] = round(
(equity_curve[-1]["portfolio_value"] / prev_value - 1), 4
)
return monthly
backtest_engine = BacktestEngine()