Riy777 commited on
Commit
e490e3d
·
verified ·
1 Parent(s): 95f813e

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +66 -36
backtest_engine.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V88.1 - GEM-Architect: RAM-Burst Fix)
3
  # ============================================================
4
 
5
  import asyncio
@@ -35,12 +35,25 @@ class HeavyDutyBacktester:
35
  self.INITIAL_CAPITAL = 10.0
36
  self.TRADING_FEES = 0.001
37
  self.MAX_SLOTS = 4
38
- self.TARGET_COINS = ['SOL/USDT']
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  self.force_start_date = None
40
  self.force_end_date = None
41
 
42
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
43
- print(f"🧪 [Backtest V88.1] RAM-Burst Edition (Fix Applied).")
44
 
45
  def set_date_range(self, start_str, end_str):
46
  self.force_start_date = start_str
@@ -51,10 +64,10 @@ class HeavyDutyBacktester:
51
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
52
 
53
  # ==============================================================
54
- # ⚡ FAST DATA DOWNLOADER (Async Burst)
55
  # ==============================================================
56
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
57
- print(f" ⚡ [Network] Burst-Downloading {sym} ({start_ms} -> {end_ms})...", flush=True)
58
 
59
  limit = 1000
60
  duration_per_batch = limit * 60 * 1000
@@ -66,32 +79,31 @@ class HeavyDutyBacktester:
66
  current += duration_per_batch
67
 
68
  all_candles = []
69
- total_batches = len(tasks)
70
- sem = asyncio.Semaphore(10)
71
 
72
  async def _fetch_batch(timestamp):
73
  async with sem:
74
- for _ in range(3):
75
  try:
76
  return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
77
  except Exception:
78
  await asyncio.sleep(1)
79
  return []
80
 
 
81
  chunk_size = 20
82
  for i in range(0, len(tasks), chunk_size):
83
  chunk_tasks = tasks[i:i + chunk_size]
84
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
85
  results = await asyncio.gather(*futures)
86
-
87
  for res in results:
88
  if res: all_candles.extend(res)
89
 
90
- progress = min(100, int((i + chunk_size) / total_batches * 100))
91
- print(f" 📥 Downloaded {progress}%... (Total: {len(all_candles)} candles)", flush=True)
92
 
93
  if not all_candles: return None
94
 
 
95
  filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
96
  seen = set()
97
  unique_candles = []
@@ -101,14 +113,14 @@ class HeavyDutyBacktester:
101
  seen.add(c[0])
102
 
103
  unique_candles.sort(key=lambda x: x[0])
 
104
  return unique_candles
105
 
106
  # ==============================================================
107
- # 🧠 CPU PROCESSING (In-Memory)
108
  # ==============================================================
109
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
110
  safe_sym = sym.replace('/', '_')
111
- # ✅ FIX: Use passed arguments directly
112
  period_suffix = f"{start_ms}_{end_ms}"
113
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
114
 
@@ -116,7 +128,7 @@ class HeavyDutyBacktester:
116
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
117
  return
118
 
119
- print(f" ⚙️ [CPU] Processing {len(candles)} candles from RAM...", flush=True)
120
  t0 = time.time()
121
 
122
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
@@ -137,18 +149,13 @@ class HeavyDutyBacktester:
137
 
138
  ai_results = []
139
 
 
140
  start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
141
  valid_indices = frames['5m'].loc[start_analysis_dt:].index
142
 
143
- total_steps = len(valid_indices)
144
- step_count = 0
145
 
146
  for t_idx in valid_indices:
147
- step_count += 1
148
- if step_count % 2000 == 0:
149
- pct = int((step_count / total_steps) * 100)
150
- print(f" 🧠 AI Analysis: {pct}%...", flush=True)
151
-
152
  ohlcv_data = {}
153
  try:
154
  cutoff = t_idx
@@ -200,15 +207,15 @@ class HeavyDutyBacktester:
200
  dt = time.time() - t0
201
  if ai_results:
202
  pd.DataFrame(ai_results).to_pickle(scores_file)
203
- print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Compute Time: {dt:.1f}s)")
204
  else:
205
- print(f" ⚠️ [{sym}] No signals found.")
206
 
207
  del frames, df_1m, candles
208
  gc.collect()
209
 
210
  # ==============================================================
211
- # PHASE 1: Main Loop
212
  # ==============================================================
213
  async def generate_truth_data(self):
214
  if self.force_start_date and self.force_end_date:
@@ -216,24 +223,33 @@ class HeavyDutyBacktester:
216
  dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
217
  start_time_ms = int(dt_start.timestamp() * 1000)
