Riy777 commited on
Commit
355c953
·
verified ·
1 Parent(s): f5097f3

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +204 -256
backtest_engine.py CHANGED
@@ -1,5 +1,9 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V86.3 - GEM-Architect: Stable Parallel)
 
 
 
 
3
  # ============================================================
4
 
5
  import asyncio
@@ -10,7 +14,8 @@ import logging
10
  import itertools
11
  import os
12
  import gc
13
- import concurrent.futures
 
14
  from datetime import datetime, timezone
15
  from typing import Dict, Any, List
16
 
@@ -19,74 +24,13 @@ try:
19
  from ml_engine.data_manager import DataManager
20
  from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
21
  from r2 import R2Service
 
22
  except ImportError:
23
  pass
24
 
25
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
26
  CACHE_DIR = "backtest_real_scores"
27
 
28
- # ==============================================================================
29
- # 🚜 ISOLATED WORKER (Stable & Clean)
30
- # ==============================================================================
31
- def run_parallel_chunk(chunk_payload):
32
- """
33
- عامل مستقل بمعايير ثبات عالية.
34
- """
35
- symbol, start_ms, end_ms, chunk_id = chunk_payload
36
-
37
- # تأخير بسيط جداً عند الإقلاع لتخفيف صدمة المعالج
38
- time.sleep(chunk_id * 1.0)
39
-
40
- print(f" ⚡ [Core {chunk_id}] Initializing ML Engine...", flush=True)
41
-
42
- try:
43
- # تهيئة بيئة نظيفة
44
- local_dm = DataManager(None, None, None)
45
- local_proc = MLProcessor(local_dm)
46
-
47
- loop = asyncio.new_event_loop()
48
- asyncio.set_event_loop(loop)
49
-
50
- # تحميل النماذج (هنا يكمن الثقل)
51
- loop.run_until_complete(local_proc.initialize())
52
- loop.run_until_complete(local_dm.initialize())
53
-
54
- local_tester = HeavyDutyBacktester(local_dm, local_proc)
55
-
56
- dt_start = datetime.fromtimestamp(start_ms/1000, tz=timezone.utc).strftime('%Y-%m-%d')
57
- print(f" 📥 [Core {chunk_id}] Fetching Data from {dt_start}...", flush=True)
58
-
59
- # إضافة فترة تحمية للمؤشرات (2000 دقيقة)
60
- warmup_ms = 2000 * 60 * 1000
61
- actual_fetch_start = start_ms - warmup_ms
62
-
63
- success = loop.run_until_complete(
64
- local_tester._process_single_coin_task(
65
- symbol,
66
- actual_fetch_start,
67
- end_ms,
68
- chunk_suffix=f"_part{chunk_id}",
69
- analysis_start_ms=start_ms,
70
- worker_id=chunk_id
71
- )
72
- )
73
-
74
- # تنظيف الذاكرة فوراً
75
- loop.run_until_complete(local_dm.close())
76
- loop.close()
77
- del local_dm, local_proc, local_tester
78
- gc.collect()
79
-
80
- print(f" ✅ [Core {chunk_id}] Completed.", flush=True)
81
- return (chunk_id, success)
82
-
83
- except Exception as e:
84
- print(f" ❌ [Core {chunk_id}] CRASH: {e}", flush=True)
85
- return (chunk_id, False)
86
-
87
- # ==============================================================================
88
- # 🧠 Main Class
89
- # ==============================================================================
90
  class HeavyDutyBacktester:
91
  def __init__(self, data_manager, processor):
92
  self.dm = data_manager
@@ -98,7 +42,9 @@ class HeavyDutyBacktester:
98
  self.TARGET_COINS = ['SOL/USDT']
99
  self.force_start_date = None
100
  self.force_end_date = None
 
101
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
 
102
 
103
  def set_date_range(self, start_str, end_str):
104
  self.force_start_date = start_str
@@ -109,152 +55,187 @@ class HeavyDutyBacktester:
109
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
110
 
111
  # ==============================================================
112
- # 🧱 Core Logic: Single Coin Processor (With % Progress)
113
  # ==============================================================
