Riy777 commited on
Commit
6649776
·
verified ·
1 Parent(s): eab27d3

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +178 -172
backtest_engine.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V134.0 - GEM-Architect: Feature Parity Edition)
3
  # ============================================================
4
 
5
  import asyncio
@@ -10,12 +10,14 @@ import time
10
  import logging
11
  import itertools
12
  import os
 
13
  import gc
14
  import sys
15
  import traceback
 
16
  from datetime import datetime, timezone
17
  from typing import Dict, Any, List
18
- from numpy.lib.stride_tricks import sliding_window_view
19
 
20
  try:
21
  from ml_engine.processor import MLProcessor, SystemLimits
@@ -31,6 +33,32 @@ except ImportError:
31
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
32
  CACHE_DIR = "backtest_real_scores"
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  class HeavyDutyBacktester:
35
  def __init__(self, data_manager, processor):
36
  self.dm = data_manager
@@ -56,7 +84,7 @@ class HeavyDutyBacktester:
56
  self.force_end_date = None
57
 
58
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
59
- print(f"🧪 [Backtest V134.0] Feature Parity Mode (Exact Live System Logic).")
60
 
61
  def set_date_range(self, start_str, end_str):
62
  self.force_start_date = start_str
@@ -75,7 +103,7 @@ class HeavyDutyBacktester:
75
  tasks.append(current)
76
  current += duration_per_batch
77
  all_candles = []
78
- sem = asyncio.Semaphore(15)
79
 
80
  async def _fetch_batch(timestamp):
81
  async with sem:
@@ -85,7 +113,7 @@ class HeavyDutyBacktester:
85
  except: await asyncio.sleep(0.5)
86
  return []
87
 
88
- chunk_size = 25
89
  for i in range(0, len(tasks), chunk_size):
90
  chunk_tasks = tasks[i:i + chunk_size]
91
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
@@ -101,29 +129,19 @@ class HeavyDutyBacktester:
101
  print(f" ✅ Downloaded {len(df)} candles.", flush=True)
102
  return df.values.tolist()
103
 
104
- # ==============================================================
105
- # 🏎️ HELPER: Rolling Z-Score (For Sniper)
106
- # ==============================================================
107
- def _z_roll(self, x, w=500):
108
- r = x.rolling(w).mean()
109
- s = x.rolling(w).std().replace(0, np.nan)
110
- return ((x - r) / s).fillna(0)
111
-
112
  # ==============================================================
113
  # 🏎️ VECTORIZED INDICATORS (EXACT MATCH TO LIVE SYSTEM)
114
  # ==============================================================
115
  def _calculate_indicators_vectorized(self, df, timeframe='1m'):
116
  # 1. Clean Types
117
  cols = ['close', 'high', 'low', 'volume', 'open']
118
- for c in cols: df[c] = df[c].astype(np.float64) # Use float64 for precision match
119
 
120
  # ---------------------------------------------------------
121
- # 🧠 PART 1: TITAN FEATURES (Exact Replica of TitanEngine)
122
  # ---------------------------------------------------------
123
- # RSI
124
  df['RSI'] = ta.rsi(df['close'], length=14).fillna(50)
125
 
126
- # MACD
127
  macd = ta.macd(df['close'])
128
  if macd is not None:
129
  df['MACD'] = macd.iloc[:, 0].fillna(0)
@@ -131,35 +149,25 @@ class HeavyDutyBacktester:
131
  else:
132
  df['MACD'] = 0.0; df['MACD_h'] = 0.0
133
 
134
- # CCI
135
  df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20).fillna(0)
136
 
137
- # ADX
138
  adx = ta.adx(df['high'], df['low'], df['close'], length=14)
139
  if adx is not None: df['ADX'] = adx.iloc[:, 0].fillna(0)
140
  else: df['ADX'] = 0.0
141
 
142
- # EMAs & Distances
143
  for p in [9, 21, 50, 200]:
144
  ema = ta.ema(df['close'], length=p)
145
  df[f'EMA_{p}_dist'] = ((df['close'] / ema) - 1).fillna(0)
146
- df[f'ema{p}'] = ema # Keep raw for others
147
 
148
- # Bollinger Bands (Width & %B)
149
  bb = ta.bbands(df['close'], length=20, std=2.0)
150
  if bb is not None:
151
- # Width = (Upper - Lower) / Middle
152
  df['BB_w'] = ((bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]).fillna(0)
153
- # %B = (Price - Lower) / (Upper - Lower)
154
  df['BB_p'] = ((df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])).fillna(0)
155
-
156
- # Helper for Hydra
157
- df['bb_width'] = df['BB_w'] # Alias
158
 
159
- # MFI
160
  df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14).fillna(50)
161
 
162
- # VWAP
163
  vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
164
  if vwap is not None:
165
  df['VWAP_dist'] = ((df['close'] / vwap) - 1).fillna(0)
@@ -168,7 +176,6 @@ class HeavyDutyBacktester:
168
  df['VWAP_dist'] = 0.0
169
  df['vwap'] = df['close']
170
 
171
- # ATR (for others)
172
  df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14).fillna(0)
173
  df['atr_pct'] = df['atr'] / df['close']
174
 
