| """ |
| ML-3m-trader Labeling Engine |
| ============================== |
| Generates supervised-learning labels by simulating potential trades |
| at each bar and checking 1:1 RR outcomes over a lookahead window. |
| |
| Labels |
| ------ |
| 0 = DO_NOTHING (spread filter failed) |
| 1 = BUY (long setup that would hit TP before SL) |
| 2 = SELL (short setup that would hit TP before SL) |
| 3 = HOLD (neither BUY nor SELL produced a winner) |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| import config as cfg |
|
|
|
|
| def _compute_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, |
| period: int) -> np.ndarray: |
| """Vectorized ATR (Wilder) for the labeler.""" |
| n = len(high) |
| tr = np.empty(n, dtype=np.float64) |
| tr[0] = high[0] - low[0] |
| for i in range(1, n): |
| tr[i] = max(high[i] - low[i], |
| abs(high[i] - close[i - 1]), |
| abs(low[i] - close[i - 1])) |
| atr = np.empty(n, dtype=np.float64) |
| atr[:] = np.nan |
| atr[period - 1] = np.mean(tr[:period]) |
| alpha = 1.0 / period |
| for i in range(period, n): |
| atr[i] = atr[i - 1] * (1 - alpha) + tr[i] * alpha |
| return atr |
|
|
|
|
| def generate_labels(df: pd.DataFrame) -> np.ndarray: |
| """ |
| For each bar, determine the best label: |
| |
| 1. Compute ATR-based stop-loss distance. |
| 2. If SL distance < spread * SPREAD_FILTER_MULTIPLIER -> DO_NOTHING. |
| 3. For BUY: entry=close, SL=close - sl_dist, TP=close + sl_dist (1:1 RR). |
| Walk forward up to LABEL_LOOKAHEAD_BARS. If TP hit first -> BUY candidate. |
| 4. For SELL: entry=close, SL=close + sl_dist, TP=close - sl_dist. |
| Walk forward. If TP hit first -> SELL candidate. |
| 5. If both BUY and SELL win -> pick the one that hits TP sooner. |
| 6. If neither wins -> HOLD. |
| |
| Parameters |
| ---------- |
| df : pd.DataFrame |
| Must contain: high, low, close, spread |
| |
| Returns |
| ------- |
| np.ndarray of int |
| Label array aligned with df index. |
| """ |
| high = df["high"].values.astype(np.float64) |
| low = df["low"].values.astype(np.float64) |
| close = df["close"].values.astype(np.float64) |
|
|
| |
| |
| |
| spread_raw = df["spread"].values.astype(np.float64) |
| |
| point = 0.01 |
| if np.nanmedian(spread_raw) < 1.0: |
| spread = spread_raw * point |
| else: |
| spread = spread_raw |
|
|
| atr = _compute_atr(high, low, close, cfg.ATR_PERIOD) |
| sl_dist = atr * cfg.ATR_SL_MULTIPLIER |
|
|
| n = len(close) |
| labels = np.full(n, cfg.LABEL_HOLD, dtype=np.int32) |
| lookahead = cfg.LABEL_LOOKAHEAD_BARS |
| spread_mult = cfg.SPREAD_FILTER_MULTIPLIER |
|
|
| for i in range(n): |
| if np.isnan(sl_dist[i]) or sl_dist[i] <= 0: |
| labels[i] = cfg.LABEL_DO_NOTHING |
| continue |
|
|
| |
| if sl_dist[i] < spread[i] * spread_mult: |
| labels[i] = cfg.LABEL_DO_NOTHING |
| continue |
|
|
| entry = close[i] |
| sd = sl_dist[i] |
|
|
| |
| buy_tp = entry + sd |
| buy_sl = entry - sd |
| buy_bars = -1 |
|
|
| |
| sell_tp = entry - sd |
| sell_sl = entry + sd |
| sell_bars = -1 |
|
|
| end = min(i + lookahead + 1, n) |
| for j in range(i + 1, end): |
| |
| if buy_bars == -1: |
| if low[j] <= buy_sl: |
| buy_bars = n + 1 |
| elif high[j] >= buy_tp: |
| buy_bars = j - i |
|
|
| |
| if sell_bars == -1: |
| if high[j] >= sell_sl: |
| sell_bars = n + 1 |
| elif low[j] <= sell_tp: |
| sell_bars = j - i |
|
|
| |
| if buy_bars != -1 and sell_bars != -1: |
| break |
|
|
| buy_won = 0 < buy_bars < n |
| sell_won = 0 < sell_bars < n |
|
|
| if buy_won and sell_won: |
| labels[i] = cfg.LABEL_BUY if buy_bars <= sell_bars else cfg.LABEL_SELL |
| elif buy_won: |
| labels[i] = cfg.LABEL_BUY |
| elif sell_won: |
| labels[i] = cfg.LABEL_SELL |
| else: |
| labels[i] = cfg.LABEL_HOLD |
|
|
| |
| unique, counts = np.unique(labels, return_counts=True) |
| dist = {cfg.LABEL_NAMES.get(u, u): int(c) for u, c in zip(unique, counts)} |
| print(f"[INFO] Label distribution: {dist}") |
|
|
| return labels |
|
|