Riy777 commited on
Commit
70421ff
·
verified ·
1 Parent(s): 669ecf2

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +106 -152
backtest_engine.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V86.0 - GEM-Architect: Parallel Integrity)
3
  # ============================================================
4
 
5
  import asyncio
@@ -10,6 +10,8 @@ import logging
10
  import itertools
11
  import os
12
  import gc
 
 
13
  import concurrent.futures
14
  from datetime import datetime, timezone
15
  from typing import Dict, Any, List
@@ -26,58 +28,77 @@ logging.getLogger('ml_engine').setLevel(logging.WARNING)
26
  CACHE_DIR = "backtest_real_scores"
27
 
28
  # ==============================================================================
29
- # 🚜 ISOLATED WORKER FUNCTION (Top-Level for Multiprocessing)
30
  # ==============================================================================
31
  def run_parallel_chunk(chunk_payload):
32
  """
33
- هذه الدالة تعمل داخل عملية (Process) منفصلة تماماً.
34
- تقوم بإنشاء نسختها الخاصة من DataManager و Processor لتجنب تضارب الذاكرة.
35
  """
36
  symbol, start_ms, end_ms, chunk_id = chunk_payload
37
 
38
- # 1. تهيئة بيئة معزولة (Isolated Environment)
39
- # نمرر None للخدمات غير الضرورية للمحاكاة (مثل R2 و WhaleMonitor) لتخفيف الحمل
40
- local_dm = DataManager(None, None, None)
41
- local_proc = MLProcessor(local_dm)
42
-
43
- # تشغيل التهيئة (Initialize) بشكل متزامن
44
- loop = asyncio.new_event_loop()
45
- asyncio.set_event_loop(loop)
46
 
 
 
47
  try:
48
- loop.run_until_complete(local_dm.initialize())
49
- loop.run_until_complete(local_proc.initialize())
 
 
 
 
50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  # إنشاء نسخة محلية من الباكتستر
52
  local_tester = HeavyDutyBacktester(local_dm, local_proc)
53
 
54
- # طباعة للتبع
55
  dt_start = datetime.fromtimestamp(start_ms/1000, tz=timezone.utc).strftime('%Y-%m-%d')
56
- print(f" [Core {chunk_id}] Processing: {dt_start}...", flush=True)
57
 
58
- # تشغيل المهمة مع لاحقة خاصة للملف (Suffix) لتجنب الكتابة فوق الملفات الأخرى
59
- # نضيف warmup (تحمية) 2000 دقيقة قبل البداية لضمان دقة المؤشرات
60
  warmup_ms = 2000 * 60 * 1000
61
  actual_fetch_start = start_ms - warmup_ms
62
 
63
  success = loop.run_until_complete(
64
  local_tester._process_single_coin_task(
65
  symbol,
66
- actual_fetch_start, # نبدأ الجلب مبكراً
67
  end_ms,
68
  chunk_suffix=f"_part{chunk_id}",
69
- analysis_start_ms=start_ms # لكن نبدأ التحليل وحفظ النتائج من هنا
 
70
  )
71
  )
72
 
 
73
  loop.run_until_complete(local_dm.close())
 
 
 
74
  return (chunk_id, success)
75
 
76
  except Exception as e:
77
- print(f" ❌ [Core {chunk_id}] Failed: {e}")
 
78
  return (chunk_id, False)
79
- finally:
80
- loop.close()
81
 
82
  # ==============================================================================
83
  # 🧠 Main Class
@@ -92,14 +113,12 @@ class HeavyDutyBacktester:
92
  self.TRADING_FEES = 0.001
93
  self.MAX_SLOTS = 4
94
 
95
- self.TARGET_COINS = [
96
- 'SOL/USDT']
97
 
98
  self.force_start_date = None
99
  self.force_end_date = None
100
 
101
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
102
- # print(f"🧪 [Backtest V86.0] Parallel Integrity Mode.")
103
 
104
  def set_date_range(self, start_str, end_str):
105
  self.force_start_date = start_str
@@ -110,38 +129,42 @@ class HeavyDutyBacktester:
110
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
111
 
112
  # ==============================================================
113
- # 🧱 Core Logic: Single Coin Processor (Modified for Chunks)
114
  # ==============================================================
115
- async def _process_single_coin_task(self, sym, start_time_ms, end_time_ms, chunk_suffix="", analysis_start_ms=None):
116
  safe_sym = sym.replace('/', '_')
117
- # إذا لم يتم تحديد analysis_start_ms، فهو نفسه start_time_ms (للتوافق مع الاستدعاء القديم)
118
  if analysis_start_ms is None: analysis_start_ms = start_time_ms
119
 
120
- # اسم الملف يعتمد على الفترة التحليلية الفعلية
121
  period_suffix = f"{analysis_start_ms}_{end_time_ms}{chunk_suffix}"
122
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
123
 
124
  if os.path.exists(scores_file):
125
- # print(f" 📂 [{sym}{chunk_suffix}] Exists.")
126
  return True
127
 
128
  t0 = time.time()
129
-
130
  all_candles_1m = []
131
  df_1m = None
132
  frames = {}
133
 
 
134
  try:
135
  current_since = start_time_ms
 
136
 
137
  while current_since < end_time_ms:
138
  try:
139
- batch = await asyncio.wait_for(
140
- self.dm.exchange.fetch_ohlcv(sym, '1m', since=current_since, limit=1000),
141
- timeout=10.0
142
- )
143
- except:
144
- await asyncio.sleep(1)
 
 
 
 
 
