Riy777 commited on
Commit
bf66206
·
verified ·
1 Parent(s): d01877f

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +145 -181
backtest_engine.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V118.10 - GEM-Architect: Multi-Class Fix)
3
  # ============================================================
4
 
5
  import asyncio
@@ -17,7 +17,7 @@ import traceback
17
  from datetime import datetime, timezone
18
  from typing import Dict, Any, List
19
 
20
- # ✅ استيراد المحركات الأساسية
21
  try:
22
  from ml_engine.processor import MLProcessor, SystemLimits
23
  from ml_engine.data_manager import DataManager
@@ -44,7 +44,6 @@ class HeavyDutyBacktester:
44
  self.force_start_date = None
45
  self.force_end_date = None
46
 
47
- # 🔥 تنظيف الكاش 🔥
48
  if os.path.exists(CACHE_DIR):
49
  files = glob.glob(os.path.join(CACHE_DIR, "*"))
50
  print(f"🧹 [System] Flushing Cache: Deleting {len(files)} old files...", flush=True)
@@ -54,7 +53,7 @@ class HeavyDutyBacktester:
54
  else:
55
  os.makedirs(CACHE_DIR)
56
 
57
- print(f"🧪 [Backtest V118.10] Multi-Class Fix Mode. Models: {self._check_models_status()}")
58
 
59
  def _check_models_status(self):
60
  status = []
@@ -68,9 +67,20 @@ class HeavyDutyBacktester:
68
  self.force_start_date = start_str
69
  self.force_end_date = end_str
70
 
71
- # ==============================================================
72
- # FAST DATA DOWNLOADER
73
- # ==============================================================
 
 
 
 
 
 
 
 
 
 
 
74
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
75
  print(f" ⚡ [Network] Downloading {sym}...", flush=True)
76
  limit = 1000
@@ -101,7 +111,6 @@ class HeavyDutyBacktester:
101
  if res: all_candles.extend(res)
102
 
103
  if not all_candles: return None
104
-
105
  filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
106
  seen = set(); unique_candles = []
107
  for c in filtered:
@@ -112,18 +121,13 @@ class HeavyDutyBacktester:
112
  print(f" ✅ Downloaded {len(unique_candles)} candles.", flush=True)
113
  return unique_candles
114
 
115
- # ==============================================================
116
- # 🏎️ VECTORIZED INDICATORS (Robust)
117
- # ==============================================================
118
  def _calculate_indicators_vectorized(self, df, timeframe='1m'):
119
- for col in ['close', 'high', 'low', 'volume', 'open']:
120
- df[col] = df[col].astype(float)
121
 
122
  df['rsi'] = ta.rsi(df['close'], length=14)
123
  df['ema20'] = ta.ema(df['close'], length=20)
124
  df['ema50'] = ta.ema(df['close'], length=50)
125
  df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
126
-
127
  df['vol_ma50'] = df['volume'].rolling(50).mean()
128
  df['rel_vol'] = df['volume'] / (df['vol_ma50'] + 1e-9)
129
 
@@ -132,11 +136,8 @@ class HeavyDutyBacktester:
132
  std20 = df['close'].rolling(20).std()
133
  df['bb_width'] = ((sma20 + 2*std20) - (sma20 - 2*std20)) / sma20
134
 
135
- vol_mean = df['volume'].rolling(20).mean()
136
- vol_std = df['volume'].rolling(20).std()
137
- df['vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9)
138
  df['atr_pct'] = df['atr'] / df['close']
139
-
140
  # L1 Score
141
  rsi_penalty = np.where(df['rsi'] > 70, (df['rsi'] - 70) * 2, 0)
142
  l1_score_raw = (df['rel_vol'] * 10) + (df['atr_pct'] * 1000) - rsi_penalty
@@ -145,11 +146,8 @@ class HeavyDutyBacktester:
145
  if timeframe == '1m':
146
  df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
147
  df['ret'] = df['close'].pct_change()
148
-
149
- roll_max = df['high'].rolling(50).max()
150
  roll_min = df['low'].rolling(50).min()
151
- diff = (roll_max - roll_min).replace(0, 1e-9)
152
- df['fib_pos'] = (df['close'] - roll_min) / diff
153
  df['volatility'] = df['atr'] / df['close']
154
  df['trend_slope'] = (df['ema20'] - df['ema20'].shift(5)) / df['ema20'].shift(5)
155
 
@@ -166,9 +164,6 @@ class HeavyDutyBacktester:
166
  df.fillna(0, inplace=True)
167
  return df
168
 
169
- # ==============================================================
170
- # 🧠 CPU PROCESSING (HYPER-VECTORIZED)
171
- # ==============================================================
172
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
173
  safe_sym = sym.replace('/', '_')
174
  period_suffix = f"{start_ms}_{end_ms}"
@@ -178,164 +173,141 @@ class HeavyDutyBacktester:
178
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
179
  return
180
 
181
- print(f" ⚙️ [CPU] Analyzing {sym} (Hyper-Vectorized Mode)...", flush=True)
182
  t0 = time.time()
183
 
184
- # 1. Data Prep
185
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
186
  df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
187
  df_1m.set_index('datetime', inplace=True)
188
  df_1m = df_1m.sort_index()
189
 
190
  frames = {}
191
- agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
192
-
193
  frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
194
  frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
195
  fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
196
 
197
  numpy_htf = {}
198
  for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
199
- resampled = df_1m.resample(tf_code).agg(agg_dict).dropna()
200
  resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str)