218
  end_time_ms = int(dt_end.timestamp() * 1000)
219
- print(f"\n🚜 [Phase 1] Era: {self.force_start_date} -> {self.force_end_date}")
220
  else:
221
  return
222
 
223
  for sym in self.TARGET_COINS:
224
- # 1. Download Phase (Async Burst)
225
- candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
226
-
227
- if candles:
228
- # 2. Processing Phase (Sequential CPU)
229
- await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
230
- else:
231
- print(f" ❌ Failed to download data for {sym}")
232
-
 
 
 
 
 
 
 
 
 
233
  gc.collect()
234
 
235
  # ==============================================================
236
- # PHASE 2: Portfolio Digital Twin Engine
237
  # ==============================================================
238
  @staticmethod
239
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
@@ -374,16 +390,30 @@ class HeavyDutyBacktester:
374
  if not final_results: return None, None
375
  best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
376
 
 
377
  print("\n" + "="*60)
378
  print(f"🏆 CHAMPION REPORT [{target_regime}]:")
 
379
  print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
 
 
 
 
 
380
  print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
 
 
 
 
 
 
 
381
  print(f" ⚙️ Config: Titan={best['config']['w_titan']} | Struct={best['config']['w_struct']} | Thresh={best['config']['thresh']}")
382
  print("="*60)
383
  return best['config'], best
384
 
385
  async def run_strategic_optimization_task():
386
- print("\n🧪 [STRATEGIC BACKTEST] RAM-Burst Mode Initiated...")
387
  r2 = R2Service()
388
  dm = DataManager(None, None, r2)
389
  proc = MLProcessor(dm)
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V89.0 - GEM-Architect: Mass-Scale Edition)
3
  # ============================================================
4
 
5
  import asyncio
 
35
  self.INITIAL_CAPITAL = 10.0
36
  self.TRADING_FEES = 0.001
37
  self.MAX_SLOTS = 4
38
+
39
+ # ✅ القائمة الكاملة (50 عملة)
40
+ self.TARGET_COINS = [
41
+ 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT',
42
+ 'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT',
43
+ 'SEI/USDT', 'TIA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'PYTH/USDT',
44
+ 'WIF/USDT', 'PEPE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT',
45
+ 'ONDO/USDT', 'ENA/USDT', 'HBAR/USDT', 'XLM/USDT', 'TAO/USDT', 'ZK/USDT',
46
+ 'ZRO/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT',
47
+ 'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'JTO/USDT',
48
+ 'STRK/USDT', 'BLUR/USDT', 'ALT/USDT', 'JUP/USDT', 'PENDLE/USDT', 'ETHFI/USDT',
49
+ 'MEME/USDT', 'ATOM/USDT'
50
+ ]
51
+
52
  self.force_start_date = None
53
  self.force_end_date = None
54
 
55
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
56
+ print(f"🧪 [Backtest V89.0] Mass-Scale Edition (50+ Coins | Fault Tolerant).")
57
 
58
  def set_date_range(self, start_str, end_str):
59
  self.force_start_date = start_str
 
64
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
65
 
66
  # ==============================================================
67
+ # ⚡ FAST DATA DOWNLOADER (Silent Burst)
68
  # ==============================================================
69
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
70
+ print(f" ⚡ [Network] Downloading {sym}...", flush=True)
71
 
72
  limit = 1000
73
  duration_per_batch = limit * 60 * 1000
 
79
  current += duration_per_batch
80
 
81
  all_candles = []
82
+ sem = asyncio.Semaphore(10) # 10 اتصالات متزامنة
 
83
 
84
  async def _fetch_batch(timestamp):
85
  async with sem:
86
+ for _ in range(3): # 3 محاولات
87
  try:
88
  return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
89
  except Exception:
90
  await asyncio.sleep(1)
91
  return []
92
 
93
+ # تقسيم المهام وتشغيلها
94
  chunk_size = 20
95
  for i in range(0, len(tasks), chunk_size):
96
  chunk_tasks = tasks[i:i + chunk_size]
97
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
98
  results = await asyncio.gather(*futures)
 
99
  for res in results:
100
  if res: all_candles.extend(res)
101
 
102
+ # 🚫 تم حذف طباعة النسبة المئوية حسب الطلب
 
103
 
104
  if not all_candles: return None
105
 
106
+ # تنظيف البيانات
107
  filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
108
  seen = set()
109
  unique_candles = []
 
113
  seen.add(c[0])
114
 
115
  unique_candles.sort(key=lambda x: x[0])
