""" p170_backtest.py — P170 Signal Backtest Engine ================================================ Regenerates dated signals directly from price history using saved global model. This is more accurate than the predictions parquet (which has no dates). Usage: python p170_backtest.py # full backtest python p170_backtest.py --symbol RTNINDIA # single symbol python p170_backtest.py --walk-forward # year by year python p170_backtest.py --threshold-sweep # optimal confidence python p170_backtest.py --min-confidence 0.45 # tighter filter """ from __future__ import annotations import argparse import json import sys from datetime import datetime from pathlib import Path from typing import Optional import numpy as np import pandas as pd sys.path.insert(0, str(Path(__file__).parent)) # ───────────────────────────────────────────────────────────────────────────── # CONFIG # ───────────────────────────────────────────────────────────────────────────── BASE = Path(".") GDIR = BASE / "data/lake/ml/p170/models/GLOBAL" UNIVERSE_PATH = BASE / "data/lake/ml/p170/p170_signal_universe.csv" OUT_DIR = BASE / "data/lake/ml/p170/backtest" OUT_DIR.mkdir(parents=True, exist_ok=True) HOLD_DAYS = 3 # exit after N trading days STOP_LOSS_PCT = 0.015 # -1.5% TARGET_PCT = 0.030 # +3.0% (2:1 R:R) COST_PCT = 0.002 # 0.2% round-trip INITIAL_CAP = 100_000 # ETF patterns to exclude ETF_PAT = {"LIQUID","GILT","GOLD","SILVER","ETF","BEES","GSEC","CASH", "BBETF","NIFTY","LOWVOL","QUALITY","SENSEX","MOM","GROWW","SETF"} # ───────────────────────────────────────────────────────────────────────────── # LOAD GLOBAL MODEL (cached) # ───────────────────────────────────────────────────────────────────────────── _model_cache: dict = {} def load_global_model() -> tuple: """Load scaler, feature cols and base models. Cached after first call.""" if _model_cache: return (_model_cache["scaler"], _model_cache["fcols"], _model_cache["models"]) import joblib scaler = joblib.load(GDIR / "scaler.joblib") fcols = joblib.load(GDIR / "feature_cols.joblib") models = {} for name in ["xgboost","lightgbm","catboost","random_forest","extra_trees"]: p = GDIR / f"{name}.joblib" if p.exists(): try: models[name] = joblib.load(p) except Exception: pass _model_cache.update({"scaler": scaler, "fcols": fcols, "models": models}) return scaler, fcols, models # ───────────────────────────────────────────────────────────────────────────── # GENERATE DATED SIGNALS FOR ONE SYMBOL # ───────────────────────────────────────────────────────────────────────────── def generate_signals(symbol: str) -> Optional[pd.DataFrame]: """ Run the global model over full price history for symbol. Returns DataFrame with columns: [date, close, pred_class, prob_down, prob_neutral, prob_up, confidence, actual_class] Returns None if data unavailable. """ from p170_max_system import load_candles, engineer_features try: scaler, fcols, models = load_global_model() if not models: return None raw = load_candles(symbol) if raw is None or len(raw) < 60: return None df = engineer_features(raw) df = df.dropna(subset=fcols).reset_index(drop=True) if len(df) < 30: return None X = df[fcols].values.astype("float32") X_s = scaler.transform(X) probas = np.mean( [m.predict_proba(X_s) for m in models.values()], axis=0 ) preds = np.argmax(probas, axis=1) conf = probas.max(axis=1) out = pd.DataFrame({ "date": pd.to_datetime(df["date"].values), "close": df["close"].values.astype(float), "pred_class": preds.astype(int), "prob_down": probas[:, 0], "prob_neutral": probas[:, 1], "prob_up": probas[:, 2], "confidence": conf, "actual_class": df["direction_class"].values.astype(int), }) return out.sort_values("date").reset_index(drop=True) except Exception as e: return None # ───────────────────────────────────────────────────────────────────────────── # SIMULATE TRADES # ───────────────────────────────────────────────────────────────────────────── def simulate_trades( symbol: str, signals_df: pd.DataFrame, min_confidence: float = 0.42, date_from: Optional[str] = None, date_to: Optional[str] = None, ) -> list[dict]: """ Simulate trades from dated signals. Entry: next close after signal day. Exit: HOLD_DAYS later OR stop loss OR target. """ df = signals_df.copy() # Date filter if date_from: df = df[df["date"] >= pd.Timestamp(date_from)] if date_to: df = df[df["date"] <= pd.Timestamp(date_to)] # Filter: directional + above confidence df = df[ (df["confidence"] >= min_confidence) & (df["pred_class"] != 1) ].reset_index(drop=True) if df.empty: return [] trades = [] prices = signals_df.set_index("date")["close"].sort_index() dates = prices.index.tolist() for _, row in df.iterrows(): sig_date = row["date"] direction = "BUY" if row["pred_class"] == 2 else "SELL" conf = float(row["confidence"]) # Entry: next trading day after signal future = [d for d in dates if d > sig_date] if len(future) < 2: continue entry_date = future[0] entry_price = float(prices[entry_date]) exit_price = entry_price exit_reason = "MAX_HOLD" hold = 0 for day_offset in range(1, HOLD_DAYS + 1): if day_offset >= len(future): break curr_date = future[day_offset] curr_price = float(prices[curr_date]) hold = day_offset if direction == "BUY": ret = (curr_price - entry_price) / entry_price if ret <= -STOP_LOSS_PCT: exit_price = entry_price * (1 - STOP_LOSS_PCT) exit_reason = "STOP_LOSS" break if ret >= TARGET_PCT: exit_price = entry_price * (1 + TARGET_PCT) exit_reason = "TARGET" break exit_price = curr_price else: # SELL / short ret = (entry_price - curr_price) / entry_price if ret <= -STOP_LOSS_PCT: exit_price = entry_price * (1 + STOP_LOSS_PCT) exit_reason = "STOP_LOSS" break if ret >= TARGET_PCT: exit_price = entry_price * (1 - TARGET_PCT) exit_reason = "TARGET" break exit_price = curr_price gross = ((exit_price - entry_price) / entry_price if direction == "BUY" else (entry_price - exit_price) / entry_price) net = gross - COST_PCT trades.append({ "symbol": symbol, "direction": direction, "signal_date": str(sig_date.date()), "entry_date": str(entry_date.date()), "entry_price": round(entry_price, 2), "exit_price": round(exit_price, 2), "exit_reason": exit_reason, "hold_days": hold, "confidence": round(conf, 4), "gross_return_pct": round(gross * 100, 3), "net_return_pct": round(net * 100, 3), }) return trades # ───────────────────────────────────────────────────────────────────────────── # METRICS # ───────────────────────────────────────────────────────────────────────────── def compute_metrics(trades: list[dict]) -> dict: if not trades: return {"trade_count": 0, "win_rate_pct": 0, "total_return_pct": 0, "sharpe": 0, "max_drawdown_pct": 0, "profit_factor": 0, "avg_trade_return_pct": 0} rets = [t["net_return_pct"] for t in trades] # Fixed position sizing: each trade uses CAPITAL_PER_TRADE fraction # Equity grows/shrinks by the P&L of each trade, not full compounding POSITION_SIZE = INITIAL_CAP * 0.05 # 5% per trade equity = [INITIAL_CAP] for r in rets: pnl = POSITION_SIZE * (r / 100) equity.append(equity[-1] + pnl) eq_s = pd.Series(equity) dd = eq_s / eq_s.cummax() - 1 max_dd = float(dd.min() * 100) wins = [r for r in rets if r > 0] losses = [r for r in rets if r <= 0] win_r = len(wins) / len(rets) * 100 gw = sum(wins) gl = abs(sum(losses)) pf = gw / gl if gl > 0 else (gw if gw > 0 else 0) s = pd.Series(rets) / 100 sharpe = float(s.mean() / s.std() * np.sqrt(252)) if s.std() > 0 else 0 return { "trade_count": len(trades), "win_rate_pct": round(win_r, 2), "total_return_pct": round((equity[-1] / INITIAL_CAP - 1) * 100, 2), "final_equity": round(equity[-1], 2), "sharpe": round(sharpe, 3), "max_drawdown_pct": round(max_dd, 2), "profit_factor": round(pf, 3), "avg_trade_return_pct": round(float(np.mean(rets)), 3), "avg_hold_days": round(np.mean([t.get("hold_days", 3) for t in trades]), 1), "stop_exits": sum(1 for t in trades if t["exit_reason"] == "STOP_LOSS"), "target_exits": sum(1 for t in trades if t["exit_reason"] == "TARGET"), "hold_exits": sum(1 for t in trades if t["exit_reason"] == "MAX_HOLD"), "buy_trades": sum(1 for t in trades if t["direction"] == "BUY"), "sell_trades": sum(1 for t in trades if t["direction"] == "SELL"), } # ───────────────────────────────────────────────────────────────────────────── # FULL BACKTEST # ───────────────────────────────────────────────────────────────────────────── def run_backtest( min_accuracy: float = 0.55, min_confidence: float = 0.42, symbol: Optional[str] = None, date_from: Optional[str] = None, date_to: Optional[str] = None, verbose: bool = True, top_n: Optional[int] = None, ) -> dict: t0 = datetime.now() # Load universe if not UNIVERSE_PATH.exists(): print("ERROR: Signal universe not found. Run p170_max_system.py first.") return {} universe = pd.read_csv(UNIVERSE_PATH) qualifying = universe[universe["accuracy"] >= min_accuracy]["symbol"].tolist() # Filter ETFs qualifying = [s for s in qualifying if not any(x in s.upper() for x in ETF_PAT)] if symbol: qualifying = [s for s in qualifying if s.upper() == symbol.upper()] if not qualifying: # Allow symbol even if not in universe qualifying = [symbol.upper()] if top_n: qualifying = qualifying[:top_n] if verbose: print(f"\n{'='*60}") print(f" P170 BACKTEST ENGINE") print(f" {len(qualifying)} symbols | " f"min_acc={min_accuracy} conf>={min_confidence}") if date_from or date_to: print(f" Period: {date_from or 'all'} → {date_to or 'all'}") print(f"{'='*60}") print(f" Hold:{HOLD_DAYS}d Stop:{STOP_LOSS_PCT*100:.1f}% " f"Target:{TARGET_PCT*100:.1f}% Cost:{COST_PCT*100:.1f}%\n") all_trades = [] sym_results = [] skipped = 0 for i, sym in enumerate(qualifying): sigs = generate_signals(sym) if sigs is None: skipped += 1 continue trades = simulate_trades(sym, sigs, min_confidence, date_from, date_to) if not trades: skipped += 1 continue m = compute_metrics(trades) sym_results.append({ "symbol": sym, "n_trades": m["trade_count"], "win_rate_pct": m["win_rate_pct"], "total_return_pct": m["total_return_pct"], "sharpe": m["sharpe"], "max_drawdown_pct": m["max_drawdown_pct"], "profit_factor": m["profit_factor"], "avg_trade_ret": m["avg_trade_return_pct"], "buy_trades": m["buy_trades"], "sell_trades": m["sell_trades"], }) all_trades.extend(trades) if verbose and (i + 1) % 25 == 0: done = len(sym_results) avg_wr = np.mean([r["win_rate_pct"] for r in sym_results]) print(f" [{i+1}/{len(qualifying)}] " f"{len(all_trades)} trades | " f"avg win rate={avg_wr:.1f}%") if not all_trades: if verbose: print(" No trades generated.") return {} overall = compute_metrics(all_trades) sym_df = pd.DataFrame(sym_results).sort_values("sharpe", ascending=False) buy_trades = [t for t in all_trades if t["direction"] == "BUY"] sell_trades = [t for t in all_trades if t["direction"] == "SELL"] buy_m = compute_metrics(buy_trades) sell_m = compute_metrics(sell_trades) elapsed = (datetime.now() - t0).total_seconds() if verbose: print(f"\n{'='*60}") print(f" RESULTS ({len(all_trades):,} trades, " f"{len(sym_results)} symbols, {skipped} skipped)") print(f"{'='*60}") print(f" Win rate: {overall['win_rate_pct']:.1f}%") print(f" Total return: {overall['total_return_pct']:.2f}%") print(f" Sharpe: {overall['sharpe']:.3f}") print(f" Max drawdown: {overall['max_drawdown_pct']:.2f}%") print(f" Profit factor: {overall['profit_factor']:.3f}") print(f" Avg trade: {overall['avg_trade_return_pct']:.3f}%") print(f" Stop/Target/Hold: " f"{overall['stop_exits']}/" f"{overall['target_exits']}/" f"{overall['hold_exits']}") print(f"\n BUY {len(buy_trades):5,} trades | " f"win={buy_m['win_rate_pct']:.1f}% | " f"ret={buy_m['total_return_pct']:.2f}% | " f"pf={buy_m['profit_factor']:.3f}") print(f" SELL {len(sell_trades):5,} trades | " f"win={sell_m['win_rate_pct']:.1f}% | " f"ret={sell_m['total_return_pct']:.2f}% | " f"pf={sell_m['profit_factor']:.3f}") print(f"\n Top 15 by Sharpe:") cols = ["symbol","n_trades","win_rate_pct","total_return_pct", "sharpe","max_drawdown_pct","profit_factor"] print(sym_df[cols].head(15).to_string(index=False)) print(f"\n Bottom 10:") print(sym_df[cols].tail(10).to_string(index=False)) # Go/No-Go checks = { "Win rate > 52%": overall["win_rate_pct"] > 52, "Sharpe > 1.0": overall["sharpe"] > 1.0, "Max DD < 20%": overall["max_drawdown_pct"] > -20, "Profit factor > 1.2": overall["profit_factor"] > 1.2, "Avg trade > 0.1%": overall["avg_trade_return_pct"] > 0.1, } all_pass = all(checks.values()) print(f"\n{'='*60}") print(f" GO / NO-GO ASSESSMENT") print(f"{'='*60}") for check, passed in checks.items(): print(f" {'✓' if passed else '✗'} {check}") verdict = "→ READY FOR PAPER TRADING" if all_pass else "→ NOT READY — tune first" print(f"\n {verdict}") print(f" Elapsed: {elapsed:.1f}s") print(f"{'='*60}\n") # Save ts = datetime.now().strftime("%Y%m%d_%H%M%S") pd.DataFrame(all_trades).to_csv(OUT_DIR / f"trades_{ts}.csv", index=False) sym_df.to_csv(OUT_DIR / f"symbol_results_{ts}.csv", index=False) summary = { "timestamp": datetime.now().isoformat(), "config": { "min_accuracy": min_accuracy, "min_confidence": min_confidence, "hold_days": HOLD_DAYS, "stop_loss_pct": STOP_LOSS_PCT * 100, "target_pct": TARGET_PCT * 100, "cost_pct": COST_PCT * 100, "date_from": date_from, "date_to": date_to, }, "overall": overall, "buy_only": buy_m, "sell_only": sell_m, "n_symbols": len(sym_results), "n_skipped": skipped, "go_nogo": all_pass if verbose else None, "top10": sym_df.head(10).to_dict("records"), } out_path = OUT_DIR / f"backtest_summary_{ts}.json" out_path.write_text(json.dumps(summary, indent=2, default=str)) (OUT_DIR / "latest_backtest.json").write_text( json.dumps(summary, indent=2, default=str)) if verbose: print(f" Saved to {OUT_DIR}/") return summary # ───────────────────────────────────────────────────────────────────────────── # WALK-FORWARD # ───────────────────────────────────────────────────────────────────────────── def run_walk_forward( min_accuracy: float = 0.55, min_confidence: float = 0.42, top_n: int = 50, ) -> None: print(f"\n{'#'*60}") print(f" WALK-FORWARD — Year by Year (top {top_n} symbols)") print(f"{'#'*60}\n") years = list(range(2017, datetime.now().year + 1)) rows = [] for year in years: d_from = f"{year}-01-01" d_to = f"{year}-12-31" print(f" {year}...", end=" ", flush=True) r = run_backtest( min_accuracy=min_accuracy, min_confidence=min_confidence, date_from=d_from, date_to=d_to, verbose=False, top_n=top_n, ) if not r: print("no trades") continue m = r["overall"] rows.append({ "year": year, "trades": m["trade_count"], "win_pct": m["win_rate_pct"], "return": m["total_return_pct"], "sharpe": m["sharpe"], "max_dd": m["max_drawdown_pct"], "pf": m["profit_factor"], }) print(f"trades={m['trade_count']} win={m['win_rate_pct']:.1f}% " f"ret={m['total_return_pct']:.2f}% sh={m['sharpe']:.3f} " f"dd={m['max_drawdown_pct']:.2f}%") if rows: df = pd.DataFrame(rows) print(f"\n{'='*60}") print(" WALK-FORWARD SUMMARY") print(f"{'='*60}") print(df.to_string(index=False)) print(f"\n Avg Sharpe: {df['sharpe'].mean():.3f}") print(f" Avg Win Rate: {df['win_pct'].mean():.1f}%") print(f" Profitable years: {(df['return'] > 0).sum()}/{len(df)}") print(f" Worst year: " f"{df.loc[df['return'].idxmin(),'year']} " f"({df['return'].min():.2f}%)") df.to_csv(OUT_DIR / "walk_forward.csv", index=False) print(f"\n Saved: {OUT_DIR}/walk_forward.csv") # ───────────────────────────────────────────────────────────────────────────── # THRESHOLD SWEEP # ───────────────────────────────────────────────────────────────────────────── def threshold_sweep(min_accuracy: float = 0.55, top_n: int = 50) -> None: print(f"\n{'='*60}") print(f" THRESHOLD SWEEP (min_acc={min_accuracy}, top {top_n} symbols)") print(f"{'='*60}") print(f" {'Conf':>6} {'Trades':>7} {'Win%':>6} " f"{'Ret%':>7} {'Sharpe':>7} {'DD%':>7} {'PF':>6}") print(f" {'-'*56}") best_sh = -999 best_conf = 0.42 for conf in [0.38, 0.40, 0.42, 0.44, 0.45, 0.47, 0.50, 0.52, 0.55]: r = run_backtest( min_accuracy=min_accuracy, min_confidence=conf, verbose=False, top_n=top_n, ) if not r: print(f" {conf:.2f} no trades") continue m = r["overall"] if m["trade_count"] < 20: continue mark = " ←" if m["sharpe"] > best_sh else "" if m["sharpe"] > best_sh: best_sh = m["sharpe"] best_conf = conf print(f" {conf:.2f} {m['trade_count']:>7,} " f"{m['win_rate_pct']:>5.1f}% " f"{m['total_return_pct']:>6.2f}% " f"{m['sharpe']:>7.3f} " f"{m['max_drawdown_pct']:>6.2f}% " f"{m['profit_factor']:>5.3f}{mark}") print(f"\n Best: conf={best_conf:.2f} Sharpe={best_sh:.3f}") print(f" Use: python run_daily_signals.py --min-confidence {best_conf:.2f}\n") # ───────────────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description="P170 Backtest Engine") ap.add_argument("--symbol", type=str, default=None) ap.add_argument("--min-accuracy", type=float, default=0.55) ap.add_argument("--min-confidence", type=float, default=0.42) ap.add_argument("--date-from", type=str, default=None) ap.add_argument("--date-to", type=str, default=None) ap.add_argument("--top-n", type=int, default=None) ap.add_argument("--walk-forward", action="store_true") ap.add_argument("--threshold-sweep", action="store_true") args = ap.parse_args() if args.walk_forward: run_walk_forward( min_accuracy=args.min_accuracy, min_confidence=args.min_confidence, top_n=args.top_n or 50, ) elif args.threshold_sweep: threshold_sweep( min_accuracy=args.min_accuracy, top_n=args.top_n or 50, ) else: run_backtest( min_accuracy=args.min_accuracy, min_confidence=args.min_confidence, symbol=args.symbol, date_from=args.date_from, date_to=args.date_to, top_n=args.top_n, verbose=True, ) if __name__ == "__main__": main()