145
  continue
146
 
147
  if not batch: break
@@ -152,17 +175,20 @@ class HeavyDutyBacktester:
152
  all_candles_1m.extend(batch)
153
  current_since = last_ts + 1
154
 
155
- await asyncio.sleep(0.01)
 
 
156
  if current_since >= end_time_ms: break
157
 
158
- # تصفية البيانات التي تتجاوز الوقت النهائي (للدقة)
159
  all_candles_1m = [c for c in all_candles_1m if c[0] <= end_time_ms]
160
 
161
  if not all_candles_1m:
 
162
  return False
163
 
164
- # print(f" 📥 [{chunk_suffix}] Downloaded {len(all_candles_1m)} candles.", flush=True)
165
 
 
166
  df_1m = pd.DataFrame(all_candles_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
167
  cols = ['open', 'high', 'low', 'close', 'volume']
168
  df_1m[cols] = df_1m[cols].astype('float32')
@@ -171,7 +197,6 @@ class HeavyDutyBacktester:
171
  df_1m = df_1m.sort_index()
172
 
173
  agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
174
-
175
  df_1m_ready = df_1m.copy()
176
  df_1m_ready['timestamp'] = df_1m_ready.index.astype(np.int64) // 10**6
177
  frames['1m'] = df_1m_ready
@@ -183,21 +208,24 @@ class HeavyDutyBacktester:
183
  frames[tf_str] = resampled
184
 
185
  ai_results = []
186
-
187
- # 🔥 المنطق المهم: نبدأ التكرار فقط من analysis_start_ms (لتجاوز فترة التحمية)
188
  analysis_start_dt = pd.to_datetime(analysis_start_ms, unit='ms')
189
  valid_indices = frames['5m'].loc[analysis_start_dt:].index
190
 
 
 
 
 
191
  for t_idx in valid_indices:
192
- # تأكد أننا لا نتجاوز الوقت النهائي المطلوب لهذا الـ Chunk
193
  if t_idx.timestamp() * 1000 > end_time_ms: break
 
 
 
 
 
194
 
195
  current_timestamp = int(t_idx.timestamp() * 1000)
196
-
197
  ohlcv_data = {}
198
  try:
199
- # نستخدم loc للوصول للبيانات حتى اللحظة الحالية (t_idx)
200
- # هذا يضمن أن المؤشرات تحسب بناء على الماضي (بما في ذلك فترة التحمية)
201
  current_slice_1m = frames['1m'].loc[:t_idx]
202
  current_slice_5m = frames['5m'].loc[:t_idx]
203
  current_slice_15m = frames['15m'].loc[:t_idx]
@@ -253,14 +281,13 @@ class HeavyDutyBacktester:
253
  dt = time.time() - t0
254
  if ai_results:
255
  pd.DataFrame(ai_results).to_pickle(scores_file)
256
- # print(f" 💾 [{sym}{chunk_suffix}] Saved {len(ai_results)} signals. (Time: {dt:.1f}s)")
257
- else:
258
- pass # Silent for empty chunks
259
 
260
  return True
261
 
262
  except Exception as e:
263
- print(f" ❌ [{sym}{chunk_suffix}] CRASH: {e}")
 
264
  return False
265
 
266
  finally:
@@ -270,7 +297,7 @@ class HeavyDutyBacktester:
270
  gc.collect()
271
 
272
  # ==============================================================
273
- # PHASE 1: Main Loop (Updated for Multiprocessing)
274
  # ==============================================================
275
  async def generate_truth_data(self):
276
  if self.force_start_date and self.force_end_date:
@@ -279,14 +306,13 @@ class HeavyDutyBacktester:
279
  start_time_ms = int(dt_start.timestamp() * 1000)
280
  end_time_ms = int(dt_end.timestamp() * 1000)
281
  print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
282
- print(f" 🚀 Turbo Mode: Engaging all CPU cores...")
283
  else:
284
  return
285
 
286
- # 1. تحديد عدد العمليات (Tasks)
287
- # نترك نواة واحدة للنظام
288
  cpu_count = os.cpu_count() or 4
289
- workers_count = max(1, cpu_count - 1)
 
290
 
291
  total_duration = end_time_ms - start_time_ms
292
  chunk_size = total_duration // workers_count
@@ -299,98 +325,76 @@ class HeavyDutyBacktester:
299
  print(f" 📂 [{sym}] Full Data Exists -> Skipping.")
300
  continue
301
 
302
- # إعداد المهام
303
  tasks_payload = []
304
  for i in range(workers_count):
305
  c_start = start_time_ms + (i * chunk_size)
306
  c_end = start_time_ms + ((i + 1) * chunk_size)
307
- if i == workers_count - 1: c_end = end_time_ms # التأكد من تغطية النهاية
308
-
309
  tasks_payload.append((sym, c_start, c_end, i))
310
 
311
  print(f" ⚡ Splitting {sym} into {workers_count} chunks...")
312
 
313
- # تشغيل الـ ProcessPoolExecutor
314
  loop = asyncio.get_running_loop()
315
  with concurrent.futures.ProcessPoolExecutor(max_workers=workers_count) as executor:
316
- # استخدام run_in_executor لتشغيل الدالة المتزامنة التي تدير العملية الفرعية
317
  futures = [loop.run_in_executor(executor, run_parallel_chunk, task) for task in tasks_payload]
318
  results = await asyncio.gather(*futures)
319
 
320
- # تجميع النتائج (Merge)
321
  print(f" 🧩 Merging results for {sym}...")
322
  all_dfs = []
323
  for chunk_id, success in results:
324
  if not success: continue
325
- # استنتاج اسم الملف الجزئي
326
- task = tasks_payload[chunk_id] # (sym, start, end, id)
327
- part_start = task[1]
328
- part_end = task[2]
329
  part_file = f"{CACHE_DIR}/{safe_sym}_{part_start}_{part_end}_part{chunk_id}_scores.pkl"
330
 
331
  if os.path.exists(part_file):
332
  try:
333
  df_part = pd.read_pickle(part_file)
334
- if not df_part.empty:
335
- all_dfs.append(df_part)
336
- # تنظيف الملفات الجزئية
337
  os.remove(part_file)
338
  except Exception as e:
339
  print(f" ⚠️ Merge Error (Part {chunk_id}): {e}")
340
 
341
  if all_dfs:
342
- final_df = pd.concat(all_dfs)
343
- # إزالة التكرارات وترتيب زمني
344
- final_df = final_df.drop_duplicates(subset=['timestamp']).sort_values('timestamp')
345
  final_df.to_pickle(final_full_file)
346
  print(f" 💾 [{sym}] FINAL SAVE: {len(final_df)} signals.")
347
  else:
348
- print(f" ⚠️ [{sym}] No signals generated in any chunk.")
349
 
350
  gc.collect()
351
 
352
  # ==============================================================
353
- # PHASE 2: Portfolio Digital Twin Engine (EXACT COPY)
354
  # ==============================================================
355
  @staticmethod
356
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
357
  results = []
358
  all_data = []
359
-
360
  for fp in scores_files:
361
  try:
362
  df = pd.read_pickle(fp)
363
  if not df.empty: all_data.append(df)
364
  except: pass
365
-
366
  if not all_data: return []
367
-
368
- global_df = pd.concat(all_data)
369
- global_df.sort_values('timestamp', inplace=True)
370
  grouped_by_time = global_df.groupby('timestamp')
371
 
372
  for config in combinations_batch:
373
  wallet = { "balance": initial_capital, "allocated": 0.0, "positions": {}, "trades_history": [] }
374
- w_titan = config['w_titan']
375
- w_struct = config['w_struct']
376
- entry_thresh = config['thresh']
377
-
378
- # Tracking Drawdown
379
- peak_balance = initial_capital
380
- max_drawdown = 0.0
381
 
382
  for ts, group in grouped_by_time:
383
  active_symbols = list(wallet["positions"].keys())
384
  current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()}