@@ -181,36 +188,32 @@ class HeavyDutyBacktester:
181
  df['return_5m'] = df['close'].pct_change(5).fillna(0)
182
  df['return_15m'] = df['close'].pct_change(15).fillna(0)
183
 
184
- df['rsi_14'] = df['RSI'] # Alias
185
-
186
- # Sniper specific derivations
187
  df['ema_9_slope'] = ((df['ema9'] - df['ema9'].shift(1)) / df['ema9'].shift(1)).fillna(0)
188
- df['ema_21_dist'] = df['EMA_21_dist'] # Reuse
189
 
190
- # Z-Scores for Sniper
191
  atr_100 = ta.atr(df['high'], df['low'], df['close'], length=100).fillna(0)
192
- df['atr_z'] = self._z_roll(atr_100) # Mapped later
193
 
194
- df['vol_zscore_50'] = self._z_roll(df['volume'], 50)
195
 
196
  rng = (df['high'] - df['low']).replace(0, 1e-9)
197
- df['candle_range'] = self._z_roll(rng, 500)
198
  df['close_pos_in_range'] = ((df['close'] - df['low']) / rng).fillna(0.5)
199
 
200
- # Liquidity Proxies
201
  df['dollar_vol'] = df['close'] * df['volume']
202
  amihud_raw = (df['return_1m'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0)
203
- df['amihud'] = self._z_roll(amihud_raw)
204
 
205
  dp = df['close'].diff()
206
  roll_cov = dp.rolling(64).cov(dp.shift(1))
207
  roll_spread_raw = (2 * np.sqrt(np.maximum(0, -roll_cov)))
208
- df['roll_spread'] = self._z_roll(roll_spread_raw)
209
 
210
  sign = np.sign(df['close'].diff()).fillna(0)
211
  signed_vol = sign * df['volume']
212
  ofi_raw = signed_vol.rolling(30).sum()
213
- df['ofi'] = self._z_roll(ofi_raw)
214
 
215
  buy_vol = (sign > 0) * df['volume']
216
  sell_vol = (sign < 0) * df['volume']
@@ -220,10 +223,10 @@ class HeavyDutyBacktester:
220
 
221
  vwap_win = 20
222
  v_short = (df['dollar_vol'].rolling(vwap_win).sum() / df['volume'].rolling(vwap_win).sum().replace(0, np.nan)).fillna(df['close'])
223
- df['vwap_dev'] = self._z_roll(df['close'] - v_short)
224
 
225
  rv_gk = ((np.log(df['high'] / df['low'])**2) / 2) - ((2 * np.log(2) - 1) * (np.log(df['close'] / df['open'])**2))
226
- df['rv_gk'] = self._z_roll(rv_gk)
227
 
228
  # L_Score approximation
229
  df['L_score'] = (df['vol_zscore_50'] - df['amihud'] - df['roll_spread'] - df['rv_gk'].abs() - df['vwap_dev'].abs() + df['ofi']).fillna(0)
@@ -249,7 +252,10 @@ class HeavyDutyBacktester:
249
  fib618 = roll_max - (diff * 0.382)
250
  df['dist_fib618'] = ((df['close'] - fib618) / df['close']).fillna(0)
251
 
252
- # Legacy Lags
 
 
 
253
  if timeframe == '1m':
254
  for lag in [1, 2, 3, 5, 10, 20]:
255
  df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0)
@@ -261,7 +267,7 @@ class HeavyDutyBacktester:
261
  return df
262
 
263
  # ==============================================================
264
- # 🧠 CPU PROCESSING (GLOBAL INFERENCE)
265
  # ==============================================================
266
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
267
  safe_sym = sym.replace('/', '_')
@@ -272,7 +278,7 @@ class HeavyDutyBacktester:
272
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
273
  return
274
 
275
- print(f" ⚙️ [CPU] Analyzing {sym} (Global Inference)...", flush=True)
276
  t0 = time.time()
277
 
278
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
@@ -283,7 +289,7 @@ class HeavyDutyBacktester:
283
  frames = {}
284
  agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
285
 
286
- # 1. Calc 1m (Base)
287
  frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
288
  frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
289
  fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
@@ -299,15 +305,10 @@ class HeavyDutyBacktester:
299
 
300
  # 3. Global Index Maps
301
  arr_ts_1m = fast_1m['timestamp']
302
- map_5m = np.searchsorted(numpy_htf['5m']['timestamp'], arr_ts_1m)
303
- map_15m = np.searchsorted(numpy_htf['15m']['timestamp'], arr_ts_1m)
304
- map_1h = np.searchsorted(numpy_htf['1h']['timestamp'], arr_ts_1m)
305
- map_4h = np.searchsorted(numpy_htf['4h']['timestamp'], arr_ts_1m)
306
-
307
- map_5m = np.clip(map_5m, 0, len(numpy_htf['5m']['timestamp']) - 1)
308
- map_15m = np.clip(map_15m, 0, len(numpy_htf['15m']['timestamp']) - 1)
309
- map_1h = np.clip(map_1h, 0, len(numpy_htf['1h']['timestamp']) - 1)
310
- map_4h = np.clip(map_4h, 0, len(numpy_htf['4h']['timestamp']) - 1)
311
 
312
  # 4. Load Models
313
  hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