114
- async def _process_single_coin_task(self, sym, start_time_ms, end_time_ms, chunk_suffix="", analysis_start_ms=None, worker_id=0):
115
- safe_sym = sym.replace('/', '_')
116
- if analysis_start_ms is None: analysis_start_ms = start_time_ms
 
 
 
117
 
118
- period_suffix = f"{analysis_start_ms}_{end_time_ms}{chunk_suffix}"
119
- scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
 
120
 
121
- if os.path.exists(scores_file):
122
- print(f" 📂 [Core {worker_id}] File Exists -> Skipping.", flush=True)
123
- return True
124
-
125
- t0 = time.time()
126
- all_candles_1m = []
127
- df_1m = None
128
- frames = {}
129
 
130
- # 1. تنزيل البيانات
131
- try:
132
- current_since = start_time_ms
 
133
 
134
- while current_since < end_time_ms:
 
 
 
 
 
 
 
135
  try:
136
- batch = await self.dm.exchange.fetch_ohlcv(sym, '1m', since=current_since, limit=1000)
137
- except Exception:
138
- await asyncio.sleep(2)
139
- continue
140
-
141
- if not batch: break
142
-
143
- last_ts = batch[-1][0]
144
- if last_ts <= current_since: break
145
-
146
- all_candles_1m.extend(batch)
147
- current_since = last_ts + 1
148
- await asyncio.sleep(0.05)
149
- if current_since >= end_time_ms: break
 
 
150
 
151
- all_candles_1m = [c for c in all_candles_1m if c[0] <= end_time_ms]
 
152
 
153
- if not all_candles_1m:
154
- print(f" ⚠️ [Core {worker_id}] No data found.", flush=True)
155
- return False
156
-
157
- # print(f" ⚙️ [Core {worker_id}] Parsing {len(all_candles_1m)} candles...", flush=True)
158
-
159
- df_1m = pd.DataFrame(all_candles_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
160
- cols = ['open', 'high', 'low', 'close', 'volume']
161
- df_1m[cols] = df_1m[cols].astype('float32')
162
- df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
163
- df_1m.set_index('datetime', inplace=True)
164
- df_1m = df_1m.sort_index()
165
-
166
- agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
167
- frames['1m'] = df_1m.copy()
168
- frames['1m']['timestamp'] = frames['1m'].index.astype(np.int64) // 10**6
169
-
170
- for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
171
- resampled = df_1m.resample(tf_code).agg(agg_dict).dropna()
172
- resampled[cols] = resampled[cols].astype('float32')
173
- resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
174
- frames[tf_str] = resampled
175
-
176
- ai_results = []
177
- analysis_start_dt = pd.to_datetime(analysis_start_ms, unit='ms')
178
- valid_indices = frames['5m'].loc[analysis_start_dt:].index
179
-
180
- total_steps = len(valid_indices)
181
- step_count = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
183
- # حلقة المعالجة
184
- for t_idx in valid_indices:
185
- if t_idx.timestamp() * 1000 > end_time_ms: break
186
- step_count += 1
187
-
188
- # طباعة نسبة التقدم كل 10%
189
- if total_steps > 0 and step_count % max(1, int(total_steps * 0.1)) == 0:
190
- pct = int((step_count / total_steps) * 100)
191
- print(f" 🧠 [Core {worker_id}] Progress: {pct}%", flush=True)
192
-
193
- current_timestamp = int(t_idx.timestamp() * 1000)
194
- ohlcv_data = {}
195
  try:
196
- # استخراج البيانات باستخدام loc (أسرع وأدق)
197
- cutoff = t_idx
198
- ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500))
199
- ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200))
200
- ohlcv_data['15m'] = self.df_to_list(frames['15m'].loc[:cutoff].tail(200))
201
- ohlcv_data['1h'] = self.df_to_list(frames['1h'].loc[:cutoff].tail(200))
202
- ohlcv_data['4h'] = self.df_to_list(frames['4h'].loc[:cutoff].tail(100))
203
- ohlcv_data['1d'] = self.df_to_list(frames['1d'].loc[:cutoff].tail(50))
204
- except: continue
205
-
206
- if len(ohlcv_data['1h']) < 60: continue
207
- current_price = frames['5m'].loc[t_idx]['close']
208
-
209
- logic_packet = {
210
- 'symbol': sym,
211
- 'ohlcv_1h': ohlcv_data['1h'][-60:],
212
- 'ohlcv_15m': ohlcv_data['15m'][-60:],
213
- 'change_24h': 0.0
214
- }
215
-
216
- logic_result = self.dm._apply_logic_tree(logic_packet)
217
- signal_type = logic_result.get('type', 'NONE')
218
- l1_score = logic_result.get('score', 0.0)
219
-
220
- real_titan = 0.5
221
- if signal_type in ['BREAKOUT', 'REVERSAL']:
222
- raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price}
223
- try:
224
- proc_res = await self.proc.process_compound_signal(raw_data_for_proc)
225
- if proc_res: real_titan = proc_res.get('titan_score', 0.5)
226
- except: pass
227
-
228
- ai_results.append({
229
- 'timestamp': current_timestamp,
230
- 'symbol': sym,
231
- 'close': current_price,
232
- 'real_titan': real_titan,
233
- 'signal_type': signal_type,
234
- 'l1_score': l1_score
235
- })
236
-
237
- dt = time.time() - t0
238
- if ai_results:
239
- pd.DataFrame(ai_results).to_pickle(scores_file)
240
- print(f" 💾 [Core {worker_id}] Saved {len(ai_results)} signals. ({dt:.1f}s)", flush=True)
241
- else:
242
- print(f" ⚠️ [Core {worker_id}] No signals found.", flush=True)
243
-
244
- return True
245
 