385
-
386
- # Check Exits
387
  for sym in active_symbols:
388
  if sym in current_prices:
389
  curr_p = current_prices[sym]
390
  pos = wallet["positions"][sym]
391
  entry_p = pos['entry_price']
392
  pct_change = (curr_p - entry_p) / entry_p
393
-
394
  if pct_change >= 0.03 or pct_change <= -0.02:
395
  gross_pnl = pos['size_usd'] * pct_change
396
  fees = pos['size_usd'] * fees_pct * 2
@@ -399,57 +403,45 @@ class HeavyDutyBacktester:
399
  wallet["balance"] += net_pnl
400
  del wallet["positions"][sym]
401
  wallet["trades_history"].append({'pnl': net_pnl})
402
-
403
- current_total_equity = wallet["balance"]
404
  if current_total_equity > peak_balance: peak_balance = current_total_equity
405
  dd = (peak_balance - current_total_equity) / peak_balance
406
  if dd > max_drawdown: max_drawdown = dd
407
 
408
- # Check Entries
409
  if len(wallet["positions"]) < max_slots:
410
- free_capital = wallet["balance"] - wallet["allocated"]
411
  slots_left = max_slots - len(wallet["positions"])
412
-
413
  if slots_left > 0 and free_capital > 2.0:
414
  position_size = wallet["balance"] / max_slots
415
  if wallet["balance"] < 20.0: position_size = free_capital / slots_left
416
  position_size = min(position_size, free_capital)
417
-
418
  for _, row in group.iterrows():
419
  sym = row['symbol']
420
  if sym in wallet["positions"]: continue
421
-
422
  sig_type = row['signal_type']
423
  l1_raw_score = row['l1_score']
424
  real_titan = row['real_titan']
425
-
426
  norm_struct = 0.0
427
  if sig_type == 'BREAKOUT': norm_struct = min(1.0, l1_raw_score / 3.0)
428
  elif sig_type == 'REVERSAL': norm_struct = l1_raw_score / 100.0
429
-
430
  score = 0.0
431
  if (w_titan + w_struct) > 0:
432
  score = ((real_titan * w_titan) + (norm_struct * w_struct)) / (w_titan + w_struct)
433
-
434
  if score >= entry_thresh:
435
  wallet["positions"][sym] = {'entry_price': row['close'], 'size_usd': position_size}
436
  wallet["allocated"] += position_size
 
437
  if len(wallet["positions"]) >= max_slots: break
438
-
439
  if wallet["balance"] < 1.0 and len(wallet["positions"]) == 0: break
440
 
441
  trades = wallet["trades_history"]
442
  if trades:
443
- net_profit = wallet["balance"] - initial_capital
444
  pnls = [t['pnl'] for t in trades]
