Riy777 commited on
Commit
95f813e
·
verified ·
1 Parent(s): fd2bc94

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +123 -188
backtest_engine.py CHANGED
@@ -1,10 +1,5 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V89.0 - GEM-Architect: Dual-Core Reactor)
3
- # ============================================================
4
- # التغيير الجذري:
5
- # 1. التحميل: Async (شبكة فقط).
6
- # 2. المعالجة: Multiprocessing على البيانات الموجودة في الرام.
7
- # النتيجة: 100% CPU Usage على 2 vCPUs.
8
  # ============================================================
9
 
10
  import asyncio
@@ -17,7 +12,6 @@ import os
17
  import gc
18
  import sys
19
  import traceback
20
- import concurrent.futures
21
  from datetime import datetime, timezone
22
  from typing import Dict, Any, List
23
 
@@ -33,127 +27,6 @@ except ImportError:
33
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
34
  CACHE_DIR = "backtest_real_scores"
35
 
36
- # ==============================================================================
37
- # 🔥 PURE CPU WORKER (No Network, No I/O, Just Math)
38
- # ==============================================================================
39
- def cpu_crunch_worker(payload):
40
- """
41
- هذا العامل يستلم البيانات جاهزة ويقوم بطحنها رياضياً فقط.
42
- معزول تماماً عن الشبكة لضمان عدم التوقف.
43
- """
44
- worker_id, candles, symbol = payload
45
-
46
- print(f" 🔥 [Core {worker_id}] Crunching {len(candles)} candles...", flush=True)
47
-
48
- # إعادة بناء كائنات خفيفة داخل العملية (بدون اتصال)
49
- # نمرر None للخدمات لأننا لا نحتاج شبكة هنا
50
- local_dm = DataManager(None, None, None)
51
- local_proc = MLProcessor(local_dm)
52
-
53
- # حيلة: تشغيل تهيئة ML Processor (تحميل النماذج) داخل العملية
54
- # نستخدم حلقة وهمية لأن الدوال async
55
- loop = asyncio.new_event_loop()
56
- asyncio.set_event_loop(loop)
57
- try:
58
- loop.run_until_complete(local_proc.initialize())
59
- except: pass # نتجاهل أخطاء الشبكة، يهمنا النماذج فقط
60
-
61
- # تحويل البيانات
62
- df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
63
- cols = ['open', 'high', 'low', 'close', 'volume']
64
- df_1m[cols] = df_1m[cols].astype('float32')
65
- df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
66
- df_1m.set_index('datetime', inplace=True)
67
- df_1m = df_1m.sort_index()
68
-
69
- frames = {}
70
- agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
71
- frames['1m'] = df_1m.copy()
72
- frames['1m']['timestamp'] = frames['1m'].index.astype(np.int64) // 10**6
73
-
74
- # Resampling
75
- for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
76
- frames[tf_str] = df_1m.resample(tf_code).agg(agg_dict).dropna()
77
- frames[tf_str]['timestamp'] = frames[tf_str].index.astype(np.int64) // 10**6
78
-
79
- # Main Analysis Loop
80
- ai_results = []
81
-
82
- # نبدأ التحليل بعد فترة كافية (مثلاً 500 شمعة) لضمان دقة المؤشرات
83
- # بما أن البيانات مقسمة، كل قسم يحتاج "هامش" (Overlap) ولكن للتبسيط سنعالج الكل
84
- valid_indices = frames['5m'].index
85
-
86
- # Helper to avoid recreating object
87
- def df_to_list(df):
88
- if df.empty: return []
89
- return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
90
-
91
- local_proc_instance = local_proc # Cache reference
92
-
93
- count = 0
94
- total = len(valid_indices)
95
-
96
- # تشغيل حلقة متزامنة (Synchronous) داخل الـ Worker للسرعة
97
- # لكن MLProcessor async، لذا نستخدم loop.run_until_complete للحالات الضرورية
98
-
99
- for t_idx in valid_indices:
100
- count += 1
101
- if count % 5000 == 0:
102
- print(f" 🔥 [Core {worker_id}] Progress: {int(count/total*100)}%", flush=True)
103
-
104
- current_timestamp = int(t_idx.timestamp() * 1000)
105
-
106
- # Slicing
107
- ohlcv_data = {}
108
- try:
109
- cutoff = t_idx
110
- ohlcv_data['1m'] = df_to_list(frames['1m'].loc[:cutoff].tail(500))
111
- ohlcv_data['5m'] = df_to_list(frames['5m'].loc[:cutoff].tail(200))
112
- ohlcv_data['15m'] = df_to_list(frames['15m'].loc[:cutoff].tail(200))
113
- ohlcv_data['1h'] = df_to_list(frames['1h'].loc[:cutoff].tail(200))
114
- ohlcv_data['4h'] = df_to_list(frames['4h'].loc[:cutoff].tail(100))
115
- ohlcv_data['1d'] = df_to_list(frames['1d'].loc[:cutoff].tail(50))
116
- except: continue
117
-
118
- if len(ohlcv_data['1h']) < 60: continue
119
- current_price = frames['5m'].loc[t_idx]['close']
120
-
121
- # Logic Tree
122
- logic_packet = {
123
- 'symbol': symbol,
124
- 'ohlcv_1h': ohlcv_data['1h'][-60:],
125
- 'ohlcv_15m': ohlcv_data['15m'][-60:],
126
- 'change_24h': 0.0
127
- }
128
-
129
- logic_result = local_dm._apply_logic_tree(logic_packet)
130
- signal_type = logic_result.get('type', 'NONE')
131
- l1_score = logic_result.get('score', 0.0)
132
-
133
- real_titan = 0.5
134
- if signal_type in ['BREAKOUT', 'REVERSAL']:
135
- raw_data_for_proc = {'symbol': symbol, 'ohlcv': ohlcv_data, 'current_price': current_price}
136
- try:
137
- # تشغيل الـ AI
138
- proc_res = loop.run_until_complete(local_proc_instance.process_compound_signal(raw_data_for_proc))
139
- if proc_res: real_titan = proc_res.get('titan_score', 0.5)
140
- except: pass
141
-
142
- ai_results.append({
143
- 'timestamp': current_timestamp,
144
- 'symbol': symbol,
145
- 'close': current_price,
146
- 'real_titan': real_titan,
147
- 'signal_type': signal_type,
148
- 'l1_score': l1_score
149
- })
150
-
151
- loop.close()
152
- return ai_results
153
-
154
- # ==============================================================================
155
- # 🧠 Main Class
156
- # ==============================================================================
157
  class HeavyDutyBacktester:
158
  def __init__(self, data_manager, processor):
159
  self.dm = data_manager
@@ -167,19 +40,25 @@ class HeavyDutyBacktester:
167
  self.force_end_date = None
168
 
169
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
170
- print(f"🧪 [Backtest V89.0] Dual-Core Reactor (100% CPU Target).")
171
 
172
  def set_date_range(self, start_str, end_str):
173
  self.force_start_date = start_str
174
  self.force_end_date = end_str
175
 
 
 
 
 
176
  # ==============================================================
177
- # ⚡ STEP 1: FAST DOWNLOAD
178
  # ==============================================================
179
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
180
- print(f" ⚡ [Network] Burst-Downloading {sym}...", flush=True)
 
181
  limit = 1000
182
  duration_per_batch = limit * 60 * 1000
 
183
  tasks = []
184
  current = start_ms
185
  while current < end_ms:
@@ -187,87 +66,145 @@ class HeavyDutyBacktester:
187
  current += duration_per_batch
188
 
189
  all_candles = []
190
- sem = asyncio.Semaphore(15)
 
191
 
192
  async def _fetch_batch(timestamp):
193
  async with sem:
194
  for _ in range(3):
195
- try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
196
- except: await asyncio.sleep(1)
 
 
197
  return []
198
 
199
- chunk_size = 25
200
  for i in range(0, len(tasks), chunk_size):
201
  chunk_tasks = tasks[i:i + chunk_size]
202
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
203
  results = await asyncio.gather(*futures)
204
- for res in results:
 
205
  if res: all_candles.extend(res)
206
- print(f" 📥 Downloaded {int((i+chunk_size)/len(tasks)*100)}%...", flush=True)
 
 
207
 
208
  if not all_candles: return None
209
 