201
  resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
202
  frames[tf_str] = resampled
203
  numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns}
204
 
205
- # 2. Time Alignment
206
  map_1m_to_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['1h']['timestamp'])-1)
207
  map_1m_to_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['5m']['timestamp'])-1)
208
  map_1m_to_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['15m']['timestamp'])-1)
209
  map_1m_to_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['4h']['timestamp'])-1)
210
 
211
- # 3. Model Access
212
  oracle_dir_model = getattr(self.proc.oracle, 'model_direction', None)
213
  sniper_models = getattr(self.proc.sniper, 'models', [])
214
  hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
215
  legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None)
216
 
217
- # 4. 🔥 Pre-Calc Legacy V2 🔥
218
  global_v2_probs = np.zeros(len(fast_1m['close']))
219
  if legacy_v2:
220
  try:
221
- l_log = fast_1m['log_ret']; l_rsi = fast_1m['rsi'] / 100.0
222
- l_fib = fast_1m['fib_pos']; l_vol = fast_1m['volatility']
223
- l5_log = numpy_htf['5m']['log_ret'][map_1m_to_5m]
224
- l5_rsi = numpy_htf['5m']['rsi'][map_1m_to_5m] / 100.0
225
- l5_fib = numpy_htf['5m']['fib_pos'][map_1m_to_5m]
226
- l5_trd = numpy_htf['5m']['trend_slope'][map_1m_to_5m]
227
- l15_log = numpy_htf['15m']['log_ret'][map_1m_to_15m]
228
- l15_rsi = numpy_htf['15m']['rsi'][map_1m_to_15m] / 100.0
229
- l15_fib618 = numpy_htf['15m']['dist_fib618'][map_1m_to_15m]
230
- l15_trd = numpy_htf['15m']['trend_slope'][map_1m_to_15m]
231
-
232
- lag_cols = []
233
- for lag in [1, 2, 3, 5, 10, 20]:
234
- lag_cols.extend([fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']])
235
-
236
- X_GLOBAL_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lag_cols])
237
- global_v2_probs = legacy_v2.predict(xgb.DMatrix(X_GLOBAL_V2))
238
-
239
- # ✅ FIX: Handle Multi-Class output if exists
240
- if len(global_v2_probs.shape) > 1:
241
- # Assuming last column is Panic/Crash prob (or index 2)
242
- global_v2_probs = global_v2_probs[:, -1]
243
  except: pass
244
 
245
- # 5. 🔥 Pre-Assemble Hydra Static 🔥
246
  global_hydra_static = None
247
  if hydra_models:
248
  try:
249
- h_rsi_1m = fast_1m['rsi']; h_rsi_5m = numpy_htf['5m']['rsi'][map_1m_to_5m]
250
- h_rsi_15m = numpy_htf['15m']['rsi'][map_1m_to_15m]; h_bb = fast_1m['bb_width']
251
- h_vol = fast_1m['rel_vol']; h_atr = fast_1m['atr']; h_close = fast_1m['close']
252
- global_hydra_static = np.column_stack([h_rsi_1m, h_rsi_5m, h_rsi_15m, h_bb, h_vol, h_atr, h_close])
253
  except: pass
254
 
255
- # 6. Candidate Filtering
 
256
  valid_indices_mask = fast_1m['l1_score'] >= 5.0
257
  valid_indices = np.where(valid_indices_mask)[0]
258
  mask_bounds = (valid_indices > 500) & (valid_indices < len(fast_1m['close']) - 245)
259
  final_valid_indices = valid_indices[mask_bounds]
260
 
261
- print(f" 🎯 Raw Candidates (Score > 5): {len(final_valid_indices)}. Vectorized Scoring...", flush=True)
262
-
263
  num_candidates = len(final_valid_indices)
 
264
  if num_candidates == 0: return
265
 
266
- # TIME VECTOR DEFINED
 
 
 
 
 
 
 
