""" Leakage-free signal diagnostic. Answers: given only features known at buy_time (with symbol/expiry/weekday dropped), is there any predictable separation between 1:1 winners and losers? Runs three passes: 1) Univariate separation per feature (AUC of feature vs label) 2) Walk-forward LightGBM with cost-aware net-PnL threshold selection 3) Walk-forward logistic baseline (sanity check) """ from pathlib import Path import warnings import numpy as np import pandas as pd warnings.filterwarnings("ignore") BASE_DIR = Path(__file__).resolve().parent OUT_DIR = BASE_DIR / "outputs" DATA_PATH = OUT_DIR / "ml_dataset_exact_all_v2_2026-01-01_to_2026-04-01_merged.csv" # Columns that are either identifiers, ground-truth times, or leaky post-entry info LEAKY_OR_ID_COLS = [ "trade_key", "trade_date", "weekday", "symbol", "option_symbol", "strike", "expiry", "trade_side", "variant", "mode", "call_put", "bt_buy_signal_time", "bt_sell_signal_time", "bt_buy_time", "bt_buy_price", "bt_stop_loss", "bt_target_1", "bt_target_2", "bt_qty_per_lot", "bt_capital_per_lot", "bt_stop_loss_amt_per_lot", "signal_time", "confirmation_time", "indication_time", "buy_time", "replay_t1_time", "replay_t2_time", "label_1to1", "label_1to2", "net_pnl_per_lot", # outcome-derived, never a feature ] TARGET = "label_1to1" BROKERAGE = 40.0 STT_RATE = 0.001 TXN_RATE = 0.0003503 SEBI_RATE = 0.000001 STAMP_RATE = 0.00003 GST_RATE = 0.18 def round_trip_charges(buy_price, exit_price, qty): buy_t = buy_price * qty sell_t = exit_price * qty turn = buy_t + sell_t brokerage = BROKERAGE stt = STT_RATE * sell_t txn = TXN_RATE * turn sebi = SEBI_RATE * turn stamp = STAMP_RATE * buy_t gst = GST_RATE * (brokerage + txn + sebi) return brokerage + stt + txn + sebi + stamp + gst def per_trade_net_pnl(row): """Net PnL for one lot based on 1:1 outcome.""" buy = float(row["option_entry_price"]) sl = float(row["stop_loss"]) t1 = float(row["target_1"]) qty = float(row.get("lot_size") or 0) if qty <= 0 or buy <= 0: return 0.0 if int(row[TARGET]) == 1: exit_price = t1 gross = (t1 - buy) * qty else: exit_price = sl gross = -(buy - sl) * qty charges = round_trip_charges(buy, exit_price, qty) return gross - charges def build_features(df): feat_cols = [c for c in df.columns if c not in LEAKY_OR_ID_COLS] X = df[feat_cols].copy() # Replace sector with one-hot (generalizes across symbols) if present cat_cols = [c for c in X.columns if X[c].dtype == object] for c in cat_cols: X[c] = X[c].fillna("UNKNOWN").astype(str) X = pd.get_dummies(X, columns=cat_cols, dummy_na=False) # Coerce remaining to numeric, fill NaN for c in X.columns: X[c] = pd.to_numeric(X[c], errors="coerce") X = X.fillna(X.median(numeric_only=True)).fillna(0.0) return X, feat_cols def univariate_auc(X, y): from sklearn.metrics import roc_auc_score rows = [] for c in X.columns: vals = X[c].values if len(np.unique(vals)) < 2: continue try: auc = roc_auc_score(y, vals) auc = max(auc, 1 - auc) # symmetric — we just care about separation except Exception: continue rows.append((c, auc)) out = pd.DataFrame(rows, columns=["feature", "auc"]).sort_values("auc", ascending=False) return out def walk_forward_lgbm(df, X, y, pnl): from sklearn.ensemble import HistGradientBoostingClassifier from sklearn.metrics import roc_auc_score df = df.reset_index(drop=True) X = X.reset_index(drop=True) y = y.reset_index(drop=True) pnl = pnl.reset_index(drop=True) df["trade_date"] = pd.to_datetime(df["trade_date"]) all_days = sorted(df["trade_date"].dt.date.unique()) train_days = 30 test_days = 5 results = [] i = train_days while i + test_days <= len(all_days): train_day_set = set(all_days[i - train_days:i]) test_day_set = set(all_days[i:i + test_days]) tr_mask = df["trade_date"].dt.date.isin(train_day_set).values te_mask = df["trade_date"].dt.date.isin(test_day_set).values if tr_mask.sum() < 500 or te_mask.sum() < 50: i += test_days continue # Split train window into inner-train (first 80%) and holdout (last 20%) train_days_list = sorted(train_day_set) cut = max(1, int(len(train_days_list) * 0.8)) inner_train_set = set(train_days_list[:cut]) holdout_set = set(train_days_list[cut:]) itr_mask = df["trade_date"].dt.date.isin(inner_train_set).values ho_mask = df["trade_date"].dt.date.isin(holdout_set).values model = HistGradientBoostingClassifier( max_iter=200, max_depth=6, learning_rate=0.05, min_samples_leaf=30, l2_regularization=1.0, random_state=42, ) model.fit(X.iloc[itr_mask], y.iloc[itr_mask]) prob_ho = model.predict_proba(X.iloc[ho_mask])[:, 1] prob_te = model.predict_proba(X.iloc[te_mask])[:, 1] te_y = y.iloc[te_mask].values te_pnl = pnl.iloc[te_mask].values ho_y = y.iloc[ho_mask].values ho_pnl = pnl.iloc[ho_mask].values try: auc = roc_auc_score(te_y, prob_te) except Exception: auc = np.nan base_rate = te_y.mean() base_pnl_per_trade = te_pnl.mean() total_trades = len(te_y) # Pick threshold on HOLDOUT (never looks at test), apply to test best_ho_pnl = -1e18 best_thr = None for thr in np.arange(0.30, 0.91, 0.02): picked = prob_ho >= thr if picked.sum() < 5: continue sub = ho_pnl[picked].sum() if sub > best_ho_pnl: best_ho_pnl = sub best_thr = thr # Also track oracle (test-picked) for reference on how much headroom exists oracle_pnl = -1e18 oracle_thr = None for thr in np.arange(0.30, 0.91, 0.02): picked = prob_te >= thr if picked.sum() < 5: continue sub = te_pnl[picked].sum() if sub > oracle_pnl: oracle_pnl = sub oracle_thr = thr if best_thr is not None: picked_te = prob_te >= best_thr out_thr_pnl = float(te_pnl[picked_te].sum()) out_n = int(picked_te.sum()) out_win = float(te_y[picked_te].mean()) if picked_te.sum() > 0 else float("nan") else: out_thr_pnl = 0.0 out_n = 0 out_win = float("nan") results.append({ "train_start": str(min(train_day_set)), "test_start": str(min(test_day_set)), "test_end": str(max(test_day_set)), "n_train": int(tr_mask.sum()), "n_test": total_trades, "auc": auc, "base_rate": base_rate, "base_total_net_pnl": float(te_pnl.sum()), "thr_holdout": best_thr, "n_picked_oos": out_n, "win_rate_oos": out_win, "pnl_oos": out_thr_pnl, "thr_oracle": oracle_thr, "pnl_oracle": float(oracle_pnl) if oracle_thr is not None else 0.0, }) i += test_days return pd.DataFrame(results) def main(): print(f"Loading {DATA_PATH.name}...") df = pd.read_csv(DATA_PATH) df = df[df[TARGET].isin([0, 1])].copy() df["trade_date"] = pd.to_datetime(df["trade_date"]) print(f"Rows: {len(df)} Date range: {df['trade_date'].min().date()} -> {df['trade_date'].max().date()}") print(f"Base rate label_1to1: {df[TARGET].mean():.4f}") print("\nComputing per-trade net PnL (1 lot)...") pnl = df.apply(per_trade_net_pnl, axis=1) df["net_pnl_per_lot"] = pnl print(f"Mean net PnL per trade (take-all): {pnl.mean():.2f}") print(f"Total net PnL (take-all): {pnl.sum():.0f}") print(f"Winners mean net PnL: {pnl[df[TARGET] == 1].mean():.2f}") print(f"Losers mean net PnL: {pnl[df[TARGET] == 0].mean():.2f}") breakeven_rate = (-pnl[df[TARGET] == 0].mean()) / (pnl[df[TARGET] == 1].mean() - pnl[df[TARGET] == 0].mean()) print(f"Implied breakeven win rate: {breakeven_rate:.4f}") print("\nBuilding features (dropping symbol/expiry/weekday/ids)...") X, feat_cols = build_features(df) y = df[TARGET].astype(int) print(f"Feature cols after drops: {len(X.columns)}") print("\n--- Univariate AUC per feature (top 20) ---") uni = univariate_auc(X, y) print(uni.head(20).to_string(index=False)) print("\n--- Walk-forward LightGBM (30d train, 5d test) ---") wf = walk_forward_lgbm(df, X, y, pnl) if wf.empty: print("Not enough data for walk-forward") return print(wf[[ "test_start", "test_end", "n_test", "auc", "base_rate", "base_total_net_pnl", "thr_holdout", "n_picked_oos", "win_rate_oos", "pnl_oos", "thr_oracle", "pnl_oracle", ]].round(4).to_string(index=False)) print("\n--- Summary ---") print(f"Mean test AUC: {wf['auc'].mean():.4f} (0.5 = no signal)") print(f"Median test AUC: {wf['auc'].median():.4f}") print(f"Take-all total net PnL across all test windows: {wf['base_total_net_pnl'].sum():.0f}") print(f"OOS (threshold picked on holdout) total PnL: {wf['pnl_oos'].sum():.0f}") print(f"ORACLE (threshold picked on test) total PnL: {wf['pnl_oracle'].sum():.0f} [upper bound]") print(f"OOS windows profitable: {(wf['pnl_oos'] > 0).sum()}/{len(wf)}") print(f"OOS windows beating take-all: {(wf['pnl_oos'] > wf['base_total_net_pnl']).sum()}/{len(wf)}") wf.to_csv(OUT_DIR / "diagnose_filter_walkforward.csv", index=False) uni.to_csv(OUT_DIR / "diagnose_filter_univariate_auc.csv", index=False) print(f"\nWrote: outputs/diagnose_filter_walkforward.csv") print(f"Wrote: outputs/diagnose_filter_univariate_auc.csv") if __name__ == "__main__": main()