210
- # تصفية وترتيب
211
- unique = {c[0]: c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms}
212
- final_candles = sorted(unique.values(), key=lambda x: x[0])
213
- return final_candles
 
 
 
 
 
 
214
 
215
  # ==============================================================
216
- # 🔥 STEP 2: PARALLEL CPU CRUNCHING
217
  # ==============================================================
218
- async def _dispatch_to_cores(self, sym, candles, start_ms, end_ms):
219
  safe_sym = sym.replace('/', '_')
 
220
  period_suffix = f"{start_ms}_{end_ms}"
221
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
222
-
223
  if os.path.exists(scores_file):
224
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
225
  return
226
 
227
- # 1. تقسيم البيانات (Splitting)
228
- cpu_count = os.cpu_count() or 2
229
- # نضيف تداخل (Overlap) بسيط لضمان استمرارية المؤشرات عند نقطة القطع
230
- # سنقسم القائمة ببساطة، العمال سيعيدون حساب المؤشرات
231
- chunk_size = len(candles) // cpu_count
232
- chunks = []
233
-
234
- print(f" ⚙️ [CPU] Splitting {len(candles)} candles into {cpu_count} cores for 100% Load...", flush=True)
235
-
236
- for i in range(cpu_count):
237
- start_idx = i * chunk_size
238
- # للإتقان: نحتاج لتداخل، لكن للتبسيط والسرعة سنقسم مباشرة
239
- # العامل الأول يأخذ من البداية، الثاني يأخذ من (بداية - 500) لضمان الـ Warmup
240
- actual_start = max(0, start_idx - 1000) if i > 0 else 0
241
-
242
- end_idx = (i + 1) * chunk_size if i < cpu_count - 1 else len(candles)
243
- chunk_data = candles[actual_start:end_idx]
244
-
245
- chunks.append((i, chunk_data, sym))
246
-
247
  t0 = time.time()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
248
 
249
- # 2. تشغيل المفاعل (Reactor)
250
- loop = asyncio.get_running_loop()
251
- final_results = []
252
 
253
- with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
254
- futures = [loop.run_in_executor(executor, cpu_crunch_worker, chunk) for chunk in chunks]
255
- results = await asyncio.gather(*futures)
256
- for res in results:
257
- final_results.extend(res)
 
 
 
258
 
259
- dt = time.time() - t0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
260
 
261
- # 3. الحفظ
262
- if final_results:
263
- # إزالة التكرارات الناتجة عن الـ Overlap
264
- df_res = pd.DataFrame(final_results).drop_duplicates(subset=['timestamp']).sort_values('timestamp')
265
- df_res.to_pickle(scores_file)
266
- print(f" 💾 [{sym}] SAVED {len(df_res)} signals. (Compute Time: {dt:.1f}s)")
267
  else:
268
  print(f" ⚠️ [{sym}] No signals found.")
269
-
270
- del candles, chunks, results
271
  gc.collect()
272
 
273
  # ==============================================================
@@ -284,19 +221,19 @@ class HeavyDutyBacktester:
284
  return
285
 
286
  for sym in self.TARGET_COINS:
287
- # 1. Download to RAM
288
  candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
289
 
290
  if candles:
291
- # 2. Burn CPU
292
- await self._dispatch_to_cores(sym, candles, start_time_ms, end_time_ms)
293
  else:
294
  print(f" ❌ Failed to download data for {sym}")
295
 
296
  gc.collect()
297
 
298
  # ==============================================================
299
- # PHASE 2: Portfolio Digital Twin Engine (Unchanged)
300
  # ==============================================================
301
  @staticmethod
302
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
@@ -446,7 +383,7 @@ class HeavyDutyBacktester:
446
  return best['config'], best
447
 
448
  async def run_strategic_optimization_task():
449
- print("\n🧪 [STRATEGIC BACKTEST] Dual-Core Reactor Initiated...")
450
  r2 = R2Service()
451
  dm = DataManager(None, None, r2)
452
  proc = MLProcessor(dm)
@@ -477,6 +414,4 @@ async def run_strategic_optimization_task():
477
  await dm.close()
478
 
479
  if __name__ == "__main__":