@@ -332,17 +333,8 @@ class HeavyDutyBacktester:
332
  if titan_model and titan_cols:
333
  print(" 🚀 Running Global Titan...", flush=True)
334
  try:
335
- # Titan needs 5m features aligned to 1m
336
- # Build feature matrix from numpy_htf['5m'] using map_5m
337
  t_vecs = []
338
  for col in titan_cols:
339
- # Titan features usually have no prefix in the pickle list,
340
- # but in htf dict we have raw names.
341
- # Need to verify if titan_cols expects "RSI" or "5m_RSI"??
342
- # Usually Titan is trained on ONE timeframe (5m).
343
- # So we just pull the raw column from numpy_htf['5m'].
344
-
345
- # Fix: Clean name (e.g. if trained as 'RSI', grab 'RSI')
346
  if col in numpy_htf['5m']:
347
  t_vecs.append(numpy_htf['5m'][col][map_5m])
348
  else:
@@ -350,7 +342,7 @@ class HeavyDutyBacktester:
350
 
351
  X_TITAN = np.column_stack(t_vecs)
352
  preds_t = titan_model.predict(xgb.DMatrix(X_TITAN))
353
- global_titan_scores = preds_t
354
  except Exception as e: print(f"Titan Error: {e}")
355
 
356
  # B. SNIPER (1m Direct)
@@ -361,13 +353,12 @@ class HeavyDutyBacktester:
361
  s_vecs = []
362
  for col in sniper_cols:
363
  if col in fast_1m: s_vecs.append(fast_1m[col])
364
- # Fix mapping for 'atr' -> 'atr_z' if needed
365
  elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z'])
366
  else: s_vecs.append(np.zeros(len(arr_ts_1m)))
367
 
368
  X_SNIPER = np.column_stack(s_vecs)
369
  preds_list = [m.predict(X_SNIPER) for m in sniper_models]
370
- global_sniper_scores = np.mean(preds_list, axis=0)
371
  except Exception as e: print(f"Sniper Error: {e}")
372
 
373
  # C. ORACLE (HTF Mix)
@@ -387,9 +378,8 @@ class HeavyDutyBacktester:
387
 
388
  X_ORACLE = np.column_stack(o_vecs)
389
  preds_o = oracle_dir.predict(X_ORACLE)
390
- global_oracle_scores = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0]
391
- # Adjust if binary (assuming 0=Long, 1=Short or vice versa, check training)
392
- # Usually we want Confidence > 0.6. Assuming output is Long Prob.
393
  except Exception as e: print(f"Oracle Error: {e}")
394
 
395
  # D. LEGACY V2 (Global)
@@ -434,11 +424,6 @@ class HeavyDutyBacktester:
434
  except: pass
435
 
436
  # --- 5. Filtering Candidates ---