246
- except Exception as e:
247
- print(f" ❌ [Core {worker_id}] ERR: {e}", flush=True)
248
- return False
 
 
 
 
 
249
 
250
- finally:
251
- del all_candles_1m
252
- if df_1m is not None: del df_1m
253
- del frames
254
- gc.collect()
 
 
 
 
255
 
256
  # ==============================================================
257
- # PHASE 1: Main Loop (Restricted Concurrency)
258
  # ==============================================================
259
  async def generate_truth_data(self):
260
  if self.force_start_date and self.force_end_date:
@@ -262,65 +243,32 @@ class HeavyDutyBacktester:
262
  dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
263
  start_time_ms = int(dt_start.timestamp() * 1000)
264
  end_time_ms = int(dt_end.timestamp() * 1000)
265
- print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
266
- print(f" 🚀 Turbo Mode: Safe Parallel Execution (Max 4 Cores)...")
267
  else:
268
  return
269
 
270
- # ⚠️ تقييد عدد العمال لتجنب تجميد الجهاز بسبب نماذج الذكاء الاصطناعي
271
- # 4 عمال هو حد آمن لمعظم الأجهزة
272
- workers_count = 4
273
-
274
- total_duration = end_time_ms - start_time_ms
275
- chunk_size = total_duration // workers_count
276
-
277
  for sym in self.TARGET_COINS:
278
  safe_sym = sym.replace('/', '_')
279
- final_full_file = f"{CACHE_DIR}/{safe_sym}_{start_time_ms}_{end_time_ms}_scores.pkl"
 
280
 
281
- if os.path.exists(final_full_file):
282
- print(f" 📂 [{sym}] Full Data Exists -> Skipping.")
283
  continue
284
 
285
- tasks_payload = []
286
- for i in range(workers_count):
287
- c_start = start_time_ms + (i * chunk_size)
288
- c_end = start_time_ms + ((i + 1) * chunk_size)
289
- if i == workers_count - 1: c_end = end_time_ms
290
- tasks_payload.append((sym, c_start, c_end, i))
291
-
292
- print(f" ⚡ Splitting {sym} into {workers_count} chunks...")
293
 
