nse-bot-backend / diagnose_filter_potential.py
ash001's picture
Deploy from GitHub Actions to nse-bot-backend
9cfff9e verified
"""
Leakage-free signal diagnostic.
Answers: given only features known at buy_time (with symbol/expiry/weekday
dropped), is there any predictable separation between 1:1 winners and losers?
Runs three passes:
1) Univariate separation per feature (AUC of feature vs label)
2) Walk-forward LightGBM with cost-aware net-PnL threshold selection
3) Walk-forward logistic baseline (sanity check)
"""
from pathlib import Path
import warnings
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
BASE_DIR = Path(__file__).resolve().parent
OUT_DIR = BASE_DIR / "outputs"
DATA_PATH = OUT_DIR / "ml_dataset_exact_all_v2_2026-01-01_to_2026-04-01_merged.csv"
# Columns that are either identifiers, ground-truth times, or leaky post-entry info
LEAKY_OR_ID_COLS = [
"trade_key", "trade_date", "weekday", "symbol", "option_symbol",
"strike", "expiry", "trade_side", "variant", "mode", "call_put",
"bt_buy_signal_time", "bt_sell_signal_time", "bt_buy_time",
"bt_buy_price", "bt_stop_loss", "bt_target_1", "bt_target_2",
"bt_qty_per_lot", "bt_capital_per_lot", "bt_stop_loss_amt_per_lot",
"signal_time", "confirmation_time", "indication_time", "buy_time",
"replay_t1_time", "replay_t2_time",
"label_1to1", "label_1to2",
"net_pnl_per_lot", # outcome-derived, never a feature
]
TARGET = "label_1to1"
BROKERAGE = 40.0
STT_RATE = 0.001
TXN_RATE = 0.0003503
SEBI_RATE = 0.000001
STAMP_RATE = 0.00003
GST_RATE = 0.18
def round_trip_charges(buy_price, exit_price, qty):
buy_t = buy_price * qty
sell_t = exit_price * qty
turn = buy_t + sell_t
brokerage = BROKERAGE
stt = STT_RATE * sell_t
txn = TXN_RATE * turn
sebi = SEBI_RATE * turn
stamp = STAMP_RATE * buy_t
gst = GST_RATE * (brokerage + txn + sebi)
return brokerage + stt + txn + sebi + stamp + gst
def per_trade_net_pnl(row):
"""Net PnL for one lot based on 1:1 outcome."""
buy = float(row["option_entry_price"])
sl = float(row["stop_loss"])
t1 = float(row["target_1"])
qty = float(row.get("lot_size") or 0)
if qty <= 0 or buy <= 0:
return 0.0
if int(row[TARGET]) == 1:
exit_price = t1
gross = (t1 - buy) * qty
else:
exit_price = sl
gross = -(buy - sl) * qty
charges = round_trip_charges(buy, exit_price, qty)
return gross - charges
def build_features(df):
feat_cols = [c for c in df.columns if c not in LEAKY_OR_ID_COLS]
X = df[feat_cols].copy()
# Replace sector with one-hot (generalizes across symbols) if present
cat_cols = [c for c in X.columns if X[c].dtype == object]
for c in cat_cols:
X[c] = X[c].fillna("UNKNOWN").astype(str)
X = pd.get_dummies(X, columns=cat_cols, dummy_na=False)
# Coerce remaining to numeric, fill NaN
for c in X.columns:
X[c] = pd.to_numeric(X[c], errors="coerce")
X = X.fillna(X.median(numeric_only=True)).fillna(0.0)
return X, feat_cols
def univariate_auc(X, y):
from sklearn.metrics import roc_auc_score
rows = []
for c in X.columns:
vals = X[c].values
if len(np.unique(vals)) < 2:
continue
try:
auc = roc_auc_score(y, vals)
auc = max(auc, 1 - auc) # symmetric — we just care about separation
except Exception:
continue
rows.append((c, auc))
out = pd.DataFrame(rows, columns=["feature", "auc"]).sort_values("auc", ascending=False)
return out
def walk_forward_lgbm(df, X, y, pnl):
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score
df = df.reset_index(drop=True)
X = X.reset_index(drop=True)
y = y.reset_index(drop=True)
pnl = pnl.reset_index(drop=True)
df["trade_date"] = pd.to_datetime(df["trade_date"])
all_days = sorted(df["trade_date"].dt.date.unique())
train_days = 30
test_days = 5
results = []
i = train_days
while i + test_days <= len(all_days):
train_day_set = set(all_days[i - train_days:i])
test_day_set = set(all_days[i:i + test_days])
tr_mask = df["trade_date"].dt.date.isin(train_day_set).values
te_mask = df["trade_date"].dt.date.isin(test_day_set).values
if tr_mask.sum() < 500 or te_mask.sum() < 50:
i += test_days
continue
# Split train window into inner-train (first 80%) and holdout (last 20%)
train_days_list = sorted(train_day_set)
cut = max(1, int(len(train_days_list) * 0.8))
inner_train_set = set(train_days_list[:cut])
holdout_set = set(train_days_list[cut:])
itr_mask = df["trade_date"].dt.date.isin(inner_train_set).values
ho_mask = df["trade_date"].dt.date.isin(holdout_set).values
model = HistGradientBoostingClassifier(
max_iter=200,
max_depth=6,
learning_rate=0.05,
min_samples_leaf=30,
l2_regularization=1.0,
random_state=42,
)
model.fit(X.iloc[itr_mask], y.iloc[itr_mask])
prob_ho = model.predict_proba(X.iloc[ho_mask])[:, 1]
prob_te = model.predict_proba(X.iloc[te_mask])[:, 1]
te_y = y.iloc[te_mask].values
te_pnl = pnl.iloc[te_mask].values
ho_y = y.iloc[ho_mask].values
ho_pnl = pnl.iloc[ho_mask].values
try:
auc = roc_auc_score(te_y, prob_te)
except Exception:
auc = np.nan
base_rate = te_y.mean()
base_pnl_per_trade = te_pnl.mean()
total_trades = len(te_y)
# Pick threshold on HOLDOUT (never looks at test), apply to test
best_ho_pnl = -1e18
best_thr = None
for thr in np.arange(0.30, 0.91, 0.02):
picked = prob_ho >= thr
if picked.sum() < 5:
continue
sub = ho_pnl[picked].sum()
if sub > best_ho_pnl:
best_ho_pnl = sub
best_thr = thr
# Also track oracle (test-picked) for reference on how much headroom exists
oracle_pnl = -1e18
oracle_thr = None
for thr in np.arange(0.30, 0.91, 0.02):
picked = prob_te >= thr
if picked.sum() < 5:
continue
sub = te_pnl[picked].sum()
if sub > oracle_pnl:
oracle_pnl = sub
oracle_thr = thr
if best_thr is not None:
picked_te = prob_te >= best_thr
out_thr_pnl = float(te_pnl[picked_te].sum())
out_n = int(picked_te.sum())
out_win = float(te_y[picked_te].mean()) if picked_te.sum() > 0 else float("nan")
else:
out_thr_pnl = 0.0
out_n = 0
out_win = float("nan")
results.append({
"train_start": str(min(train_day_set)),
"test_start": str(min(test_day_set)),
"test_end": str(max(test_day_set)),
"n_train": int(tr_mask.sum()),
"n_test": total_trades,
"auc": auc,
"base_rate": base_rate,
"base_total_net_pnl": float(te_pnl.sum()),
"thr_holdout": best_thr,
"n_picked_oos": out_n,
"win_rate_oos": out_win,
"pnl_oos": out_thr_pnl,
"thr_oracle": oracle_thr,
"pnl_oracle": float(oracle_pnl) if oracle_thr is not None else 0.0,
})
i += test_days
return pd.DataFrame(results)
def main():
print(f"Loading {DATA_PATH.name}...")
df = pd.read_csv(DATA_PATH)
df = df[df[TARGET].isin([0, 1])].copy()
df["trade_date"] = pd.to_datetime(df["trade_date"])
print(f"Rows: {len(df)} Date range: {df['trade_date'].min().date()} -> {df['trade_date'].max().date()}")
print(f"Base rate label_1to1: {df[TARGET].mean():.4f}")
print("\nComputing per-trade net PnL (1 lot)...")
pnl = df.apply(per_trade_net_pnl, axis=1)
df["net_pnl_per_lot"] = pnl
print(f"Mean net PnL per trade (take-all): {pnl.mean():.2f}")
print(f"Total net PnL (take-all): {pnl.sum():.0f}")
print(f"Winners mean net PnL: {pnl[df[TARGET] == 1].mean():.2f}")
print(f"Losers mean net PnL: {pnl[df[TARGET] == 0].mean():.2f}")
breakeven_rate = (-pnl[df[TARGET] == 0].mean()) / (pnl[df[TARGET] == 1].mean() - pnl[df[TARGET] == 0].mean())
print(f"Implied breakeven win rate: {breakeven_rate:.4f}")
print("\nBuilding features (dropping symbol/expiry/weekday/ids)...")
X, feat_cols = build_features(df)
y = df[TARGET].astype(int)
print(f"Feature cols after drops: {len(X.columns)}")
print("\n--- Univariate AUC per feature (top 20) ---")
uni = univariate_auc(X, y)
print(uni.head(20).to_string(index=False))
print("\n--- Walk-forward LightGBM (30d train, 5d test) ---")
wf = walk_forward_lgbm(df, X, y, pnl)
if wf.empty:
print("Not enough data for walk-forward")
return
print(wf[[
"test_start", "test_end", "n_test", "auc",
"base_rate", "base_total_net_pnl",
"thr_holdout", "n_picked_oos", "win_rate_oos", "pnl_oos",
"thr_oracle", "pnl_oracle",
]].round(4).to_string(index=False))
print("\n--- Summary ---")
print(f"Mean test AUC: {wf['auc'].mean():.4f} (0.5 = no signal)")
print(f"Median test AUC: {wf['auc'].median():.4f}")
print(f"Take-all total net PnL across all test windows: {wf['base_total_net_pnl'].sum():.0f}")
print(f"OOS (threshold picked on holdout) total PnL: {wf['pnl_oos'].sum():.0f}")
print(f"ORACLE (threshold picked on test) total PnL: {wf['pnl_oracle'].sum():.0f} [upper bound]")
print(f"OOS windows profitable: {(wf['pnl_oos'] > 0).sum()}/{len(wf)}")
print(f"OOS windows beating take-all: {(wf['pnl_oos'] > wf['base_total_net_pnl']).sum()}/{len(wf)}")
wf.to_csv(OUT_DIR / "diagnose_filter_walkforward.csv", index=False)
uni.to_csv(OUT_DIR / "diagnose_filter_univariate_auc.csv", index=False)
print(f"\nWrote: outputs/diagnose_filter_walkforward.csv")
print(f"Wrote: outputs/diagnose_filter_univariate_auc.csv")
if __name__ == "__main__":
main()