Spaces:
Sleeping
Sleeping
File size: 10,121 Bytes
9cfff9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | """
Leakage-free signal diagnostic.
Answers: given only features known at buy_time (with symbol/expiry/weekday
dropped), is there any predictable separation between 1:1 winners and losers?
Runs three passes:
1) Univariate separation per feature (AUC of feature vs label)
2) Walk-forward LightGBM with cost-aware net-PnL threshold selection
3) Walk-forward logistic baseline (sanity check)
"""
from pathlib import Path
import warnings
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
BASE_DIR = Path(__file__).resolve().parent
OUT_DIR = BASE_DIR / "outputs"
DATA_PATH = OUT_DIR / "ml_dataset_exact_all_v2_2026-01-01_to_2026-04-01_merged.csv"
# Columns that are either identifiers, ground-truth times, or leaky post-entry info
LEAKY_OR_ID_COLS = [
"trade_key", "trade_date", "weekday", "symbol", "option_symbol",
"strike", "expiry", "trade_side", "variant", "mode", "call_put",
"bt_buy_signal_time", "bt_sell_signal_time", "bt_buy_time",
"bt_buy_price", "bt_stop_loss", "bt_target_1", "bt_target_2",
"bt_qty_per_lot", "bt_capital_per_lot", "bt_stop_loss_amt_per_lot",
"signal_time", "confirmation_time", "indication_time", "buy_time",
"replay_t1_time", "replay_t2_time",
"label_1to1", "label_1to2",
"net_pnl_per_lot", # outcome-derived, never a feature
]
TARGET = "label_1to1"
BROKERAGE = 40.0
STT_RATE = 0.001
TXN_RATE = 0.0003503
SEBI_RATE = 0.000001
STAMP_RATE = 0.00003
GST_RATE = 0.18
def round_trip_charges(buy_price, exit_price, qty):
buy_t = buy_price * qty
sell_t = exit_price * qty
turn = buy_t + sell_t
brokerage = BROKERAGE
stt = STT_RATE * sell_t
txn = TXN_RATE * turn
sebi = SEBI_RATE * turn
stamp = STAMP_RATE * buy_t
gst = GST_RATE * (brokerage + txn + sebi)
return brokerage + stt + txn + sebi + stamp + gst
def per_trade_net_pnl(row):
"""Net PnL for one lot based on 1:1 outcome."""
buy = float(row["option_entry_price"])
sl = float(row["stop_loss"])
t1 = float(row["target_1"])
qty = float(row.get("lot_size") or 0)
if qty <= 0 or buy <= 0:
return 0.0
if int(row[TARGET]) == 1:
exit_price = t1
gross = (t1 - buy) * qty
else:
exit_price = sl
gross = -(buy - sl) * qty
charges = round_trip_charges(buy, exit_price, qty)
return gross - charges
def build_features(df):
feat_cols = [c for c in df.columns if c not in LEAKY_OR_ID_COLS]
X = df[feat_cols].copy()
# Replace sector with one-hot (generalizes across symbols) if present
cat_cols = [c for c in X.columns if X[c].dtype == object]
for c in cat_cols:
X[c] = X[c].fillna("UNKNOWN").astype(str)
X = pd.get_dummies(X, columns=cat_cols, dummy_na=False)
# Coerce remaining to numeric, fill NaN
for c in X.columns:
X[c] = pd.to_numeric(X[c], errors="coerce")
X = X.fillna(X.median(numeric_only=True)).fillna(0.0)
return X, feat_cols
def univariate_auc(X, y):
from sklearn.metrics import roc_auc_score
rows = []
for c in X.columns:
vals = X[c].values
if len(np.unique(vals)) < 2:
continue
try:
auc = roc_auc_score(y, vals)
auc = max(auc, 1 - auc) # symmetric — we just care about separation
except Exception:
continue
rows.append((c, auc))
out = pd.DataFrame(rows, columns=["feature", "auc"]).sort_values("auc", ascending=False)
return out
def walk_forward_lgbm(df, X, y, pnl):
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score
df = df.reset_index(drop=True)
X = X.reset_index(drop=True)
y = y.reset_index(drop=True)
pnl = pnl.reset_index(drop=True)
df["trade_date"] = pd.to_datetime(df["trade_date"])
all_days = sorted(df["trade_date"].dt.date.unique())
train_days = 30
test_days = 5
results = []
i = train_days
while i + test_days <= len(all_days):
train_day_set = set(all_days[i - train_days:i])
test_day_set = set(all_days[i:i + test_days])
tr_mask = df["trade_date"].dt.date.isin(train_day_set).values
te_mask = df["trade_date"].dt.date.isin(test_day_set).values
if tr_mask.sum() < 500 or te_mask.sum() < 50:
i += test_days
continue
# Split train window into inner-train (first 80%) and holdout (last 20%)
train_days_list = sorted(train_day_set)
cut = max(1, int(len(train_days_list) * 0.8))
inner_train_set = set(train_days_list[:cut])
holdout_set = set(train_days_list[cut:])
itr_mask = df["trade_date"].dt.date.isin(inner_train_set).values
ho_mask = df["trade_date"].dt.date.isin(holdout_set).values
model = HistGradientBoostingClassifier(
max_iter=200,
max_depth=6,
learning_rate=0.05,
min_samples_leaf=30,
l2_regularization=1.0,
random_state=42,
)
model.fit(X.iloc[itr_mask], y.iloc[itr_mask])
prob_ho = model.predict_proba(X.iloc[ho_mask])[:, 1]
prob_te = model.predict_proba(X.iloc[te_mask])[:, 1]
te_y = y.iloc[te_mask].values
te_pnl = pnl.iloc[te_mask].values
ho_y = y.iloc[ho_mask].values
ho_pnl = pnl.iloc[ho_mask].values
try:
auc = roc_auc_score(te_y, prob_te)
except Exception:
auc = np.nan
base_rate = te_y.mean()
base_pnl_per_trade = te_pnl.mean()
total_trades = len(te_y)
# Pick threshold on HOLDOUT (never looks at test), apply to test
best_ho_pnl = -1e18
best_thr = None
for thr in np.arange(0.30, 0.91, 0.02):
picked = prob_ho >= thr
if picked.sum() < 5:
continue
sub = ho_pnl[picked].sum()
if sub > best_ho_pnl:
best_ho_pnl = sub
best_thr = thr
# Also track oracle (test-picked) for reference on how much headroom exists
oracle_pnl = -1e18
oracle_thr = None
for thr in np.arange(0.30, 0.91, 0.02):
picked = prob_te >= thr
if picked.sum() < 5:
continue
sub = te_pnl[picked].sum()
if sub > oracle_pnl:
oracle_pnl = sub
oracle_thr = thr
if best_thr is not None:
picked_te = prob_te >= best_thr
out_thr_pnl = float(te_pnl[picked_te].sum())
out_n = int(picked_te.sum())
out_win = float(te_y[picked_te].mean()) if picked_te.sum() > 0 else float("nan")
else:
out_thr_pnl = 0.0
out_n = 0
out_win = float("nan")
results.append({
"train_start": str(min(train_day_set)),
"test_start": str(min(test_day_set)),
"test_end": str(max(test_day_set)),
"n_train": int(tr_mask.sum()),
"n_test": total_trades,
"auc": auc,
"base_rate": base_rate,
"base_total_net_pnl": float(te_pnl.sum()),
"thr_holdout": best_thr,
"n_picked_oos": out_n,
"win_rate_oos": out_win,
"pnl_oos": out_thr_pnl,
"thr_oracle": oracle_thr,
"pnl_oracle": float(oracle_pnl) if oracle_thr is not None else 0.0,
})
i += test_days
return pd.DataFrame(results)
def main():
print(f"Loading {DATA_PATH.name}...")
df = pd.read_csv(DATA_PATH)
df = df[df[TARGET].isin([0, 1])].copy()
df["trade_date"] = pd.to_datetime(df["trade_date"])
print(f"Rows: {len(df)} Date range: {df['trade_date'].min().date()} -> {df['trade_date'].max().date()}")
print(f"Base rate label_1to1: {df[TARGET].mean():.4f}")
print("\nComputing per-trade net PnL (1 lot)...")
pnl = df.apply(per_trade_net_pnl, axis=1)
df["net_pnl_per_lot"] = pnl
print(f"Mean net PnL per trade (take-all): {pnl.mean():.2f}")
print(f"Total net PnL (take-all): {pnl.sum():.0f}")
print(f"Winners mean net PnL: {pnl[df[TARGET] == 1].mean():.2f}")
print(f"Losers mean net PnL: {pnl[df[TARGET] == 0].mean():.2f}")
breakeven_rate = (-pnl[df[TARGET] == 0].mean()) / (pnl[df[TARGET] == 1].mean() - pnl[df[TARGET] == 0].mean())
print(f"Implied breakeven win rate: {breakeven_rate:.4f}")
print("\nBuilding features (dropping symbol/expiry/weekday/ids)...")
X, feat_cols = build_features(df)
y = df[TARGET].astype(int)
print(f"Feature cols after drops: {len(X.columns)}")
print("\n--- Univariate AUC per feature (top 20) ---")
uni = univariate_auc(X, y)
print(uni.head(20).to_string(index=False))
print("\n--- Walk-forward LightGBM (30d train, 5d test) ---")
wf = walk_forward_lgbm(df, X, y, pnl)
if wf.empty:
print("Not enough data for walk-forward")
return
print(wf[[
"test_start", "test_end", "n_test", "auc",
"base_rate", "base_total_net_pnl",
"thr_holdout", "n_picked_oos", "win_rate_oos", "pnl_oos",
"thr_oracle", "pnl_oracle",
]].round(4).to_string(index=False))
print("\n--- Summary ---")
print(f"Mean test AUC: {wf['auc'].mean():.4f} (0.5 = no signal)")
print(f"Median test AUC: {wf['auc'].median():.4f}")
print(f"Take-all total net PnL across all test windows: {wf['base_total_net_pnl'].sum():.0f}")
print(f"OOS (threshold picked on holdout) total PnL: {wf['pnl_oos'].sum():.0f}")
print(f"ORACLE (threshold picked on test) total PnL: {wf['pnl_oracle'].sum():.0f} [upper bound]")
print(f"OOS windows profitable: {(wf['pnl_oos'] > 0).sum()}/{len(wf)}")
print(f"OOS windows beating take-all: {(wf['pnl_oos'] > wf['base_total_net_pnl']).sum()}/{len(wf)}")
wf.to_csv(OUT_DIR / "diagnose_filter_walkforward.csv", index=False)
uni.to_csv(OUT_DIR / "diagnose_filter_univariate_auc.csv", index=False)
print(f"\nWrote: outputs/diagnose_filter_walkforward.csv")
print(f"Wrote: outputs/diagnose_filter_univariate_auc.csv")
if __name__ == "__main__":
main() |