Riy777 commited on
Commit
4f4b3b7
·
verified ·
1 Parent(s): 355c953

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +16 -51
backtest_engine.py CHANGED
@@ -1,9 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V88.0 - GEM-Architect: RAM-Burst Edition)
3
- # ============================================================
4
- # استراتيجية المعماري للمواصفات المحدودة (2 vCPU / 16GB RAM):
5
- # 1. Async I/O Burst: سحب البيانات بالتوازي لأن الشبكة لا تضغط المعالج.
6
- # 2. In-Memory Analysis: المعالجة تتم بعد اكتمال البيانات بالكامل.
7
  # ============================================================
8
 
9
  import asyncio
@@ -24,7 +20,7 @@ try:
24
  from ml_engine.data_manager import DataManager
25
  from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
26
  from r2 import R2Service
27
- import ccxt.async_support as ccxt # نستخدم النسخة الـ Async حصراً
28
  except ImportError:
29
  pass
30
 
@@ -44,7 +40,7 @@ class HeavyDutyBacktester:
44
  self.force_end_date = None
45
 
46
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
47
- print(f"🧪 [Backtest V88.0] RAM-Burst Edition (High Speed I/O).")
48
 
49
  def set_date_range(self, start_str, end_str):
50
  self.force_start_date = start_str
@@ -58,44 +54,30 @@ class HeavyDutyBacktester:
58
  # ⚡ FAST DATA DOWNLOADER (Async Burst)
59
  # ==============================================================
60
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
61
- """
62
- يقوم بتحميل كل البيانات دفعة واحدة باستخدام اتصالات متزامنة.
63
- يستغل الرام (16GB) لتخزين كل شيء قبل المعالجة.
64
- """
65
  print(f" ⚡ [Network] Burst-Downloading {sym} ({start_ms} -> {end_ms})...", flush=True)
66
 
67
- # تقسيم الفترة إلى دفعات (كل دفعة 1000 شمعة = 60000000 ميلي ثانية)
68
  limit = 1000
69
  duration_per_batch = limit * 60 * 1000
70
 
71
  tasks = []
72
  current = start_ms
73
-
74
- # إنشاء قائمة بالمهمات الزمنية
75
  while current < end_ms:
76
  tasks.append(current)
77
  current += duration_per_batch
78
 
79
  all_candles = []
80
  total_batches = len(tasks)
81
-
82
- # نستخدم Semaphore لمنع حظر الـ IP (مثلاً 10 اتصالات في نفس اللحظة)
83
  sem = asyncio.Semaphore(10)
84
 
85
  async def _fetch_batch(timestamp):
86
  async with sem:
87
- try:
88
- # محاولة 3 مرات في حال الفشل
89
- for _ in range(3):
90
- try:
91
- return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
92
- except Exception:
93
- await asyncio.sleep(1)
94
- return []
95
- except: return []
96
-
97
- # تشغيل التنزيل المتوازي
98
- # نقسم المهام إلى مجموعات (Chunks) لنظهر التقدم
99
  chunk_size = 20
100
  for i in range(0, len(tasks), chunk_size):
101
  chunk_tasks = tasks[i:i + chunk_size]
@@ -105,16 +87,12 @@ class HeavyDutyBacktester:
105
  for res in results:
106
  if res: all_candles.extend(res)
107
 
108
- # طباعة التقدم
109
  progress = min(100, int((i + chunk_size) / total_batches * 100))
110
  print(f" 📥 Downloaded {progress}%... (Total: {len(all_candles)} candles)", flush=True)
111
 
112
- # ترتيب وإزالة التكرار
113
  if not all_candles: return None
114
 
115
- # تصفية ما هو خارج النطاق بدقة
116
  filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
117
- # إزالة التكرارات بناءً على الوقت (المفتاح 0)
118
  seen = set()
119
  unique_candles = []
120
  for c in filtered:
@@ -122,7 +100,6 @@ class HeavyDutyBacktester:
122
  unique_candles.append(c)
123
  seen.add(c[0])
124
 
125
- # ترتيب نهائي
126
  unique_candles.sort(key=lambda x: x[0])