437
- # Using Oracle and Sniper to filter BEFORE loop
438
- # This saves simulating trades that would never be entered
439
-
440
- # Valid: (Titan > 0.5) & (Oracle > 0.5) & (Sniper > 0.3) & (RSI < 70)
441
- # This reduces loop count drastically
442
  is_candidate = (
443
  (numpy_htf['1h']['RSI'][map_1h] <= 70) &
444
  (global_titan_scores > 0.4) &
@@ -447,7 +432,6 @@ class HeavyDutyBacktester:
447
 
448
  candidate_indices = np.where(is_candidate)[0]
449
 
450
- # Date Filter
451
  start_ts_val = frames['1m'].index[0] + pd.Timedelta(minutes=500)
452
  start_idx_offset = np.searchsorted(arr_ts_1m, int(start_ts_val.timestamp()*1000))
453
  candidate_indices = candidate_indices[candidate_indices >= start_idx_offset]
@@ -497,7 +481,6 @@ class HeavyDutyBacktester:
497
  l2_arr = np.full(240, 0.7)
498
  tgt_arr = np.full(240, 3.0)
499
 
500
- # [rsi1, rsi5, rsi15, bb, vol, dist_ema, atr_p, norm, max, dists, time, entry, oracle, l2, target]
501
  X_H = np.column_stack([
502
  sl_st[:,0], sl_st[:,1], sl_st[:,2], sl_st[:,3], sl_st[:,4],
503
  zeros, atr_pct, norm_pnl, max_pnl_r,
@@ -537,120 +520,126 @@ class HeavyDutyBacktester:
537
  gc.collect()
538
 
539
  # ==============================================================
540
- # PHASE 1 & 2 (Standard Optimization)
541
  # ==============================================================
542
  async def generate_truth_data(self):
543
- if self.force_start_date and self.force_end_date:
544
- dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
545
- dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
546
- start_time_ms = int(dt_start.timestamp() * 1000)
547
- end_time_ms = int(dt_end.timestamp() * 1000)
548
  print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
549
- else: return
550
-
551
- for sym in self.TARGET_COINS:
552
- try:
553
- candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
554
- if candles: await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
555
- except Exception as e: print(f" ❌ SKIP {sym}: {e}", flush=True)
556
- gc.collect()
557
 
 
 
 
558
  @staticmethod
559
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
560
- results = []
561
- all_data = []
562
- for fp in scores_files:
563
- try:
564
- df = pd.read_pickle(fp)
565
- if not df.empty: all_data.append(df)
566
  except: pass
567
- if not all_data: return []
568
- global_df = pd.concat(all_data)
569
- global_df.sort_values('timestamp', inplace=True)
570
- grouped_by_time = global_df.groupby('timestamp')
 
 
 
 
 
 
 
571
 
572
- for config in combinations_batch:
573
- wallet = { "balance": initial_capital, "allocated": 0.0, "positions": {}, "trades_history": [] }
574
- w_titan = config['w_titan']; oracle_thresh = config.get('oracle_thresh', 0.6)
575
- sniper_thresh = config.get('sniper_thresh', 0.4); hydra_thresh = config['hydra_thresh']
576
- peak_balance = initial_capital; max_drawdown = 0.0
577
-
578
- for ts, group in grouped_by_time:
579
- active = list(wallet["positions"].keys())
580
- current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()}
581
- for sym in active:
582
- if sym in current_prices:
583
- curr = current_prices[sym]
584
- pos = wallet["positions"][sym]
585
- h_risk = pos.get('risk_hydra_crash', 0)
586
- h_time = pos.get('time_hydra_crash', 0)
587
- is_crash = (h_risk > hydra_thresh) and (h_time > 0) and (ts >= h_time)
588
- pnl = (curr - pos['entry']) / pos['entry']
589
- if is_crash or pnl > 0.04 or pnl < -0.02:
590
- wallet['balance'] += pos['size'] * (1 + pnl - (fees_pct*2))
591
- wallet['allocated'] -= pos['size']
592
- del wallet['positions'][sym]
593
- wallet['trades_history'].append({'pnl': pnl})
594
-
595
- total_eq = wallet['balance'] + wallet['allocated']
596
- if total_eq > peak_balance: peak_balance = total_eq
597
- dd = (peak_balance - total_eq) / peak_balance
598
- if dd > max_drawdown: max_drawdown = dd
599
-
600
- if len(wallet['positions']) < max_slots:
601
- for _, row in group.iterrows():
602
- if row['symbol'] in wallet['positions']: continue
603
- if row['oracle_conf'] < oracle_thresh: continue
604
- if row['sniper_score'] < sniper_thresh: continue
605
- if row['real_titan'] < w_titan: continue # Titan Check
606
-
607
- size = 10.0
608
- if wallet['balance'] >= size:
609
- wallet['positions'][row['symbol']] = {
610
- 'entry': row['close'], 'size': size,
611
- 'risk_hydra_crash': row['risk_hydra_crash'],
612
- 'time_hydra_crash': row['time_hydra_crash']
613
- }
614
- wallet['balance'] -= size
615
- wallet['allocated'] += size
616
 
617
- final_bal = wallet['balance'] + wallet['allocated']
618
- net_profit = final_bal - initial_capital
619
- trades = wallet['trades_history']
620
- total_t = len(trades)
621
- win_count = len([t for t in trades if t['pnl'] > 0])
622
- loss_count = len([t for t in trades if t['pnl'] <= 0])
623
- win_rate = (win_count / total_t * 100) if total_t > 0 else 0
624
- max_win = max([t['pnl'] for t in trades]) if trades else 0
625
- max_loss = min([t['pnl'] for t in trades]) if trades else 0
626
 
627
- max_win_streak = 0; max_loss_streak = 0; curr_w = 0; curr_l = 0
628
- for t in trades:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
629
  if t['pnl'] > 0:
630
  curr_w += 1; curr_l = 0
631
- if curr_w > max_win_streak: max_win_streak = curr_w
632
  else:
633
  curr_l += 1; curr_w = 0
634
- if curr_l > max_loss_streak: max_loss_streak = curr_l
635
-
636
- results.append({
637
- 'config': config, 'final_balance': final_bal, 'net_profit': net_profit,
638
- 'total_trades': total_t, 'win_count': win_count, 'loss_count': loss_count,
639
- 'win_rate': win_rate, 'max_single_win': max_win, 'max_single_loss': max_loss,
640
- 'max_drawdown': max_drawdown * 100
 
 
 
 
 
 
 
 
 
 
 
641
  })
642
-
643
- return results
644
 
645
  async def run_optimization(self, target_regime="RANGE"):
646
  await self.generate_truth_data()
647
  oracle_r = np.linspace(0.4, 0.7, 3); sniper_r = np.linspace(0.4, 0.7, 3)
648
- hydra_r = [0.85, 0.95]
649
 
650
  combos = []
651
- for o, s, h in itertools.product(oracle_r, sniper_r, hydra_r):
652
  combos.append({
653
- 'w_titan': 0.5, 'w_struct': 0.3, 'thresh': 0.5, 'l1_thresh': 50.0,
654
  'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95
655
  })
656
 
@@ -661,6 +650,14 @@ class HeavyDutyBacktester:
661
  results_list.sort(key=lambda x: x['net_profit'], reverse=True)
662
  best = results_list[0]
663
 
 
 
 
 
 
 
 
 
664
  print("\n" + "="*60)
665
  print(f"🏆 CHAMPION REPORT [{target_regime}]:")
666
  print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
@@ -668,13 +665,22 @@ class HeavyDutyBacktester:
668
  print("-" * 60)
669
  print(f" 📊 Total Trades: {best['total_trades']}")
670
  print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
 
 
 
 
 
 
 
 
671
  print("-" * 60)
 
672
  print(f" ⚙️ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}")
673
  print("="*60)
674
  return best['config'], best
675
 
676
  async def run_strategic_optimization_task():
677
- print("\n🧪 [STRATEGIC BACKTEST] Feature Parity Mode...")
678
  r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm)
679
  try:
680
  await dm.initialize(); await proc.initialize()
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V135.0 - GEM-Architect: Feature Parity + Full Diagnostics)
3
  # ============================================================
