Riy777 commited on
Commit
fd2bc94
·
verified ·
1 Parent(s): 4f4b3b7

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +188 -123
backtest_engine.py CHANGED
@@ -1,5 +1,10 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V88.1 - GEM-Architect: RAM-Burst Fix)
 
 
 
 
 
3
  # ============================================================
4
 
5
  import asyncio
@@ -12,6 +17,7 @@ import os
12
  import gc
13
  import sys
14
  import traceback
 
15
  from datetime import datetime, timezone
16
  from typing import Dict, Any, List
17
 
@@ -27,6 +33,127 @@ except ImportError:
27
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
28
  CACHE_DIR = "backtest_real_scores"
29
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  class HeavyDutyBacktester:
31
  def __init__(self, data_manager, processor):
32
  self.dm = data_manager
@@ -40,25 +167,19 @@ class HeavyDutyBacktester:
40
  self.force_end_date = None
41
 
42
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
43
- print(f"🧪 [Backtest V88.1] RAM-Burst Edition (Fix Applied).")
44
 
45
  def set_date_range(self, start_str, end_str):
46
  self.force_start_date = start_str
47
  self.force_end_date = end_str
48
 
49
- def df_to_list(self, df):
50
- if df.empty: return []
51
- return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
52
-
53
  # ==============================================================
54
- # ⚡ FAST DATA DOWNLOADER (Async Burst)
55
  # ==============================================================
56
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
57
- print(f" ⚡ [Network] Burst-Downloading {sym} ({start_ms} -> {end_ms})...", flush=True)
58
-
59
  limit = 1000
60
  duration_per_batch = limit * 60 * 1000
61
-
62
  tasks = []
63
  current = start_ms
64
  while current < end_ms:
@@ -66,145 +187,87 @@ class HeavyDutyBacktester:
66
  current += duration_per_batch
67
 
68
  all_candles = []
69
- total_batches = len(tasks)
70
- sem = asyncio.Semaphore(10)
71
 
72
  async def _fetch_batch(timestamp):
73
  async with sem:
74
  for _ in range(3):
75
- try:
76
- return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
77
- except Exception:
78
- await asyncio.sleep(1)
79
  return []
80
 
81
- chunk_size = 20
82
  for i in range(0, len(tasks), chunk_size):
83
  chunk_tasks = tasks[i:i + chunk_size]
84
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
85
  results = await asyncio.gather(*futures)
86
-
87
- for res in results:
88
  if res: all_candles.extend(res)
89
-
90
- progress = min(100, int((i + chunk_size) / total_batches * 100))
91
- print(f" 📥 Downloaded {progress}%... (Total: {len(all_candles)} candles)", flush=True)
92
 
93
  if not all_candles: return None
94
 
95
- filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
96
- seen = set()
97
- unique_candles = []
98
- for c in filtered:
99
- if c[0] not in seen:
100
- unique_candles.append(c)
101
- seen.add(c[0])
102
-
103
- unique_candles.sort(key=lambda x: x[0])
104
- return unique_candles
105
 
106
  # ==============================================================
107
- # 🧠 CPU PROCESSING (In-Memory)
108
  # ==============================================================
109
- async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
110
  safe_sym = sym.replace('/', '_')
111
- # ✅ FIX: Use passed arguments directly
112
  period_suffix = f"{start_ms}_{end_ms}"
113
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
114
-
115
  if os.path.exists(scores_file):
116
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
117
  return
118
 
119
- print(f" ⚙️ [CPU] Processing {len(candles)} candles from RAM...", flush=True)
120
- t0 = time.time()
121
-
122
- df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
123
- cols = ['open', 'high', 'low', 'close', 'volume']
124
- df_1m[cols] = df_1m[cols].astype('float32')
125
- df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
126
- df_1m.set_index('datetime', inplace=True)
127
- df_1m = df_1m.sort_index()
128
-
129
- frames = {}
130
- agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
131
- frames['1m'] = df_1m.copy()
132
- frames['1m']['timestamp'] = frames['1m'].index.astype(np.int64) // 10**6
133
-
134
- for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
135
- frames[tf_str] = df_1m.resample(tf_code).agg(agg_dict).dropna()
136
- frames[tf_str]['timestamp'] = frames[tf_str].index.astype(np.int64) // 10**6
137
-
138
- ai_results = []
139
 
