| """ |
| p170_backtest.py β P170 Signal Backtest Engine |
| ================================================ |
| Regenerates dated signals directly from price history using saved global model. |
| This is more accurate than the predictions parquet (which has no dates). |
| |
| Usage: |
| python p170_backtest.py # full backtest |
| python p170_backtest.py --symbol RTNINDIA # single symbol |
| python p170_backtest.py --walk-forward # year by year |
| python p170_backtest.py --threshold-sweep # optimal confidence |
| python p170_backtest.py --min-confidence 0.45 # tighter filter |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| sys.path.insert(0, str(Path(__file__).parent)) |
|
|
| |
| |
| |
| BASE = Path(".") |
| GDIR = BASE / "data/lake/ml/p170/models/GLOBAL" |
| UNIVERSE_PATH = BASE / "data/lake/ml/p170/p170_signal_universe.csv" |
| OUT_DIR = BASE / "data/lake/ml/p170/backtest" |
| OUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| HOLD_DAYS = 3 |
| STOP_LOSS_PCT = 0.015 |
| TARGET_PCT = 0.030 |
| COST_PCT = 0.002 |
| INITIAL_CAP = 100_000 |
|
|
| |
| ETF_PAT = {"LIQUID","GILT","GOLD","SILVER","ETF","BEES","GSEC","CASH", |
| "BBETF","NIFTY","LOWVOL","QUALITY","SENSEX","MOM","GROWW","SETF"} |
|
|
|
|
| |
| |
| |
| _model_cache: dict = {} |
|
|
| def load_global_model() -> tuple: |
| """Load scaler, feature cols and base models. Cached after first call.""" |
| if _model_cache: |
| return (_model_cache["scaler"], _model_cache["fcols"], |
| _model_cache["models"]) |
| import joblib |
| scaler = joblib.load(GDIR / "scaler.joblib") |
| fcols = joblib.load(GDIR / "feature_cols.joblib") |
| models = {} |
| for name in ["xgboost","lightgbm","catboost","random_forest","extra_trees"]: |
| p = GDIR / f"{name}.joblib" |
| if p.exists(): |
| try: |
| models[name] = joblib.load(p) |
| except Exception: |
| pass |
| _model_cache.update({"scaler": scaler, "fcols": fcols, "models": models}) |
| return scaler, fcols, models |
|
|
|
|
| |
| |
| |
|
|
| def generate_signals(symbol: str) -> Optional[pd.DataFrame]: |
| """ |
| Run the global model over full price history for symbol. |
| Returns DataFrame with columns: |
| [date, close, pred_class, prob_down, prob_neutral, prob_up, |
| confidence, actual_class] |
| Returns None if data unavailable. |
| """ |
| from p170_max_system import load_candles, engineer_features |
|
|
| try: |
| scaler, fcols, models = load_global_model() |
| if not models: |
| return None |
|
|
| raw = load_candles(symbol) |
| if raw is None or len(raw) < 60: |
| return None |
|
|
| df = engineer_features(raw) |
| df = df.dropna(subset=fcols).reset_index(drop=True) |
| if len(df) < 30: |
| return None |
|
|
| X = df[fcols].values.astype("float32") |
| X_s = scaler.transform(X) |
|
|
| probas = np.mean( |
| [m.predict_proba(X_s) for m in models.values()], axis=0 |
| ) |
| preds = np.argmax(probas, axis=1) |
| conf = probas.max(axis=1) |
|
|
| out = pd.DataFrame({ |
| "date": pd.to_datetime(df["date"].values), |
| "close": df["close"].values.astype(float), |
| "pred_class": preds.astype(int), |
| "prob_down": probas[:, 0], |
| "prob_neutral": probas[:, 1], |
| "prob_up": probas[:, 2], |
| "confidence": conf, |
| "actual_class": df["direction_class"].values.astype(int), |
| }) |
| return out.sort_values("date").reset_index(drop=True) |
|
|
| except Exception as e: |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def simulate_trades( |
| symbol: str, |
| signals_df: pd.DataFrame, |
| min_confidence: float = 0.42, |
| date_from: Optional[str] = None, |
| date_to: Optional[str] = None, |
| ) -> list[dict]: |
| """ |
| Simulate trades from dated signals. |
| Entry: next close after signal day. |
| Exit: HOLD_DAYS later OR stop loss OR target. |
| """ |
| df = signals_df.copy() |
|
|
| |
| if date_from: |
| df = df[df["date"] >= pd.Timestamp(date_from)] |
| if date_to: |
| df = df[df["date"] <= pd.Timestamp(date_to)] |
|
|
| |
| df = df[ |
| (df["confidence"] >= min_confidence) & |
| (df["pred_class"] != 1) |
| ].reset_index(drop=True) |
|
|
| if df.empty: |
| return [] |
|
|
| trades = [] |
| prices = signals_df.set_index("date")["close"].sort_index() |
| dates = prices.index.tolist() |
|
|
| for _, row in df.iterrows(): |
| sig_date = row["date"] |
| direction = "BUY" if row["pred_class"] == 2 else "SELL" |
| conf = float(row["confidence"]) |
|
|
| |
| future = [d for d in dates if d > sig_date] |
| if len(future) < 2: |
| continue |
|
|
| entry_date = future[0] |
| entry_price = float(prices[entry_date]) |
|
|
| exit_price = entry_price |
| exit_reason = "MAX_HOLD" |
| hold = 0 |
|
|
| for day_offset in range(1, HOLD_DAYS + 1): |
| if day_offset >= len(future): |
| break |
| curr_date = future[day_offset] |
| curr_price = float(prices[curr_date]) |
| hold = day_offset |
|
|
| if direction == "BUY": |
| ret = (curr_price - entry_price) / entry_price |
| if ret <= -STOP_LOSS_PCT: |
| exit_price = entry_price * (1 - STOP_LOSS_PCT) |
| exit_reason = "STOP_LOSS" |
| break |
| if ret >= TARGET_PCT: |
| exit_price = entry_price * (1 + TARGET_PCT) |
| exit_reason = "TARGET" |
| break |
| exit_price = curr_price |
| else: |
| ret = (entry_price - curr_price) / entry_price |
| if ret <= -STOP_LOSS_PCT: |
| exit_price = entry_price * (1 + STOP_LOSS_PCT) |
| exit_reason = "STOP_LOSS" |
| break |
| if ret >= TARGET_PCT: |
| exit_price = entry_price * (1 - TARGET_PCT) |
| exit_reason = "TARGET" |
| break |
| exit_price = curr_price |
|
|
| gross = ((exit_price - entry_price) / entry_price |
| if direction == "BUY" |
| else (entry_price - exit_price) / entry_price) |
| net = gross - COST_PCT |
|
|
| trades.append({ |
| "symbol": symbol, |
| "direction": direction, |
| "signal_date": str(sig_date.date()), |
| "entry_date": str(entry_date.date()), |
| "entry_price": round(entry_price, 2), |
| "exit_price": round(exit_price, 2), |
| "exit_reason": exit_reason, |
| "hold_days": hold, |
| "confidence": round(conf, 4), |
| "gross_return_pct": round(gross * 100, 3), |
| "net_return_pct": round(net * 100, 3), |
| }) |
|
|
| return trades |
|
|
|
|
| |
| |
| |
|
|
| def compute_metrics(trades: list[dict]) -> dict: |
| if not trades: |
| return {"trade_count": 0, "win_rate_pct": 0, "total_return_pct": 0, |
| "sharpe": 0, "max_drawdown_pct": 0, "profit_factor": 0, |
| "avg_trade_return_pct": 0} |
|
|
| rets = [t["net_return_pct"] for t in trades] |
| |
| |
| POSITION_SIZE = INITIAL_CAP * 0.05 |
| equity = [INITIAL_CAP] |
| for r in rets: |
| pnl = POSITION_SIZE * (r / 100) |
| equity.append(equity[-1] + pnl) |
|
|
| eq_s = pd.Series(equity) |
| dd = eq_s / eq_s.cummax() - 1 |
| max_dd = float(dd.min() * 100) |
|
|
| wins = [r for r in rets if r > 0] |
| losses = [r for r in rets if r <= 0] |
| win_r = len(wins) / len(rets) * 100 |
|
|
| gw = sum(wins) |
| gl = abs(sum(losses)) |
| pf = gw / gl if gl > 0 else (gw if gw > 0 else 0) |
|
|
| s = pd.Series(rets) / 100 |
| sharpe = float(s.mean() / s.std() * np.sqrt(252)) if s.std() > 0 else 0 |
|
|
| return { |
| "trade_count": len(trades), |
| "win_rate_pct": round(win_r, 2), |
| "total_return_pct": round((equity[-1] / INITIAL_CAP - 1) * 100, 2), |
| "final_equity": round(equity[-1], 2), |
| "sharpe": round(sharpe, 3), |
| "max_drawdown_pct": round(max_dd, 2), |
| "profit_factor": round(pf, 3), |
| "avg_trade_return_pct": round(float(np.mean(rets)), 3), |
| "avg_hold_days": round(np.mean([t.get("hold_days", 3) for t in trades]), 1), |
| "stop_exits": sum(1 for t in trades if t["exit_reason"] == "STOP_LOSS"), |
| "target_exits": sum(1 for t in trades if t["exit_reason"] == "TARGET"), |
| "hold_exits": sum(1 for t in trades if t["exit_reason"] == "MAX_HOLD"), |
| "buy_trades": sum(1 for t in trades if t["direction"] == "BUY"), |
| "sell_trades": sum(1 for t in trades if t["direction"] == "SELL"), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_backtest( |
| min_accuracy: float = 0.55, |
| min_confidence: float = 0.42, |
| symbol: Optional[str] = None, |
| date_from: Optional[str] = None, |
| date_to: Optional[str] = None, |
| verbose: bool = True, |
| top_n: Optional[int] = None, |
| ) -> dict: |
|
|
| t0 = datetime.now() |
|
|
| |
| if not UNIVERSE_PATH.exists(): |
| print("ERROR: Signal universe not found. Run p170_max_system.py first.") |
| return {} |
|
|
| universe = pd.read_csv(UNIVERSE_PATH) |
| qualifying = universe[universe["accuracy"] >= min_accuracy]["symbol"].tolist() |
|
|
| |
| qualifying = [s for s in qualifying |
| if not any(x in s.upper() for x in ETF_PAT)] |
|
|
| if symbol: |
| qualifying = [s for s in qualifying if s.upper() == symbol.upper()] |
| if not qualifying: |
| |
| qualifying = [symbol.upper()] |
|
|
| if top_n: |
| qualifying = qualifying[:top_n] |
|
|
| if verbose: |
| print(f"\n{'='*60}") |
| print(f" P170 BACKTEST ENGINE") |
| print(f" {len(qualifying)} symbols | " |
| f"min_acc={min_accuracy} conf>={min_confidence}") |
| if date_from or date_to: |
| print(f" Period: {date_from or 'all'} β {date_to or 'all'}") |
| print(f"{'='*60}") |
| print(f" Hold:{HOLD_DAYS}d Stop:{STOP_LOSS_PCT*100:.1f}% " |
| f"Target:{TARGET_PCT*100:.1f}% Cost:{COST_PCT*100:.1f}%\n") |
|
|
| all_trades = [] |
| sym_results = [] |
| skipped = 0 |
|
|
| for i, sym in enumerate(qualifying): |
| sigs = generate_signals(sym) |
| if sigs is None: |
| skipped += 1 |
| continue |
|
|
| trades = simulate_trades(sym, sigs, min_confidence, date_from, date_to) |
| if not trades: |
| skipped += 1 |
| continue |
|
|
| m = compute_metrics(trades) |
| sym_results.append({ |
| "symbol": sym, |
| "n_trades": m["trade_count"], |
| "win_rate_pct": m["win_rate_pct"], |
| "total_return_pct": m["total_return_pct"], |
| "sharpe": m["sharpe"], |
| "max_drawdown_pct": m["max_drawdown_pct"], |
| "profit_factor": m["profit_factor"], |
| "avg_trade_ret": m["avg_trade_return_pct"], |
| "buy_trades": m["buy_trades"], |
| "sell_trades": m["sell_trades"], |
| }) |
| all_trades.extend(trades) |
|
|
| if verbose and (i + 1) % 25 == 0: |
| done = len(sym_results) |
| avg_wr = np.mean([r["win_rate_pct"] for r in sym_results]) |
| print(f" [{i+1}/{len(qualifying)}] " |
| f"{len(all_trades)} trades | " |
| f"avg win rate={avg_wr:.1f}%") |
|
|
| if not all_trades: |
| if verbose: |
| print(" No trades generated.") |
| return {} |
|
|
| overall = compute_metrics(all_trades) |
| sym_df = pd.DataFrame(sym_results).sort_values("sharpe", ascending=False) |
|
|
| buy_trades = [t for t in all_trades if t["direction"] == "BUY"] |
| sell_trades = [t for t in all_trades if t["direction"] == "SELL"] |
| buy_m = compute_metrics(buy_trades) |
| sell_m = compute_metrics(sell_trades) |
|
|
| elapsed = (datetime.now() - t0).total_seconds() |
|
|
| if verbose: |
| print(f"\n{'='*60}") |
| print(f" RESULTS ({len(all_trades):,} trades, " |
| f"{len(sym_results)} symbols, {skipped} skipped)") |
| print(f"{'='*60}") |
| print(f" Win rate: {overall['win_rate_pct']:.1f}%") |
| print(f" Total return: {overall['total_return_pct']:.2f}%") |
| print(f" Sharpe: {overall['sharpe']:.3f}") |
| print(f" Max drawdown: {overall['max_drawdown_pct']:.2f}%") |
| print(f" Profit factor: {overall['profit_factor']:.3f}") |
| print(f" Avg trade: {overall['avg_trade_return_pct']:.3f}%") |
| print(f" Stop/Target/Hold: " |
| f"{overall['stop_exits']}/" |
| f"{overall['target_exits']}/" |
| f"{overall['hold_exits']}") |
| print(f"\n BUY {len(buy_trades):5,} trades | " |
| f"win={buy_m['win_rate_pct']:.1f}% | " |
| f"ret={buy_m['total_return_pct']:.2f}% | " |
| f"pf={buy_m['profit_factor']:.3f}") |
| print(f" SELL {len(sell_trades):5,} trades | " |
| f"win={sell_m['win_rate_pct']:.1f}% | " |
| f"ret={sell_m['total_return_pct']:.2f}% | " |
| f"pf={sell_m['profit_factor']:.3f}") |
| print(f"\n Top 15 by Sharpe:") |
| cols = ["symbol","n_trades","win_rate_pct","total_return_pct", |
| "sharpe","max_drawdown_pct","profit_factor"] |
| print(sym_df[cols].head(15).to_string(index=False)) |
| print(f"\n Bottom 10:") |
| print(sym_df[cols].tail(10).to_string(index=False)) |
|
|
| |
| checks = { |
| "Win rate > 52%": overall["win_rate_pct"] > 52, |
| "Sharpe > 1.0": overall["sharpe"] > 1.0, |
| "Max DD < 20%": overall["max_drawdown_pct"] > -20, |
| "Profit factor > 1.2": overall["profit_factor"] > 1.2, |
| "Avg trade > 0.1%": overall["avg_trade_return_pct"] > 0.1, |
| } |
| all_pass = all(checks.values()) |
| print(f"\n{'='*60}") |
| print(f" GO / NO-GO ASSESSMENT") |
| print(f"{'='*60}") |
| for check, passed in checks.items(): |
| print(f" {'β' if passed else 'β'} {check}") |
| verdict = "β READY FOR PAPER TRADING" if all_pass else "β NOT READY β tune first" |
| print(f"\n {verdict}") |
| print(f" Elapsed: {elapsed:.1f}s") |
| print(f"{'='*60}\n") |
|
|
| |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| pd.DataFrame(all_trades).to_csv(OUT_DIR / f"trades_{ts}.csv", index=False) |
| sym_df.to_csv(OUT_DIR / f"symbol_results_{ts}.csv", index=False) |
|
|
| summary = { |
| "timestamp": datetime.now().isoformat(), |
| "config": { |
| "min_accuracy": min_accuracy, |
| "min_confidence": min_confidence, |
| "hold_days": HOLD_DAYS, |
| "stop_loss_pct": STOP_LOSS_PCT * 100, |
| "target_pct": TARGET_PCT * 100, |
| "cost_pct": COST_PCT * 100, |
| "date_from": date_from, |
| "date_to": date_to, |
| }, |
| "overall": overall, |
| "buy_only": buy_m, |
| "sell_only": sell_m, |
| "n_symbols": len(sym_results), |
| "n_skipped": skipped, |
| "go_nogo": all_pass if verbose else None, |
| "top10": sym_df.head(10).to_dict("records"), |
| } |
| out_path = OUT_DIR / f"backtest_summary_{ts}.json" |
| out_path.write_text(json.dumps(summary, indent=2, default=str)) |
| (OUT_DIR / "latest_backtest.json").write_text( |
| json.dumps(summary, indent=2, default=str)) |
|
|
| if verbose: |
| print(f" Saved to {OUT_DIR}/") |
|
|
| return summary |
|
|
|
|
| |
| |
| |
|
|
| def run_walk_forward( |
| min_accuracy: float = 0.55, |
| min_confidence: float = 0.42, |
| top_n: int = 50, |
| ) -> None: |
| print(f"\n{'#'*60}") |
| print(f" WALK-FORWARD β Year by Year (top {top_n} symbols)") |
| print(f"{'#'*60}\n") |
|
|
| years = list(range(2017, datetime.now().year + 1)) |
| rows = [] |
|
|
| for year in years: |
| d_from = f"{year}-01-01" |
| d_to = f"{year}-12-31" |
| print(f" {year}...", end=" ", flush=True) |
| r = run_backtest( |
| min_accuracy=min_accuracy, |
| min_confidence=min_confidence, |
| date_from=d_from, |
| date_to=d_to, |
| verbose=False, |
| top_n=top_n, |
| ) |
| if not r: |
| print("no trades") |
| continue |
| m = r["overall"] |
| rows.append({ |
| "year": year, |
| "trades": m["trade_count"], |
| "win_pct": m["win_rate_pct"], |
| "return": m["total_return_pct"], |
| "sharpe": m["sharpe"], |
| "max_dd": m["max_drawdown_pct"], |
| "pf": m["profit_factor"], |
| }) |
| print(f"trades={m['trade_count']} win={m['win_rate_pct']:.1f}% " |
| f"ret={m['total_return_pct']:.2f}% sh={m['sharpe']:.3f} " |
| f"dd={m['max_drawdown_pct']:.2f}%") |
|
|
| if rows: |
| df = pd.DataFrame(rows) |
| print(f"\n{'='*60}") |
| print(" WALK-FORWARD SUMMARY") |
| print(f"{'='*60}") |
| print(df.to_string(index=False)) |
| print(f"\n Avg Sharpe: {df['sharpe'].mean():.3f}") |
| print(f" Avg Win Rate: {df['win_pct'].mean():.1f}%") |
| print(f" Profitable years: {(df['return'] > 0).sum()}/{len(df)}") |
| print(f" Worst year: " |
| f"{df.loc[df['return'].idxmin(),'year']} " |
| f"({df['return'].min():.2f}%)") |
| df.to_csv(OUT_DIR / "walk_forward.csv", index=False) |
| print(f"\n Saved: {OUT_DIR}/walk_forward.csv") |
|
|
|
|
| |
| |
| |
|
|
| def threshold_sweep(min_accuracy: float = 0.55, top_n: int = 50) -> None: |
| print(f"\n{'='*60}") |
| print(f" THRESHOLD SWEEP (min_acc={min_accuracy}, top {top_n} symbols)") |
| print(f"{'='*60}") |
| print(f" {'Conf':>6} {'Trades':>7} {'Win%':>6} " |
| f"{'Ret%':>7} {'Sharpe':>7} {'DD%':>7} {'PF':>6}") |
| print(f" {'-'*56}") |
|
|
| best_sh = -999 |
| best_conf = 0.42 |
|
|
| for conf in [0.38, 0.40, 0.42, 0.44, 0.45, 0.47, 0.50, 0.52, 0.55]: |
| r = run_backtest( |
| min_accuracy=min_accuracy, |
| min_confidence=conf, |
| verbose=False, |
| top_n=top_n, |
| ) |
| if not r: |
| print(f" {conf:.2f} no trades") |
| continue |
| m = r["overall"] |
| if m["trade_count"] < 20: |
| continue |
| mark = " β" if m["sharpe"] > best_sh else "" |
| if m["sharpe"] > best_sh: |
| best_sh = m["sharpe"] |
| best_conf = conf |
| print(f" {conf:.2f} {m['trade_count']:>7,} " |
| f"{m['win_rate_pct']:>5.1f}% " |
| f"{m['total_return_pct']:>6.2f}% " |
| f"{m['sharpe']:>7.3f} " |
| f"{m['max_drawdown_pct']:>6.2f}% " |
| f"{m['profit_factor']:>5.3f}{mark}") |
|
|
| print(f"\n Best: conf={best_conf:.2f} Sharpe={best_sh:.3f}") |
| print(f" Use: python run_daily_signals.py --min-confidence {best_conf:.2f}\n") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| ap = argparse.ArgumentParser(description="P170 Backtest Engine") |
| ap.add_argument("--symbol", type=str, default=None) |
| ap.add_argument("--min-accuracy", type=float, default=0.55) |
| ap.add_argument("--min-confidence", type=float, default=0.42) |
| ap.add_argument("--date-from", type=str, default=None) |
| ap.add_argument("--date-to", type=str, default=None) |
| ap.add_argument("--top-n", type=int, default=None) |
| ap.add_argument("--walk-forward", action="store_true") |
| ap.add_argument("--threshold-sweep", action="store_true") |
| args = ap.parse_args() |
|
|
| if args.walk_forward: |
| run_walk_forward( |
| min_accuracy=args.min_accuracy, |
| min_confidence=args.min_confidence, |
| top_n=args.top_n or 50, |
| ) |
| elif args.threshold_sweep: |
| threshold_sweep( |
| min_accuracy=args.min_accuracy, |
| top_n=args.top_n or 50, |
| ) |
| else: |
| run_backtest( |
| min_accuracy=args.min_accuracy, |
| min_confidence=args.min_confidence, |
| symbol=args.symbol, |
| date_from=args.date_from, |
| date_to=args.date_to, |
| top_n=args.top_n, |
| verbose=True, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|