4
 
5
  import asyncio
 
10
  import logging
11
  import itertools
12
  import os
13
+ import glob
14
  import gc
15
  import sys
16
  import traceback
17
+ from numpy.lib.stride_tricks import sliding_window_view
18
  from datetime import datetime, timezone
19
  from typing import Dict, Any, List
20
+ from scipy.special import expit # Sigmoid
21
 
22
  try:
23
  from ml_engine.processor import MLProcessor, SystemLimits
 
33
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
34
  CACHE_DIR = "backtest_real_scores"
35
 
36
+ # ============================================================
37
+ # 🛡️ GLOBAL HELPERS
38
+ # ============================================================
39
+ def sanitize_features(df):
40
+ if df is None or df.empty: return df
41
+ return df.replace([np.inf, -np.inf], np.nan).fillna(0.0)
42
+
43
+ def _z_roll(x, w=500):
44
+ r = x.rolling(w).mean()
45
+ s = x.rolling(w).std().replace(0, np.nan)
46
+ return ((x - r) / s).fillna(0)
47
+
48
+ def _revive_score_distribution(scores):
49
+ """Normalize flattened scores to 0-1 range if they are compressed"""
50
+ scores = np.array(scores, dtype=np.float32)
51
+ if len(scores) < 10: return scores
52
+ std = np.std(scores)
53
+ if std < 0.05:
54
+ mean = np.mean(scores)
55
+ z = (scores - mean) / (std + 1e-9)
56
+ return expit(z)
57
+ return scores
58
+
59
+ # ============================================================
60
+ # 🧪 THE BACKTESTER CLASS
61
+ # ============================================================
62
  class HeavyDutyBacktester:
63
  def __init__(self, data_manager, processor):
64
  self.dm = data_manager
 
84
  self.force_end_date = None
85
 
86
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
87
+ print(f"🧪 [Backtest V135.0] Feature Parity + Full Diagnostics + Speed.")
88
 
89
  def set_date_range(self, start_str, end_str):
90
  self.force_start_date = start_str
 
103
  tasks.append(current)
104
  current += duration_per_batch
105
  all_candles = []
106
+ sem = asyncio.Semaphore(20)
107
 
108
  async def _fetch_batch(timestamp):
109
  async with sem:
 
113
  except: await asyncio.sleep(0.5)
114
  return []
115
 
116
+ chunk_size = 50
117
  for i in range(0, len(tasks), chunk_size):
118
  chunk_tasks = tasks[i:i + chunk_size]
119
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
 
129
  print(f" ✅ Downloaded {len(df)} candles.", flush=True)
130
  return df.values.tolist()
131
 
 
 
 
 
 
 
 
 
132
  # ==============================================================
133
  # 🏎️ VECTORIZED INDICATORS (EXACT MATCH TO LIVE SYSTEM)
134
  # ==============================================================
135
  def _calculate_indicators_vectorized(self, df, timeframe='1m'):
136
  # 1. Clean Types
137
  cols = ['close', 'high', 'low', 'volume', 'open']
138
+ for c in cols: df[c] = df[c].astype(np.float64)
139
 
140
  # ---------------------------------------------------------
141
+ # 🧠 PART 1: TITAN FEATURES
142
  # ---------------------------------------------------------
 
143
  df['RSI'] = ta.rsi(df['close'], length=14).fillna(50)
144
 
 
145
  macd = ta.macd(df['close'])
146
  if macd is not None:
147
  df['MACD'] = macd.iloc[:, 0].fillna(0)
 
149
  else:
150
  df['MACD'] = 0.0; df['MACD_h'] = 0.0
151
 
 
152
  df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20).fillna(0)
153
 
 
154
  adx = ta.adx(df['high'], df['low'], df['close'], length=14)
155
  if adx is not None: df['ADX'] = adx.iloc[:, 0].fillna(0)
156
  else: df['ADX'] = 0.0
157
 
 
158
  for p in [9, 21, 50, 200]:
159
  ema = ta.ema(df['close'], length=p)
160
  df[f'EMA_{p}_dist'] = ((df['close'] / ema) - 1).fillna(0)
161
+ df[f'ema{p}'] = ema
162
 
 
163
  bb = ta.bbands(df['close'], length=20, std=2.0)
164
  if bb is not None:
 
165
  df['BB_w'] = ((bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]).fillna(0)
 
166
  df['BB_p'] = ((df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])).fillna(0)
167
+ df['bb_width'] = df['BB_w']
 
 
168
 
 
169
  df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14).fillna(50)
170
 
 
171
  vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
172
  if vwap is not None:
173
  df['VWAP_dist'] = ((df['close'] / vwap) - 1).fillna(0)
 
176
  df['VWAP_dist'] = 0.0
177
  df['vwap'] = df['close']