480
- import multiprocessing
481
- multiprocessing.freeze_support()
482
  asyncio.run(run_strategic_optimization_task())
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V88.1 - GEM-Architect: RAM-Burst Fix)
 
 
 
 
 
3
  # ============================================================
4
 
5
  import asyncio
 
12
  import gc
13
  import sys
14
  import traceback
 
15
  from datetime import datetime, timezone
16
  from typing import Dict, Any, List
17
 
 
27
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
28
  CACHE_DIR = "backtest_real_scores"
29
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  class HeavyDutyBacktester:
31
  def __init__(self, data_manager, processor):
32
  self.dm = data_manager
 
40
  self.force_end_date = None
41
 
42
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
43
+ print(f"🧪 [Backtest V88.1] RAM-Burst Edition (Fix Applied).")
44
 
45
  def set_date_range(self, start_str, end_str):
46
  self.force_start_date = start_str
47
  self.force_end_date = end_str
48
 
49
+ def df_to_list(self, df):
50
+ if df.empty: return []
51
+ return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
52
+
53
  # ==============================================================
54
+ # ⚡ FAST DATA DOWNLOADER (Async Burst)
55
  # ==============================================================
56
  async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
57
+ print(f" ⚡ [Network] Burst-Downloading {sym} ({start_ms} -> {end_ms})...", flush=True)
58
+
59
  limit = 1000
60
  duration_per_batch = limit * 60 * 1000
61
+
62
  tasks = []
63
  current = start_ms
64
  while current < end_ms:
 
66
  current += duration_per_batch
67
 
68
  all_candles = []
69
+ total_batches = len(tasks)
70
+ sem = asyncio.Semaphore(10)
71
 
72
  async def _fetch_batch(timestamp):
73
  async with sem:
74
  for _ in range(3):
75
+ try:
76
+ return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
77
+ except Exception:
78
+ await asyncio.sleep(1)
79
  return []
80
 
81
+ chunk_size = 20
82
  for i in range(0, len(tasks), chunk_size):
83
  chunk_tasks = tasks[i:i + chunk_size]
84
  futures = [_fetch_batch(ts) for ts in chunk_tasks]
85
  results = await asyncio.gather(*futures)
86
+
87
+ for res in results:
88
  if res: all_candles.extend(res)
89
+
90
+ progress = min(100, int((i + chunk_size) / total_batches * 100))
91
+ print(f" 📥 Downloaded {progress}%... (Total: {len(all_candles)} candles)", flush=True)
92
 
93
  if not all_candles: return None
94
 
95
+ filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
96
+ seen = set()
97
+ unique_candles = []
98
+ for c in filtered:
99
+ if c[0] not in seen:
100
+ unique_candles.append(c)
101
+ seen.add(c[0])
102
+
103
+ unique_candles.sort(key=lambda x: x[0])
104
+ return unique_candles
105
 
106
  # ==============================================================
107
+ # 🧠 CPU PROCESSING (In-Memory)
108
  # ==============================================================
109
+ async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
110
  safe_sym = sym.replace('/', '_')
111
+ # ✅ FIX: Use passed arguments directly
112
  period_suffix = f"{start_ms}_{end_ms}"
113
  scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
114
+
115
  if os.path.exists(scores_file):
116
  print(f" 📂 [{sym}] Data Exists -> Skipping.")
117
  return
118
 