294
- loop = asyncio.get_running_loop()
295
- with concurrent.futures.ProcessPoolExecutor(max_workers=workers_count) as executor:
296
- futures = [loop.run_in_executor(executor, run_parallel_chunk, task) for task in tasks_payload]
297
- results = await asyncio.gather(*futures)
298
-
299
- print(f" 🧩 Merging results for {sym}...")
300
- all_dfs = []
301
- for chunk_id, success in results:
302
- if not success: continue
303
- task = tasks_payload[chunk_id]
304
- part_start = task[1]; part_end = task[2]
305
- part_file = f"{CACHE_DIR}/{safe_sym}_{part_start}_{part_end}_part{chunk_id}_scores.pkl"
306
-
307
- if os.path.exists(part_file):
308
- try:
309
- df_part = pd.read_pickle(part_file)
310
- if not df_part.empty: all_dfs.append(df_part)
311
- os.remove(part_file)
312
- except: pass
313
-
314
- if all_dfs:
315
- final_df = pd.concat(all_dfs).drop_duplicates(subset=['timestamp']).sort_values('timestamp')
316
- final_df.to_pickle(final_full_file)
317
- print(f" 💾 [{sym}] FINAL SAVE: {len(final_df)} signals.")
318
  else:
319
- print(f" ⚠️ [{sym}] No signals generated.")
 
320
  gc.collect()
321
 
322
  # ==============================================================
323
- # PHASE 2: Portfolio Digital Twin Engine (Unchanged)
324
  # ==============================================================
325
  @staticmethod
326
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
@@ -343,6 +291,7 @@ class HeavyDutyBacktester:
343
  for ts, group in grouped_by_time:
344
  active_symbols = list(wallet["positions"].keys())
345
  current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()}
 
346
  for sym in active_symbols:
347
  if sym in current_prices:
348
  curr_p = current_prices[sym]
@@ -357,7 +306,7 @@ class HeavyDutyBacktester:
357
  wallet["balance"] += net_pnl
358
  del wallet["positions"][sym]
359
  wallet["trades_history"].append({'pnl': net_pnl})
360
-
361
  current_total_equity = wallet["balance"] + wallet["allocated"]
362
  if current_total_equity > peak_balance: peak_balance = current_total_equity
363
  dd = (peak_balance - current_total_equity) / peak_balance
@@ -431,7 +380,7 @@ class HeavyDutyBacktester:
431
 
432
  current_period_files = []
433
  for f in os.listdir(CACHE_DIR):
434
- if f.endswith('_scores.pkl') and period_id in f and "_part" not in f:
435
  current_period_files.append(os.path.join(CACHE_DIR, f))
436
 
437
  if not current_period_files:
@@ -449,16 +398,13 @@ class HeavyDutyBacktester:
449
  combinations.append({'w_titan': round(wt, 2), 'w_struct': round(ws, 2), 'thresh': round(th, 2)})
450
 
451
  final_results = []