116
+ print(f" ✅ Downloaded {len(unique_candles)} candles for {sym}.", flush=True)
117
  return unique_candles
118
 
119
  # ==============================================================
120
+ # 🧠 CPU PROCESSING (In-Memory & Silent)
121
  # ==============================================================
122
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
123
  safe_sym = sym.replace('/', '_')
 
124
  period_suffix = f"{start_ms}_{end_ms}"
125
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
126
 
 
128
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
129
  return
130
 
131
+ print(f" ⚙️ [CPU] Processing {sym}...", flush=True)
132
  t0 = time.time()
133
 
134
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
 
149
 
150
  ai_results = []
151
 
152
+ # نبدأ التحليل بعد فترة التحمية
153
  start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
154
  valid_indices = frames['5m'].loc[start_analysis_dt:].index
155
 
156
+ # 🚫 تم حذف حلقة طباعة التقدم %
 
157
 
158
  for t_idx in valid_indices:
 
 
 
 
 
159
  ohlcv_data = {}
160
  try:
161
  cutoff = t_idx
 
207
  dt = time.time() - t0
208
  if ai_results:
209
  pd.DataFrame(ai_results).to_pickle(scores_file)
210
+ print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Processed in {dt:.1f}s)", flush=True)
211
  else:
212
+ print(f" ⚠️ [{sym}] No signals found.", flush=True)
213
 
214
  del frames, df_1m, candles
215
  gc.collect()
216
 
217
  # ==============================================================
218
+ # PHASE 1: Main Loop (Fault Tolerant)
219
  # ==============================================================
220
  async def generate_truth_data(self):
221
  if self.force_start_date and self.force_end_date:
 
223
  dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
224
  start_time_ms = int(dt_start.timestamp() * 1000)
225
  end_time_ms = int(dt_end.timestamp() * 1000)
226
+ print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
227
  else:
228
  return
229
 
230
  for sym in self.TARGET_COINS:
231
+ # 🛡️ حماية شاملة: لن يتوقف السكربت أبداً بسبب عملة واحدة
232
+ try:
233
+ # 1. Download Phase
234
+ candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
235
+
236
+ if candles:
237
+ # 2. Processing Phase
238
+ await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
239
+ else:
240
+ print(f" ❌ Failed/Empty data for {sym}. Continuing...", flush=True)
241
+
242
+ except Exception as e:
243
+ # طباعة الخطأ والمتابعة فوراً
244
+ print(f" ❌ SKIP: Error processing {sym}: {e}", flush=True)
245
+ # traceback.print_exc() # يمكنك تفعيلها إذا أردت التفاصيل
246
+ continue
247
+
248
+ # تنظيف إضافي بعد كل عملة
249
  gc.collect()
250
 
251
  # ==============================================================
252
+ # PHASE 2: Portfolio Digital Twin Engine (Full Stats Restored)
253
  # ==============================================================
254
  @staticmethod
255
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
 
390
  if not final_results: return None, None
391
  best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
392
 
393
+ # ✅ الطباعة الكاملة للإحصائيات (كما في الملف الأصلي)
394
  print("\n" + "="*60)
395
  print(f"🏆 CHAMPION REPORT [{target_regime}]:")
396
+ print(f" 📅 Period: {self.force_start_date} -> {self.force_end_date}")
397
  print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
398
+ print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}")
399
+ print("-" * 60)
400
+ print(f" 📊 Total Trades: {best['total_trades']}")
401
+ print(f" ✅ Winning Trades: {best['win_count']}")
402
+ print(f" ❌ Losing Trades: {best['loss_count']}")
403
  print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
404
+ print("-" * 60)
405
+ print(f" 🟢 Max Single Win: ${best['max_single_win']:.2f}")
406
+ print(f" 🔴 Max Single Loss: ${best['max_single_loss']:.2f}")
407
+ print(f" 🔥 Max Win Streak: {best['max_win_streak']} trades")
408
+ print(f" 🧊 Max Loss Streak: {best['max_loss_streak']} trades")
409
+ print(f" 📉 Max Drawdown: {best['max_drawdown']:.1f}%")
410
+ print("-" * 60)
411
  print(f" ⚙️ Config: Titan={best['config']['w_titan']} | Struct={best['config']['w_struct']} | Thresh={best['config']['thresh']}")
412
  print("="*60)
413
  return best['config'], best
414
 
415
  async def run_strategic_optimization_task():
416
+ print("\n🧪 [STRATEGIC BACKTEST] Mass-Scale Edition Initiated...")
417
  r2 = R2Service()
418
  dm = DataManager(None, None, r2)
419
  proc = MLProcessor(dm)