119
+ print(f" ⚙️ [CPU] Processing {len(candles)} candles from RAM...", flush=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  t0 = time.time()
121
+
122
+ df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
123
+ cols = ['open', 'high', 'low', 'close', 'volume']
124
+ df_1m[cols] = df_1m[cols].astype('float32')
125
+ df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
126
+ df_1m.set_index('datetime', inplace=True)
127
+ df_1m = df_1m.sort_index()
128
+
129
+ frames = {}
130
+ agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
131
+ frames['1m'] = df_1m.copy()
132
+ frames['1m']['timestamp'] = frames['1m'].index.astype(np.int64) // 10**6
133
+
134
+ for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
135
+ frames[tf_str] = df_1m.resample(tf_code).agg(agg_dict).dropna()
136
+ frames[tf_str]['timestamp'] = frames[tf_str].index.astype(np.int64) // 10**6
137
+
138
+ ai_results = []
139
 
140
+ start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
141
+ valid_indices = frames['5m'].loc[start_analysis_dt:].index
 
142
 
143
+ total_steps = len(valid_indices)
144
+ step_count = 0
145
+
146
+ for t_idx in valid_indices:
147
+ step_count += 1
148
+ if step_count % 2000 == 0:
149
+ pct = int((step_count / total_steps) * 100)
150
+ print(f" 🧠 AI Analysis: {pct}%...", flush=True)
151
 
152
+ ohlcv_data = {}
153
+ try:
154
+ cutoff = t_idx
155
+ ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500))
156
+ ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200))
157
+ ohlcv_data['15m'] = self.df_to_list(frames['15m'].loc[:cutoff].tail(200))
158
+ ohlcv_data['1h'] = self.df_to_list(frames['1h'].loc[:cutoff].tail(200))
159
+ ohlcv_data['4h'] = self.df_to_list(frames['4h'].loc[:cutoff].tail(100))
160
+ ohlcv_data['1d'] = self.df_to_list(frames['1d'].loc[:cutoff].tail(50))
161
+ except: continue
162
+
163
+ if len(ohlcv_data['1h']) < 60: continue
164
+ current_price = frames['5m'].loc[t_idx]['close']
165
+
166
+ logic_packet = {
167
+ 'symbol': sym,
168
+ 'ohlcv_1h': ohlcv_data['1h'][-60:],
169
+ 'ohlcv_15m': ohlcv_data['15m'][-60:],
170
+ 'change_24h': 0.0
171
+ }
172
+ try:
173
+ if len(ohlcv_data['1h']) >= 24:
174
+ p_now = ohlcv_data['1h'][-1][4]
175
+ p_old = ohlcv_data['1h'][-24][4]
176
+ logic_packet['change_24h'] = ((p_now - p_old) / p_old) * 100
177
+ except: pass
178
+
179
+ logic_result = self.dm._apply_logic_tree(logic_packet)
180
+ signal_type = logic_result.get('type', 'NONE')
181
+ l1_score = logic_result.get('score', 0.0)
182
+
183
+ real_titan = 0.5
184
+ if signal_type in ['BREAKOUT', 'REVERSAL']:
185
+ raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price}
186
+ try:
187
+ proc_res = await self.proc.process_compound_signal(raw_data_for_proc)
188
+ if proc_res: real_titan = proc_res.get('titan_score', 0.5)
189
+ except: pass
190
+
191
+ ai_results.append({
192
+ 'timestamp': int(t_idx.timestamp() * 1000),
193
+ 'symbol': sym,
194
+ 'close': current_price,
195
+ 'real_titan': real_titan,
196
+ 'signal_type': signal_type,
197
+ 'l1_score': l1_score
198
+ })
199
 
200
+ dt = time.time() - t0
201
+ if ai_results:
202
+ pd.DataFrame(ai_results).to_pickle(scores_file)
203
+ print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Compute Time: {dt:.1f}s)")
 
 
204
  else:
205
  print(f" ⚠️ [{sym}] No signals found.")
206
+
207
+ del frames, df_1m, candles
208
  gc.collect()
209
 
210
  # ==============================================================
 
221
  return
222
 
223
  for sym in self.TARGET_COINS:
224
+ # 1. Download Phase (Async Burst)
225
  candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
226
 
227
  if candles:
228
+ # 2. Processing Phase (Sequential CPU)
229
+ await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
230
  else:
231
  print(f" ❌ Failed to download data for {sym}")
232
 
233
  gc.collect()
234
 
235
  # ==============================================================
236
+ # PHASE 2: Portfolio Digital Twin Engine
237
  # ==============================================================
238
  @staticmethod
239
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
 
383
  return best['config'], best
384
 
385
  async def run_strategic_optimization_task():
386
+ print("\n🧪 [STRATEGIC BACKTEST] RAM-Burst Mode Initiated...")
387
  r2 = R2Service()
388
  dm = DataManager(None, None, r2)
389
  proc = MLProcessor(dm)
 
414
  await dm.close()
415
 
416
  if __name__ == "__main__":
 
 
417
  asyncio.run(run_strategic_optimization_task())