452
- batch_size = max(20, len(combinations) // (os.cpu_count() * 2))
453
- batches = [combinations[i:i+batch_size] for i in range(0, len(combinations), batch_size)]
454
 
455
- with concurrent.futures.ProcessPoolExecutor() as executor:
456
- futures = [executor.submit(self._worker_optimize, batch, current_period_files,
457
- self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
458
- for batch in batches]
459
- for future in concurrent.futures.as_completed(futures):
460
- try: final_results.extend(future.result())
461
- except Exception as e: print(f"Grid Error: {e}")
462
 
463
  if not final_results: return None, None
464
  best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
@@ -472,10 +418,14 @@ class HeavyDutyBacktester:
472
  return best['config'], best
473
 
474
  async def run_strategic_optimization_task():
475
- print("\n🧪 [STRATEGIC BACKTEST] Time Lord Initiated (Stable Parallel)...")
476
  r2 = R2Service()
477
  dm = DataManager(None, None, r2)
478
  proc = MLProcessor(dm)
 
 
 
 
479
  try:
480
  hub = AdaptiveHub(r2)
481
  await hub.initialize()
@@ -499,6 +449,4 @@ async def run_strategic_optimization_task():
499
  await dm.close()
500
 
501
  if __name__ == "__main__":
502
- import multiprocessing
503
- multiprocessing.freeze_support()
504
  asyncio.run(run_strategic_optimization_task())
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V88.0 - GEM-Architect: RAM-Burst Edition)
3
+ # ============================================================
4
+ # استراتيجية المعماري للمواصفات المحدودة (2 vCPU / 16GB RAM):
5
+ # 1. Async I/O Burst: سحب البيانات بالتوازي لأن الشبكة لا تضغط المعالج.
6
+ # 2. In-Memory Analysis: المعالجة تتم بعد اكتمال البيانات بالكامل.
7
  # ============================================================
8
 
9
  import asyncio
 
14
  import itertools
15
  import os
16
  import gc
17
+ import sys
18
+ import traceback
19
  from datetime import datetime, timezone
20
  from typing import Dict, Any, List
21
 
 
24
  from ml_engine.data_manager import DataManager
25
  from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
26
  from r2 import R2Service
27
+ import ccxt.async_support as ccxt # نستخدم النسخة الـ Async حصراً
28
  except ImportError:
29
  pass
30
 
31
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
32
  CACHE_DIR = "backtest_real_scores"
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  class HeavyDutyBacktester:
35
  def __init__(self, data_manager, processor):
36
  self.dm = data_manager
 
42
  self.TARGET_COINS = ['SOL/USDT']
43
  self.force_start_date = None
44
  self.force_end_date = None
45
+
46
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
47
+ print(f"🧪 [Backtest V88.0] RAM-Burst Edition (High Speed I/O).")
48
 
49
  def set_date_range(self, start_str, end_str):
50
  self.force_start_date = start_str
 
55
  return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
56
 
57
  # ==============================================================
58
+ # FAST DATA DOWNLOADER (Async Burst)
59
  # ==============================================================
60
+ async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
61
+ """
62
+ يقوم بتحميل كل البيانات دفعة واحدة باستخدام اتصالات متزامنة.
63
+ يستغل الرام (16GB) لتخزين كل شيء قبل المعالجة.
64
+ """
65
+ print(f" ⚡ [Network] Burst-Downloading {sym} ({start_ms} -> {end_ms})...", flush=True)
66
 
67
+ # تقسيم الفترة إلى دفعات (كل دفعة 1000 شمعة = 60000000 ميلي ثانية)
68
+ limit = 1000
69
+ duration_per_batch = limit * 60 * 1000
70
 
71
+ tasks = []
72
+ current = start_ms
 
 
 
 
 
 
73
 
74
+ # إنشاء قائمة بالمهمات الزمنية
75
+ while current < end_ms:
76
+ tasks.append(current)
77
+ current += duration_per_batch
78
 
79
+ all_candles = []
80
+ total_batches = len(tasks)
81
+
82
+ # نستخدم Semaphore لمنع حظر الـ IP (مثلاً 10 اتصالات في نفس اللحظة)
83
+ sem = asyncio.Semaphore(10)
84
+
85
+ async def _fetch_batch(timestamp):
86
+ async with sem:
87
  try:
88
+ # محاولة 3 مرات في حال الفشل
89
+ for _ in range(3):
90
+ try:
91
+ return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
92
+ except Exception:
93
+ await asyncio.sleep(1)
94
+ return []
95
+ except: return []
96
+
97
+ # تشغيل التنزيل المتوازي
98
+ # نقسم المهام إلى مجموعات (Chunks) لنظهر التقدم
99
+ chunk_size = 20
100
+ for i in range(0, len(tasks), chunk_size):
101
+ chunk_tasks = tasks[i:i + chunk_size]
102
+ futures = [_fetch_batch(ts) for ts in chunk_tasks]
103
+ results = await asyncio.gather(*futures)
104
 
105
+ for res in results:
106
+ if res: all_candles.extend(res)
107
 
108
+ # طباعة التقدم
109
+ progress = min(100, int((i + chunk_size) / total_batches * 100))
110
+ print(f" 📥 Downloaded {progress}%... (Total: {len(all_candles)} candles)", flush=True)
111
+
112
+ # ترتيب وإزالة التكرار
113
+ if not all_candles: return None
114
+
115
+ # تصفية ما هو خارج النطاق بدقة
116
+ filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
117
+ # إزالة التكرارات بناءً على الوقت (المفتاح 0)
118
+ seen = set()
119
+ unique_candles = []
120
+ for c in filtered:
121
+ if c[0] not in seen:
122
+ unique_candles.append(c)
123
+ seen.add(c[0])
124
+
125
+ # ترتيب نهائي
126
+ unique_candles.sort(key=lambda x: x[0])
127
+ return unique_candles
128
+
129
+ # ==============================================================
130
+ # 🧠 CPU PROCESSING (In-Memory)
131
+ # ==============================================================
132
+ async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
133
+ safe_sym = sym.replace('/', '_')
134
+ period_suffix = f"{start_time_ms}_{end_time_ms}" # سيتم تعريفه لاحقاً
135
+ # لكن هنا سنستخدم معرف الفترة الممرر
136
+ period_suffix = f"{start_ms}_{end_ms}"
137
+ scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
138
+
139
+ print(f" ⚙️ [CPU] Processing {len(candles)} candles from RAM...", flush=True)
140
+ t0 = time.time()
141
+
142
+ # تحويل سريع لـ Pandas
143
+ df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
144
+ cols = ['open', 'high', 'low', 'close', 'volume']
145
+ df_1m[cols] = df_1m[cols].astype('float32')
146
+ df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
147
+ df_1m.set_index('datetime', inplace=True)
148
+ df_1m = df_1m.sort_index()
149
+
150
+ # Resampling
151
+ frames = {}
152
+ agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
153
+ frames['1m'] = df_1m.copy()
154
+ frames['1m']['timestamp'] = frames['1m'].index.astype(np.int64) // 10**6
155
+
156
+ for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
157
+ frames[tf_str] = df_1m.resample(tf_code).agg(agg_dict).dropna()
158
+ frames[tf_str]['timestamp'] = frames[tf_str].index.astype(np.int64) // 10**6
159
+
160
+ ai_results = []
161
+
162
+ # نبدأ التحليل بعد فترة كافية للمؤشرات
163
+ start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500)
164
+ valid_indices = frames['5m'].loc[start_analysis_dt:].index
165
+
166
+ total_steps = len(valid_indices)
167
+ step_count = 0
168
+
169
+ # حلقة المعالجة السريعة (بدون انتظار شبكة)
170
+ for t_idx in valid_indices:
171
+ step_count += 1
172
+ if step_count % 2000 == 0:
173
+ pct = int((step_count / total_steps) * 100)
174
+ print(f" 🧠 AI Analysis: {pct}%...", flush=True)
175
+
176
+ ohlcv_data = {}
177
+ try:
178
+ # Slicing from RAM is fast
179
+ cutoff = t_idx
180
+ ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500))
181
+ ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200))
182
+ ohlcv_data['15m'] = self.df_to_list(frames['15m'].loc[:cutoff].tail(200))
183
+ ohlcv_data['1h'] = self.df_to_list(frames['1h'].loc[:cutoff].tail(200))
184
+ ohlcv_data['4h'] = self.df_to_list(frames['4h'].loc[:cutoff].tail(100))
185
+ ohlcv_data['1d'] = self.df_to_list(frames['1d'].loc[:cutoff].tail(50))
186
+ except: continue
187
+
188
+ if len(ohlcv_data['1h']) < 60: continue
189
+ current_price = frames['5m'].loc[t_idx]['close']
190
+
191
+ # L1 Logic
192
+ logic_packet = {
193
+ 'symbol': sym,
194
+ 'ohlcv_1h': ohlcv_data['1h'][-60:],
195
+ 'ohlcv_15m': ohlcv_data['15m'][-60:],
196
+ 'change_24h': 0.0
197
+ }
198
+ try:
199
+ if len(ohlcv_data['1h']) >= 24:
200
+ p_now = ohlcv_data['1h'][-1][4]
201
+ p_old = ohlcv_data['1h'][-24][4]
202
+ logic_packet['change_24h'] = ((p_now - p_old) / p_old) * 100
203
+ except: pass
204
+
205
+ logic_result = self.dm._apply_logic_tree(logic_packet)
206
+ signal_type = logic_result.get('type', 'NONE')
207
+ l1_score = logic_result.get('score', 0.0)
208
 
