Riy777 commited on
Commit
ad148b0
·
verified ·
1 Parent(s): a0f2e90

Update backtest_engine.py

Browse files
Files changed (1) hide show
  1. backtest_engine.py +145 -137
backtest_engine.py CHANGED
@@ -1,9 +1,9 @@
1
  # ============================================================
2
- # 🧪 backtest_engine.py (V51.1 - GEM-Architect: Bug Fix)
3
  # ============================================================
4
  # التحديثات:
5
- # 1. إصلاح خطأ 'numpy.ndarray object has no attribute values'.
6
- # 2. تحسين التعامل مع المصفوفات لضمان استقرار الباكتست.
7
  # ============================================================
8
 
9
  import asyncio
@@ -17,85 +17,120 @@ import shutil
17
  import concurrent.futures
18
  from typing import Dict, Any, List
19
 
20
- # استيراد خفيف لتجنب تضارب التوازي
21
- from ml_engine.processor import SystemLimits
22
  from ml_engine.data_manager import DataManager
23
  from learning_hub.adaptive_hub import StrategyDNA
 
24
 
25
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
26
- CACHE_DIR = "backtest_cache_grid"
27
 
28
- class MassiveOptimizer:
29
- def __init__(self, data_manager):
30
  self.dm = data_manager
31
- # 3 = سريع (تجربة) | 5 = متوسط (~3000) | 10 = دقيق (~1000)
32
  self.GRID_DENSITY = 10
33
 
 
 
 
 
34
  self.TARGET_COINS = [
35
  'BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT',
36
- 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT', 'LTC/USDT',
37
- 'NEAR/USDT', 'RUNE/USDT', 'INJ/USDT', 'PEPE/USDT', 'SHIB/USDT'
38
  ]
39
 
40
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
41
- print(f"🧪 [Backtest Engine V51.1] Grid Density set to: {self.GRID_DENSITY}")
42
 
43
- async def fetch_deep_history(self):
44
- """تحميل البيانات وتجهيزها للمعالجة السريعة"""
45
- print(f"\n⏳ [Data] Pre-fetching history for Grid Search...")
 
 
 
 
 
 
46
  end_time_ms = int(time.time() * 1000)
47
- start_time_ms = end_time_ms - (14 * 24 * 60 * 60 * 1000)
 
48
 
49
  for sym in self.TARGET_COINS:
50
  safe_sym = sym.replace('/', '_')
51
- file_path = f"{CACHE_DIR}/{safe_sym}.pkl"
 
52
 
53
- if os.path.exists(file_path): continue
 
 
54
 
55
- print(f" Downloading {sym}...", end="", flush=True)
56
- try:
57
- candles = await self.dm.exchange.fetch_ohlcv(sym, '15m', since=start_time_ms, limit=1000)
58
- if candles:
59
- df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
60
- df = df.drop_duplicates(subset=['timestamp']).sort_values('timestamp')
61
- for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float)
62
-
63
- # حساب المؤشرات مسبقاً (Vectorized)
64
- df['ema50'] = df['close'].ewm(span=50).mean()
65
-
66
- # Scanner Proxies
67
- # RSI Manual Calculation for speed
68
- delta = df['close'].diff()
69
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
70
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
71
- rs = gain / loss
72
- df['rsi'] = 100 - (100 / (1 + rs))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
 
74
- # BB
75
- df['ma20'] = df['close'].rolling(20).mean()
76
- df['std20'] = df['close'].rolling(20).std()
77
- df['bb_upper'] = df['ma20'] + (df['std20'] * 2)
78
 
79
- df.to_pickle(file_path)
80
- print(" ✅")
81
- else:
82
- print(" ⚠️ Empty")
83
- except: print(" ❌ Error")
84
- await asyncio.sleep(0.5)
 
 
 
 
 
 
 
85
 
86
  # ==============================================================
87
- # 🧠 The Worker Logic (Isolated for Speed & Accuracy)
88
  # ==============================================================
89
  @staticmethod
90
- def _worker_evaluate_batch(combinations_batch, market_data_files):
91
- """
92
- يقوم هذا العامل بتقييم مجموعة من التوليفات (Batch) دفعة واحدة.
93
- """
94
  results = []
95
-
96
- # تحميل البيانات للذاكرة (يتم مرة واحدة لكل Worker)
97
  dfs = []
98
- for fp in market_data_files:
99
  try: dfs.append(pd.read_pickle(fp))
100
  except: pass
101
 