445
- wins = [p for p in pnls if p > 0]
446
- losses = [p for p in pnls if p <= 0]
447
-
448
- win_count = len(wins); loss_count = len(losses)
449
- total_trades = len(trades)
450
- win_rate = (win_count / total_trades) * 100 if total_trades > 0 else 0
451
- max_single_win = max(pnls) if pnls else 0.0
452
- max_single_loss = min(pnls) if pnls else 0.0
453
 
454
  current_win_streak = 0; max_win_streak = 0
455
  current_loss_streak = 0; max_loss_streak = 0
@@ -462,12 +454,9 @@ class HeavyDutyBacktester:
462
  if current_loss_streak > max_loss_streak: max_loss_streak = current_loss_streak
463
 
464
  results.append({
465
- 'config': config,
466
- 'final_balance': wallet["balance"],
467
- 'net_profit': net_profit,
468
- 'total_trades': total_trades,
469
- 'win_count': win_count, 'loss_count': loss_count,
470
- 'win_rate': win_rate,
471
  'max_single_win': max_single_win, 'max_single_loss': max_single_loss,
472
  'max_win_streak': max_win_streak, 'max_loss_streak': max_loss_streak,
473
  'max_drawdown': max_drawdown * 100
@@ -479,7 +468,6 @@ class HeavyDutyBacktester:
479
  'max_single_win': 0.0, 'max_single_loss': 0.0, 'max_win_streak': 0,
480
  'max_loss_streak': 0, 'max_drawdown': 0.0
481
  })
482
-
483
  return results
484
 
485
  async def run_optimization(self, target_regime="RANGE"):
@@ -489,19 +477,16 @@ class HeavyDutyBacktester:
489
  end_ts = int(datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() * 1000)
490
  period_id = f"{start_ts}_{end_ts}"
491
 
492
- # البحث عن الملف النهائي الكامل (وليس الأجزاء partX)
493
  current_period_files = []
494
  for f in os.listdir(CACHE_DIR):
495
  if f.endswith('_scores.pkl') and period_id in f and "_part" not in f:
496
  current_period_files.append(os.path.join(CACHE_DIR, f))
497
 
498
  if not current_period_files:
499
- print(f"❌ No signals for {target_regime}.")
500
  return None, None
501
 
502
  print(f"\n🧩 [Phase 2] Optimizing for {target_regime}...")
503
- print(f" 💰 Start Capital: ${self.INITIAL_CAPITAL}")
504
-
505
  w_titan_range = np.linspace(0.4, 0.9, num=self.GRID_DENSITY)
506
  w_struct_range = np.linspace(0.1, 0.6, num=self.GRID_DENSITY)
507
  thresh_range = np.linspace(0.20, 0.60, num=self.GRID_DENSITY)