127
  return unique_candles
128
 
@@ -131,15 +108,17 @@ class HeavyDutyBacktester:
131
  # ==============================================================
132
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
133
  safe_sym = sym.replace('/', '_')
134
- period_suffix = f"{start_time_ms}_{end_time_ms}" # سيتم تعريفه لاحقاً
135
- # لكن هن�� سنستخدم معرف الفترة الممرر
136
  period_suffix = f"{start_ms}_{end_ms}"
137
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
138
 
 
 
 
 
139
  print(f" ⚙️ [CPU] Processing {len(candles)} candles from RAM...", flush=True)
140
  t0 = time.time()
141
 
142
- # تحويل سريع لـ Pandas
143
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
144
  cols = ['open', 'high', 'low', 'close', 'volume']
145
  df_1m[cols] = df_1m[cols].astype('float32')
@@ -147,7 +126,6 @@ class HeavyDutyBacktester:
147
  df_1m.set_index('datetime', inplace=True)
148
  df_1m = df_1m.sort_index()
149
 
150
- # Resampling
151
  frames = {}
152
  agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
153
  frames['1m'] = df_1m.copy()
@@ -159,14 +137,12 @@ class HeavyDutyBacktester:
159
 
160
  ai_results = []
161
 
162
- # نبدأ التحليل بعد فترة كافية للمؤشرات
163
  start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
164
  valid_indices = frames['5m'].loc[start_analysis_dt:].index
165
 
166
  total_steps = len(valid_indices)
167
  step_count = 0
168
 
169
- # حلقة المعالجة السريعة (بدون انتظار شبكة)
170
  for t_idx in valid_indices:
171
  step_count += 1
172
  if step_count % 2000 == 0:
@@ -175,7 +151,6 @@ class HeavyDutyBacktester:
175
 
176
  ohlcv_data = {}
177
  try:
178
- # Slicing from RAM is fast
179
  cutoff = t_idx
180
  ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500))
181
  ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200))
@@ -188,7 +163,6 @@ class HeavyDutyBacktester:
188
  if len(ohlcv_data['1h']) < 60: continue
189
  current_price = frames['5m'].loc[t_idx]['close']
190
 
191
- # L1 Logic
192
  logic_packet = {
193
  'symbol': sym,
194
  'ohlcv_1h': ohlcv_data['1h'][-60:],
@@ -206,7 +180,6 @@ class HeavyDutyBacktester:
206
  signal_type = logic_result.get('type', 'NONE')
207
  l1_score = logic_result.get('score', 0.0)
208
 
209
- # L2 AI Execution (Only on L1 Signals)
210
  real_titan = 0.5
211
  if signal_type in ['BREAKOUT', 'REVERSAL']:
212
  raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price}
@@ -248,14 +221,6 @@ class HeavyDutyBacktester:
248
  return
249
 
250
  for sym in self.TARGET_COINS:
251
- safe_sym = sym.replace('/', '_')
252
- period_suffix = f"{start_time_ms}_{end_time_ms}"
253
- scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
254
-
255
- if os.path.exists(scores_file):
256
- print(f" 📂 [{sym}] Data Exists -> Skipping.")
257
- continue
258
-
259
  # 1. Download Phase (Async Burst)
260
  candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
261
 
@@ -268,7 +233,7 @@ class HeavyDutyBacktester:
268
  gc.collect()
269
 
270
  # ==============================================================
271
- # PHASE 2: Portfolio Digital Twin Engine (Standard)
272
  # ==============================================================
273
  @staticmethod
274
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V88.1 - GEM-Architect: RAM-Burst Fix)
 
 
 
 
3
  # ============================================================
4
 
5
  import asyncio
 
20
  from ml_engine.data_manager import DataManager
21
  from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
22
  from r2 import R2Service
23
+ import ccxt.async_support as ccxt
24
  except ImportError:
25
  pass
26
 
 
40
  self.force_end_date = None
41
 
42
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
43
+ print(f"🧪 [Backtest V88.1] RAM-Burst Edition (Fix Applied).")
44
 
45
  def set_date_range(self, start_str, end_str):