140
- start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
141
- valid_indices = frames['5m'].loc[start_analysis_dt:].index
142
 
143
- total_steps = len(valid_indices)
144
- step_count = 0
145
-
146
- for t_idx in valid_indices:
147
- step_count += 1
148
- if step_count % 2000 == 0:
149
- pct = int((step_count / total_steps) * 100)
150
- print(f" 🧠 AI Analysis: {pct}%...", flush=True)
151
-
152
- ohlcv_data = {}
153
- try:
154
- cutoff = t_idx
155
- ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500))
156
- ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200))
157
- ohlcv_data['15m'] = self.df_to_list(frames['15m'].loc[:cutoff].tail(200))
158
- ohlcv_data['1h'] = self.df_to_list(frames['1h'].loc[:cutoff].tail(200))
159
- ohlcv_data['4h'] = self.df_to_list(frames['4h'].loc[:cutoff].tail(100))
160
- ohlcv_data['1d'] = self.df_to_list(frames['1d'].loc[:cutoff].tail(50))
161
- except: continue
162
-
163
- if len(ohlcv_data['1h']) < 60: continue
164
- current_price = frames['5m'].loc[t_idx]['close']
165
-
166
- logic_packet = {
167
- 'symbol': sym,
168
- 'ohlcv_1h': ohlcv_data['1h'][-60:],
169
- 'ohlcv_15m': ohlcv_data['15m'][-60:],
170
- 'change_24h': 0.0
171
- }
172
- try:
173
- if len(ohlcv_data['1h']) >= 24:
174
- p_now = ohlcv_data['1h'][-1][4]
175
- p_old = ohlcv_data['1h'][-24][4]
176
- logic_packet['change_24h'] = ((p_now - p_old) / p_old) * 100
177
- except: pass
178
-
179
- logic_result = self.dm._apply_logic_tree(logic_packet)
180
- signal_type = logic_result.get('type', 'NONE')
181
- l1_score = logic_result.get('score', 0.0)
182
 
183
- real_titan = 0.5
184
- if signal_type in ['BREAKOUT', 'REVERSAL']:
185
- raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price}
186
- try:
187
- proc_res = await self.proc.process_compound_signal(raw_data_for_proc)
188
- if proc_res: real_titan = proc_res.get('titan_score', 0.5)
189
- except: pass
190
-
191
- ai_results.append({
192
- 'timestamp': int(t_idx.timestamp() * 1000),
193
- 'symbol': sym,
194
- 'close': current_price,
195
- 'real_titan': real_titan,
196
- 'signal_type': signal_type,
197
- 'l1_score': l1_score
198
- })
199
 
 
 
 
 
 
 
200
  dt = time.time() - t0
201
- if ai_results:
202
- pd.DataFrame(ai_results).to_pickle(scores_file)
203
- print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Compute Time: {dt:.1f}s)")
 
 
 
 
204
  else:
205
  print(f" ⚠️ [{sym}] No signals found.")
206
-
207
- del frames, df_1m, candles
208
  gc.collect()
209
 
210
  # ==============================================================
@@ -221,19 +284,19 @@ class HeavyDutyBacktester:
221
  return
222
 
223
  for sym in self.TARGET_COINS:
224
- # 1. Download Phase (Async Burst)
225
  candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
226
 
227
  if candles:
228
- # 2. Processing Phase (Sequential CPU)
229
- await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
230
  else:
231
  print(f" ❌ Failed to download data for {sym}")
232
 
233
  gc.collect()
234
 
235
  # ==============================================================
236
- # PHASE 2: Portfolio Digital Twin Engine
237
  # ==============================================================
238
  @staticmethod
239
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
@@ -383,7 +446,7 @@ class HeavyDutyBacktester:
383
  return best['config'], best
384
 
385
  async def run_strategic_optimization_task():
386
- print("\n🧪 [STRATEGIC BACKTEST] RAM-Burst Mode Initiated...")
387
  r2 = R2Service()
388
  dm = DataManager(None, None, r2)
389
  proc = MLProcessor(dm)