@@ -515,7 +500,6 @@ class HeavyDutyBacktester:
515
  batch_size = max(20, len(combinations) // (os.cpu_count() * 2))
516
  batches = [combinations[i:i+batch_size] for i in range(0, len(combinations), batch_size)]
517
 
518
- # المرحلة الثانية تستخدم ProcessPoolExecutor أيضاً
519
  with concurrent.futures.ProcessPoolExecutor() as executor:
520
  futures = [executor.submit(self._worker_optimize, batch, current_period_files,
521
  self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
@@ -525,74 +509,44 @@ class HeavyDutyBacktester:
525
  except Exception as e: print(f"Grid Error: {e}")
526
 
527
  if not final_results: return None, None
528
-
529
  best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
530
 
531
- # 🔥🔥🔥 Full Report (EXACTLY AS ORIGINAL) 🔥🔥🔥
532
  print("\n" + "="*60)
533
  print(f"🏆 CHAMPION REPORT [{target_regime}]:")
534
- print(f" 📅 Period: {self.force_start_date} -> {self.force_end_date}")
535
  print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
536
- print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}")
537
- print("-" * 60)
538
- print(f" 📊 Total Trades: {best['total_trades']}")
539
- print(f" ✅ Winning Trades: {best['win_count']}")
540
- print(f" ❌ Losing Trades: {best['loss_count']}")
541
  print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
542
- print("-" * 60)
543
- print(f" 🟢 Max Single Win: ${best['max_single_win']:.2f}")
544
- print(f" 🔴 Max Single Loss: ${best['max_single_loss']:.2f}")
545
- print(f" 🔥 Max Win Streak: {best['max_win_streak']} trades")
546
- print(f" 🧊 Max Loss Streak: {best['max_loss_streak']} trades")
547
- print(f" 📉 Max Drawdown: {best['max_drawdown']:.1f}%")
548
- print("-" * 60)
549
  print(f" ⚙️ Config: Titan={best['config']['w_titan']} | Struct={best['config']['w_struct']} | Thresh={best['config']['thresh']}")
550
  print("="*60)
551
-
552
  return best['config'], best
553
 
554
  async def run_strategic_optimization_task():
555
  print("\n🧪 [STRATEGIC BACKTEST] Time Lord Initiated (Parallel Turbo)...")
556
-
557
- # لا نقوم بتهيئة الـ dm/proc هنا بشكل كامل، لأن العمال (Workers) سيقومون بذلك
558
- # ولكن نحتاج لتمريرهم للكائن لغرض الهيكلة فقط
559
  r2 = R2Service()
560
  dm = DataManager(None, None, r2)
561
  proc = MLProcessor(dm)
562
-
563
- # لن نشغل تهيئة الـ dm هنا لأننا سنعتمد على العمال، لكن نحتاج لتهيئة r2 للـ hub
564
-
565
  try:
566
  hub = AdaptiveHub(r2)
567
  await hub.initialize()
568
-
569
  scenarios = [
570
  {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
571
  {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
572
  {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
573
  {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
574
  ]
575
-
576
  optimizer = HeavyDutyBacktester(dm, proc)
577
-
578
  for scen in scenarios:
579
  target = scen["regime"]
580
  optimizer.set_date_range(scen["start"], scen["end"])
581
-
582
  best_config, best_stats = await optimizer.run_optimization(target_regime=target)
583
-
584
  if best_config and best_stats:
585
  hub.submit_challenger(target, best_config, best_stats)
586
-
587
  await hub._save_state_to_r2()
588
  hub._inject_current_parameters()
589
  print(f"✅ [System] ALL DNA Updated & Saved Successfully.")
590
-
591
  finally:
592
  await dm.close()
593
 
594
  if __name__ == "__main__":
595
- # Windows/macOS support for multiprocessing
596
  import multiprocessing
597
  multiprocessing.freeze_support()
598
  asyncio.run(run_strategic_optimization_task())
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V86.1 - GEM-Architect: Debug Turbo)
3
  # ============================================================
4
 
5
  import asyncio
 
10
  import itertools
11
  import os
12
  import gc
13
+ import sys
14
+ import traceback
15
  import concurrent.futures
16
  from datetime import datetime, timezone
17
  from typing import Dict, Any, List
 
28
  CACHE_DIR = "backtest_real_scores"
29
 
30
  # ==============================================================================
31
+ # 🚜 ISOLATED WORKER FUNCTION (With Staggered Start & Debug Logs)
32
  # ==============================================================================
33
  def run_parallel_chunk(chunk_payload):
34
  """
35
+ عامل مستقل مع إقلاع متدرج لتخفيف الضغط على الشبكة والمعالج.
 
36
  """
37
  symbol, start_ms, end_ms, chunk_id = chunk_payload
38
 
39
+ # 1. الإقلاع المتدرج (Staggered Start)
40
+ # كل عامل ينتظر (chunk_id * 2) ثانية. هذا يمنع 15 عملية من ضرب الـ API في نفس اللحظة.
41
+ wait_time = chunk_id * 1.5
42
+ if chunk_id > 0:
43
+ time.sleep(wait_time)
 
 
 
44
 
45
+ print(f" ⚡ [Core {chunk_id}] Init... (Waited {wait_time}s)", flush=True)
46
+
47
  try:
48
+ # 2. تهيئة بيئة خفيفة
49
+ # نمرر None لتجنب الاتصالات الثقيلة غير الضرورية، لكن DataManager يحتاج لتهيئة
50
+ local_dm = DataManager(None, None, None)
51
+
52
+ # حيلة: لا نستدعي initialize الكاملة لـ DM إذا كانت تتصل بالإنترنت لجلب الأسواق
53
+ # فقط نهيئ http_client إذا لزم الأمر يدوياً داخل المهمة
54
 
55
+ # تهيئة المعالج
56
+ local_proc = MLProcessor(local_dm)
57
+
58
+ # تشغيل حلقة الأحداث الخاصة بهذه العملية
59
+ loop = asyncio.new_event_loop()
60
+ asyncio.set_event_loop(loop)
61
+
62
+ # تشغيل التهيئة
63
+ # ملاحظة: MLProcessor قد يستغرق وقتاً لتحميل النماذج
64
+ loop.run_until_complete(local_proc.initialize())
65
+ # local_dm.initialize() عادة يتصل بالمنصة، سنقوم بتهيئته يدوياً داخل المهمة لتجنب التعليق
66
+ if not local_dm.http_client:
67
+ import httpx
68
+ local_dm.http_client = httpx.AsyncClient(timeout=30.0)
69
+
70
  # إنشاء نسخة محلية من الباكتستر
71
  local_tester = HeavyDutyBacktester(local_dm, local_proc)
72
 
 
73
  dt_start = datetime.fromtimestamp(start_ms/1000, tz=timezone.utc).strftime('%Y-%m-%d')
74
+ print(f" 📥 [Core {chunk_id}] Downloading Data: {dt_start}...", flush=True)
75
 
76
+ # فترة التحمية
 
77
  warmup_ms = 2000 * 60 * 1000
78
  actual_fetch_start = start_ms - warmup_ms
79
 
80
  success = loop.run_until_complete(
81
  local_tester._process_single_coin_task(
82
  symbol,
83
+ actual_fetch_start,
84
  end_ms,
85
  chunk_suffix=f"_part{chunk_id}",
86
+ analysis_start_ms=start_ms,
87
+ worker_id=chunk_id
88
  )
89
  )
90
 
91
+ # تنظيف
92
  loop.run_until_complete(local_dm.close())
93
+ loop.close()
94
+
95
+ print(f" ✅ [Core {chunk_id}] Finished Chunk.", flush=True)
96
  return (chunk_id, success)
97
 
98
  except Exception as e:
99
+ print(f" ❌ [Core {chunk_id}] CRASHED: {e}", flush=True)
100
+ traceback.print_exc() # طباعة تفاصيل الخطأ
101
  return (chunk_id, False)
 
 
102
 
103
  # ==============================================================================
104
  # 🧠 Main Class
 
113
  self.TRADING_FEES = 0.001
114
  self.MAX_SLOTS = 4
115
 
116
+ self.TARGET_COINS = ['SOL/USDT']
 
117
 
118
  self.force_start_date = None
119
  self.force_end_date = None
120
 
121
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
 
122
 
123
  def set_date_range(self, start_str, end_str):
124
  self.force_start_date = start_str
 
129
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
130
 
131
  # ==============================================================
132
+ # 🧱 Core Logic: Single Coin Processor (With Progress Logs)
133
  # ==============================================================
134
+ async def _process_single_coin_task(self, sym, start_time_ms, end_time_ms, chunk_suffix="", analysis_start_ms=None, worker_id=0):
135
  safe_sym = sym.replace('/', '_')
 
136
  if analysis_start_ms is None: analysis_start_ms = start_time_ms
137
 
 
138
  period_suffix = f"{analysis_start_ms}_{end_time_ms}{chunk_suffix}"
139
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
140
 
141
  if os.path.exists(scores_file):
142
+ print(f" 📂 [Core {worker_id}] File Exists -> Skipping.", flush=True)
143
  return True
144
 
145
  t0 = time.time()
 
146
  all_candles_1m = []
147
  df_1m = None
148
  frames = {}
149
 
150
+ # 1. تنزيل البيانات (الحلقة التي كانت تتوقف)
151
  try:
152
  current_since = start_time_ms
153
+ req_count = 0
154
 
155
  while current_since < end_time_ms:
156
  try:
157
+ # إضافة مهلة وتكرار في حال الفشل
158
+ batch = await self.dm.exchange.fetch_ohlcv(sym, '1m', since=current_since, limit=1000)
159
+ req_count += 1
160
+
161
+ # طباعة تقدم كل 5 طلبات لنعرف أن العملية حية
162
+ if req_count % 5 == 0:
163
+ print(f" ⏳ [Core {worker_id}] Fetched batch {req_count}...", flush=True)
164
+
165
+ except Exception as net_err:
166
+ print(f" ⚠️ [Core {worker_id}] Network hiccup: {net_err}. Retrying in 5s...", flush=True)
167
+ await asyncio.sleep(5)
168
  continue
169
 
170
  if not batch: break
 
175
  all_candles_1m.extend(batch)
176
  current_since = last_ts + 1
177
 
178
+ # تخفيف الضغط قليلاً على الـ API
179
+ await asyncio.sleep(0.1)
180
+
181
  if current_since >= end_time_ms: break
182
 
 
183
  all_candles_1m = [c for c in all_candles_1m if c[0] <= end_time_ms]
184
 
185
  if not all_candles_1m:
186
+ print(f" ⚠️ [Core {worker_id}] No candles found.", flush=True)
187
  return False
188
 
189
+ print(f" ⚙️ [Core {worker_id}] Processing {len(all_candles_1m)} candles...", flush=True)
190
 
191
+ # معالجة البيانات (Pandas)
192
  df_1m = pd.DataFrame(all_candles_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
193
  cols = ['open', 'high', 'low', 'close', 'volume']
194
  df_1m[cols] = df_1m[cols].astype('float32')
 
197
  df_1m = df_1m.sort_index()
198
 
199
  agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
 
200
  df_1m_ready = df_1m.copy()
201
  df_1m_ready['timestamp'] = df_1m_ready.index.astype(np.int64) // 10**6
202
  frames['1m'] = df_1m_ready
 
208
  frames[tf_str] = resampled
209
 
210
  ai_results = []
 
 
211
  analysis_start_dt = pd.to_datetime(analysis_start_ms, unit='ms')
212
  valid_indices = frames['5m'].loc[analysis_start_dt:].index
213
 
214
+ # حلقة الذكاء الاصطناعي
215
+ total_steps = len(valid_indices)
216
+ step_count = 0
217
+
218
  for t_idx in valid_indices:
 
219
  if t_idx.timestamp() * 1000 > end_time_ms: break
220
+ step_count += 1
221
+
222
+ # طباعة تقدم المعالجة كل 20%
223
+ if step_count % max(1, (total_steps // 5)) == 0:
224
+ print(f" 🧠 [Core {worker_id}] AI Analyzing: {step_count}/{total_steps}...", flush=True)
225
 
226
  current_timestamp = int(t_idx.timestamp() * 1000)
 
227
  ohlcv_data = {}
228
  try:
 
 
229
  current_slice_1m = frames['1m'].loc[:t_idx]
230
  current_slice_5m = frames['5m'].loc[:t_idx]
231
  current_slice_15m = frames['15m'].loc[:t_idx]
 
281
  dt = time.time() - t0
282
  if ai_results:
283
  pd.DataFrame(ai_results).to_pickle(scores_file)
284
+ print(f" 💾 [Core {worker_id}] Saved {len(ai_results)} signals. (Time: {dt:.1f}s)", flush=True)
 
 
285
 
286
  return True
287
 
288
  except Exception as e:
289
+ print(f" ❌ [Core {worker_id}] CRASH: {e}", flush=True)
290
+ traceback.print_exc()
291
  return False
292
 
293
  finally:
 
297
  gc.collect()
298
 
299
  # ==============================================================
300
+ # PHASE 1: Main Loop
301
  # ==============================================================
302
  async def generate_truth_data(self):
303
  if self.force_start_date and self.force_end_date:
 
306
  start_time_ms = int(dt_start.timestamp() * 1000)
307
  end_time_ms = int(dt_end.timestamp() * 1000)
308
  print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
309
+ print(f" 🚀 Turbo Mode: Engaging all CPU cores (Staggered Start)...")
310
  else:
311
  return
312
 
 
 
313
  cpu_count = os.cpu_count() or 4
314
+ # نستخدم عدد عمال معقول لتجنب حظر الـ API (مثلاً 10 عمال كحد أقصى)
315
+ workers_count = min(10, max(1, cpu_count - 1))
316
 
317
  total_duration = end_time_ms - start_time_ms
318
  chunk_size = total_duration // workers_count
 
325
  print(f" 📂 [{sym}] Full Data Exists -> Skipping.")
326
  continue
327
 
 
328
  tasks_payload = []
329
  for i in range(workers_count):
330
  c_start = start_time_ms + (i * chunk_size)
331
  c_end = start_time_ms + ((i + 1) * chunk_size)
332
+ if i == workers_count - 1: c_end = end_time_ms
 
333
  tasks_payload.append((sym, c_start, c_end, i))
334
 
335
  print(f" ⚡ Splitting {sym} into {workers_count} chunks...")
336
 
 
337
  loop = asyncio.get_running_loop()
338
  with concurrent.futures.ProcessPoolExecutor(max_workers=workers_count) as executor:
 
339
  futures = [loop.run_in_executor(executor, run_parallel_chunk, task) for task in tasks_payload]
340
  results = await asyncio.gather(*futures)
341
 
 
342
  print(f" 🧩 Merging results for {sym}...")
343
  all_dfs = []
344
  for chunk_id, success in results:
345
  if not success: continue
346
+ task = tasks_payload[chunk_id]
347
+ part_start = task[1]; part_end = task[2]
 
 
348
  part_file = f"{CACHE_DIR}/{safe_sym}_{part_start}_{part_end}_part{chunk_id}_scores.pkl"
349
 
350
  if os.path.exists(part_file):
351
  try:
352
  df_part = pd.read_pickle(part_file)
353
+ if not df_part.empty: all_dfs.append(df_part)
 
 
354
  os.remove(part_file)
355
  except Exception as e:
356
  print(f" ⚠️ Merge Error (Part {chunk_id}): {e}")
357
 
358
  if all_dfs:
359
+ final_df = pd.concat(all_dfs).drop_duplicates(subset=['timestamp']).sort_values('timestamp')
 
 
360
  final_df.to_pickle(final_full_file)
361
  print(f" 💾 [{sym}] FINAL SAVE: {len(final_df)} signals.")
362
  else:
363
+ print(f" ⚠️ [{sym}] No signals generated.")
364
 
365
  gc.collect()
366
 
367
  # ==============================================================
368
+ # PHASE 2: Portfolio Digital Twin Engine (Unchanged Logic)
369
  # ==============================================================
370
  @staticmethod
371
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
372
  results = []
373
  all_data = []
 
374
  for fp in scores_files:
375
  try:
376
  df = pd.read_pickle(fp)
377
  if not df.empty: all_data.append(df)
378
  except: pass
 
379
  if not all_data: return []
380
+ global_df = pd.concat(all_data).drop_duplicates(subset=['timestamp']).sort_values('timestamp')
 
 
381
  grouped_by_time = global_df.groupby('timestamp')
382
 
383
  for config in combinations_batch:
384
  wallet = { "balance": initial_capital, "allocated": 0.0, "positions": {}, "trades_history": [] }
385
+ w_titan = config['w_titan']; w_struct = config['w_struct']; entry_thresh = config['thresh']
386
+ peak_balance = initial_capital; max_drawdown = 0.0
 
 
 
 
 
387
 
388
  for ts, group in grouped_by_time:
389
  active_symbols = list(wallet["positions"].keys())
390
  current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()}
391
+ # Exits
 
392
  for sym in active_symbols:
393
  if sym in current_prices:
394
  curr_p = current_prices[sym]
395
  pos = wallet["positions"][sym]
396
  entry_p = pos['entry_price']
397
  pct_change = (curr_p - entry_p) / entry_p
 
398
  if pct_change >= 0.03 or pct_change <= -0.02:
399
  gross_pnl = pos['size_usd'] * pct_change
400
  fees = pos['size_usd'] * fees_pct * 2
 
403
  wallet["balance"] += net_pnl
404
  del wallet["positions"][sym]
405
  wallet["trades_history"].append({'pnl': net_pnl})
406
+ # Entries
407
+ current_total_equity = wallet["balance"] + wallet["allocated"]
408
  if current_total_equity > peak_balance: peak_balance = current_total_equity
409
  dd = (peak_balance - current_total_equity) / peak_balance
410
  if dd > max_drawdown: max_drawdown = dd
411
 
 
412
  if len(wallet["positions"]) < max_slots:
413
+ free_capital = wallet["balance"]
414
  slots_left = max_slots - len(wallet["positions"])
 
415
  if slots_left > 0 and free_capital > 2.0:
416
  position_size = wallet["balance"] / max_slots
417
  if wallet["balance"] < 20.0: position_size = free_capital / slots_left
418
  position_size = min(position_size, free_capital)
 
419
  for _, row in group.iterrows():
420
  sym = row['symbol']
421
  if sym in wallet["positions"]: continue
 
422
  sig_type = row['signal_type']
423
  l1_raw_score = row['l1_score']
424
  real_titan = row['real_titan']
 
425
  norm_struct = 0.0
426
  if sig_type == 'BREAKOUT': norm_struct = min(1.0, l1_raw_score / 3.0)
427
  elif sig_type == 'REVERSAL': norm_struct = l1_raw_score / 100.0
 
428
  score = 0.0
429
  if (w_titan + w_struct) > 0:
430
  score = ((real_titan * w_titan) + (norm_struct * w_struct)) / (w_titan + w_struct)
 
431
  if score >= entry_thresh:
432
  wallet["positions"][sym] = {'entry_price': row['close'], 'size_usd': position_size}
433
  wallet["allocated"] += position_size
434
+ wallet["balance"] -= position_size
435
  if len(wallet["positions"]) >= max_slots: break
 
436
  if wallet["balance"] < 1.0 and len(wallet["positions"]) == 0: break
437
 
438
  trades = wallet["trades_history"]
439
  if trades:
440
+ net_profit = wallet["balance"] - initial_capital + wallet["allocated"]
441
  pnls = [t['pnl'] for t in trades]
442
+ win_count = len([p for p in pnls if p > 0]); loss_count = len([p for p in pnls if p <= 0])
443
+ win_rate = (win_count / len(trades)) * 100
444
+ max_single_win = max(pnls) if pnls else 0.0; max_single_loss = min(pnls) if pnls else 0.0
 
 
 
 
 
445
 
446
  current_win_streak = 0; max_win_streak = 0
447
  current_loss_streak = 0; max_loss_streak = 0
 
454
  if current_loss_streak > max_loss_streak: max_loss_streak = current_loss_streak
455
 
456
  results.append({
457
+ 'config': config, 'final_balance': wallet["balance"] + wallet["allocated"],
458
+ 'net_profit': net_profit, 'total_trades': len(trades),
459
+ 'win_count': win_count, 'loss_count': loss_count, 'win_rate': win_rate,
 
 
 
460
  'max_single_win': max_single_win, 'max_single_loss': max_single_loss,
461
  'max_win_streak': max_win_streak, 'max_loss_streak': max_loss_streak,
462
  'max_drawdown': max_drawdown * 100
 
468
  'max_single_win': 0.0, 'max_single_loss': 0.0, 'max_win_streak': 0,
469
  'max_loss_streak': 0, 'max_drawdown': 0.0
470
  })
 
471
  return results
472
 
473
  async def run_optimization(self, target_regime="RANGE"):
 
477
  end_ts = int(datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() * 1000)
478
  period_id = f"{start_ts}_{end_ts}"
479
 
 
480
  current_period_files = []
481
  for f in os.listdir(CACHE_DIR):
482
  if f.endswith('_scores.pkl') and period_id in f and "_part" not in f:
483
  current_period_files.append(os.path.join(CACHE_DIR, f))
484
 
485
  if not current_period_files:
486
+ print(f"❌ No combined signal data found for {target_regime}.")
487
  return None, None
488
 
489
  print(f"\n🧩 [Phase 2] Optimizing for {target_regime}...")
 
 
490
  w_titan_range = np.linspace(0.4, 0.9, num=self.GRID_DENSITY)
491
  w_struct_range = np.linspace(0.1, 0.6, num=self.GRID_DENSITY)
492
  thresh_range = np.linspace(0.20, 0.60, num=self.GRID_DENSITY)
 
500
  batch_size = max(20, len(combinations) // (os.cpu_count() * 2))
501
  batches = [combinations[i:i+batch_size] for i in range(0, len(combinations), batch_size)]
502
 
 
503
  with concurrent.futures.ProcessPoolExecutor() as executor:
504
  futures = [executor.submit(self._worker_optimize, batch, current_period_files,
505
  self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
 
509
  except Exception as e: print(f"Grid Error: {e}")
510
 
511
  if not final_results: return None, None
 
512
  best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
513
 
 
514
  print("\n" + "="*60)
515
  print(f"🏆 CHAMPION REPORT [{target_regime}]:")
 
516
  print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
 
 
 
 
 
517
  print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
 
 
 
 
 
 
 
518
  print(f" ⚙️ Config: Titan={best['config']['w_titan']} | Struct={best['config']['w_struct']} | Thresh={best['config']['thresh']}")
519
  print("="*60)
 
520
  return best['config'], best
521
 
522
  async def run_strategic_optimization_task():
523
  print("\n🧪 [STRATEGIC BACKTEST] Time Lord Initiated (Parallel Turbo)...")
 
 
 
524
  r2 = R2Service()
525
  dm = DataManager(None, None, r2)
526
  proc = MLProcessor(dm)
 
 
 
527
  try:
528
  hub = AdaptiveHub(r2)
529
  await hub.initialize()
 
530
  scenarios = [
531
  {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
532
  {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
533
  {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
534
  {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
535
  ]
 
536
  optimizer = HeavyDutyBacktester(dm, proc)
 
537
  for scen in scenarios:
538
  target = scen["regime"]
539
  optimizer.set_date_range(scen["start"], scen["end"])
 
540
  best_config, best_stats = await optimizer.run_optimization(target_regime=target)
 
541
  if best_config and best_stats:
542
  hub.submit_challenger(target, best_config, best_stats)
 
543
  await hub._save_state_to_r2()
544
  hub._inject_current_parameters()
545
  print(f"✅ [System] ALL DNA Updated & Saved Successfully.")
 
546
  finally:
547
  await dm.close()
548
 
549
  if __name__ == "__main__":
 
550
  import multiprocessing
551
  multiprocessing.freeze_support()
552
  asyncio.run(run_strategic_optimization_task())