209
+ # L2 AI Execution (Only on L1 Signals)
210
+ real_titan = 0.5
211
+ if signal_type in ['BREAKOUT', 'REVERSAL']:
212
+ raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price}
 
 
 
 
 
 
 
 
213
  try:
214
+ proc_res = await self.proc.process_compound_signal(raw_data_for_proc)
215
+ if proc_res: real_titan = proc_res.get('titan_score', 0.5)
216
+ except: pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
217
 
218
+ ai_results.append({
219
+ 'timestamp': int(t_idx.timestamp() * 1000),
220
+ 'symbol': sym,
221
+ 'close': current_price,
222
+ 'real_titan': real_titan,
223
+ 'signal_type': signal_type,
224
+ 'l1_score': l1_score
225
+ })
226
 
227
+ dt = time.time() - t0
228
+ if ai_results:
229
+ pd.DataFrame(ai_results).to_pickle(scores_file)
230
+ print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Compute Time: {dt:.1f}s)")
231
+ else:
232
+ print(f" ⚠️ [{sym}] No signals found.")
233
+
234
+ del frames, df_1m, candles
235
+ gc.collect()
236
 
237
  # ==============================================================
238
+ # PHASE 1: Main Loop
239
  # ==============================================================
240
  async def generate_truth_data(self):