@@ -414,4 +477,6 @@ async def run_strategic_optimization_task():
414
  await dm.close()
415
 
416
  if __name__ == "__main__":
 
 
417
  asyncio.run(run_strategic_optimization_task())
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V89.0 - GEM-Architect: Dual-Core Reactor)
3
+ # ============================================================
4
+ # التغيير الجذري:
5
+ # 1. التحميل: Async (شبكة فقط).
6
+ # 2. المعالجة: Multiprocessing على البيانات الموجودة في الرام.
7
+ # النتيجة: 100% CPU Usage على 2 vCPUs.
8
  # ============================================================
9
 
10
  import asyncio
 
17
  import gc
18
  import sys
19
  import traceback
20
+ import concurrent.futures
21
  from datetime import datetime, timezone
22
  from typing import Dict, Any, List
23
 
 
33
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
34
  CACHE_DIR = "backtest_real_scores"
35
 
36
+ # ==============================================================================
37
+ # 🔥 PURE CPU WORKER (No Network, No I/O, Just Math)
38
+ # ==============================================================================
39
+ def cpu_crunch_worker(payload):
40
+ """
41
+ هذا العامل يستلم البيانات جاهزة ويقوم بطحنها رياضياً فقط.
42
+ معزول تماماً عن الشبكة لضمان عدم التوقف.
43
+ """
44
+ worker_id, candles, symbol = payload
45
+
46
+ print(f" 🔥 [Core {worker_id}] Crunching {len(candles)} candles...", flush=True)
47
+
48
+ # إعادة بناء كائنات خفيفة داخل العملية (بدون اتصال)
49
+ # نمرر None للخدمات لأننا لا نحتاج شبكة هنا
50
+ local_dm = DataManager(None, None, None)
51
+ local_proc = MLProcessor(local_dm)
52
+
53
+ # حيلة: تشغيل تهيئة ML Processor (تحميل النماذج) داخل العملية
54
+ # نستخدم حلقة وهمية لأن الدوال async
55
+ loop = asyncio.new_event_loop()
56
+ asyncio.set_event_loop(loop)
57
+ try:
58
+ loop.run_until_complete(local_proc.initialize())
59
+ except: pass # نتجاهل أخطاء الشبكة، يهمنا النماذج فقط
60
+
61
+ # تحويل البيانات
62
+ df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
63
+ cols = ['open', 'high', 'low', 'close', 'volume']
64
+ df_1m[cols] = df_1m[cols].astype('float32')
65
+ df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
66
+ df_1m.set_index('datetime', inplace=True)
67
+ df_1m = df_1m.sort_index()
68
+
69
+ frames = {}
70
+ agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
71
+ frames['1m'] = df_1m.copy()
72
+ frames['1m']['timestamp'] = frames['1m'].index.astype(np.int64) // 10**6
73
+
74
+ # Resampling
75
+ for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
76
+ frames[tf_str] = df_1m.resample(tf_code).agg(agg_dict).dropna()
77
+ frames[tf_str]['timestamp'] = frames[tf_str].index.astype(np.int64) // 10**6
78
+
79
+ # Main Analysis Loop
80
+ ai_results = []
81
+
82
+ # نبدأ التحليل بعد فترة كافية (مثلاً 500 شمعة) لضمان دقة المؤشرات
83
+ # بما أن البيانات مقسمة، كل قسم يحتاج "هامش" (Overlap) ولكن للتبسيط سنعالج الكل
84
+ valid_indices = frames['5m'].index
85
+
86
+ # Helper to avoid recreating object
87
+ def df_to_list(df):
88
+ if df.empty: return []
89
+ return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
90
+
91
+ local_proc_instance = local_proc # Cache reference
92
+
93
+ count = 0
94
+ total = len(valid_indices)
95
+
96
+ # تشغيل حلقة متزامنة (Synchronous) داخل الـ Worker للسرعة
97
+ # لكن MLProcessor async، لذا نستخدم loop.run_until_complete للحالات الضرورية
98
+
99
+ for t_idx in valid_indices:
100
+ count += 1
101
+ if count % 5000 == 0:
102
+ print(f" 🔥 [Core {worker_id}] Progress: {int(count/total*100)}%", flush=True)
103
+
104
+ current_timestamp = int(t_idx.timestamp() * 1000)
105
+
106
+ # Slicing
107
+ ohlcv_data = {}
108
+ try:
109
+ cutoff = t_idx
110
+ ohlcv_data['1m'] = df_to_list(frames['1m'].loc[:cutoff].tail(500))
111
+ ohlcv_data['5m'] = df_to_list(frames['5m'].loc[:cutoff].tail(200))
112
+ ohlcv_data['15m'] = df_to_list(frames['15m'].loc[:cutoff].tail(200))
113
+ ohlcv_data['1h'] = df_to_list(frames['1h'].loc[:cutoff].tail(200))
114
+ ohlcv_data['4h'] = df_to_list(frames['4h'].loc[:cutoff].tail(100))
115
+ ohlcv_data['1d'] = df_to_list(frames['1d'].loc[:cutoff].tail(50))
116
+ except: continue
117
+
118
+ if len(ohlcv_data['1h']) < 60: continue
119
+ current_price = frames['5m'].loc[t_idx]['close']
120
+
121
+ # Logic Tree
122
+ logic_packet = {
123
+ 'symbol': symbol,
124
+ 'ohlcv_1h': ohlcv_data['1h'][-60:],
125
+ 'ohlcv_15m': ohlcv_data['15m'][-60:],
126
+ 'change_24h': 0.0
127
+ }
128
+
129
+ logic_result = local_dm._apply_logic_tree(logic_packet)
130
+ signal_type = logic_result.get('type', 'NONE')
131
+ l1_score = logic_result.get('score', 0.0)
132
+
133
+ real_titan = 0.5
134
+ if signal_type in ['BREAKOUT', 'REVERSAL']:
135
+ raw_data_for_proc = {'symbol': symbol, 'ohlcv': ohlcv_data, 'current_price': current_price}
136
+ try:
137
+ # تشغيل الـ AI
138
+ proc_res = loop.run_until_complete(local_proc_instance.process_compound_signal(raw_data_for_proc))
139
+ if proc_res: real_titan = proc_res.get('titan_score', 0.5)
140
+ except: pass
141
+
142
+ ai_results.append({
143
+ 'timestamp': current_timestamp,
144
+ 'symbol': symbol,
145
+ 'close': current_price,
146
+ 'real_titan': real_titan,
147
+ 'signal_type': signal_type,
148
+ 'l1_score': l1_score
149
+ })
150
+
151
+ loop.close()
152
+ return ai_results
153
+
154
+ # ==============================================================================
155
+ # 🧠 Main Class
156
+ # ==============================================================================
157
  class HeavyDutyBacktester:
158
  def __init__(self, data_manager, processor):
159
  self.dm = data_manager
 
167
  self.force_end_date = None
168
 
169
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
170
+ print(f"🧪 [Backtest V89.0] Dual-Core Reactor (100% CPU Target).")
171
 
172
  def set_date_range(self, start_str, end_str):
173
  self.force_start_date = start_str
174
  self.force_end_date = end_str
175
 
 
 
 
 
176
  # ==============================================================
177
+ # ⚡ STEP 1: FAST DOWNLOAD
178
  # ==============================================================
179
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
180
+ print(f" ⚡ [Network] Burst-Downloading {sym}...", flush=True)
 
181
  limit = 1000
182
  duration_per_batch = limit * 60 * 1000
 
183
  tasks = []
184
  current = start_ms
185
  while current < end_ms:
 
187
  current += duration_per_batch
188
 
189
  all_candles = []
190
+ sem = asyncio.Semaphore(15)
 
191
 
192
  async def _fetch_batch(timestamp):
193
  async with sem:
194
  for _ in range(3):
195
+ try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
196
+ except: await asyncio.sleep(1)
 
 
197
  return []
198
 
199
+ chunk_size = 25
200
  for i in range(0, len(tasks), chunk_size):
201
  chunk_tasks = tasks[i:i + chunk_size]
202
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
203
  results = await asyncio.gather(*futures)
204
+ for res in results:
 
205
  if res: all_candles.extend(res)
206
+ print(f" 📥 Downloaded {int((i+chunk_size)/len(tasks)*100)}%...", flush=True)
 
 
207
 
208
  if not all_candles: return None
