Riy777 commited on
Commit
c6cfba4
·
verified ·
1 Parent(s): e3314ff

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +39 -235
backtest_engine.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V81.1 - GEM-Architect: Import Fixed)
3
  # ============================================================
4
 
5
  import asyncio
@@ -17,7 +17,6 @@ from typing import Dict, Any, List
17
  try:
18
  from ml_engine.processor import MLProcessor, SystemLimits
19
  from ml_engine.data_manager import DataManager
20
- # ✅ التصحيح هنا: إضافة AdaptiveHub للاستيراد
21
  from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
22
  from r2 import R2Service
23
  except ImportError:
@@ -53,7 +52,7 @@ class HeavyDutyBacktester:
53
  self.force_end_date = None
54
 
55
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
56
- print(f"🧪 [Backtest V81.1] Time Lord Mode Ready (Import Fixed).")
57
 
58
  def set_date_range(self, start_str, end_str):
59
  self.force_start_date = start_str
@@ -63,15 +62,21 @@ class HeavyDutyBacktester:
63
  if df.empty: return []
64
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
65
 
 
 
 
66
  async def _process_single_coin_task(self, sym, start_time_ms, end_time_ms):
67
  safe_sym = sym.replace('/', '_')
68
  period_suffix = f"{start_time_ms}_{end_time_ms}"
69
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
70
 
 
71
  if os.path.exists(scores_file):
 
72
  return True
73
 
74
- print(f" ⚙️ Simulating {sym}...", end="", flush=True)
 
75
 
76
  all_candles_1m = []
77
  df_1m = None
@@ -79,6 +84,8 @@ class HeavyDutyBacktester:
79
 
80
  try:
81
  current_since = start_time_ms
 
 
82
  while current_since < end_time_ms:
83
  try:
84
  batch = await asyncio.wait_for(
@@ -86,22 +93,38 @@ class HeavyDutyBacktester:
86
  timeout=10.0
87
  )
88
  except:
 
89
  await asyncio.sleep(1)
90
  continue
91
 
92
- if not batch: break
 
 
 
93
  last_ts = batch[-1][0]
94
  if last_ts <= current_since: break
 
95
  all_candles_1m.extend(batch)
96
  current_since = last_ts + 1
 
 
 
 
 
 
97
  await asyncio.sleep(0.01)
98
  if current_since >= end_time_ms: break
99
 
 
100
  all_candles_1m = [c for c in all_candles_1m if c[0] <= end_time_ms]
 
101
  if not all_candles_1m:
102
- print(" No Data.")
103
  return False
104
 
 
 
 
105
  df_1m = pd.DataFrame(all_candles_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
106
  cols = ['open', 'high', 'low', 'close', 'volume']
107
  df_1m[cols] = df_1m[cols].astype('float32')
@@ -124,6 +147,7 @@ class HeavyDutyBacktester:
124
  ai_results = []
125
  valid_indices = frames['5m'].index[500:]
126
 
 
127
  for t_idx in valid_indices:
128
  current_timestamp = int(t_idx.timestamp() * 1000)
129
 
@@ -174,15 +198,17 @@ class HeavyDutyBacktester:
174
  'l1_score': l1_score
175
  })
176
 
 
177
  if ai_results:
178
  pd.DataFrame(ai_results).to_pickle(scores_file)
179
- print(f" Saved ({len(ai_results)}).")
180
  else:
181
- print(" ⚠️ No signals.")
 
182
  return True
183
 
184
  except Exception as e:
185
- print(f" Error: {e}")
186
  return False
187
 
188
  finally:
@@ -191,232 +217,10 @@ class HeavyDutyBacktester:
191
  del frames
192
  gc.collect()
193
 
 
 
 
194
  async def generate_truth_data(self):
195
  if self.force_start_date and self.force_end_date:
196
  dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