267
  time_vec = np.arange(1, 241)
268
 
269
- # --- A. ORACLE MATRIX CONSTRUCTION ---
270
- oracle_preds = np.full(num_candidates, 0.5)
271
  if oracle_dir_model:
272
  try:
273
  idx_1h = map_1m_to_1h[final_valid_indices]
274
  idx_15m = map_1m_to_15m[final_valid_indices]
275
  idx_4h = map_1m_to_4h[final_valid_indices]
276
-
277
  titan_scores = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95)
278
 
279
- oracle_features = []
280
  for col in getattr(self.proc.oracle, 'feature_cols', []):
281
- if col.startswith('1h_'):
282
- c = col[3:]; oracle_features.append(numpy_htf['1h'][c][idx_1h] if c in numpy_htf['1h'] else np.zeros(num_candidates))
283
- elif col.startswith('15m_'):
284
- c = col[4:]; oracle_features.append(numpy_htf['15m'][c][idx_15m] if c in numpy_htf['15m'] else np.zeros(num_candidates))
285
- elif col.startswith('4h_'):
286
- c = col[3:]; oracle_features.append(numpy_htf['4h'][c][idx_4h] if c in numpy_htf['4h'] else np.zeros(num_candidates))
287
- elif col == 'sim_titan_score': oracle_features.append(titan_scores)
288
- elif col == 'sim_mc_score': oracle_features.append(np.full(num_candidates, 0.5))
289
- elif col == 'sim_pattern_score': oracle_features.append(np.full(num_candidates, 0.5))
290
- else: oracle_features.append(np.zeros(num_candidates))
291
-
292
- X_oracle_big = np.column_stack(oracle_features)
293
- preds = oracle_dir_model.predict(X_oracle_big)
294
 
295
- # FIX: Handle Multi-Class (take last column usually)
296
- if len(preds.shape) > 1 and preds.shape[1] > 1: oracle_preds = preds[:, -1]
297
- else: oracle_preds = preds.flatten()
298
  except Exception as e: print(f"Oracle Error: {e}")
299
 
300
- # --- B. SNIPER MATRIX CONSTRUCTION (FIXED) ---
301
- sniper_preds = np.full(num_candidates, 0.5)
302
  if sniper_models:
303
  try:
304
- sniper_features = []
305
  for col in getattr(self.proc.sniper, 'feature_names', []):
306
- if col in fast_1m: sniper_features.append(fast_1m[col][final_valid_indices])
307
- elif col == 'L_score': sniper_features.append(fast_1m.get('vol_zscore_50', np.zeros(len(fast_1m['close'])))[final_valid_indices])
308
- else: sniper_features.append(np.zeros(num_candidates))
309
-
310
- X_sniper_big = np.column_stack(sniper_features)
311
 
312
- # FIX: Extract Positive Class Prob if Multi-Class
313
- batch_preds = []
314
  for m in sniper_models:
315
- raw_p = m.predict(X_sniper_big)
316
- if len(raw_p.shape) > 1 and raw_p.shape[1] > 1:
317
- # Assuming index 2 is Buy (0=Sell, 1=Hold, 2=Buy) or index 1 if binary
318
- # Safest: Take last column
319
- batch_preds.append(raw_p[:, -1])
320
- else:
321
- batch_preds.append(raw_p)
322
-
323
- sniper_preds = np.mean(batch_preds, axis=0)
324
  except Exception as e: print(f"Sniper Error: {e}")
325
 
326
- # --- C. HYDRA MATRIX CONSTRUCTION ---
327
- hydra_risk_preds = np.zeros(num_candidates)
328
- hydra_time_preds = np.zeros(num_candidates, dtype=int)
329
-
330
  if hydra_models and global_hydra_static is not None:
331
  chunk_size = 5000
332
  for i in range(0, num_candidates, chunk_size):
333
- chunk_indices = final_valid_indices[i : i + chunk_size]
334
- batch_X = []; valid_batch_indices = []
 
335
 
336
- for k, idx in enumerate(chunk_indices):
 
337
  start = idx + 1; end = start + 240
338
  sl_static = global_hydra_static[start:end]
 
339
  entry_p = fast_1m['close'][idx]
340
  sl_close = sl_static[:, 6]; sl_atr = sl_static[:, 5]
341
  sl_dist = np.maximum(1.5 * sl_atr, entry_p * 0.015)
@@ -344,87 +316,78 @@ class HeavyDutyBacktester:
344
  sl_max_pnl_r = (sl_cum_max - entry_p) / sl_dist
345
  sl_atr_pct = sl_atr / sl_close
346
  zeros = np.zeros(240); ones = np.ones(240)