209
 
210
+ # تصفية وترتيب
211
+ unique = {c[0]: c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms}
212
+ final_candles = sorted(unique.values(), key=lambda x: x[0])
213
+ return final_candles
 
 
 
 
 
 
214
 
215
  # ==============================================================
216
+ # 🔥 STEP 2: PARALLEL CPU CRUNCHING
217
  # ==============================================================
218
+ async def _dispatch_to_cores(self, sym, candles, start_ms, end_ms):
219
  safe_sym = sym.replace('/', '_')
 
220
  period_suffix = f"{start_ms}_{end_ms}"
221
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
222
+
223
  if os.path.exists(scores_file):
224
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
225
  return
226
 
227
+ # 1. تقسيم البيانات (Splitting)
228
+ cpu_count = os.cpu_count() or 2
229
+ # نضيف تداخل (Overlap) بسيط لضمان استمرارية المؤشرات عند نقطة القطع
230
+ # سنقسم القائمة ببساطة، العمال سيعيدون حساب المؤشرات
231
+ chunk_size = len(candles) // cpu_count
232
+ chunks = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
 
234
+ print(f" ⚙️ [CPU] Splitting {len(candles)} candles into {cpu_count} cores for 100% Load...", flush=True)
 
235
 
236
+ for i in range(cpu_count):
237
+ start_idx = i * chunk_size
238
+ # للإتقان: نحتاج لتداخل، لكن للتبسيط والسرعة سنقسم مباشرة
239
+ # العامل الأول يأخذ من البداية، الثاني يأخذ من (بداية - 500) لضمان الـ Warmup
240
+ actual_start = max(0, start_idx - 1000) if i > 0 else 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
241
 
242
+ end_idx = (i + 1) * chunk_size if i < cpu_count - 1 else len(candles)
243
+ chunk_data = candles[actual_start:end_idx]
244
+
245
+ chunks.append((i, chunk_data, sym))
246
+
247
+ t0 = time.time()
248
+
249
+ # 2. تشغيل المفاعل (Reactor)
250
+ loop = asyncio.get_running_loop()
251
+ final_results = []
 
 
 
 
 
 
252
 
253
+ with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
254
+ futures = [loop.run_in_executor(executor, cpu_crunch_worker, chunk) for chunk in chunks]
255
+ results = await asyncio.gather(*futures)
256
+ for res in results:
257
+ final_results.extend(res)
258
+
259
  dt = time.time() - t0
260
+
261
+ # 3. الحفظ
262
+ if final_results:
263
+ # إزالة التكرارات الناتجة عن الـ Overlap
264
+ df_res = pd.DataFrame(final_results).drop_duplicates(subset=['timestamp']).sort_values('timestamp')
265
+ df_res.to_pickle(scores_file)
266
+ print(f" 💾 [{sym}] SAVED {len(df_res)} signals. (Compute Time: {dt:.1f}s)")
267
  else:
268
  print(f" ⚠️ [{sym}] No signals found.")
269
+
270
+ del candles, chunks, results
271
  gc.collect()
272
 
273
  # ==============================================================
 
284
  return
285
 
286
  for sym in self.TARGET_COINS:
287
+ # 1. Download to RAM
288
  candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
289
 
290
  if candles:
291
+ # 2. Burn CPU
292
+ await self._dispatch_to_cores(sym, candles, start_time_ms, end_time_ms)
293
  else:
294
  print(f" ❌ Failed to download data for {sym}")
295
 
296
  gc.collect()
297
 
298
  # ==============================================================
299
+ # PHASE 2: Portfolio Digital Twin Engine (Unchanged)
300
  # ==============================================================
301
  @staticmethod
302
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
 
446
  return best['config'], best
447
 
448
  async def run_strategic_optimization_task():
449
+ print("\n🧪 [STRATEGIC BACKTEST] Dual-Core Reactor Initiated...")
450
  r2 = R2Service()
451
  dm = DataManager(None, None, r2)
452
  proc = MLProcessor(dm)
 
477
  await dm.close()
478
 
479
  if __name__ == "__main__":
480
+ import multiprocessing
481
+ multiprocessing.freeze_support()
482
  asyncio.run(run_strategic_optimization_task())