File size: 4,661 Bytes
9cb5a00 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | """
ML-3m-trader Labeling Engine
==============================
Generates supervised-learning labels by simulating potential trades
at each bar and checking 1:1 RR outcomes over a lookahead window.
Labels
------
0 = DO_NOTHING (spread filter failed)
1 = BUY (long setup that would hit TP before SL)
2 = SELL (short setup that would hit TP before SL)
3 = HOLD (neither BUY nor SELL produced a winner)
"""
import numpy as np
import pandas as pd
import config as cfg
def _compute_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray,
period: int) -> np.ndarray:
"""Vectorized ATR (Wilder) for the labeler."""
n = len(high)
tr = np.empty(n, dtype=np.float64)
tr[0] = high[0] - low[0]
for i in range(1, n):
tr[i] = max(high[i] - low[i],
abs(high[i] - close[i - 1]),
abs(low[i] - close[i - 1]))
atr = np.empty(n, dtype=np.float64)
atr[:] = np.nan
atr[period - 1] = np.mean(tr[:period])
alpha = 1.0 / period
for i in range(period, n):
atr[i] = atr[i - 1] * (1 - alpha) + tr[i] * alpha
return atr
def generate_labels(df: pd.DataFrame) -> np.ndarray:
"""
For each bar, determine the best label:
1. Compute ATR-based stop-loss distance.
2. If SL distance < spread * SPREAD_FILTER_MULTIPLIER -> DO_NOTHING.
3. For BUY: entry=close, SL=close - sl_dist, TP=close + sl_dist (1:1 RR).
Walk forward up to LABEL_LOOKAHEAD_BARS. If TP hit first -> BUY candidate.
4. For SELL: entry=close, SL=close + sl_dist, TP=close - sl_dist.
Walk forward. If TP hit first -> SELL candidate.
5. If both BUY and SELL win -> pick the one that hits TP sooner.
6. If neither wins -> HOLD.
Parameters
----------
df : pd.DataFrame
Must contain: high, low, close, spread
Returns
-------
np.ndarray of int
Label array aligned with df index.
"""
high = df["high"].values.astype(np.float64)
low = df["low"].values.astype(np.float64)
close = df["close"].values.astype(np.float64)
# Spread: MT5 provides spread in points; convert to price units.
# For XAUUSDc 1 point = 0.01 typically. If spread is already in
# price terms (>1), we use as-is; otherwise multiply by point size.
spread_raw = df["spread"].values.astype(np.float64)
# Heuristic: if median spread < 1, it is likely in points -> convert
point = 0.01 # XAUUSDc standard point
if np.nanmedian(spread_raw) < 1.0:
spread = spread_raw * point
else:
spread = spread_raw
atr = _compute_atr(high, low, close, cfg.ATR_PERIOD)
sl_dist = atr * cfg.ATR_SL_MULTIPLIER
n = len(close)
labels = np.full(n, cfg.LABEL_HOLD, dtype=np.int32)
lookahead = cfg.LABEL_LOOKAHEAD_BARS
spread_mult = cfg.SPREAD_FILTER_MULTIPLIER
for i in range(n):
if np.isnan(sl_dist[i]) or sl_dist[i] <= 0:
labels[i] = cfg.LABEL_DO_NOTHING
continue
# Spread filter
if sl_dist[i] < spread[i] * spread_mult:
labels[i] = cfg.LABEL_DO_NOTHING
continue
entry = close[i]
sd = sl_dist[i]
# BUY scenario
buy_tp = entry + sd
buy_sl = entry - sd
buy_bars = -1 # bars to TP (-1 = never)
# SELL scenario
sell_tp = entry - sd
sell_sl = entry + sd
sell_bars = -1
end = min(i + lookahead + 1, n)
for j in range(i + 1, end):
# Check BUY
if buy_bars == -1:
if low[j] <= buy_sl:
buy_bars = n + 1 # SL hit first -> lose
elif high[j] >= buy_tp:
buy_bars = j - i # TP hit
# Check SELL
if sell_bars == -1:
if high[j] >= sell_sl:
sell_bars = n + 1 # SL hit
elif low[j] <= sell_tp:
sell_bars = j - i # TP hit
# Both resolved
if buy_bars != -1 and sell_bars != -1:
break
buy_won = 0 < buy_bars < n
sell_won = 0 < sell_bars < n
if buy_won and sell_won:
labels[i] = cfg.LABEL_BUY if buy_bars <= sell_bars else cfg.LABEL_SELL
elif buy_won:
labels[i] = cfg.LABEL_BUY
elif sell_won:
labels[i] = cfg.LABEL_SELL
else:
labels[i] = cfg.LABEL_HOLD
# Summary
unique, counts = np.unique(labels, return_counts=True)
dist = {cfg.LABEL_NAMES.get(u, u): int(c) for u, c in zip(unique, counts)}
print(f"[INFO] Label distribution: {dist}")
return labels
|