347
- row = np.column_stack([
348
- sl_static[:, 0], sl_static[:, 1], sl_static[:, 2],
349
- sl_static[:, 3], sl_static[:, 4],
350
- zeros, sl_atr_pct, sl_norm_pnl, sl_max_pnl_r,
351
- zeros, zeros, time_vec,
352
- zeros, ones*0.6, ones*0.7, ones*3.0
353
- ])
354
- batch_X.append(row)
355
- valid_batch_indices.append(i + k)
356
 
357
  if batch_X:
358
  try:
359
- big_X = np.array(batch_X)
 
 
360
  big_X_flat = big_X.reshape(-1, big_X.shape[-1])
361
  preds_flat = hydra_models['crash'].predict_proba(big_X_flat)[:, 1]
362
  preds_batch = preds_flat.reshape(len(batch_X), 240)
363
- batch_max_risk = np.max(preds_batch, axis=1)
 
364
  over_thresh = preds_batch > 0.6
365
  has_crash = over_thresh.any(axis=1)
366
  crash_times_rel = np.argmax(over_thresh, axis=1)
367
- for j, glob_idx in enumerate(valid_batch_indices):
368
- hydra_risk_preds[glob_idx] = batch_max_risk[j]
369
- if has_crash[j]:
370
- start_t_idx = final_valid_indices[glob_idx] + 1
371
- abs_time = fast_1m['timestamp'][start_t_idx + crash_times_rel[j]]
372
- hydra_time_preds[glob_idx] = abs_time
 
 
 
 
 
 
373
  except Exception: pass
374
 
375
- # --- D. LEGACY V2 MAPPING ---
376
- legacy_risk_preds = np.zeros(num_candidates)
377
- legacy_time_preds = np.zeros(num_candidates, dtype=int)
378
  if legacy_v2:
379
- for k, idx in enumerate(final_valid_indices):
 
 
380
  start = idx + 1
381
  if start + 240 < len(global_v2_probs):
382
- window = global_v2_probs[start : start + 240]
383
- legacy_risk_preds[k] = np.max(window)
384
-
385
- # --- E. FINAL DATAFRAME CONSTRUCTION (Safe Mode) ---
386
- try:
387
- # 1. Gather Arrays
388
- arr_ts = fast_1m['timestamp'][final_valid_indices]
389
- arr_close = fast_1m['close'][final_valid_indices]
390
- arr_l1 = fast_1m['l1_score'][final_valid_indices]
391
- arr_titan = np.clip(arr_l1 / 40.0, 0.1, 0.95)
392
-
393
- # 2. Check Lengths & Flatten
394
- arrays = {
395
- 'timestamp': arr_ts,
396
- 'close': arr_close,
397
- 'real_titan': arr_titan,
398
- 'oracle_conf': oracle_preds,
399
- 'sniper_score': sniper_preds,
400
- 'l1_score': arr_l1,
401
- 'risk_hydra_crash': hydra_risk_preds,
402
- 'time_hydra_crash': hydra_time_preds,
403
- 'risk_legacy_v2': legacy_risk_preds,
404
- 'time_legacy_panic': legacy_time_preds
405
- }
406
-
407
- clean_arrays = {}
408
- for k, v in arrays.items():
409
- flat_v = np.array(v).flatten()
410
- # Safety Truncate
411
- if len(flat_v) > num_candidates: flat_v = flat_v[:num_candidates]
412
- elif len(flat_v) < num_candidates:
413
- print(f"⚠️ PADDING {k}: {len(flat_v)} -> {num_candidates}")
414
- flat_v = np.pad(flat_v, (0, num_candidates - len(flat_v)))
415
- clean_arrays[k] = flat_v
416
-
417
- clean_arrays['symbol'] = sym
418
- ai_df = pd.DataFrame(clean_arrays)
419
-
420
- dt = time.time() - t0
421
- if not ai_df.empty:
422
- ai_df.to_pickle(scores_file)
423
- print(f" ✅ [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True)
424
-
425
- except Exception as e:
426
- print(f"❌ DataFrame Construction Error: {e}")
427
- traceback.print_exc()
428
 
429
  del frames, fast_1m, numpy_htf, global_v2_probs, global_hydra_static
430
  gc.collect()
@@ -459,7 +422,7 @@ class HeavyDutyBacktester:
459
  global_df = pd.concat(all_data)
460
  global_df.sort_values('timestamp', inplace=True)
461
 
462
- # 🚀 Numpy Conversion 🚀
463
  arr_ts = global_df['timestamp'].values
464
  arr_close = global_df['close'].values.astype(np.float64)
465
  arr_symbol = global_df['symbol'].values
@@ -559,8 +522,9 @@ class HeavyDutyBacktester:
559
  await self.generate_truth_data()
560
 
561
  d = self.GRID_DENSITY
562
- oracle_range = np.linspace(0.45, 0.8, d).tolist()
563
- sniper_range = np.linspace(0.35, 0.7, d).tolist()
 
564
  hydra_range = np.linspace(0.70, 0.95, d).tolist()
565
  l1_range = [10.0, 15.0, 20.0, 25.0]
566
  titan_range = [0.4, 0.6]
@@ -601,7 +565,7 @@ class HeavyDutyBacktester:
601
  return best['config'], best
602
 
603
  async def run_strategic_optimization_task():
604
- print("\n🧪 [STRATEGIC BACKTEST] Dimension Safe Mode...")
605
  r2 = R2Service()
606
  dm = DataManager(None, None, r2)
607
  proc = MLProcessor(dm)
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V119.0 - GEM-Architect: The Synchronizer)
3
  # ============================================================