@@ -104,130 +139,105 @@ class MassiveOptimizer:
104
  total_trades = 0
105
 
106
  w_titan = config['w_titan']
107
- w_scanner = config['w_scanner']
108
  entry_thresh = config['thresh']
109
 
110
  for df in dfs:
111
- # ---------------------------------------------------
112
- # ⚡ Vectorized Signal Logic
113
- # ---------------------------------------------------
114
- # 1. Titan Score (Simulated)
115
- titan_score = np.where(df['close'] > df['ema50'], 0.9, 0.3)
116
-
117
- # 2. Scanner Score (Simulated)
118
- rsi_cond = np.where(df['rsi'] < 60, 1.0, 0.4)
119
- bb_cond = np.where(df['close'] > df['bb_upper'], 1.0, 0.0)
120
-
121
- scanner_score = (rsi_cond * 0.7) + (bb_cond * 0.3)
122
-
123
- # 3. Final Weighted Score
124
- final_score = (titan_score * w_titan) + (scanner_score * w_scanner)
125
- final_score = final_score / (w_titan + w_scanner)
126
-
127
- # 4. Generate Entries (Boolean Numpy Array)
128
- signals = (final_score > entry_thresh)
129
-
130
- # 5. Fast Loop for PnL
131
- prices = df['close'].values
132
-
133
- # 🔥 FIX: signals هو أصلاً numpy array، لا نحتاج .values
134
- sigs = signals
135
-
136
  in_pos = False
137
  entry_p = 0.0
138
 
139
- for i in range(len(prices)-1):
140
- if not in_pos and sigs[i]:
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  in_pos = True
142
- entry_p = prices[i]
143
  elif in_pos:
144
- curr = prices[i]
145
- pnl = (curr - entry_p) / entry_p
146
 
147
- if pnl > 0.03 or pnl < -0.015:
 
148
  total_pnl += pnl
149
  total_trades += 1
150
  in_pos = False
151
 
152
- if total_trades > 5:
153
  results.append({
154
  'config': config,
155
  'pnl': total_pnl,
156
  'trades': total_trades,
157
- 'score': total_pnl * np.log(total_trades)
158
  })
159
-
160
  return results
161
 
162
- # ==============================================================
163
- # 🚀 The Grid Generator
164
- # ==============================================================
165
  async def run_optimization(self):
166
- market_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith('.pkl')]
167
- if not market_files:
168
- await self.fetch_deep_history()
169
- market_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith('.pkl')]
170
 
171
- print(f"🧩 [Optimizer] Generating Grid with Density={self.GRID_DENSITY}...")
 
 
 
 
 
 
172
 
173
- w_titan_range = np.linspace(0.2, 0.9, num=self.GRID_DENSITY)
174
- w_scanner_range = np.linspace(0.1, 0.8, num=self.GRID_DENSITY)
175
  thresh_range = np.linspace(0.50, 0.80, num=self.GRID_DENSITY)
176
 
177
  combinations = []
178
  for wt, ws, th in itertools.product(w_titan_range, w_scanner_range, thresh_range):
179
- combinations.append({
180
- 'w_titan': round(float(wt), 2),
181
- 'w_scanner': round(float(ws), 2),
182
- 'thresh': round(float(th), 2)
183
- })
184
 
185
- print(f" 📊 Total Unique Combinations: {len(combinations):,}")
186
- print(f" 🚀 Est. Processing Time: {len(combinations)/2000:.1f} minutes (on parallel cores)")
187
 
188
- start_time = time.time()
189
  final_results = []
 
 
190
 
