DHAN_AI_DATA / p170_backtest.py
addyAIMLprojects's picture
P170 backtest + paper tracker β€” 2026-05-15
4124dd5 verified
"""
p170_backtest.py β€” P170 Signal Backtest Engine
================================================
Regenerates dated signals directly from price history using saved global model.
This is more accurate than the predictions parquet (which has no dates).
Usage:
python p170_backtest.py # full backtest
python p170_backtest.py --symbol RTNINDIA # single symbol
python p170_backtest.py --walk-forward # year by year
python p170_backtest.py --threshold-sweep # optimal confidence
python p170_backtest.py --min-confidence 0.45 # tighter filter
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent))
# ─────────────────────────────────────────────────────────────────────────────
# CONFIG
# ─────────────────────────────────────────────────────────────────────────────
BASE = Path(".")
GDIR = BASE / "data/lake/ml/p170/models/GLOBAL"
UNIVERSE_PATH = BASE / "data/lake/ml/p170/p170_signal_universe.csv"
OUT_DIR = BASE / "data/lake/ml/p170/backtest"
OUT_DIR.mkdir(parents=True, exist_ok=True)
HOLD_DAYS = 3 # exit after N trading days
STOP_LOSS_PCT = 0.015 # -1.5%
TARGET_PCT = 0.030 # +3.0% (2:1 R:R)
COST_PCT = 0.002 # 0.2% round-trip
INITIAL_CAP = 100_000
# ETF patterns to exclude
ETF_PAT = {"LIQUID","GILT","GOLD","SILVER","ETF","BEES","GSEC","CASH",
"BBETF","NIFTY","LOWVOL","QUALITY","SENSEX","MOM","GROWW","SETF"}
# ─────────────────────────────────────────────────────────────────────────────
# LOAD GLOBAL MODEL (cached)
# ─────────────────────────────────────────────────────────────────────────────
_model_cache: dict = {}
def load_global_model() -> tuple:
"""Load scaler, feature cols and base models. Cached after first call."""
if _model_cache:
return (_model_cache["scaler"], _model_cache["fcols"],
_model_cache["models"])
import joblib
scaler = joblib.load(GDIR / "scaler.joblib")
fcols = joblib.load(GDIR / "feature_cols.joblib")
models = {}
for name in ["xgboost","lightgbm","catboost","random_forest","extra_trees"]:
p = GDIR / f"{name}.joblib"
if p.exists():
try:
models[name] = joblib.load(p)
except Exception:
pass
_model_cache.update({"scaler": scaler, "fcols": fcols, "models": models})
return scaler, fcols, models
# ─────────────────────────────────────────────────────────────────────────────
# GENERATE DATED SIGNALS FOR ONE SYMBOL
# ─────────────────────────────────────────────────────────────────────────────
def generate_signals(symbol: str) -> Optional[pd.DataFrame]:
"""
Run the global model over full price history for symbol.
Returns DataFrame with columns:
[date, close, pred_class, prob_down, prob_neutral, prob_up,
confidence, actual_class]
Returns None if data unavailable.
"""
from p170_max_system import load_candles, engineer_features
try:
scaler, fcols, models = load_global_model()
if not models:
return None
raw = load_candles(symbol)
if raw is None or len(raw) < 60:
return None
df = engineer_features(raw)
df = df.dropna(subset=fcols).reset_index(drop=True)
if len(df) < 30:
return None
X = df[fcols].values.astype("float32")
X_s = scaler.transform(X)
probas = np.mean(
[m.predict_proba(X_s) for m in models.values()], axis=0
)
preds = np.argmax(probas, axis=1)
conf = probas.max(axis=1)
out = pd.DataFrame({
"date": pd.to_datetime(df["date"].values),
"close": df["close"].values.astype(float),
"pred_class": preds.astype(int),
"prob_down": probas[:, 0],
"prob_neutral": probas[:, 1],
"prob_up": probas[:, 2],
"confidence": conf,
"actual_class": df["direction_class"].values.astype(int),
})
return out.sort_values("date").reset_index(drop=True)
except Exception as e:
return None
# ─────────────────────────────────────────────────────────────────────────────
# SIMULATE TRADES
# ─────────────────────────────────────────────────────────────────────────────
def simulate_trades(
symbol: str,
signals_df: pd.DataFrame,
min_confidence: float = 0.42,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> list[dict]:
"""
Simulate trades from dated signals.
Entry: next close after signal day.
Exit: HOLD_DAYS later OR stop loss OR target.
"""
df = signals_df.copy()
# Date filter
if date_from:
df = df[df["date"] >= pd.Timestamp(date_from)]
if date_to:
df = df[df["date"] <= pd.Timestamp(date_to)]
# Filter: directional + above confidence
df = df[
(df["confidence"] >= min_confidence) &
(df["pred_class"] != 1)
].reset_index(drop=True)
if df.empty:
return []
trades = []
prices = signals_df.set_index("date")["close"].sort_index()
dates = prices.index.tolist()
for _, row in df.iterrows():
sig_date = row["date"]
direction = "BUY" if row["pred_class"] == 2 else "SELL"
conf = float(row["confidence"])
# Entry: next trading day after signal
future = [d for d in dates if d > sig_date]
if len(future) < 2:
continue
entry_date = future[0]
entry_price = float(prices[entry_date])
exit_price = entry_price
exit_reason = "MAX_HOLD"
hold = 0
for day_offset in range(1, HOLD_DAYS + 1):
if day_offset >= len(future):
break
curr_date = future[day_offset]
curr_price = float(prices[curr_date])
hold = day_offset
if direction == "BUY":
ret = (curr_price - entry_price) / entry_price
if ret <= -STOP_LOSS_PCT:
exit_price = entry_price * (1 - STOP_LOSS_PCT)
exit_reason = "STOP_LOSS"
break
if ret >= TARGET_PCT:
exit_price = entry_price * (1 + TARGET_PCT)
exit_reason = "TARGET"
break
exit_price = curr_price
else: # SELL / short
ret = (entry_price - curr_price) / entry_price
if ret <= -STOP_LOSS_PCT:
exit_price = entry_price * (1 + STOP_LOSS_PCT)
exit_reason = "STOP_LOSS"
break
if ret >= TARGET_PCT:
exit_price = entry_price * (1 - TARGET_PCT)
exit_reason = "TARGET"
break
exit_price = curr_price
gross = ((exit_price - entry_price) / entry_price
if direction == "BUY"
else (entry_price - exit_price) / entry_price)
net = gross - COST_PCT
trades.append({
"symbol": symbol,
"direction": direction,
"signal_date": str(sig_date.date()),
"entry_date": str(entry_date.date()),
"entry_price": round(entry_price, 2),
"exit_price": round(exit_price, 2),
"exit_reason": exit_reason,
"hold_days": hold,
"confidence": round(conf, 4),
"gross_return_pct": round(gross * 100, 3),
"net_return_pct": round(net * 100, 3),
})
return trades
# ─────────────────────────────────────────────────────────────────────────────
# METRICS
# ─────────────────────────────────────────────────────────────────────────────
def compute_metrics(trades: list[dict]) -> dict:
if not trades:
return {"trade_count": 0, "win_rate_pct": 0, "total_return_pct": 0,
"sharpe": 0, "max_drawdown_pct": 0, "profit_factor": 0,
"avg_trade_return_pct": 0}
rets = [t["net_return_pct"] for t in trades]
# Fixed position sizing: each trade uses CAPITAL_PER_TRADE fraction
# Equity grows/shrinks by the P&L of each trade, not full compounding
POSITION_SIZE = INITIAL_CAP * 0.05 # 5% per trade
equity = [INITIAL_CAP]
for r in rets:
pnl = POSITION_SIZE * (r / 100)
equity.append(equity[-1] + pnl)
eq_s = pd.Series(equity)
dd = eq_s / eq_s.cummax() - 1
max_dd = float(dd.min() * 100)
wins = [r for r in rets if r > 0]
losses = [r for r in rets if r <= 0]
win_r = len(wins) / len(rets) * 100
gw = sum(wins)
gl = abs(sum(losses))
pf = gw / gl if gl > 0 else (gw if gw > 0 else 0)
s = pd.Series(rets) / 100
sharpe = float(s.mean() / s.std() * np.sqrt(252)) if s.std() > 0 else 0
return {
"trade_count": len(trades),
"win_rate_pct": round(win_r, 2),
"total_return_pct": round((equity[-1] / INITIAL_CAP - 1) * 100, 2),
"final_equity": round(equity[-1], 2),
"sharpe": round(sharpe, 3),
"max_drawdown_pct": round(max_dd, 2),
"profit_factor": round(pf, 3),
"avg_trade_return_pct": round(float(np.mean(rets)), 3),
"avg_hold_days": round(np.mean([t.get("hold_days", 3) for t in trades]), 1),
"stop_exits": sum(1 for t in trades if t["exit_reason"] == "STOP_LOSS"),
"target_exits": sum(1 for t in trades if t["exit_reason"] == "TARGET"),
"hold_exits": sum(1 for t in trades if t["exit_reason"] == "MAX_HOLD"),
"buy_trades": sum(1 for t in trades if t["direction"] == "BUY"),
"sell_trades": sum(1 for t in trades if t["direction"] == "SELL"),
}
# ─────────────────────────────────────────────────────────────────────────────
# FULL BACKTEST
# ─────────────────────────────────────────────────────────────────────────────
def run_backtest(
min_accuracy: float = 0.55,
min_confidence: float = 0.42,
symbol: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
verbose: bool = True,
top_n: Optional[int] = None,
) -> dict:
t0 = datetime.now()
# Load universe
if not UNIVERSE_PATH.exists():
print("ERROR: Signal universe not found. Run p170_max_system.py first.")
return {}
universe = pd.read_csv(UNIVERSE_PATH)
qualifying = universe[universe["accuracy"] >= min_accuracy]["symbol"].tolist()
# Filter ETFs
qualifying = [s for s in qualifying
if not any(x in s.upper() for x in ETF_PAT)]
if symbol:
qualifying = [s for s in qualifying if s.upper() == symbol.upper()]
if not qualifying:
# Allow symbol even if not in universe
qualifying = [symbol.upper()]
if top_n:
qualifying = qualifying[:top_n]
if verbose:
print(f"\n{'='*60}")
print(f" P170 BACKTEST ENGINE")
print(f" {len(qualifying)} symbols | "
f"min_acc={min_accuracy} conf>={min_confidence}")
if date_from or date_to:
print(f" Period: {date_from or 'all'} β†’ {date_to or 'all'}")
print(f"{'='*60}")
print(f" Hold:{HOLD_DAYS}d Stop:{STOP_LOSS_PCT*100:.1f}% "
f"Target:{TARGET_PCT*100:.1f}% Cost:{COST_PCT*100:.1f}%\n")
all_trades = []
sym_results = []
skipped = 0
for i, sym in enumerate(qualifying):
sigs = generate_signals(sym)
if sigs is None:
skipped += 1
continue
trades = simulate_trades(sym, sigs, min_confidence, date_from, date_to)
if not trades:
skipped += 1
continue
m = compute_metrics(trades)
sym_results.append({
"symbol": sym,
"n_trades": m["trade_count"],
"win_rate_pct": m["win_rate_pct"],
"total_return_pct": m["total_return_pct"],
"sharpe": m["sharpe"],
"max_drawdown_pct": m["max_drawdown_pct"],
"profit_factor": m["profit_factor"],
"avg_trade_ret": m["avg_trade_return_pct"],
"buy_trades": m["buy_trades"],
"sell_trades": m["sell_trades"],
})
all_trades.extend(trades)
if verbose and (i + 1) % 25 == 0:
done = len(sym_results)
avg_wr = np.mean([r["win_rate_pct"] for r in sym_results])
print(f" [{i+1}/{len(qualifying)}] "
f"{len(all_trades)} trades | "
f"avg win rate={avg_wr:.1f}%")
if not all_trades:
if verbose:
print(" No trades generated.")
return {}
overall = compute_metrics(all_trades)
sym_df = pd.DataFrame(sym_results).sort_values("sharpe", ascending=False)
buy_trades = [t for t in all_trades if t["direction"] == "BUY"]
sell_trades = [t for t in all_trades if t["direction"] == "SELL"]
buy_m = compute_metrics(buy_trades)
sell_m = compute_metrics(sell_trades)
elapsed = (datetime.now() - t0).total_seconds()
if verbose:
print(f"\n{'='*60}")
print(f" RESULTS ({len(all_trades):,} trades, "
f"{len(sym_results)} symbols, {skipped} skipped)")
print(f"{'='*60}")
print(f" Win rate: {overall['win_rate_pct']:.1f}%")
print(f" Total return: {overall['total_return_pct']:.2f}%")
print(f" Sharpe: {overall['sharpe']:.3f}")
print(f" Max drawdown: {overall['max_drawdown_pct']:.2f}%")
print(f" Profit factor: {overall['profit_factor']:.3f}")
print(f" Avg trade: {overall['avg_trade_return_pct']:.3f}%")
print(f" Stop/Target/Hold: "
f"{overall['stop_exits']}/"
f"{overall['target_exits']}/"
f"{overall['hold_exits']}")
print(f"\n BUY {len(buy_trades):5,} trades | "
f"win={buy_m['win_rate_pct']:.1f}% | "
f"ret={buy_m['total_return_pct']:.2f}% | "
f"pf={buy_m['profit_factor']:.3f}")
print(f" SELL {len(sell_trades):5,} trades | "
f"win={sell_m['win_rate_pct']:.1f}% | "
f"ret={sell_m['total_return_pct']:.2f}% | "
f"pf={sell_m['profit_factor']:.3f}")
print(f"\n Top 15 by Sharpe:")
cols = ["symbol","n_trades","win_rate_pct","total_return_pct",
"sharpe","max_drawdown_pct","profit_factor"]
print(sym_df[cols].head(15).to_string(index=False))
print(f"\n Bottom 10:")
print(sym_df[cols].tail(10).to_string(index=False))
# Go/No-Go
checks = {
"Win rate > 52%": overall["win_rate_pct"] > 52,
"Sharpe > 1.0": overall["sharpe"] > 1.0,
"Max DD < 20%": overall["max_drawdown_pct"] > -20,
"Profit factor > 1.2": overall["profit_factor"] > 1.2,
"Avg trade > 0.1%": overall["avg_trade_return_pct"] > 0.1,
}
all_pass = all(checks.values())
print(f"\n{'='*60}")
print(f" GO / NO-GO ASSESSMENT")
print(f"{'='*60}")
for check, passed in checks.items():
print(f" {'βœ“' if passed else 'βœ—'} {check}")
verdict = "β†’ READY FOR PAPER TRADING" if all_pass else "β†’ NOT READY β€” tune first"
print(f"\n {verdict}")
print(f" Elapsed: {elapsed:.1f}s")
print(f"{'='*60}\n")
# Save
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
pd.DataFrame(all_trades).to_csv(OUT_DIR / f"trades_{ts}.csv", index=False)
sym_df.to_csv(OUT_DIR / f"symbol_results_{ts}.csv", index=False)
summary = {
"timestamp": datetime.now().isoformat(),
"config": {
"min_accuracy": min_accuracy,
"min_confidence": min_confidence,
"hold_days": HOLD_DAYS,
"stop_loss_pct": STOP_LOSS_PCT * 100,
"target_pct": TARGET_PCT * 100,
"cost_pct": COST_PCT * 100,
"date_from": date_from,
"date_to": date_to,
},
"overall": overall,
"buy_only": buy_m,
"sell_only": sell_m,
"n_symbols": len(sym_results),
"n_skipped": skipped,
"go_nogo": all_pass if verbose else None,
"top10": sym_df.head(10).to_dict("records"),
}
out_path = OUT_DIR / f"backtest_summary_{ts}.json"
out_path.write_text(json.dumps(summary, indent=2, default=str))
(OUT_DIR / "latest_backtest.json").write_text(
json.dumps(summary, indent=2, default=str))
if verbose:
print(f" Saved to {OUT_DIR}/")
return summary
# ─────────────────────────────────────────────────────────────────────────────
# WALK-FORWARD
# ─────────────────────────────────────────────────────────────────────────────
def run_walk_forward(
min_accuracy: float = 0.55,
min_confidence: float = 0.42,
top_n: int = 50,
) -> None:
print(f"\n{'#'*60}")
print(f" WALK-FORWARD β€” Year by Year (top {top_n} symbols)")
print(f"{'#'*60}\n")
years = list(range(2017, datetime.now().year + 1))
rows = []
for year in years:
d_from = f"{year}-01-01"
d_to = f"{year}-12-31"
print(f" {year}...", end=" ", flush=True)
r = run_backtest(
min_accuracy=min_accuracy,
min_confidence=min_confidence,
date_from=d_from,
date_to=d_to,
verbose=False,
top_n=top_n,
)
if not r:
print("no trades")
continue
m = r["overall"]
rows.append({
"year": year,
"trades": m["trade_count"],
"win_pct": m["win_rate_pct"],
"return": m["total_return_pct"],
"sharpe": m["sharpe"],
"max_dd": m["max_drawdown_pct"],
"pf": m["profit_factor"],
})
print(f"trades={m['trade_count']} win={m['win_rate_pct']:.1f}% "
f"ret={m['total_return_pct']:.2f}% sh={m['sharpe']:.3f} "
f"dd={m['max_drawdown_pct']:.2f}%")
if rows:
df = pd.DataFrame(rows)
print(f"\n{'='*60}")
print(" WALK-FORWARD SUMMARY")
print(f"{'='*60}")
print(df.to_string(index=False))
print(f"\n Avg Sharpe: {df['sharpe'].mean():.3f}")
print(f" Avg Win Rate: {df['win_pct'].mean():.1f}%")
print(f" Profitable years: {(df['return'] > 0).sum()}/{len(df)}")
print(f" Worst year: "
f"{df.loc[df['return'].idxmin(),'year']} "
f"({df['return'].min():.2f}%)")
df.to_csv(OUT_DIR / "walk_forward.csv", index=False)
print(f"\n Saved: {OUT_DIR}/walk_forward.csv")
# ─────────────────────────────────────────────────────────────────────────────
# THRESHOLD SWEEP
# ─────────────────────────────────────────────────────────────────────────────
def threshold_sweep(min_accuracy: float = 0.55, top_n: int = 50) -> None:
print(f"\n{'='*60}")
print(f" THRESHOLD SWEEP (min_acc={min_accuracy}, top {top_n} symbols)")
print(f"{'='*60}")
print(f" {'Conf':>6} {'Trades':>7} {'Win%':>6} "
f"{'Ret%':>7} {'Sharpe':>7} {'DD%':>7} {'PF':>6}")
print(f" {'-'*56}")
best_sh = -999
best_conf = 0.42
for conf in [0.38, 0.40, 0.42, 0.44, 0.45, 0.47, 0.50, 0.52, 0.55]:
r = run_backtest(
min_accuracy=min_accuracy,
min_confidence=conf,
verbose=False,
top_n=top_n,
)
if not r:
print(f" {conf:.2f} no trades")
continue
m = r["overall"]
if m["trade_count"] < 20:
continue
mark = " ←" if m["sharpe"] > best_sh else ""
if m["sharpe"] > best_sh:
best_sh = m["sharpe"]
best_conf = conf
print(f" {conf:.2f} {m['trade_count']:>7,} "
f"{m['win_rate_pct']:>5.1f}% "
f"{m['total_return_pct']:>6.2f}% "
f"{m['sharpe']:>7.3f} "
f"{m['max_drawdown_pct']:>6.2f}% "
f"{m['profit_factor']:>5.3f}{mark}")
print(f"\n Best: conf={best_conf:.2f} Sharpe={best_sh:.3f}")
print(f" Use: python run_daily_signals.py --min-confidence {best_conf:.2f}\n")
# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description="P170 Backtest Engine")
ap.add_argument("--symbol", type=str, default=None)
ap.add_argument("--min-accuracy", type=float, default=0.55)
ap.add_argument("--min-confidence", type=float, default=0.42)
ap.add_argument("--date-from", type=str, default=None)
ap.add_argument("--date-to", type=str, default=None)
ap.add_argument("--top-n", type=int, default=None)
ap.add_argument("--walk-forward", action="store_true")
ap.add_argument("--threshold-sweep", action="store_true")
args = ap.parse_args()
if args.walk_forward:
run_walk_forward(
min_accuracy=args.min_accuracy,
min_confidence=args.min_confidence,
top_n=args.top_n or 50,
)
elif args.threshold_sweep:
threshold_sweep(
min_accuracy=args.min_accuracy,
top_n=args.top_n or 50,
)
else:
run_backtest(
min_accuracy=args.min_accuracy,
min_confidence=args.min_confidence,
symbol=args.symbol,
date_from=args.date_from,
date_to=args.date_to,
top_n=args.top_n,
verbose=True,
)
if __name__ == "__main__":
main()