4
 
5
  import asyncio
 
17
  from datetime import datetime, timezone
18
  from typing import Dict, Any, List
19
 
20
+ # ✅ استيراد المحركات
21
  try:
22
  from ml_engine.processor import MLProcessor, SystemLimits
23
  from ml_engine.data_manager import DataManager
 
44
  self.force_start_date = None
45
  self.force_end_date = None
46
 
 
47
  if os.path.exists(CACHE_DIR):
48
  files = glob.glob(os.path.join(CACHE_DIR, "*"))
49
  print(f"🧹 [System] Flushing Cache: Deleting {len(files)} old files...", flush=True)
 
53
  else:
54
  os.makedirs(CACHE_DIR)
55
 
56
+ print(f"🧪 [Backtest V119.0] Synchronized Integrity Mode. Models: {self._check_models_status()}")
57
 
58
  def _check_models_status(self):
59
  status = []
 
67
  self.force_start_date = start_str
68
  self.force_end_date = end_str
69
 
70
+ # --- Helper: Robust Probability Extraction ---
71
+ def _extract_probs(self, raw_preds):
72
+ """Extracts positive class probability regardless of shape (N,), (N,1), (N,3)"""
73
+ if isinstance(raw_preds, list): raw_preds = np.array(raw_preds)
74
+
75
+ if raw_preds.ndim == 1:
76
+ return raw_preds # Already 1D probabilities or regression
77
+ elif raw_preds.ndim == 2:
78
+ cols = raw_preds.shape[1]
79
+ if cols == 1: return raw_preds.flatten()
80
+ if cols == 2: return raw_preds[:, 1] # Binary [Neg, Pos]
81
+ if cols >= 3: return raw_preds[:, -1] # Multi [Sell, Hold, Buy] -> Buy
82
+ return raw_preds.flatten()
83
+
84
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
85
  print(f" ⚡ [Network] Downloading {sym}...", flush=True)
86
  limit = 1000
 
111
  if res: all_candles.extend(res)
112
 
113
  if not all_candles: return None
 
114
  filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
115
  seen = set(); unique_candles = []
116
  for c in filtered:
 
121
  print(f" ✅ Downloaded {len(unique_candles)} candles.", flush=True)
122
  return unique_candles
123
 
 
 
 
124
  def _calculate_indicators_vectorized(self, df, timeframe='1m'):
125
+ for col in ['close', 'high', 'low', 'volume', 'open']: df[col] = df[col].astype(float)
 
126
 
127
  df['rsi'] = ta.rsi(df['close'], length=14)
128
  df['ema20'] = ta.ema(df['close'], length=20)
129
  df['ema50'] = ta.ema(df['close'], length=50)
130
  df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
 
131
  df['vol_ma50'] = df['volume'].rolling(50).mean()
132
  df['rel_vol'] = df['volume'] / (df['vol_ma50'] + 1e-9)
133
 
 
136
  std20 = df['close'].rolling(20).std()
137
  df['bb_width'] = ((sma20 + 2*std20) - (sma20 - 2*std20)) / sma20
138
 
 
 
 
139
  df['atr_pct'] = df['atr'] / df['close']
140
+
141
  # L1 Score
142
  rsi_penalty = np.where(df['rsi'] > 70, (df['rsi'] - 70) * 2, 0)
143
  l1_score_raw = (df['rel_vol'] * 10) + (df['atr_pct'] * 1000) - rsi_penalty
 
146
  if timeframe == '1m':
147
  df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
148
  df['ret'] = df['close'].pct_change()
 
 
149
  roll_min = df['low'].rolling(50).min()
150
+ df['fib_pos'] = (df['close'] - roll_min) / (df['high'].rolling(50).max() - roll_min + 1e-9)
 
151
  df['volatility'] = df['atr'] / df['close']
152
  df['trend_slope'] = (df['ema20'] - df['ema20'].shift(5)) / df['ema20'].shift(5)