178
 
 
179
  df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14).fillna(0)
180
  df['atr_pct'] = df['atr'] / df['close']
181
 
 
188
  df['return_5m'] = df['close'].pct_change(5).fillna(0)
189
  df['return_15m'] = df['close'].pct_change(15).fillna(0)
190
 
191
+ df['rsi_14'] = df['RSI']
 
 
192
  df['ema_9_slope'] = ((df['ema9'] - df['ema9'].shift(1)) / df['ema9'].shift(1)).fillna(0)
193
+ df['ema_21_dist'] = df['EMA_21_dist']
194
 
 
195
  atr_100 = ta.atr(df['high'], df['low'], df['close'], length=100).fillna(0)
196
+ df['atr_z'] = _z_roll(atr_100)
197
 
198
+ df['vol_zscore_50'] = _z_roll(df['volume'], 50)
199
 
200
  rng = (df['high'] - df['low']).replace(0, 1e-9)
201
+ df['candle_range'] = _z_roll(rng, 500)
202
  df['close_pos_in_range'] = ((df['close'] - df['low']) / rng).fillna(0.5)
203
 
 
204
  df['dollar_vol'] = df['close'] * df['volume']
205
  amihud_raw = (df['return_1m'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0)
206
+ df['amihud'] = _z_roll(amihud_raw)
207
 
208
  dp = df['close'].diff()
209
  roll_cov = dp.rolling(64).cov(dp.shift(1))
210
  roll_spread_raw = (2 * np.sqrt(np.maximum(0, -roll_cov)))
211
+ df['roll_spread'] = _z_roll(roll_spread_raw)
212
 
213
  sign = np.sign(df['close'].diff()).fillna(0)
214
  signed_vol = sign * df['volume']
215
  ofi_raw = signed_vol.rolling(30).sum()
216
+ df['ofi'] = _z_roll(ofi_raw)
217
 
218
  buy_vol = (sign > 0) * df['volume']
219
  sell_vol = (sign < 0) * df['volume']
 
223
 
224
  vwap_win = 20
225
  v_short = (df['dollar_vol'].rolling(vwap_win).sum() / df['volume'].rolling(vwap_win).sum().replace(0, np.nan)).fillna(df['close'])
226
+ df['vwap_dev'] = _z_roll(df['close'] - v_short)
227
 
228
  rv_gk = ((np.log(df['high'] / df['low'])**2) / 2) - ((2 * np.log(2) - 1) * (np.log(df['close'] / df['open'])**2))
229
+ df['rv_gk'] = _z_roll(rv_gk)
230
 
231
  # L_Score approximation
232
  df['L_score'] = (df['vol_zscore_50'] - df['amihud'] - df['roll_spread'] - df['rv_gk'].abs() - df['vwap_dev'].abs() + df['ofi']).fillna(0)
 
252
  fib618 = roll_max - (diff * 0.382)
253
  df['dist_fib618'] = ((df['close'] - fib618) / df['close']).fillna(0)
254
 
255
+ df['dist_ema50'] = (df['close'] - df['ema50']) / df['close']
256
+ df['ema200'] = ta.ema(df['close'], length=200)
257
+ df['dist_ema200'] = (df['close'] - df['ema200']) / df['close']
258
+
259
  if timeframe == '1m':
260
  for lag in [1, 2, 3, 5, 10, 20]:
261
  df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0)
 
267
  return df
268
 
269
  # ==============================================================
270
+ # 🧠 CPU PROCESSING (GLOBAL INFERENCE + FULL FEATURE PARITY)
271
  # ==============================================================
272
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
273
  safe_sym = sym.replace('/', '_')
 
278
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
279
  return
280
 
281
+ print(f" ⚙️ [CPU] Analyzing {sym} (Full Stack / High Fidelity)...", flush=True)
282
  t0 = time.time()
283
 
284
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
 
289
  frames = {}
290
  agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
291
 
292
+ # 1. Calc 1m
293
  frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
294
  frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
295
  fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
 
305
 
306
  # 3. Global Index Maps
307
  arr_ts_1m = fast_1m['timestamp']
308
+ map_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['5m']['timestamp']) - 1)
309
+ map_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['15m']['timestamp']) - 1)
310
+ map_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1h']['timestamp']) - 1)
311
+ map_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['4h']['timestamp']) - 1)
 
 
 
 
 
312
 
313
  # 4. Load Models
314
  hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
 
333
  if titan_model and titan_cols:
334
  print(" 🚀 Running Global Titan...", flush=True)
335
  try:
 
 
336
  t_vecs = []
337
  for col in titan_cols:
 
 
 
 
 
 
 
338
  if col in numpy_htf['5m']:
339
  t_vecs.append(numpy_htf['5m'][col][map_5m])
340
  else:
 
342
 
343
  X_TITAN = np.column_stack(t_vecs)
344
  preds_t = titan_model.predict(xgb.DMatrix(X_TITAN))
345
+ global_titan_scores = _revive_score_distribution(preds_t)
346
  except Exception as e: print(f"Titan Error: {e}")
347
 
348
  # B. SNIPER (1m Direct)
 
353
  s_vecs = []
354
  for col in sniper_cols:
355
  if col in fast_1m: s_vecs.append(fast_1m[col])
 