197
- dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
198
- start_time_ms = int(dt_start.timestamp() * 1000)
199
- end_time_ms = int(dt_end.timestamp() * 1000)
200
- print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
201
- else:
202
- return
203
-
204
- chunk_size = 4
205
- chunks = [self.TARGET_COINS[i:i + chunk_size] for i in range(0, len(self.TARGET_COINS), chunk_size)]
206
-
207
- for chunk_idx, chunk in enumerate(chunks):
208
- for sym in chunk:
209
- try:
210
- await asyncio.wait_for(
211
- self._process_single_coin_task(sym, start_time_ms, end_time_ms),
212
- timeout=300.0
213
- )
214
- except asyncio.TimeoutError:
215
- print(f" 💀 Killed {sym}. Moving on...")
216
- gc.collect()
217
- gc.collect()
218
- await asyncio.sleep(1.0)
219
-
220
- @staticmethod
221
- def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
222
- results = []
223
- all_data = []
224
-
225
- for fp in scores_files:
226
- try:
227
- df = pd.read_pickle(fp)
228
- if not df.empty: all_data.append(df)
229
- except: pass
230
-
231
- if not all_data: return []
232
-
233
- global_df = pd.concat(all_data)
234
- global_df.sort_values('timestamp', inplace=True)
235
- grouped_by_time = global_df.groupby('timestamp')
236
-
237
- for config in combinations_batch:
238
- wallet = { "balance": initial_capital, "allocated": 0.0, "positions": {}, "trades_history": [] }
239
-
240
- w_titan = config['w_titan']
241
- w_struct = config['w_struct']
242
- entry_thresh = config['thresh']
243
-
244
- for ts, group in grouped_by_time:
245
- active_symbols = list(wallet["positions"].keys())
246
- current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()}
247
-
248
- for sym in active_symbols:
249
- if sym in current_prices:
250
- curr_p = current_prices[sym]
251
- pos = wallet["positions"][sym]
252
- entry_p = pos['entry_price']
253
- pct_change = (curr_p - entry_p) / entry_p
254
-
255
- if pct_change >= 0.03 or pct_change <= -0.02:
256
- gross_pnl = pos['size_usd'] * pct_change
257
- fees = pos['size_usd'] * fees_pct * 2
258
- net_pnl = gross_pnl - fees
259
- wallet["allocated"] -= pos['size_usd']
260
- wallet["balance"] += net_pnl
261
- del wallet["positions"][sym]
262
- wallet["trades_history"].append({'pnl': net_pnl})
263
-
264
- if len(wallet["positions"]) < max_slots:
265
- free_capital = wallet["balance"] - wallet["allocated"]
266
- slots_left = max_slots - len(wallet["positions"])
267
-
268
- if slots_left > 0 and free_capital > 2.0:
269
- position_size = wallet["balance"] / max_slots
270
- if wallet["balance"] < 20.0: position_size = free_capital / slots_left
271
- position_size = min(position_size, free_capital)
272
-
273
- for _, row in group.iterrows():
274
- sym = row['symbol']
275
- if sym in wallet["positions"]: continue
276
-
277
- sig_type = row['signal_type']
278
- l1_raw_score = row['l1_score']
279
- real_titan = row['real_titan']
280
-
281
- norm_struct = 0.0
282
- if sig_type == 'BREAKOUT': norm_struct = min(1.0, l1_raw_score / 3.0)
283
- elif sig_type == 'REVERSAL': norm_struct = l1_raw_score / 100.0
284
-
285
- score = 0.0
286
- if (w_titan + w_struct) > 0:
287
- score = ((real_titan * w_titan) + (norm_struct * w_struct)) / (w_titan + w_struct)
288
-
289
- if score >= entry_thresh:
290
- wallet["positions"][sym] = {'entry_price': row['close'], 'size_usd': position_size}
291
- wallet["allocated"] += position_size
292
- if len(wallet["positions"]) >= max_slots: break
293
-
294
- if wallet["balance"] < 1.0 and len(wallet["positions"]) == 0: break
295
-
296
- trades = wallet["trades_history"]
297
- if trades:
298
- net_profit = wallet["balance"] - initial_capital
299
- pnls = [t['pnl'] for t in trades]
300
- wins = [p for p in pnls if p > 0]
301
- losses = [p for p in pnls if p <= 0]
302
- win_rate = (len(wins) / len(trades)) * 100
303
-
304
- results.append({
305
- 'config': config,
306
- 'final_balance': wallet["balance"],
307
- 'net_profit': net_profit,
308
- 'total_trades': len(trades),
309
- 'win_rate': win_rate,
310
- })
311
- else:
312
- results.append({'config': config, 'final_balance': initial_capital, 'net_profit': 0.0, 'total_trades': 0, 'win_rate': 0.0})
313
-
314
- return results
315
-
316
- async def run_optimization(self, target_regime="RANGE"):
317
- await self.generate_truth_data()
318
-
319
- start_ts = int(datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() * 1000)
320
- end_ts = int(datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() * 1000)
321
- period_id = f"{start_ts}_{end_ts}"
322
-
323
- current_period_files = []
324
- for f in os.listdir(CACHE_DIR):
325
- if f.endswith('_scores.pkl') and period_id in f:
326
- current_period_files.append(os.path.join(CACHE_DIR, f))
327
-
328
- if not current_period_files:
329
- print(f"❌ No signals for {target_regime}.")
330
- return None
331
-
332
- print(f"\n🧩 [Phase 2] Optimizing for {target_regime}...")
333
- print(f" 💰 Start Capital: ${self.INITIAL_CAPITAL}")
334
-
335
- w_titan_range = np.linspace(0.4, 0.9, num=self.GRID_DENSITY)
336
- w_struct_range = np.linspace(0.1, 0.6, num=self.GRID_DENSITY)
337
- thresh_range = np.linspace(0.20, 0.90, num=self.GRID_DENSITY)
338
-
339
- combinations = []
340
- for wt, ws, th in itertools.product(w_titan_range, w_struct_range, thresh_range):
341
- if 0.9 <= (wt + ws) <= 1.1:
342
- combinations.append({'w_titan': round(wt, 2), 'w_struct': round(ws, 2), 'thresh': round(th, 2)})
343
-
344
- final_results = []
345
- batch_size = max(20, len(combinations) // (os.cpu_count() * 2))
346
- batches = [combinations[i:i+batch_size] for i in range(0, len(combinations), batch_size)]
347
-
348
- with concurrent.futures.ProcessPoolExecutor() as executor:
349
- futures = [executor.submit(self._worker_optimize, batch, current_period_files,
350
- self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
351
- for batch in batches]
352
- for future in concurrent.futures.as_completed(futures):
353
- try: final_results.extend(future.result())
354
- except Exception as e: print(f"Grid Error: {e}")
355
-
356
- if not final_results: return None
357
-
358
- best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
359
-
360
- print("\n" + "="*60)
361
- print(f"🏆 CHAMPION REPORT [{target_regime}]:")
362
- print(f" 📅 Period: {self.force_start_date} -> {self.force_end_date}")
363
- print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
364
- print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}")
365
- print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
366
- print(f" ⚙️ Config: Titan={best['config']['w_titan']} | Struct={best['config']['w_struct']} | Thresh={best['config']['thresh']}")
367
- print("="*60)
368
-
369
- return best['config']
370
-
371
- async def run_strategic_optimization_task():
372
- print("\n🧪 [STRATEGIC BACKTEST] Time Lord Initiated...")
373
- r2 = R2Service()
374
- dm = DataManager(None, None, r2)
375
- proc = MLProcessor(dm)
376
-
377
- await dm.initialize()
378
- await proc.initialize()
379
-
380
- try:
381
- hub = AdaptiveHub(r2)
382
- await hub.initialize()
383
-
384
- # 🔥🔥🔥 خريطة السيناريوهات الشاملة 🔥🔥🔥
385
- scenarios = [
386
- # 1. BULL: ETF Rally
387
- {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
388
-
389
- # 2. BEAR: Crash of '23
390
- {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
391
-
392
- # 3. DEAD: The Great Boredom
393
- {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
394
-
395
- # 4. RANGE: Recent Chop
396
- {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
397
- ]
398
-
399
- optimizer = HeavyDutyBacktester(dm, proc)
400
-
401
- for scen in scenarios:
402
- target = scen["regime"]
403
- optimizer.set_date_range(scen["start"], scen["end"])
404
-
405
- best_config = await optimizer.run_optimization(target_regime=target)
406
-
407
- if best_config and target in hub.strategies:
408
- print(f"💉 Injecting Optimized DNA into {target}...")
409
- st = hub.strategies[target]
410
- st.model_weights['titan'] = best_config['w_titan']
411
- st.model_weights['structure'] = best_config['w_struct']
412
- st.filters['l1_min_score'] = best_config['thresh']
413
-
414
- await hub._save_state_to_r2()
415
- hub._inject_current_parameters()
416
- print(f"✅ [System] ALL DNA Updated & Saved Successfully.")
417
-
418
- finally:
419
- await dm.close()
420
-
421
- if __name__ == "__main__":
422
- asyncio.run(run_strategic_optimization_task())
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V82.0 - GEM-Architect: Verbose Logger)
3
  # ============================================================
4
 
5
  import asyncio
 
17
  try:
18
  from ml_engine.processor import MLProcessor, SystemLimits
19
  from ml_engine.data_manager import DataManager
 
20
  from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
21
  from r2 import R2Service
22
  except ImportError:
 
52
  self.force_end_date = None
53
 
54
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
55
+ print(f"🧪 [Backtest V82.0] Verbose Logger Mode Ready.")
56
 
57
  def set_date_range(self, start_str, end_str):
58
  self.force_start_date = start_str
 
62
  if df.empty: return []
63
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
64
 
65
+ # ==============================================================
66
+ # 🧱 Core Logic: Single Coin Processor (VERBOSE)
67
+ # ==============================================================
68
  async def _process_single_coin_task(self, sym, start_time_ms, end_time_ms):
69
  safe_sym = sym.replace('/', '_')
70
  period_suffix = f"{start_time_ms}_{end_time_ms}"
71
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
72
 
73
+ # ✅ طباعة فورية إذا الملف موجود
74
  if os.path.exists(scores_file):
75
+ print(f" 📂 [{sym}] Data Exists -> Skipping fetch.")
76
  return True
77
 
78
+ print(f" [{sym}] Starting FETCH sequence...", flush=True)
79
+ t0 = time.time()
80
 
81
  all_candles_1m = []
82
  df_1m = None
 
84
 
85
  try:
86
  current_since = start_time_ms
87
+ fetch_count = 0
88
+
89
  while current_since < end_time_ms:
90
  try:
91
  batch = await asyncio.wait_for(
 
93
  timeout=10.0
94
  )
95
  except:
96
+ print(f" ⚠️ [{sym}] Net Retry...", flush=True)
97
  await asyncio.sleep(1)
98
  continue
99
 
100
+ if not batch:
101
+ print(f" ⚠️ [{sym}] No more data from exchange.", flush=True)
102
+ break
103
+
104
  last_ts = batch[-1][0]
105
  if last_ts <= current_since: break
106
+
107
  all_candles_1m.extend(batch)
108
  current_since = last_ts + 1
109
+
110
+ fetch_count += 1
111
+ # ✅ طباعة التقدم كل 5 دفعات (5000 شمعة) لطمأنة المستخدم
112
+ if fetch_count % 5 == 0:
113
+ print(f" -> [{sym}] Fetched {len(all_candles_1m)} candles...", flush=True)
114
+
115
  await asyncio.sleep(0.01)
116
  if current_since >= end_time_ms: break
117
 
118
+ # فلترة النطاق الزمني بدقة
119
  all_candles_1m = [c for c in all_candles_1m if c[0] <= end_time_ms]
120
+
121
  if not all_candles_1m:
122
+ print(f" [{sym}] FAILED: No data retrieved.")
123
  return False
124
 
125
+ print(f" ✅ [{sym}] Download Complete ({len(all_candles_1m)} candles). Processing...", flush=True)
126
+
127
+ # تحويل البيانات (Vectorization)
128
  df_1m = pd.DataFrame(all_candles_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
129
  cols = ['open', 'high', 'low', 'close', 'volume']
130
  df_1m[cols] = df_1m[cols].astype('float32')
 
147
  ai_results = []
148
  valid_indices = frames['5m'].index[500:]
149
 
150
+ # محاكاة
151
  for t_idx in valid_indices:
152
  current_timestamp = int(t_idx.timestamp() * 1000)
153
 
 
198
  'l1_score': l1_score
199
  })
200
 
201
+ dt = time.time() - t0
202
  if ai_results:
203
  pd.DataFrame(ai_results).to_pickle(scores_file)
204
+ print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Time: {dt:.1f}s)")
205
  else:
206
+ print(f" ⚠️ [{sym}] No signals found. (Time: {dt:.1f}s)")
207
+
208
  return True
209
 
210
  except Exception as e:
211
+ print(f" [{sym}] CRASH: {e}")
212
  return False
213
 
214
  finally:
 
217
  del frames
218
  gc.collect()
219
 
220
+ # ==============================================================
221
+ # PHASE 1: Main Loop
222
+ # ==============================================================
223
  async def generate_truth_data(self):
224
  if self.force_start_date and self.force_end_date:
225
  dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
226
+ dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").