153
 
 
164
  df.fillna(0, inplace=True)
165
  return df
166
 
 
 
 
167
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
168
  safe_sym = sym.replace('/', '_')
169
  period_suffix = f"{start_ms}_{end_ms}"
 
173
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
174
  return
175
 
176
+ print(f" ⚙️ [CPU] Analyzing {sym} (Synchronized Mode)...", flush=True)
177
  t0 = time.time()
178
 
 
179
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
180
  df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
181
  df_1m.set_index('datetime', inplace=True)
182
  df_1m = df_1m.sort_index()
183
 
184
  frames = {}
 
 
185
  frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
186
  frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
187
  fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
188
 
189
  numpy_htf = {}
190
  for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
191
+ resampled = df_1m.resample(tf_code).agg({'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}).dropna()
192
  resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str)
193
  resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
194
  frames[tf_str] = resampled
195
  numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns}
196
 
197
+ # Time Alignment
198
  map_1m_to_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['1h']['timestamp'])-1)
199
  map_1m_to_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['5m']['timestamp'])-1)
200
  map_1m_to_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['15m']['timestamp'])-1)
201
  map_1m_to_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['4h']['timestamp'])-1)
202
 
203
+ # Model Access
204
  oracle_dir_model = getattr(self.proc.oracle, 'model_direction', None)
205
  sniper_models = getattr(self.proc.sniper, 'models', [])
206
  hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
207
  legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None)
208
 
209
+ # Pre-Calc Legacy V2 (Global)
210
  global_v2_probs = np.zeros(len(fast_1m['close']))
211
  if legacy_v2:
212
  try:
213
+ # Optimized construction
214
+ X_GLOBAL_V2 = np.column_stack([
215
+ fast_1m['log_ret'], fast_1m['rsi']/100.0, fast_1m['fib_pos'], fast_1m['volatility'],
216
+ numpy_htf['5m']['log_ret'][map_1m_to_5m], numpy_htf['5m']['rsi'][map_1m_to_5m]/100.0, numpy_htf['5m']['fib_pos'][map_1m_to_5m], numpy_htf['5m']['trend_slope'][map_1m_to_5m],
217
+ numpy_htf['15m']['log_ret'][map_1m_to_15m], numpy_htf['15m']['rsi'][map_1m_to_15m]/100.0, numpy_htf['15m']['dist_fib618'][map_1m_to_15m], numpy_htf['15m']['trend_slope'][map_1m_to_15m],
218
+ *[fast_1m[f'log_ret_lag_{l}'] for l in [1,2,3,5,10,20]],
219
+ *[fast_1m[f'rsi_lag_{l}'] for l in [1,2,3,5,10,20]],
220
+ *[fast_1m[f'fib_pos_lag_{l}'] for l in [1,2,3,5,10,20]],
221
+ *[fast_1m[f'volatility_lag_{l}'] for l in [1,2,3,5,10,20]]
222
+ ])
223
+ raw = legacy_v2.predict(xgb.DMatrix(X_GLOBAL_V2))
224
+ global_v2_probs = self._extract_probs(raw)
 
 
 
 
 
 
 
 
 
 
225
  except: pass
226
 
227
+ # Pre-Assemble Hydra Static
228
  global_hydra_static = None
229
  if hydra_models:
230
  try:
231
+ global_hydra_static = np.column_stack([
232
+ fast_1m['rsi'], numpy_htf['5m']['rsi'][map_1m_to_5m], numpy_htf['15m']['rsi'][map_1m_to_15m],
233
+ fast_1m['bb_width'], fast_1m['rel_vol'], fast_1m['atr'], fast_1m['close']
234
+ ])
235
  except: pass
236
 
237
+ # 🎯 CANDIDATE SELECTION
238
+ # L1 Score > 5.0 (Loose pre-filter)
239
  valid_indices_mask = fast_1m['l1_score'] >= 5.0
240
  valid_indices = np.where(valid_indices_mask)[0]
241
  mask_bounds = (valid_indices > 500) & (valid_indices < len(fast_1m['close']) - 245)
242
  final_valid_indices = valid_indices[mask_bounds]
243
 
 
 
244
  num_candidates = len(final_valid_indices)
245
+ print(f" 🎯 Raw Candidates (Score > 5): {num_candidates}. Calculating Scores...", flush=True)
246
  if num_candidates == 0: return
247
 
