File size: 7,425 Bytes
9cb5a00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""
ML-3m-trader Backtesting Engine
=================================
Walk-forward backtester with realistic execution modeling:
- Per-trade position sizing by balance and SL distance
- Random slippage (0-2 XAUUSDc units)
- Spread filter (skip if SL < spread * 10)
- 1:1 Risk-Reward enforcement
"""

import numpy as np
import pandas as pd

import config as cfg


def run_backtest(
    df: pd.DataFrame,
    predictions: np.ndarray,
    starting_balance: float = cfg.STARTING_BALANCE,
    bet_pct: float = cfg.DEFAULT_BET_PCT,
    seed: int = cfg.RANDOM_SEED,
) -> dict:
    """
    Walk-forward backtest on the test set.

    Parameters
    ----------
    df : pd.DataFrame
        Test-set DataFrame with OHLCV + spread + features.
    predictions : np.ndarray
        Model predictions aligned with df (BUY=1, SELL=2, HOLD=3, DO_NOTHING=0).
    starting_balance : float
        Initial account balance in USD.
    bet_pct : float
        Fraction of balance to risk per trade.
    seed : int
        RNG seed for slippage.

    Returns
    -------
    dict with keys:
        trades       : list of dicts (individual trade records)
        equity_curve : np.ndarray (balance after each bar)
        final_balance: float
    """
    rng = np.random.RandomState(seed)

    high = df["high"].values.astype(np.float64)
    low = df["low"].values.astype(np.float64)
    close = df["close"].values.astype(np.float64)

    # Spread handling (same heuristic as labeler)
    spread_raw = df["spread"].values.astype(np.float64)
    point = 0.01
    if np.nanmedian(spread_raw) < 1.0:
        spread = spread_raw * point
    else:
        spread = spread_raw

    # Pre-compute ATR for SL distance
    atr = _compute_atr_vec(high, low, close, cfg.ATR_PERIOD)

    n = len(close)
    balance = starting_balance
    equity_curve = np.full(n, starting_balance, dtype=np.float64)
    trades = []

    in_trade = False
    trade_dir = 0  # 1=long, -1=short
    entry_price = 0.0
    sl_price = 0.0
    tp_price = 0.0
    trade_entry_bar = 0
    risk_amount = 0.0

    for i in range(n):
        if in_trade:
            # Check exit
            hit_sl = False
            hit_tp = False

            if trade_dir == 1:  # Long
                if low[i] <= sl_price:
                    hit_sl = True
                if high[i] >= tp_price:
                    hit_tp = True
            else:  # Short
                if high[i] >= sl_price:
                    hit_sl = True
                if low[i] <= tp_price:
                    hit_tp = True

            if hit_tp and hit_sl:
                # Ambiguous bar — assume SL hit (conservative)
                hit_tp = False

            if hit_sl:
                pnl = -risk_amount
                balance += pnl
                trades.append({
                    "entry_bar": trade_entry_bar,
                    "exit_bar": i,
                    "direction": "BUY" if trade_dir == 1 else "SELL",
                    "entry_price": entry_price,
                    "sl_price": sl_price,
                    "tp_price": tp_price,
                    "exit_price": sl_price,
                    "pnl": pnl,
                    "result": "SL",
                    "balance_after": balance,
                    "time_entry": df["time"].iloc[trade_entry_bar] if "time" in df.columns else None,
                    "time_exit": df["time"].iloc[i] if "time" in df.columns else None,
                })
                in_trade = False

            elif hit_tp:
                pnl = risk_amount  # 1:1 RR
                balance += pnl
                trades.append({
                    "entry_bar": trade_entry_bar,
                    "exit_bar": i,
                    "direction": "BUY" if trade_dir == 1 else "SELL",
                    "entry_price": entry_price,
                    "sl_price": sl_price,
                    "tp_price": tp_price,
                    "exit_price": tp_price,
                    "pnl": pnl,
                    "result": "TP",
                    "balance_after": balance,
                    "time_entry": df["time"].iloc[trade_entry_bar] if "time" in df.columns else None,
                    "time_exit": df["time"].iloc[i] if "time" in df.columns else None,
                })
                in_trade = False

        # Try to open new trade if not in one
        if not in_trade and predictions[i] in (cfg.LABEL_BUY, cfg.LABEL_SELL):
            if np.isnan(atr[i]) or atr[i] <= 0:
                equity_curve[i] = balance
                continue

            sl_dist = atr[i] * cfg.ATR_SL_MULTIPLIER

            # Spread filter
            if sl_dist < spread[i] * cfg.SPREAD_FILTER_MULTIPLIER:
                equity_curve[i] = balance
                continue

            # Random slippage
            slippage = rng.uniform(cfg.SLIPPAGE_MIN, cfg.SLIPPAGE_MAX)

            if predictions[i] == cfg.LABEL_BUY:
                entry_price = close[i] + slippage  # buy at worse price
                sl_price = entry_price - sl_dist
                tp_price = entry_price + sl_dist
                trade_dir = 1
            else:  # SELL
                entry_price = close[i] - slippage  # sell at worse price
                sl_price = entry_price + sl_dist
                tp_price = entry_price - sl_dist
                trade_dir = -1

            # Position sizing: risk bet_pct of balance
            risk_amount = balance * bet_pct
            trade_entry_bar = i
            in_trade = True

        equity_curve[i] = balance

    # Close any open trade at last bar's close
    if in_trade:
        if trade_dir == 1:
            pnl_pts = close[-1] - entry_price
        else:
            pnl_pts = entry_price - close[-1]
        # Scale PnL proportionally
        sl_dist_trade = abs(entry_price - sl_price)
        if sl_dist_trade > 0:
            pnl = risk_amount * (pnl_pts / sl_dist_trade)
        else:
            pnl = 0.0
        balance += pnl
        trades.append({
            "entry_bar": trade_entry_bar,
            "exit_bar": len(close) - 1,
            "direction": "BUY" if trade_dir == 1 else "SELL",
            "entry_price": entry_price,
            "sl_price": sl_price,
            "tp_price": tp_price,
            "exit_price": close[-1],
            "pnl": pnl,
            "result": "OPEN_CLOSE",
            "balance_after": balance,
            "time_entry": df["time"].iloc[trade_entry_bar] if "time" in df.columns else None,
            "time_exit": df["time"].iloc[-1] if "time" in df.columns else None,
        })
        equity_curve[-1] = balance

    print(f"[INFO] Backtest complete: {len(trades)} trades, "
          f"final balance: ${balance:,.2f}")

    return {
        "trades": trades,
        "equity_curve": equity_curve,
        "final_balance": balance,
    }


def _compute_atr_vec(high, low, close, period):
    """Vectorized ATR for the backtester."""
    n = len(high)
    tr = np.empty(n, dtype=np.float64)
    tr[0] = high[0] - low[0]
    for i in range(1, n):
        tr[i] = max(high[i] - low[i],
                     abs(high[i] - close[i - 1]),
                     abs(low[i] - close[i - 1]))
    atr = np.empty(n, dtype=np.float64)
    atr[:] = np.nan
    if period <= n:
        atr[period - 1] = np.mean(tr[:period])
        alpha = 1.0 / period
        for i in range(period, n):
            atr[i] = atr[i - 1] * (1 - alpha) + tr[i] * alpha
    return atr