File size: 4,661 Bytes
9cb5a00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
ML-3m-trader Labeling Engine
==============================
Generates supervised-learning labels by simulating potential trades
at each bar and checking 1:1 RR outcomes over a lookahead window.

Labels
------
0 = DO_NOTHING  (spread filter failed)
1 = BUY         (long setup that would hit TP before SL)
2 = SELL        (short setup that would hit TP before SL)
3 = HOLD        (neither BUY nor SELL produced a winner)
"""

import numpy as np
import pandas as pd

import config as cfg


def _compute_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray,
                 period: int) -> np.ndarray:
    """Vectorized ATR (Wilder) for the labeler."""
    n = len(high)
    tr = np.empty(n, dtype=np.float64)
    tr[0] = high[0] - low[0]
    for i in range(1, n):
        tr[i] = max(high[i] - low[i],
                     abs(high[i] - close[i - 1]),
                     abs(low[i] - close[i - 1]))
    atr = np.empty(n, dtype=np.float64)
    atr[:] = np.nan
    atr[period - 1] = np.mean(tr[:period])
    alpha = 1.0 / period
    for i in range(period, n):
        atr[i] = atr[i - 1] * (1 - alpha) + tr[i] * alpha
    return atr


def generate_labels(df: pd.DataFrame) -> np.ndarray:
    """
    For each bar, determine the best label:

    1. Compute ATR-based stop-loss distance.
    2. If SL distance < spread * SPREAD_FILTER_MULTIPLIER -> DO_NOTHING.
    3. For BUY: entry=close, SL=close - sl_dist, TP=close + sl_dist (1:1 RR).
       Walk forward up to LABEL_LOOKAHEAD_BARS.  If TP hit first -> BUY candidate.
    4. For SELL: entry=close, SL=close + sl_dist, TP=close - sl_dist.
       Walk forward.  If TP hit first -> SELL candidate.
    5. If both BUY and SELL win -> pick the one that hits TP sooner.
    6. If neither wins -> HOLD.

    Parameters
    ----------
    df : pd.DataFrame
        Must contain: high, low, close, spread

    Returns
    -------
    np.ndarray of int
        Label array aligned with df index.
    """
    high = df["high"].values.astype(np.float64)
    low = df["low"].values.astype(np.float64)
    close = df["close"].values.astype(np.float64)

    # Spread: MT5 provides spread in points; convert to price units.
    # For XAUUSDc 1 point = 0.01 typically.  If spread is already in
    # price terms (>1), we use as-is; otherwise multiply by point size.
    spread_raw = df["spread"].values.astype(np.float64)
    # Heuristic: if median spread < 1, it is likely in points -> convert
    point = 0.01  # XAUUSDc standard point
    if np.nanmedian(spread_raw) < 1.0:
        spread = spread_raw * point
    else:
        spread = spread_raw

    atr = _compute_atr(high, low, close, cfg.ATR_PERIOD)
    sl_dist = atr * cfg.ATR_SL_MULTIPLIER

    n = len(close)
    labels = np.full(n, cfg.LABEL_HOLD, dtype=np.int32)
    lookahead = cfg.LABEL_LOOKAHEAD_BARS
    spread_mult = cfg.SPREAD_FILTER_MULTIPLIER

    for i in range(n):
        if np.isnan(sl_dist[i]) or sl_dist[i] <= 0:
            labels[i] = cfg.LABEL_DO_NOTHING
            continue

        # Spread filter
        if sl_dist[i] < spread[i] * spread_mult:
            labels[i] = cfg.LABEL_DO_NOTHING
            continue

        entry = close[i]
        sd = sl_dist[i]

        # BUY scenario
        buy_tp = entry + sd
        buy_sl = entry - sd
        buy_bars = -1  # bars to TP (-1 = never)

        # SELL scenario
        sell_tp = entry - sd
        sell_sl = entry + sd
        sell_bars = -1

        end = min(i + lookahead + 1, n)
        for j in range(i + 1, end):
            # Check BUY
            if buy_bars == -1:
                if low[j] <= buy_sl:
                    buy_bars = n + 1  # SL hit first -> lose
                elif high[j] >= buy_tp:
                    buy_bars = j - i  # TP hit

            # Check SELL
            if sell_bars == -1:
                if high[j] >= sell_sl:
                    sell_bars = n + 1  # SL hit
                elif low[j] <= sell_tp:
                    sell_bars = j - i  # TP hit

            # Both resolved
            if buy_bars != -1 and sell_bars != -1:
                break

        buy_won = 0 < buy_bars < n
        sell_won = 0 < sell_bars < n

        if buy_won and sell_won:
            labels[i] = cfg.LABEL_BUY if buy_bars <= sell_bars else cfg.LABEL_SELL
        elif buy_won:
            labels[i] = cfg.LABEL_BUY
        elif sell_won:
            labels[i] = cfg.LABEL_SELL
        else:
            labels[i] = cfg.LABEL_HOLD

    # Summary
    unique, counts = np.unique(labels, return_counts=True)
    dist = {cfg.LABEL_NAMES.get(u, u): int(c) for u, c in zip(unique, counts)}
    print(f"[INFO] Label distribution: {dist}")

    return labels