248
+ # 🚀 PRE-ALLOCATE ARRAYS (STRICT ALIGNMENT)
249
+ # Using arrays of exact size N guarantees no shifting
250
+ res_oracle = np.full(num_candidates, 0.5, dtype=np.float32)
251
+ res_sniper = np.full(num_candidates, 0.5, dtype=np.float32)
252
+ res_hydra_risk = np.zeros(num_candidates, dtype=np.float32)
253
+ res_hydra_time = np.zeros(num_candidates, dtype=np.int64)
254
+ res_legacy_risk = np.zeros(num_candidates, dtype=np.float32)
255
+
256
  time_vec = np.arange(1, 241)
257
 
258
+ # --- A. ORACLE BATCHING ---
 
259
  if oracle_dir_model:
260
  try:
261
  idx_1h = map_1m_to_1h[final_valid_indices]
262
  idx_15m = map_1m_to_15m[final_valid_indices]
263
  idx_4h = map_1m_to_4h[final_valid_indices]
 
264
  titan_scores = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95)
265
 
266
+ features = []
267
  for col in getattr(self.proc.oracle, 'feature_cols', []):
268
+ if col.startswith('1h_'): features.append(numpy_htf['1h'].get(col[3:], np.zeros(len(idx_1h)))[idx_1h])
269
+ elif col.startswith('15m_'): features.append(numpy_htf['15m'].get(col[4:], np.zeros(len(idx_15m)))[idx_15m])
270
+ elif col.startswith('4h_'): features.append(numpy_htf['4h'].get(col[3:], np.zeros(len(idx_4h)))[idx_4h])
271
+ elif col == 'sim_titan_score': features.append(titan_scores)
272
+ elif col == 'sim_mc_score': features.append(np.full(num_candidates, 0.5))
273
+ elif col == 'sim_pattern_score': features.append(np.full(num_candidates, 0.5))
274
+ else: features.append(np.zeros(num_candidates))
 
 
 
 
 
 
275
 
276
+ X_oracle = np.column_stack(features)
277
+ preds = oracle_dir_model.predict(X_oracle)
278
+ res_oracle = self._extract_probs(preds)
279
  except Exception as e: print(f"Oracle Error: {e}")
280
 
281
+ # --- B. SNIPER BATCHING ---
 
282
  if sniper_models:
283
  try:
284
+ features = []
285
  for col in getattr(self.proc.sniper, 'feature_names', []):
286
+ if col in fast_1m: features.append(fast_1m[col][final_valid_indices])
287
+ elif col == 'L_score': features.append(fast_1m.get('vol_zscore_50', np.zeros(len(fast_1m['close'])))[final_valid_indices])
288
+ else: features.append(np.zeros(num_candidates))
 
 
289
 
290
+ X_sniper = np.column_stack(features)
291
+ preds_list = []
292
  for m in sniper_models:
293
+ raw = m.predict(X_sniper)
294
+ preds_list.append(self._extract_probs(raw))
295
+ res_sniper = np.mean(preds_list, axis=0)
 
 
 
 
 
 
296
  except Exception as e: print(f"Sniper Error: {e}")
297
 
298
+ # --- C. HYDRA BATCHING (Optimized Loop) ---
 
 
 
299
  if hydra_models and global_hydra_static is not None:
300
  chunk_size = 5000
301
  for i in range(0, num_candidates, chunk_size):
302
+ # Indices inside 'final_valid_indices'
303
+ chunk_range = range(i, min(i + chunk_size, num_candidates))
304
+ global_indices = final_valid_indices[chunk_range]
305
 
306
+ batch_X = []
307
+ for idx in global_indices:
308
  start = idx + 1; end = start + 240
309
  sl_static = global_hydra_static[start:end]
310
+
311
  entry_p = fast_1m['close'][idx]
312
  sl_close = sl_static[:, 6]; sl_atr = sl_static[:, 5]
313
  sl_dist = np.maximum(1.5 * sl_atr, entry_p * 0.015)
 
316
  sl_max_pnl_r = (sl_cum_max - entry_p) / sl_dist
317
  sl_atr_pct = sl_atr / sl_close
318
  zeros = np.zeros(240); ones = np.ones(240)
319
+
320
+ # 16 Features exact
321
+ batch_X.append(np.column_stack([
322
+ sl_static[:, 0], sl_static[:, 1], sl_static[:, 2], # 3 RSIs
323
+ sl_static[:, 3], sl_static[:, 4], # BB, Vol
324
+ zeros, sl_atr_pct, sl_norm_pnl, sl_max_pnl_r, # 4 dynamics
325
+ zeros, zeros, time_vec, # 3 static
326
+ zeros, ones*0.6, ones*0.7, ones*3.0 # 4 placeholders
327
+ ]))
328
 
329
  if batch_X:
330
  try:
331
+ big_X = np.array(batch_X) # (B, 240, 16)
332
+ # Flatten for model if needed (Assuming Hydra takes 2D)
333
+ # NOTE: Verify if Hydra takes 3D or 2D. Assuming 2D stacked:
334
  big_X_flat = big_X.reshape(-1, big_X.shape[-1])
