Spaces:
Running
Running
| """ | |
| Leakage-free signal diagnostic. | |
| Answers: given only features known at buy_time (with symbol/expiry/weekday | |
| dropped), is there any predictable separation between 1:1 winners and losers? | |
| Runs three passes: | |
| 1) Univariate separation per feature (AUC of feature vs label) | |
| 2) Walk-forward LightGBM with cost-aware net-PnL threshold selection | |
| 3) Walk-forward logistic baseline (sanity check) | |
| """ | |
| from pathlib import Path | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| warnings.filterwarnings("ignore") | |
| BASE_DIR = Path(__file__).resolve().parent | |
| OUT_DIR = BASE_DIR / "outputs" | |
| DATA_PATH = OUT_DIR / "ml_dataset_exact_all_v2_2026-01-01_to_2026-04-01_merged.csv" | |
| # Columns that are either identifiers, ground-truth times, or leaky post-entry info | |
| LEAKY_OR_ID_COLS = [ | |
| "trade_key", "trade_date", "weekday", "symbol", "option_symbol", | |
| "strike", "expiry", "trade_side", "variant", "mode", "call_put", | |
| "bt_buy_signal_time", "bt_sell_signal_time", "bt_buy_time", | |
| "bt_buy_price", "bt_stop_loss", "bt_target_1", "bt_target_2", | |
| "bt_qty_per_lot", "bt_capital_per_lot", "bt_stop_loss_amt_per_lot", | |
| "signal_time", "confirmation_time", "indication_time", "buy_time", | |
| "replay_t1_time", "replay_t2_time", | |
| "label_1to1", "label_1to2", | |
| "net_pnl_per_lot", # outcome-derived, never a feature | |
| ] | |
| TARGET = "label_1to1" | |
| BROKERAGE = 40.0 | |
| STT_RATE = 0.001 | |
| TXN_RATE = 0.0003503 | |
| SEBI_RATE = 0.000001 | |
| STAMP_RATE = 0.00003 | |
| GST_RATE = 0.18 | |
| def round_trip_charges(buy_price, exit_price, qty): | |
| buy_t = buy_price * qty | |
| sell_t = exit_price * qty | |
| turn = buy_t + sell_t | |
| brokerage = BROKERAGE | |
| stt = STT_RATE * sell_t | |
| txn = TXN_RATE * turn | |
| sebi = SEBI_RATE * turn | |
| stamp = STAMP_RATE * buy_t | |
| gst = GST_RATE * (brokerage + txn + sebi) | |
| return brokerage + stt + txn + sebi + stamp + gst | |
| def per_trade_net_pnl(row): | |
| """Net PnL for one lot based on 1:1 outcome.""" | |
| buy = float(row["option_entry_price"]) | |
| sl = float(row["stop_loss"]) | |
| t1 = float(row["target_1"]) | |
| qty = float(row.get("lot_size") or 0) | |
| if qty <= 0 or buy <= 0: | |
| return 0.0 | |
| if int(row[TARGET]) == 1: | |
| exit_price = t1 | |
| gross = (t1 - buy) * qty | |
| else: | |
| exit_price = sl | |
| gross = -(buy - sl) * qty | |
| charges = round_trip_charges(buy, exit_price, qty) | |
| return gross - charges | |
| def build_features(df): | |
| feat_cols = [c for c in df.columns if c not in LEAKY_OR_ID_COLS] | |
| X = df[feat_cols].copy() | |
| # Replace sector with one-hot (generalizes across symbols) if present | |
| cat_cols = [c for c in X.columns if X[c].dtype == object] | |
| for c in cat_cols: | |
| X[c] = X[c].fillna("UNKNOWN").astype(str) | |
| X = pd.get_dummies(X, columns=cat_cols, dummy_na=False) | |
| # Coerce remaining to numeric, fill NaN | |
| for c in X.columns: | |
| X[c] = pd.to_numeric(X[c], errors="coerce") | |
| X = X.fillna(X.median(numeric_only=True)).fillna(0.0) | |
| return X, feat_cols | |
| def univariate_auc(X, y): | |
| from sklearn.metrics import roc_auc_score | |
| rows = [] | |
| for c in X.columns: | |
| vals = X[c].values | |
| if len(np.unique(vals)) < 2: | |
| continue | |
| try: | |
| auc = roc_auc_score(y, vals) | |
| auc = max(auc, 1 - auc) # symmetric — we just care about separation | |
| except Exception: | |
| continue | |
| rows.append((c, auc)) | |
| out = pd.DataFrame(rows, columns=["feature", "auc"]).sort_values("auc", ascending=False) | |
| return out | |
| def walk_forward_lgbm(df, X, y, pnl): | |
| from sklearn.ensemble import HistGradientBoostingClassifier | |
| from sklearn.metrics import roc_auc_score | |
| df = df.reset_index(drop=True) | |
| X = X.reset_index(drop=True) | |
| y = y.reset_index(drop=True) | |
| pnl = pnl.reset_index(drop=True) | |
| df["trade_date"] = pd.to_datetime(df["trade_date"]) | |
| all_days = sorted(df["trade_date"].dt.date.unique()) | |
| train_days = 30 | |
| test_days = 5 | |
| results = [] | |
| i = train_days | |
| while i + test_days <= len(all_days): | |
| train_day_set = set(all_days[i - train_days:i]) | |
| test_day_set = set(all_days[i:i + test_days]) | |
| tr_mask = df["trade_date"].dt.date.isin(train_day_set).values | |
| te_mask = df["trade_date"].dt.date.isin(test_day_set).values | |
| if tr_mask.sum() < 500 or te_mask.sum() < 50: | |
| i += test_days | |
| continue | |
| # Split train window into inner-train (first 80%) and holdout (last 20%) | |
| train_days_list = sorted(train_day_set) | |
| cut = max(1, int(len(train_days_list) * 0.8)) | |
| inner_train_set = set(train_days_list[:cut]) | |
| holdout_set = set(train_days_list[cut:]) | |
| itr_mask = df["trade_date"].dt.date.isin(inner_train_set).values | |
| ho_mask = df["trade_date"].dt.date.isin(holdout_set).values | |
| model = HistGradientBoostingClassifier( | |
| max_iter=200, | |
| max_depth=6, | |
| learning_rate=0.05, | |
| min_samples_leaf=30, | |
| l2_regularization=1.0, | |
| random_state=42, | |
| ) | |
| model.fit(X.iloc[itr_mask], y.iloc[itr_mask]) | |
| prob_ho = model.predict_proba(X.iloc[ho_mask])[:, 1] | |
| prob_te = model.predict_proba(X.iloc[te_mask])[:, 1] | |
| te_y = y.iloc[te_mask].values | |
| te_pnl = pnl.iloc[te_mask].values | |
| ho_y = y.iloc[ho_mask].values | |
| ho_pnl = pnl.iloc[ho_mask].values | |
| try: | |
| auc = roc_auc_score(te_y, prob_te) | |
| except Exception: | |
| auc = np.nan | |
| base_rate = te_y.mean() | |
| base_pnl_per_trade = te_pnl.mean() | |
| total_trades = len(te_y) | |
| # Pick threshold on HOLDOUT (never looks at test), apply to test | |
| best_ho_pnl = -1e18 | |
| best_thr = None | |
| for thr in np.arange(0.30, 0.91, 0.02): | |
| picked = prob_ho >= thr | |
| if picked.sum() < 5: | |
| continue | |
| sub = ho_pnl[picked].sum() | |
| if sub > best_ho_pnl: | |
| best_ho_pnl = sub | |
| best_thr = thr | |
| # Also track oracle (test-picked) for reference on how much headroom exists | |
| oracle_pnl = -1e18 | |
| oracle_thr = None | |
| for thr in np.arange(0.30, 0.91, 0.02): | |
| picked = prob_te >= thr | |
| if picked.sum() < 5: | |
| continue | |
| sub = te_pnl[picked].sum() | |
| if sub > oracle_pnl: | |
| oracle_pnl = sub | |
| oracle_thr = thr | |
| if best_thr is not None: | |
| picked_te = prob_te >= best_thr | |
| out_thr_pnl = float(te_pnl[picked_te].sum()) | |
| out_n = int(picked_te.sum()) | |
| out_win = float(te_y[picked_te].mean()) if picked_te.sum() > 0 else float("nan") | |
| else: | |
| out_thr_pnl = 0.0 | |
| out_n = 0 | |
| out_win = float("nan") | |
| results.append({ | |
| "train_start": str(min(train_day_set)), | |
| "test_start": str(min(test_day_set)), | |
| "test_end": str(max(test_day_set)), | |
| "n_train": int(tr_mask.sum()), | |
| "n_test": total_trades, | |
| "auc": auc, | |
| "base_rate": base_rate, | |
| "base_total_net_pnl": float(te_pnl.sum()), | |
| "thr_holdout": best_thr, | |
| "n_picked_oos": out_n, | |
| "win_rate_oos": out_win, | |
| "pnl_oos": out_thr_pnl, | |
| "thr_oracle": oracle_thr, | |
| "pnl_oracle": float(oracle_pnl) if oracle_thr is not None else 0.0, | |
| }) | |
| i += test_days | |
| return pd.DataFrame(results) | |
| def main(): | |
| print(f"Loading {DATA_PATH.name}...") | |
| df = pd.read_csv(DATA_PATH) | |
| df = df[df[TARGET].isin([0, 1])].copy() | |
| df["trade_date"] = pd.to_datetime(df["trade_date"]) | |
| print(f"Rows: {len(df)} Date range: {df['trade_date'].min().date()} -> {df['trade_date'].max().date()}") | |
| print(f"Base rate label_1to1: {df[TARGET].mean():.4f}") | |
| print("\nComputing per-trade net PnL (1 lot)...") | |
| pnl = df.apply(per_trade_net_pnl, axis=1) | |
| df["net_pnl_per_lot"] = pnl | |
| print(f"Mean net PnL per trade (take-all): {pnl.mean():.2f}") | |
| print(f"Total net PnL (take-all): {pnl.sum():.0f}") | |
| print(f"Winners mean net PnL: {pnl[df[TARGET] == 1].mean():.2f}") | |
| print(f"Losers mean net PnL: {pnl[df[TARGET] == 0].mean():.2f}") | |
| breakeven_rate = (-pnl[df[TARGET] == 0].mean()) / (pnl[df[TARGET] == 1].mean() - pnl[df[TARGET] == 0].mean()) | |
| print(f"Implied breakeven win rate: {breakeven_rate:.4f}") | |
| print("\nBuilding features (dropping symbol/expiry/weekday/ids)...") | |
| X, feat_cols = build_features(df) | |
| y = df[TARGET].astype(int) | |
| print(f"Feature cols after drops: {len(X.columns)}") | |
| print("\n--- Univariate AUC per feature (top 20) ---") | |
| uni = univariate_auc(X, y) | |
| print(uni.head(20).to_string(index=False)) | |
| print("\n--- Walk-forward LightGBM (30d train, 5d test) ---") | |
| wf = walk_forward_lgbm(df, X, y, pnl) | |
| if wf.empty: | |
| print("Not enough data for walk-forward") | |
| return | |
| print(wf[[ | |
| "test_start", "test_end", "n_test", "auc", | |
| "base_rate", "base_total_net_pnl", | |
| "thr_holdout", "n_picked_oos", "win_rate_oos", "pnl_oos", | |
| "thr_oracle", "pnl_oracle", | |
| ]].round(4).to_string(index=False)) | |
| print("\n--- Summary ---") | |
| print(f"Mean test AUC: {wf['auc'].mean():.4f} (0.5 = no signal)") | |
| print(f"Median test AUC: {wf['auc'].median():.4f}") | |
| print(f"Take-all total net PnL across all test windows: {wf['base_total_net_pnl'].sum():.0f}") | |
| print(f"OOS (threshold picked on holdout) total PnL: {wf['pnl_oos'].sum():.0f}") | |
| print(f"ORACLE (threshold picked on test) total PnL: {wf['pnl_oracle'].sum():.0f} [upper bound]") | |
| print(f"OOS windows profitable: {(wf['pnl_oos'] > 0).sum()}/{len(wf)}") | |
| print(f"OOS windows beating take-all: {(wf['pnl_oos'] > wf['base_total_net_pnl']).sum()}/{len(wf)}") | |
| wf.to_csv(OUT_DIR / "diagnose_filter_walkforward.csv", index=False) | |
| uni.to_csv(OUT_DIR / "diagnose_filter_univariate_auc.csv", index=False) | |
| print(f"\nWrote: outputs/diagnose_filter_walkforward.csv") | |
| print(f"Wrote: outputs/diagnose_filter_univariate_auc.csv") | |
| if __name__ == "__main__": | |
| main() |