356
  elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z'])
357
  else: s_vecs.append(np.zeros(len(arr_ts_1m)))
358
 
359
  X_SNIPER = np.column_stack(s_vecs)
360
  preds_list = [m.predict(X_SNIPER) for m in sniper_models]
361
+ global_sniper_scores = _revive_score_distribution(np.mean(preds_list, axis=0))
362
  except Exception as e: print(f"Sniper Error: {e}")
363
 
364
  # C. ORACLE (HTF Mix)
 
378
 
379
  X_ORACLE = np.column_stack(o_vecs)
380
  preds_o = oracle_dir.predict(X_ORACLE)
381
+ preds_o = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0]
382
+ global_oracle_scores = _revive_score_distribution(preds_o)
 
383
  except Exception as e: print(f"Oracle Error: {e}")
384
 
385
  # D. LEGACY V2 (Global)
 
424
  except: pass
425
 
426
  # --- 5. Filtering Candidates ---
 
 
 
 
 
427
  is_candidate = (
428
  (numpy_htf['1h']['RSI'][map_1h] <= 70) &
429
  (global_titan_scores > 0.4) &
 
432
 
433
  candidate_indices = np.where(is_candidate)[0]
434
 
 
435
  start_ts_val = frames['1m'].index[0] + pd.Timedelta(minutes=500)
436
  start_idx_offset = np.searchsorted(arr_ts_1m, int(start_ts_val.timestamp()*1000))
437
  candidate_indices = candidate_indices[candidate_indices >= start_idx_offset]
 
481
  l2_arr = np.full(240, 0.7)
482
  tgt_arr = np.full(240, 3.0)
483
 
 
484
  X_H = np.column_stack([
485
  sl_st[:,0], sl_st[:,1], sl_st[:,2], sl_st[:,3], sl_st[:,4],
486
  zeros, atr_pct, norm_pnl, max_pnl_r,
 
520
  gc.collect()
521
 
522
  # ==============================================================
523
+ # PHASE 1: Truth Data
524
  # ==============================================================
525
  async def generate_truth_data(self):
526
+ if self.force_start_date:
527
+ dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
528
+ dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
529
+ ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000)
 
530
  print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
531
+ for sym in self.TARGET_COINS:
532
+ c = await self._fetch_all_data_fast(sym, ms_s, ms_e)
533
+ if c: await self._process_data_in_memory(sym, c, ms_s, ms_e)
 
 
 
 
 
534
 
535
+ # ==============================================================
536
+ # PHASE 2: Optimization (Detailed Stats)
537
+ # ==============================================================
538
  @staticmethod
539
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
540
+ print(f" ⏳ [System] Loading {len(scores_files)} datasets...", flush=True)
541
+ data = []
542
+ for f in scores_files:
543
+ try: data.append(pd.read_pickle(f))
 
 
544
  except: pass
545
+ if not data: return []
546
+ df = pd.concat(data).sort_values('timestamp')
547
+
548
+ ts = df['timestamp'].values; close = df['close'].values.astype(float)
549
+ sym = df['symbol'].values; sym_map = {s:i for i,s in enumerate(np.unique(sym))}
550
+ sym_id = np.array([sym_map[s] for s in sym])
551
+
552
+ oracle = df['oracle_conf'].values; sniper = df['sniper_score'].values
553
+ hydra = df['risk_hydra_crash'].values; titan = df['real_titan'].values
554
+ l1 = df['l1_score'].values
555
+ legacy_v2 = df['risk_legacy_v2'].values
556
 
557
+ N = len(ts)
558
+ print(f" 🚀 [System] Testing {len(combinations_batch)} configs on {N} candles...", flush=True)
559
+
560
+ res = []
561
+ for cfg in combinations_batch:
562
+ pos = {}; log = []
563
+ bal = initial_capital; alloc = 0.0
564
+ mask = (l1 >= cfg['l1_thresh']) & (oracle >= cfg['oracle_thresh']) & (sniper >= cfg['sniper_thresh']) & (titan >= 0.55)
565
+
566
+ for i in range(N):
567
+ s = sym_id[i]; p = close[i]
568
+ if s in pos:
569
+ entry = pos[s][0]; h_r = pos[s][1]; titan_entry = pos[s][3]
570
+ crash_hydra = (h_r > cfg['hydra_thresh'])
571
+ panic_legacy = (legacy_v2[i] > cfg['legacy_thresh'])
572
+ pnl = (p - entry)/entry
573
+
574
+ if crash_hydra or panic_legacy or pnl > 0.04 or pnl < -0.02:
575
+ realized = pnl - fees_pct*2
576
+ bal += pos[s][2] * (1 + realized)
577
+ alloc -= pos[s][2]
578
+ is_consensus = (titan_entry > 0.55)
579
+ log.append({'pnl': realized, 'consensus': is_consensus})
580
+ del pos[s]
581
+
582
+ if len(pos) < max_slots and mask[i]:
583
+ if s not in pos and bal >= 5.0:
584
+ size = min(10.0, bal * 0.98)
585
+ pos[s] = (p, hydra[i], size, titan[i])
586
+ bal -= size; alloc += size
 
 
 
 
 
 
 
 
 
 
 
 
 
 
587
 
588
+ final_bal = bal + alloc
589
+ profit = final_bal - initial_capital
 
 
 
 
 
 
 
590
 
591
+ # Detailed Stats
592
+ tot = len(log)
593
+ winning = [x for x in log if x['pnl'] > 0]
594
+ losing = [x for x in log if x['pnl'] <= 0]
595
+
596
+ win_count = len(winning); loss_count = len(losing)
597
+ win_rate = (win_count/tot*100) if tot else 0
598
+
599
+ avg_win = np.mean([x['pnl'] for x in winning]) if winning else 0
600
+ avg_loss = np.mean([x['pnl'] for x in losing]) if losing else 0
601
+
602
+ gross_p = sum([x['pnl'] for x in winning])
603
+ gross_l = abs(sum([x['pnl'] for x in losing]))
604
+ profit_factor = (gross_p / gross_l) if gross_l > 0 else 99.9
605
+
606
+ max_win_s = 0; max_loss_s = 0; curr_w = 0; curr_l = 0
607
+ for t in log:
608
  if t['pnl'] > 0:
609
  curr_w += 1; curr_l = 0
610
+ if curr_w > max_win_s: max_win_s = curr_w
611
  else:
612
  curr_l += 1; curr_w = 0
613
+ if curr_l > max_loss_s: max_loss_s = curr_l
614
+
615
+ cons_trades = [x for x in log if x['consensus']]
616
+ n_cons = len(cons_trades)
617
+ agree_rate = (n_cons/tot*100) if tot else 0
618
+ cons_win_rate = (sum(1 for x in cons_trades if x['pnl']>0)/n_cons*100) if n_cons else 0
619
+ cons_avg_pnl = (sum(x['pnl'] for x in cons_trades)/n_cons*100) if n_cons else 0
620
+
621
+ res.append({
622
+ 'config': cfg, 'final_balance': final_bal, 'net_profit': profit,
623
+ 'total_trades': tot, 'win_rate': win_rate, 'max_drawdown': 0,
624
+ 'win_count': win_count, 'loss_count': loss_count,
625
+ 'avg_win': avg_win, 'avg_loss': avg_loss,
626
+ 'max_win_streak': max_win_s, 'max_loss_streak': max_loss_s,
627
+ 'profit_factor': profit_factor,
628
+ 'consensus_agreement_rate': agree_rate,
629
+ 'high_consensus_win_rate': cons_win_rate,
630
+ 'high_consensus_avg_pnl': cons_avg_pnl
631
  })
632
+ return res
 
633
 
634
  async def run_optimization(self, target_regime="RANGE"):
635
  await self.generate_truth_data()
636
  oracle_r = np.linspace(0.4, 0.7, 3); sniper_r = np.linspace(0.4, 0.7, 3)
637
+ hydra_r = [0.85, 0.95]; l1_r = [10.0]
638
 
639
  combos = []
640
+ for o, s, h, l1 in itertools.product(oracle_r, sniper_r, hydra_r, l1_r):
641
  combos.append({
642
+ 'w_titan': 0.4, 'w_struct': 0.3, 'thresh': l1, 'l1_thresh': l1,
643
  'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95
644
  })
645
 
 
650
  results_list.sort(key=lambda x: x['net_profit'], reverse=True)
651
  best = results_list[0]
652
 
653
+ # Auto-Diagnosis
654
+ diag = []
655
+ if best['total_trades'] > 2000 and best['net_profit'] < 10: diag.append("⚠️ Overtrading")
656
+ if best['win_rate'] > 55 and best['net_profit'] < 0: diag.append("⚠️ Fee Burn")
657
+ if abs(best['avg_loss']) > best['avg_win']: diag.append("⚠️ Risk/Reward Inversion")
658
+ if best['max_loss_streak'] > 10: diag.append("⚠️ Consecutive Loss Risk")
659
+ if not diag: diag.append("✅ System Healthy")
660
+
661
  print("\n" + "="*60)
662
  print(f"🏆 CHAMPION REPORT [{target_regime}]:")
663
  print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
 
665
  print("-" * 60)
666
  print(f" 📊 Total Trades: {best['total_trades']}")
667
  print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
668
+ print(f" ✅ Winning Trades: {best['win_count']} (Avg: {best['avg_win']*100:.2f}%)")
669
+ print(f" ❌ Losing Trades: {best['loss_count']} (Avg: {best['avg_loss']*100:.2f}%)")
670
+ print(f" 🌊 Max Streaks: Win {best['max_win_streak']} | Loss {best['max_loss_streak']}")
671
+ print(f" ⚖️ Profit Factor: {best['profit_factor']:.2f}")
672
+ print("-" * 60)
673
+ print(f" 🧠 CONSENSUS ANALYTICS:")
674
+ print(f" 🤝 Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%")
675
+ print(f" 🌟 High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%")
676
  print("-" * 60)
677
+ print(f" 🩺 DIAGNOSIS: {' '.join(diag)}")
678
  print(f" ⚙️ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}")
679
  print("="*60)
680
  return best['config'], best
681
 
682
  async def run_strategic_optimization_task():
683
+ print("\n🧪 [STRATEGIC BACKTEST] Full Spectrum Mode...")
684
  r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm)
685
  try:
686
  await dm.initialize(); await proc.initialize()