335
  preds_flat = hydra_models['crash'].predict_proba(big_X_flat)[:, 1]
336
  preds_batch = preds_flat.reshape(len(batch_X), 240)
337
+
338
+ max_risks = np.max(preds_batch, axis=1)
339
  over_thresh = preds_batch > 0.6
340
  has_crash = over_thresh.any(axis=1)
341
  crash_times_rel = np.argmax(over_thresh, axis=1)
342
+
343
+ # Direct Assignment by Slice
344
+ res_hydra_risk[chunk_range] = max_risks
345
+
346
+ # Calculate absolute times
347
+ crash_abs_times = np.zeros(len(batch_X), dtype=np.int64)
348
+ for j, has in enumerate(has_crash):
349
+ if has:
350
+ t_idx = global_indices[j] + 1 + crash_times_rel[j]
351
+ crash_abs_times[j] = fast_1m['timestamp'][t_idx]
352
+ res_hydra_time[chunk_range] = crash_abs_times
353
+
354
  except Exception: pass
355
 
356
+ # --- D. LEGACY MAPPING ---
 
 
357
  if legacy_v2:
358
+ # Vectorized Look-ahead max? Hard. Loop is safest for correctness.
359
+ # Optimized scalar loop
360
+ for i, idx in enumerate(final_valid_indices):
361
  start = idx + 1
362
  if start + 240 < len(global_v2_probs):
363
+ # We can't vector slice variable windows efficiently in numpy without stride tricks
364
+ # Simple loop is fine for 1D array
365
+ res_legacy_risk[i] = np.max(global_v2_probs[start : start + 240])
366
+
367
+ # 📊 MANDATORY DIAGNOSTICS
368
+ print(f" 📊 [Stats] Oracle: Min={res_oracle.min():.2f} Max={res_oracle.max():.2f} Mean={res_oracle.mean():.2f}")
369
+ print(f" 📊 [Stats] Sniper: Min={res_sniper.min():.2f} Max={res_sniper.max():.2f} Mean={res_sniper.mean():.2f}")
370
+ print(f" 📊 [Stats] L1 Score: Min={fast_1m['l1_score'][final_valid_indices].min():.1f} Max={fast_1m['l1_score'][final_valid_indices].max():.1f}")
371
+
372
+ # --- E. CONSTRUCT DF ---
373
+ ai_df = pd.DataFrame({
374
+ 'timestamp': fast_1m['timestamp'][final_valid_indices],
375
+ 'symbol': sym,
376
+ 'close': fast_1m['close'][final_valid_indices],
377
+ 'real_titan': np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95),
378
+ 'oracle_conf': res_oracle,
379
+ 'sniper_score': res_sniper,
380
+ 'l1_score': fast_1m['l1_score'][final_valid_indices],
381
+ 'risk_hydra_crash': res_hydra_risk,
382
+ 'time_hydra_crash': res_hydra_time,
383
+ 'risk_legacy_v2': res_legacy_risk,
384
+ 'time_legacy_panic': np.zeros(num_candidates, dtype=int) # Placeholder
385
+ })
386
+
387
+ dt = time.time() - t0
388
+ if not ai_df.empty:
389
+ ai_df.to_pickle(scores_file)
390
+ print(f" ✅ [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
391
 
392
  del frames, fast_1m, numpy_htf, global_v2_probs, global_hydra_static
393
  gc.collect()
 
422
  global_df = pd.concat(all_data)
423
  global_df.sort_values('timestamp', inplace=True)
424
 
425
+ # Arrays
426
  arr_ts = global_df['timestamp'].values
427
  arr_close = global_df['close'].values.astype(np.float64)
428
  arr_symbol = global_df['symbol'].values
 
522
  await self.generate_truth_data()
523
 
524
  d = self.GRID_DENSITY
525
+ # Lowered Floors to Catch Signals
526
+ oracle_range = np.linspace(0.40, 0.8, d).tolist() # Lowered floor to 0.40
527
+ sniper_range = np.linspace(0.30, 0.7, d).tolist() # Lowered floor to 0.30
528
  hydra_range = np.linspace(0.70, 0.95, d).tolist()
529
  l1_range = [10.0, 15.0, 20.0, 25.0]
530
  titan_range = [0.4, 0.6]
 
565
  return best['config'], best
566
 
567
  async def run_strategic_optimization_task():
568
+ print("\n🧪 [STRATEGIC BACKTEST] Synchronized Integrity Mode...")
569
  r2 = R2Service()
570
  dm = DataManager(None, None, r2)
571
  proc = MLProcessor(dm)