191
- batch_size = max(100, len(combinations) // (os.cpu_count() * 4))
192
- batches = [combinations[i:i + batch_size] for i in range(0, len(combinations), batch_size)]
193
-
194
- print(f" 🔥 Firing up {os.cpu_count()} CPU Cores for {len(batches)} batches...")
195
-
196
- loop = asyncio.get_running_loop()
197
  with concurrent.futures.ProcessPoolExecutor() as executor:
198
- futures = [executor.submit(self._worker_evaluate_batch, batch, market_files) for batch in batches]
199
-
200
  for future in concurrent.futures.as_completed(futures):
201
- try:
202
- res = future.result()
203
- final_results.extend(res)
204
- except Exception as e: print(f"Batch Error: {e}")
205
 
206
- elapsed = time.time() - start_time
207
- print(f"✅ Optimization Finished in {elapsed:.2f}s")
208
-
209
  if not final_results:
210
- print("⚠️ No profitable strategies found (Check Data or lowered thresholds).")
211
  return None
212
 
213
- best_result = sorted(final_results, key=lambda x: x['score'], reverse=True)[0]
214
 
215
  print("\n" + "="*60)
216
- print(f"🏆 GRAND CHAMPION (From {len(combinations):,} options):")
217
- print(f" 💰 Total Score (PnL): {best_result['pnl']:.2f}")
218
- print(f" 📊 Trades: {best_result['trades']}")
219
- print(f" 🧬 DNA: {best_result['config']}")
220
  print("="*60)
221
 
222
- return best_result['config']
223
 
224
  async def run_strategic_optimization_task():
225
- print("\n🧪 [STRATEGIC BACKTEST V51.1] Starting Massive Grid Search...")
226
- from r2 import R2Service
227
  r2 = R2Service()
228
  dm = DataManager(None, None, r2)
 
 
 
229
 
230
- optimizer = MassiveOptimizer(dm)
231
  best_config = await optimizer.run_optimization()
232
 
233
  if best_config:
@@ -235,19 +245,17 @@ async def run_strategic_optimization_task():
235
  hub = AdaptiveHub(r2)
236
  await hub.initialize()
237
 
238
- regime = "RANGE"
239
  if regime in hub.strategies:
240
- print(f"💉 Injecting new DNA into {regime} Strategy...")
241
  st = hub.strategies[regime]
242
-
243
  st.model_weights['titan'] = best_config['w_titan']
244
- # نستخدم 'patterns' لحفظ وزن الـ Scanner مؤقتاً أو كما اتفقنا سابقاً
245
  st.model_weights['patterns'] = best_config['w_scanner']
246
  st.filters['l1_min_score'] = best_config['thresh'] * 100
247
 
248
  await hub._save_state_to_r2()
249
  hub._inject_current_parameters()
250
- print("✅ [System] DNA Updated & Active.")
251
 
252
  await dm.close()
253
 
 
1
  # ============================================================
2
+ # 🧪 backtest_engine.py (V60.1 - GEM-Architect: Configurable Real-Deal)
3
  # ============================================================
4
  # التحديثات:
5
+ # 1. إضافة متغير `BACKTEST_DAYS` للتحكم السهل في مدة البيانات.
6
+ # 2. الحفاظ على المحرك الحقيقي (Real Models) بدون محاكاة.
7
  # ============================================================
8
 
9
  import asyncio
 
17
  import concurrent.futures
18
  from typing import Dict, Any, List
19
 
20
+ # استيراد المحركات الحقيقية
21
+ from ml_engine.processor import MLProcessor, SystemLimits
22
  from ml_engine.data_manager import DataManager
23
  from learning_hub.adaptive_hub import StrategyDNA
24
+ from r2 import R2Service
25
 
26
  logging.getLogger('ml_engine').setLevel(logging.WARNING)
27
+ CACHE_DIR = "backtest_real_scores"
28
 
29
+ class HeavyDutyBacktester:
30
+ def __init__(self, data_manager, processor):
31
  self.dm = data_manager
32
+ self.proc = processor
33
  self.GRID_DENSITY = 10
34
 
35
+ # 🔥🔥🔥 إعدادات التحكم في الوقت (غير هذا الرقم كما تشاء) 🔥🔥🔥
36
+ self.BACKTEST_DAYS = 7 # عدد الأيام التي سيتم فحصها
37
+ # ============================================================
38
+
39
  self.TARGET_COINS = [
40
  'BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT',
41
+ 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT'
 
42
  ]
43
 
44
  if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
45
+ print(f"🧪 [Backtest V60.1] Heavy Duty Mode (Real Models). Period: {self.BACKTEST_DAYS} Days.")
46
 
47
+ # ==============================================================
48
+ # PHASE 1: The Heavy Lift (Running Real AI Models)
49
+ # ==============================================================
50
+ async def generate_truth_data(self):
51
+ """
52
+ تشغيل النماذج الحقيقية على البيانات التاريخية وحفظ النتائج.
53
+ """
54
+ print(f"\n🚜 [Phase 1] Running REAL Models on History ({self.BACKTEST_DAYS} Days)...")
55
+
56
  end_time_ms = int(time.time() * 1000)
57
+ # استخدام المتغير السهل هنا
58
+ start_time_ms = end_time_ms - (self.BACKTEST_DAYS * 24 * 60 * 60 * 1000)
59
 
60
  for sym in self.TARGET_COINS:
61
  safe_sym = sym.replace('/', '_')
62
+ # نقوم بتضمين عدد الأيام في اسم الملف لكي لا يختلط ببيانات قديمة
63
+ scores_file = f"{CACHE_DIR}/{safe_sym}_scores_{self.BACKTEST_DAYS}d.pkl"
64
 
65
+ if os.path.exists(scores_file):
66
+ print(f" 📂 {sym} scores already computed. Skipping.")
67
+ continue
68
 
69
+ print(f" Processing {sym} with ML Engine...", end="", flush=True)
70
+
71
+ # جلب الشموع (نطلب شموع أكثر قليلاً لضمان وجود بيانات كافية للمؤشرات)
72
+ candles = await self.dm.exchange.fetch_ohlcv(sym, '15m', since=start_time_ms, limit=2000)
73
+ if not candles:
74
+ print(" No Data")
75
+ continue
76
+
77
+ df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
78
+ df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
79
+ df.set_index('datetime', inplace=True)
80
+
81
+ ai_results = []
82
+
83
+ # محاكاة التداول شمعة بشمعة (Real Inference Loop)
84
+ # نبدأ من الشمعة 100 لتوفير بيانات كافية
85
+ start_idx = 100 if len(df) > 100 else 0
86
+
87
+ for i in range(start_idx, len(df)):
88
+ # تجهيز النافذة الزمنية كما يراها المعالج في الوقت الحي
89
+ window = df.iloc[i-100:i+1]
90
+ current_price = window['close'].iloc[-1]
91
+
92
+ # تحويل البيانات لصيغة OHLCV list
93
+ ohlcv_15m = window.reset_index()[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
94
+
95
+ # تجهيز الحزمة للمعالج
96
+ raw_data = {
97
+ 'symbol': sym,
98
+ 'current_price': current_price,
99
+ 'ohlcv': {'15m': ohlcv_15m, '1h': ohlcv_15m} # استخدام 15m كبديل لـ 1h للسرعة مع النماذج
100
+ }
101
+
102
+ # 🔥 استدعاء المعالج الحقيقي (Titan + Patterns) 🔥
103
+ result = await self.proc.process_compound_signal(raw_data)
104
+
105
+ if result:
106
+ titan_real = result.get('titan_score', 0.5)
107
+ pattern_real = result.get('patterns_score', 0.5)
108
 
109
+ # استدعاء الكاشف الحقيقي (Scanner)
110
+ scanner_res = self.dm._apply_scanner_strategies(window, sym)
 
 
111
 
112
+ ai_results.append({
113
+ 'timestamp': window.index[-1],
114
+ 'close': current_price,
115
+ 'real_titan': titan_real,
116
+ 'real_pattern': pattern_real,
117
+ 'real_scanner_data': scanner_res
118
+ })
119
+
120
+ if ai_results:
121
+ pd.DataFrame(ai_results).to_pickle(scores_file)
122
+ print(f" ✅ Done ({len(ai_results)} candles)")
123
+ else:
124
+ print(" ⚠️ Empty Results")
125
 
126
  # ==============================================================
127
+ # PHASE 2: The Grid Optimizer (Fast Math on Real Scores)
128
  # ==============================================================
129
  @staticmethod
130
+ def _worker_optimize(combinations_batch, scores_files):
 
 
 
131
  results = []
 
 
132
  dfs = []
133
+ for fp in scores_files:
134
  try: dfs.append(pd.read_pickle(fp))
135
  except: pass
136
 
 
139
  total_trades = 0
140
 
141
  w_titan = config['w_titan']
142
+ w_scanner = config['w_scanner']
143
  entry_thresh = config['thresh']
144
 
145
  for df in dfs:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
146
  in_pos = False
147
  entry_p = 0.0
148
 
149
+ for idx, row in df.iterrows():
150
+ # 1. حساب Scanner Score
151
+ s_data = row['real_scanner_data']
152
+ active_cnt = sum([1 for k,v in s_data.items() if v['active']])
153
+ scanner_score = (active_cnt * 100) / 4
154
+ scanner_score /= 100.0
155
+
156
+ real_titan = row['real_titan']
157
+
158
+ # 2. المعادلة الموزونة
159
+ final_score = (real_titan * w_titan) + (scanner_score * w_scanner)
160
+ final_score /= (w_titan + w_scanner)
161
+
162
+ # 3. محاكاة الدخول
163
+ if not in_pos and final_score >= entry_thresh:
164
  in_pos = True
165
+ entry_p = row['close']
166
  elif in_pos:
167
+ # 4. محاكاة الخروج (TP/SL)
168
+ pnl = (row['close'] - entry_p) / entry_p
169
 
170
+ # شروط الخروج (يمكن تعديلها هنا أيضاً)
171
+ if pnl > 0.03 or pnl < -0.02:
172
  total_pnl += pnl
173
  total_trades += 1
174
  in_pos = False
175
 
176
+ if total_trades > 3: # تصفية النتائج القليلة جداً
177
  results.append({
178
  'config': config,
179
  'pnl': total_pnl,
180
  'trades': total_trades,
181
+ 'score': total_pnl
182
  })
 
183
  return results
184
 
 
 
 
185
  async def run_optimization(self):
186
+ # 1. تشغيل النماذج الحقيقية
187
+ await self.generate_truth_data()
 
 
188
 
189
+ # 2. تجهيز الشبكة
190
+ score_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith(f'_scores_{self.BACKTEST_DAYS}d.pkl')]
191
+ if not score_files:
192
+ print("❌ No AI scores found. Phase 1 failed?")
193
+ return
194
+
195
+ print(f"\n🧩 [Phase 2] Running Grid Search on REAL AI SCORES...")
196
 
197
+ w_titan_range = np.linspace(0.1, 0.9, num=self.GRID_DENSITY)
198
+ w_scanner_range = np.linspace(0.1, 0.9, num=self.GRID_DENSITY)
199
  thresh_range = np.linspace(0.50, 0.80, num=self.GRID_DENSITY)
200
 
201
  combinations = []
202
  for wt, ws, th in itertools.product(w_titan_range, w_scanner_range, thresh_range):
203
+ combinations.append({'w_titan': round(wt, 2), 'w_scanner': round(ws, 2), 'thresh': round(th, 2)})
 
 
 
 
204
 
205
+ print(f" 📊 Combinations: {len(combinations):,}")
 
206
 
 
207
  final_results = []
208
+ batch_size = max(50, len(combinations) // (os.cpu_count() * 2))
209
+ batches = [combinations[i:i+batch_size] for i in range(0, len(combinations), batch_size)]
210
 
 
 
 
 
 
 
211
  with concurrent.futures.ProcessPoolExecutor() as executor:
212
+ futures = [executor.submit(self._worker_optimize, batch, score_files) for batch in batches]
 
213
  for future in concurrent.futures.as_completed(futures):
214
+ try: final_results.extend(future.result())
215
+ except Exception as e: print(f"Grid Error: {e}")
 
 
216
 
 
 
 
217
  if not final_results:
218
+ print("⚠️ No profitable config found.")
219
  return None
220
 
221
+ best = sorted(final_results, key=lambda x: x['pnl'], reverse=True)[0]
222
 
223
  print("\n" + "="*60)
224
+ print(f"🏆 REAL-MODEL CHAMPION ({self.BACKTEST_DAYS} Days):")
225
+ print(f" 💰 PnL: {best['pnl']:.2f}")
226
+ print(f" 📊 Trades: {best['trades']}")
227
+ print(f" 🧬 Config: {best['config']}")
228
  print("="*60)
229
 
230
+ return best['config']
231
 
232
  async def run_strategic_optimization_task():
233
+ print("\n🧪 [STRATEGIC BACKTEST V60.1] Starting Heavy Duty Optimization...")
 
234
  r2 = R2Service()
235
  dm = DataManager(None, None, r2)
236
+ proc = MLProcessor(dm)
237
+ await dm.initialize()
238
+ await proc.initialize()
239
 
240
+ optimizer = HeavyDutyBacktester(dm, proc)
241
  best_config = await optimizer.run_optimization()
242
 
243
  if best_config:
 
245
  hub = AdaptiveHub(r2)
246
  await hub.initialize()
247
 
248
+ regime = "RANGE"
249
  if regime in hub.strategies:
250
+ print(f"💉 Injecting REAL DNA into {regime}...")
251
  st = hub.strategies[regime]
 
252
  st.model_weights['titan'] = best_config['w_titan']
 
253
  st.model_weights['patterns'] = best_config['w_scanner']
254
  st.filters['l1_min_score'] = best_config['thresh'] * 100
255
 
256
  await hub._save_state_to_r2()
257
  hub._inject_current_parameters()
258
+ print("✅ [System] DNA Updated.")
259
 
260
  await dm.close()
261