| import logging |
| import time |
| import pandas as pd |
| import numpy as np |
| from datetime import datetime |
|
|
| from backend.agents.signal_agent import detect_signals |
| from backend.agents.regime_agent import detect_regime, filter_signals_by_regime |
| from backend.agents.risk_agent import evaluate_risk |
| from backend.agents.portfolio_agent import size_position |
| from backend.agents.alpha_scorer import compute_alpha_score |
| from backend.features.feature_store import compute_universe_features |
| from config import BACKTEST_PARAMS as BP |
| from config import RISK_PARAMS as RP |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def run_portfolio_backtest(tickers: list[str], period: str = "2y", initial_capital: float = 10_00_000): |
| """ |
| Run a walk-forward portfolio backtest. |
| Simulates the daily scan pipeline over historical data. |
| """ |
| start_time = time.time() |
| logger.info(f"🚀 Starting Walk-Forward Backtest: {len(tickers)} tickers, period={period}") |
|
|
| |
| |
| |
| feature_results = compute_universe_features(tickers, period=period) |
| if not feature_results: |
| return {"error": "No data found for backtest"} |
|
|
| |
| all_dates = [] |
| for r in feature_results: |
| all_dates.extend(r["df"].index.tolist()) |
| |
| unique_dates = sorted(list(set(all_dates))) |
| |
| test_dates = unique_dates[100:] |
| |
| logger.info(f"Backtesting over {len(test_dates)} trading days...") |
|
|
| |
| capital = initial_capital |
| portfolio_value = initial_capital |
| active_trades = [] |
| equity_curve = [] |
| trade_log = [] |
|
|
| |
| for i, current_date in enumerate(test_dates): |
| |
| current_equity = capital |
| still_active = [] |
| |
| for trade in active_trades: |
| ticker = trade["ticker"] |
| |
| ticker_data = next((r["df"] for r in feature_results if r["ticker"] == ticker), None) |
| if ticker_data is not None and current_date in ticker_data.index: |
| current_price = ticker_data.loc[current_date, "Close"] |
| high_price = ticker_data.loc[current_date, "High"] |
| low_price = ticker_data.loc[current_date, "Low"] |
| |
| |
| if low_price <= trade["stop_loss"]: |
| |
| exit_price = trade["stop_loss"] |
| pnl = (exit_price - trade["entry_price"]) * trade["quantity"] |
| capital += (trade["quantity"] * exit_price) |
| trade_log.append({**trade, "exit_date": current_date, "exit_price": exit_price, "pnl": pnl, "status": "SL"}) |
| elif high_price >= trade["target_1"]: |
| |
| |
| exit_price = trade["target_1"] |
| pnl = (exit_price - trade["entry_price"]) * trade["quantity"] |
| capital += (trade["quantity"] * exit_price) |
| trade_log.append({**trade, "exit_date": current_date, "exit_price": exit_price, "pnl": pnl, "status": "TP1"}) |
| else: |
| |
| current_equity += (trade["quantity"] * current_price) |
| still_active.append(trade) |
| else: |
| |
| still_active.append(trade) |
| |
| active_trades = still_active |
| portfolio_value = current_equity |
| equity_curve.append({"date": current_date, "equity": portfolio_value}) |
|
|
| |
| |
| daily_signals = [] |
| for r in feature_results: |
| ticker = r["ticker"] |
| df_until_now = r["df"].loc[:current_date] |
| if len(df_until_now) < 2 or current_date not in df_until_now.index: |
| continue |
| |
| latest_row = df_until_now.iloc[-1].to_dict() |
| |
| signals = detect_signals(ticker, latest_row, df_until_now) |
| daily_signals.extend(signals) |
|
|
| if not daily_signals: |
| continue |
|
|
| |
| |
| |
| regime = {"regime": "trend_up", "confidence": 0.8, "valid_signals": ["momentum_breakout", "golden_cross", "macd_bullish_cross", "volume_breakout"]} |
|
|
| |
| |
| scored_signals = [] |
| for s in daily_signals: |
| ticker = s["ticker"] |
| r = next((res for res in feature_results if res["ticker"] == ticker), None) |
| latest_features = r["df"].loc[:current_date].iloc[-1].to_dict() |
| |
| |
| risk = evaluate_risk(s, latest_features, active_trades) |
| if risk["decision"] in ("approve", "reduce"): |
| s["alpha_score"] = compute_alpha_score(s, latest_features) |
| scored_signals.append(s) |
|
|
| scored_signals.sort(key=lambda x: x["alpha_score"], reverse=True) |
|
|
| |
| for s in scored_signals[:3]: |
| if any(t["ticker"] == s["ticker"] for t in active_trades): |
| continue |
| |
| |
| s = size_position(s, portfolio_value, active_trades) |
| cost = s["quantity"] * s["entry_price"] |
| |
| if cost > 0 and capital >= cost: |
| capital -= cost |
| active_trades.append({**s, "entry_date": current_date}) |
|
|
| |
| duration = time.time() - start_time |
| performance = _calculate_metrics(equity_curve, trade_log, initial_capital) |
| |
| logger.info(f"✅ Backtest Complete in {duration:.1f}s. Final Equity: {portfolio_value:,.0f}") |
| |
| return { |
| "metrics": performance, |
| "equity_curve": equity_curve, |
| "trades": trade_log, |
| "duration_seconds": duration |
| } |
|
|
| def _calculate_metrics(equity_curve, trade_log, initial_capital): |
| if not equity_curve: |
| return {} |
| |
| df_equity = pd.DataFrame(equity_curve) |
| final_equity = df_equity["equity"].iloc[-1] |
| total_return_pct = (final_equity / initial_capital - 1) * 100 |
| |
| |
| df_equity["peak"] = df_equity["equity"].cummax() |
| df_equity["drawdown"] = (df_equity["equity"] - df_equity["peak"]) / df_equity["peak"] * 100 |
| max_drawdown = df_equity["drawdown"].min() |
| |
| |
| df_trades = pd.DataFrame(trade_log) |
| win_rate = 0 |
| if not df_trades.empty: |
| win_rate = (df_trades["pnl"] > 0).mean() * 100 |
| |
| return { |
| "total_return_pct": round(total_return_pct, 2), |
| "max_drawdown_pct": round(max_drawdown, 2), |
| "win_rate_pct": round(win_rate, 1), |
| "total_trades": len(trade_log), |
| "final_equity": round(final_equity, 2) |
| } |
|
|