46
  self.force_start_date = start_str
 
54
  # ⚡ FAST DATA DOWNLOADER (Async Burst)
55
  # ==============================================================
56
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
 
 
 
 
57
  print(f" ⚡ [Network] Burst-Downloading {sym} ({start_ms} -> {end_ms})...", flush=True)
58
 
 
59
  limit = 1000
60
  duration_per_batch = limit * 60 * 1000
61
 
62
  tasks = []
63
  current = start_ms
 
 
64
  while current < end_ms:
65
  tasks.append(current)
66
  current += duration_per_batch
67
 
68
  all_candles = []
69
  total_batches = len(tasks)
 
 
70
  sem = asyncio.Semaphore(10)
71
 
72
  async def _fetch_batch(timestamp):
73
  async with sem:
74
+ for _ in range(3):
75
+ try:
76
+ return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
77
+ except Exception:
78
+ await asyncio.sleep(1)
79
+ return []
80
+
 
 
 
 
 
81
  chunk_size = 20
82
  for i in range(0, len(tasks), chunk_size):
83
  chunk_tasks = tasks[i:i + chunk_size]
 
87
  for res in results:
88
  if res: all_candles.extend(res)
89
 
 
90
  progress = min(100, int((i + chunk_size) / total_batches * 100))
91
  print(f" 📥 Downloaded {progress}%... (Total: {len(all_candles)} candles)", flush=True)
92
 
 
93
  if not all_candles: return None
94
 
 
95
  filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
 
96
  seen = set()
97
  unique_candles = []
98
  for c in filtered:
 
100
  unique_candles.append(c)
101
  seen.add(c[0])
102
 
 
103
  unique_candles.sort(key=lambda x: x[0])
104
  return unique_candles
105
 
 
108
  # ==============================================================
109
  async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
110
  safe_sym = sym.replace('/', '_')
111
+ # FIX: Use passed arguments directly
 
112
  period_suffix = f"{start_ms}_{end_ms}"
113
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
114
 
115
+ if os.path.exists(scores_file):
116
+ print(f" 📂 [{sym}] Data Exists -> Skipping.")
117
+ return
118
+
119
  print(f" ⚙️ [CPU] Processing {len(candles)} candles from RAM...", flush=True)
120
  t0 = time.time()
121
 
 
122
  df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
123
  cols = ['open', 'high', 'low', 'close', 'volume']
124
  df_1m[cols] = df_1m[cols].astype('float32')
 
126
  df_1m.set_index('datetime', inplace=True)
127
  df_1m = df_1m.sort_index()
128
 
 
129
  frames = {}
130
  agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
131
  frames['1m'] = df_1m.copy()
 
137
 
138
  ai_results = []
139
 
 
140
  start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
141
  valid_indices = frames['5m'].loc[start_analysis_dt:].index
142
 
143
  total_steps = len(valid_indices)
144
  step_count = 0
145
 
 
146
  for t_idx in valid_indices:
147
  step_count += 1
148
  if step_count % 2000 == 0:
 
151
 
152
  ohlcv_data = {}
153
  try:
 
154
  cutoff = t_idx
155
  ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500))
156
  ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200))
 
163
  if len(ohlcv_data['1h']) < 60: continue
164
  current_price = frames['5m'].loc[t_idx]['close']
165
 
 
166
  logic_packet = {
167
  'symbol': sym,
168
  'ohlcv_1h': ohlcv_data['1h'][-60:],
 
180
  signal_type = logic_result.get('type', 'NONE')
181
  l1_score = logic_result.get('score', 0.0)
182
 
 
183
  real_titan = 0.5
184
  if signal_type in ['BREAKOUT', 'REVERSAL']:
185
  raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price}
 
221
  return
222
 
223
  for sym in self.TARGET_COINS:
 
 
 
 
 
 
 
 
224
  # 1. Download Phase (Async Burst)
225
  candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
226
 
 
233
  gc.collect()
234
 
235
  # ==============================================================
236
+ # PHASE 2: Portfolio Digital Twin Engine
237
  # ==============================================================
238
  @staticmethod
239
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):