241
  if self.force_start_date and self.force_end_date:
 
243
  dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
244
  start_time_ms = int(dt_start.timestamp() * 1000)
245
  end_time_ms = int(dt_end.timestamp() * 1000)
246
+ print(f"\n🚜 [Phase 1] Era: {self.force_start_date} -> {self.force_end_date}")
 
247
  else:
248
  return
249
 
 
 
 
 
 
 
 
250
  for sym in self.TARGET_COINS:
251
  safe_sym = sym.replace('/', '_')
252
+ period_suffix = f"{start_time_ms}_{end_time_ms}"
253
+ scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
254
 
255
+ if os.path.exists(scores_file):
256
+ print(f" 📂 [{sym}] Data Exists -> Skipping.")
257
  continue
258
 
259
+ # 1. Download Phase (Async Burst)
260
+ candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
 
 
 
 
 
 
261
 
262
+ if candles:
263
+ # 2. Processing Phase (Sequential CPU)
264
+ await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
265
  else:
266
+ print(f" Failed to download data for {sym}")
267
+
268
  gc.collect()
269
 
270
  # ==============================================================
271
+ # PHASE 2: Portfolio Digital Twin Engine (Standard)
272
  # ==============================================================
273
  @staticmethod
274
  def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
 
291
  for ts, group in grouped_by_time:
292
  active_symbols = list(wallet["positions"].keys())
293
  current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()}
294
+ # Exits
295
  for sym in active_symbols:
296
  if sym in current_prices:
297
  curr_p = current_prices[sym]
 
306
  wallet["balance"] += net_pnl
307
  del wallet["positions"][sym]
308
  wallet["trades_history"].append({'pnl': net_pnl})
309
+ # Entries
310
  current_total_equity = wallet["balance"] + wallet["allocated"]
311
  if current_total_equity > peak_balance: peak_balance = current_total_equity
312
  dd = (peak_balance - current_total_equity) / peak_balance
 
380
 
381
  current_period_files = []
382
  for f in os.listdir(CACHE_DIR):
383
+ if f.endswith('_scores.pkl') and period_id in f:
384
  current_period_files.append(os.path.join(CACHE_DIR, f))
385
 
386
  if not current_period_files:
 
398
  combinations.append({'w_titan': round(wt, 2), 'w_struct': round(ws, 2), 'thresh': round(th, 2)})
399
 
400
  final_results = []
401
+ batch_size = 100
 
402
 
403
+ for i in range(0, len(combinations), batch_size):
404
+ batch = combinations[i:i+batch_size]
405
+ res = self._worker_optimize(batch, current_period_files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
406
+ final_results.extend(res)
407
+ if i % 1000 == 0: print(f" ...Analyzed {i}/{len(combinations)} configs", flush=True)
 
 
408
 
409
  if not final_results: return None, None
410
  best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0]
 
418
  return best['config'], best
419
 
420
  async def run_strategic_optimization_task():
421
+ print("\n🧪 [STRATEGIC BACKTEST] RAM-Burst Mode Initiated...")
422
  r2 = R2Service()
423
  dm = DataManager(None, None, r2)
424
  proc = MLProcessor(dm)
425
+
426
+ await dm.initialize()
427
+ await proc.initialize()
428
+
429
  try:
430
  hub = AdaptiveHub(r2)
431
  await hub.initialize()
 
449
  await dm.close()
450
 
451
  if __name__ == "__main__":
 
 
452
  asyncio.run(run_strategic_optimization_task())