Spaces:
Running
Running
| # ============================================================ | |
| # 🧪 backtest_engine.py (V91.0 - GEM-Architect: Sync & Math Fix) | |
| # ============================================================ | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import time | |
| import logging | |
| import itertools | |
| import os | |
| import gc | |
| import sys | |
| import traceback | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List | |
| try: | |
| from ml_engine.processor import MLProcessor, SystemLimits | |
| from ml_engine.data_manager import DataManager | |
| from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub | |
| from r2 import R2Service | |
| import ccxt.async_support as ccxt | |
| except ImportError: | |
| pass | |
| logging.getLogger('ml_engine').setLevel(logging.WARNING) | |
| CACHE_DIR = "backtest_real_scores" | |
| class HeavyDutyBacktester: | |
| def __init__(self, data_manager, processor): | |
| self.dm = data_manager | |
| self.proc = processor | |
| self.GRID_DENSITY = 10 | |
| self.INITIAL_CAPITAL = 10.0 | |
| self.TRADING_FEES = 0.001 | |
| self.MAX_SLOTS = 4 | |
| # القائمة الكاملة | |
| self.TARGET_COINS = [ | |
| 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'NEAR/USDT','SHIB/USDT' | |
| ] | |
| self.force_start_date = None | |
| self.force_end_date = None | |
| if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) | |
| print(f"🧪 [Backtest V91.0] Synchronized Engine (Math Bug Fixed).") | |
| def set_date_range(self, start_str, end_str): | |
| self.force_start_date = start_str | |
| self.force_end_date = end_str | |
| def df_to_list(self, df): | |
| if df.empty: return [] | |
| return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist() | |
| # ============================================================== | |
| # ⚡ FAST DATA DOWNLOADER | |
| # ============================================================== | |
| async def _fetch_all_data_fast(self, sym, start_ms, end_ms): | |
| print(f" ⚡ [Network] Downloading {sym}...", flush=True) | |
| limit = 1000 | |
| duration_per_batch = limit * 60 * 1000 | |
| tasks = [] | |
| current = start_ms | |
| while current < end_ms: | |
| tasks.append(current) | |
| current += duration_per_batch | |
| all_candles = [] | |
| sem = asyncio.Semaphore(10) | |
| async def _fetch_batch(timestamp): | |
| async with sem: | |
| for _ in range(3): | |
| try: | |
| return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit) | |
| except Exception: | |
| await asyncio.sleep(1) | |
| return [] | |
| chunk_size = 20 | |
| for i in range(0, len(tasks), chunk_size): | |
| chunk_tasks = tasks[i:i + chunk_size] | |
| futures = [_fetch_batch(ts) for ts in chunk_tasks] | |
| results = await asyncio.gather(*futures) | |
| for res in results: | |
| if res: all_candles.extend(res) | |
| if not all_candles: return None | |
| filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms] | |
| seen = set() | |
| unique_candles = [] | |
| for c in filtered: | |
| if c[0] not in seen: | |
| unique_candles.append(c) | |
| seen.add(c[0]) | |
| unique_candles.sort(key=lambda x: x[0]) | |
| print(f" ✅ Downloaded {len(unique_candles)} candles for {sym}.", flush=True) | |
| return unique_candles | |
| # ============================================================== | |
| # 🧠 CPU PROCESSING | |
| # ============================================================== | |
| async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): | |
| safe_sym = sym.replace('/', '_') | |
| period_suffix = f"{start_ms}_{end_ms}" | |
| scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl" | |
| if os.path.exists(scores_file): | |
| print(f" 📂 [{sym}] Data Exists -> Skipping.") | |
| return | |
| print(f" ⚙️ [CPU] Processing {sym}...", flush=True) | |
| t0 = time.time() | |
| df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| cols = ['open', 'high', 'low', 'close', 'volume'] | |
| df_1m[cols] = df_1m[cols].astype('float32') | |
| df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') | |
| df_1m.set_index('datetime', inplace=True) | |
| df_1m = df_1m.sort_index() | |
| frames = {} | |
| agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} | |
| frames['1m'] = df_1m.copy() | |
| # تقريب التوقيت للدقيقة لضمان التزامن لاحقاً | |
| frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6 | |
| for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]: | |
| frames[tf_str] = df_1m.resample(tf_code).agg(agg_dict).dropna() | |
| frames[tf_str]['timestamp'] = frames[tf_str].index.astype(np.int64) // 10**6 | |
| ai_results = [] | |
| start_analysis_dt = df_1m.index[0] + pd.Timedelta(minutes=500) | |
| valid_indices = frames['5m'].loc[start_analysis_dt:].index | |
| for t_idx in valid_indices: | |
| ohlcv_data = {} | |
| try: | |
| cutoff = t_idx | |
| ohlcv_data['1m'] = self.df_to_list(frames['1m'].loc[:cutoff].tail(500)) | |
| ohlcv_data['5m'] = self.df_to_list(frames['5m'].loc[:cutoff].tail(200)) | |
| ohlcv_data['15m'] = self.df_to_list(frames['15m'].loc[:cutoff].tail(200)) | |
| ohlcv_data['1h'] = self.df_to_list(frames['1h'].loc[:cutoff].tail(200)) | |
| ohlcv_data['4h'] = self.df_to_list(frames['4h'].loc[:cutoff].tail(100)) | |
| ohlcv_data['1d'] = self.df_to_list(frames['1d'].loc[:cutoff].tail(50)) | |
| except: continue | |
| if len(ohlcv_data['1h']) < 60: continue | |
| current_price = frames['5m'].loc[t_idx]['close'] | |
| logic_packet = { | |
| 'symbol': sym, | |
| 'ohlcv_1h': ohlcv_data['1h'][-60:], | |
| 'ohlcv_15m': ohlcv_data['15m'][-60:], | |
| 'change_24h': 0.0 | |
| } | |
| try: | |
| if len(ohlcv_data['1h']) >= 24: | |
| p_now = ohlcv_data['1h'][-1][4] | |
| p_old = ohlcv_data['1h'][-24][4] | |
| logic_packet['change_24h'] = ((p_now - p_old) / p_old) * 100 | |
| except: pass | |
| logic_result = self.dm._apply_logic_tree(logic_packet) | |
| signal_type = logic_result.get('type', 'NONE') | |
| l1_score = logic_result.get('score', 0.0) | |
| real_titan = 0.5 | |
| if signal_type in ['BREAKOUT', 'REVERSAL']: | |
| raw_data_for_proc = {'symbol': sym, 'ohlcv': ohlcv_data, 'current_price': current_price} | |
| try: | |
| proc_res = await self.proc.process_compound_signal(raw_data_for_proc) | |
| if proc_res: real_titan = proc_res.get('titan_score', 0.5) | |
| except: pass | |
| # ✅ حفظ timestamp بوحدة الدقيقة الموحدة لضمان التزامن | |
| ts_aligned = int(t_idx.timestamp() // 60) * 60 * 1000 | |
| ai_results.append({ | |
| 'timestamp': ts_aligned, | |
| 'symbol': sym, | |
| 'close': current_price, | |
| 'real_titan': real_titan, | |
| 'signal_type': signal_type, | |
| 'l1_score': l1_score | |
| }) | |
| dt = time.time() - t0 | |
| if ai_results: | |
| pd.DataFrame(ai_results).to_pickle(scores_file) | |
| print(f" 💾 [{sym}] Saved {len(ai_results)} signals. (Processed in {dt:.1f}s)", flush=True) | |
| else: | |
| print(f" ⚠️ [{sym}] No signals found.", flush=True) | |
| del frames, df_1m, candles | |
| gc.collect() | |
| # ============================================================== | |
| # PHASE 1: Main Loop | |
| # ============================================================== | |
| async def generate_truth_data(self): | |
| if self.force_start_date and self.force_end_date: | |
| dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| start_time_ms = int(dt_start.timestamp() * 1000) | |
| end_time_ms = int(dt_end.timestamp() * 1000) | |
| print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") | |
| else: | |
| return | |
| for sym in self.TARGET_COINS: | |
| try: | |
| candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms) | |
| if candles: | |
| await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms) | |
| else: | |
| print(f" ❌ Failed/Empty data for {sym}.", flush=True) | |
| except Exception as e: | |
| print(f" ❌ SKIP {sym}: {e}", flush=True) | |
| continue | |
| gc.collect() | |
| # ============================================================== | |
| # PHASE 2: Portfolio Digital Twin Engine (✅ FIX: MATH & SYNC) | |
| # ============================================================== | |
| def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): | |
| results = [] | |
| all_data = [] | |
| # 1. Load All Data | |
| for fp in scores_files: | |
| try: | |
| df = pd.read_pickle(fp) | |
| if not df.empty: all_data.append(df) | |
| except: pass | |
| if not all_data: return [] | |
| # 2. Merge and Align | |
| global_df = pd.concat(all_data) | |
| # ✅ FIX: Pivot Data for Perfect Alignment (Time x Symbol) | |
| # نحتاج هيكلة تسمح لنا بمعرفة سعر كل العملات في كل دقيقة حتى لو لم يكن هناك إشارة | |
| # ملاحظة: ملفات scores تحتوي فقط على الإشارات. لإدارة الخروج نحتاج أسعار مستمرة. | |
| # الحل الوسط: نستخدم أسعار الإشارات المتاحة، ونفترض ثبات السعر (Forward Fill) عند الفجوات البسيطة | |
| # أو نعتمد على أن الإشارة تتكرر. | |
| # الأفضل: التجميع الزمني الموحد. | |
| global_df.sort_values('timestamp', inplace=True) | |
| grouped_by_time = global_df.groupby('timestamp') | |
| for config in combinations_batch: | |
| wallet = { "balance": initial_capital, "allocated": 0.0, "positions": {}, "trades_history": [] } | |
| w_titan = config['w_titan']; w_struct = config['w_struct']; entry_thresh = config['thresh'] | |
| peak_balance = initial_capital; max_drawdown = 0.0 | |
| for ts, group in grouped_by_time: | |
| active_symbols = list(wallet["positions"].keys()) | |
| # إنشاء قاموس أسعار لهذه اللحظة | |
| current_prices = {row['symbol']: row['close'] for _, row in group.iterrows()} | |
| # --- 1. Exit Logic --- | |
| # ✅ FIX: Handle missing prices (Partial Sync) | |
| # إذا العملة المفتوحة غير موجودة في بيانات هذه الدقيقة (لأنها لم تعط إشارة)، | |
| # لا يمكننا فحص الخروج. ننتظر الدقيقة التالية التي تظهر فيها. | |
| for sym in active_symbols: | |
| if sym in current_prices: | |
| curr_p = current_prices[sym] | |
| pos = wallet["positions"][sym] | |
| entry_p = pos['entry_price'] | |
| pct_change = (curr_p - entry_p) / entry_p | |
| # شروط الخروج | |
| if pct_change >= 0.03 or pct_change <= -0.02: | |
| gross_pnl = pos['size_usd'] * pct_change | |
| fees = pos['size_usd'] * fees_pct * 2 | |
| net_pnl = gross_pnl - fees # هذا هو الربح/الخسارة الصافي | |
| wallet["allocated"] -= pos['size_usd'] | |
| # 🔥🔥 FATAL MATH BUG FIXED HERE 🔥🔥 | |
| # القديم: wallet["balance"] += net_pnl (كارثة!) | |
| # الجديد: نرجع رأس المال + الربح | |
| return_amount = pos['size_usd'] + net_pnl | |
| wallet["balance"] += return_amount | |
| del wallet["positions"][sym] | |
| wallet["trades_history"].append({'pnl': net_pnl}) | |
| # --- Update Stats --- | |
| current_total_equity = wallet["balance"] + wallet["allocated"] | |
| if current_total_equity > peak_balance: peak_balance = current_total_equity | |
| dd = (peak_balance - current_total_equity) / peak_balance | |
| if dd > max_drawdown: max_drawdown = dd | |
| # --- 2. Entry Logic (Sniper Priority) --- | |
| if len(wallet["positions"]) < max_slots: | |
| free_capital = wallet["balance"] | |
| slots_left = max_slots - len(wallet["positions"]) | |
| if slots_left > 0 and free_capital > 2.0: | |
| candidates = [] | |
| for _, row in group.iterrows(): | |
| sym = row['symbol'] | |
| if sym in wallet["positions"]: continue | |
| sig_type = row['signal_type'] | |
| l1_raw_score = row['l1_score'] | |
| real_titan = row['real_titan'] | |
| norm_struct = 0.0 | |
| if sig_type == 'BREAKOUT': norm_struct = min(1.0, l1_raw_score / 3.0) | |
| elif sig_type == 'REVERSAL': norm_struct = l1_raw_score / 100.0 | |
| score = 0.0 | |
| if (w_titan + w_struct) > 0: | |
| score = ((real_titan * w_titan) + (norm_struct * w_struct)) / (w_titan + w_struct) | |
| if score >= entry_thresh: | |
| candidates.append({ | |
| 'symbol': sym, | |
| 'score': score, | |
| 'price': row['close'] | |
| }) | |
| # ترتيب حسب القوة (Sniper Mode) | |
| candidates.sort(key=lambda x: x['score'], reverse=True) | |
| for cand in candidates[:slots_left]: | |
| # إدارة رأس المال | |
| position_size = wallet["balance"] / max_slots | |
| curr_slots_open = len(wallet["positions"]) | |
| curr_slots_left = max_slots - curr_slots_open | |
| # إذا الرصيد قليل، نوزع المتبقي على الخانات المتبقية بالتساوي | |
| if wallet["balance"] < 20.0 and curr_slots_left > 0: | |
| position_size = wallet["balance"] / curr_slots_left | |
| position_size = min(position_size, wallet["balance"]) | |
| if position_size > 2.0: | |
| wallet["positions"][cand['symbol']] = {'entry_price': cand['price'], 'size_usd': position_size} | |
| wallet["allocated"] += position_size | |
| wallet["balance"] -= position_size | |
| if wallet["balance"] < 1.0 and len(wallet["positions"]) == 0: break | |
| trades = wallet["trades_history"] | |
| if trades: | |
| # حساب الرصيد النهائي الصحيح | |
| # ملاحظة: allocated هنا يجب أن يكون صفراً إذا أغلقت كل الصفقات | |
| # أو يمثل قيمة الدخول للصفقات المفتوحة | |
| final_equity = wallet["balance"] + wallet["allocated"] | |
| net_profit = final_equity - initial_capital | |
| pnls = [t['pnl'] for t in trades] | |
| win_count = len([p for p in pnls if p > 0]); loss_count = len([p for p in pnls if p <= 0]) | |
| win_rate = (win_count / len(trades)) * 100 | |
| max_single_win = max(pnls) if pnls else 0.0; max_single_loss = min(pnls) if pnls else 0.0 | |
| current_win_streak = 0; max_win_streak = 0 | |
| current_loss_streak = 0; max_loss_streak = 0 | |
| for p in pnls: | |
| if p > 0: | |
| current_win_streak += 1; current_loss_streak = 0 | |
| if current_win_streak > max_win_streak: max_win_streak = current_win_streak | |
| else: | |
| current_loss_streak += 1; current_win_streak = 0 | |
| if current_loss_streak > max_loss_streak: max_loss_streak = current_loss_streak | |
| results.append({ | |
| 'config': config, 'final_balance': final_equity, | |
| 'net_profit': net_profit, 'total_trades': len(trades), | |
| 'win_count': win_count, 'loss_count': loss_count, 'win_rate': win_rate, | |
| 'max_single_win': max_single_win, 'max_single_loss': max_single_loss, | |
| 'max_win_streak': max_win_streak, 'max_loss_streak': max_loss_streak, | |
| 'max_drawdown': max_drawdown * 100 | |
| }) | |
| else: | |
| results.append({ | |
| 'config': config, 'final_balance': initial_capital, 'net_profit': 0.0, | |
| 'total_trades': 0, 'win_count': 0, 'loss_count': 0, 'win_rate': 0.0, | |
| 'max_single_win': 0.0, 'max_single_loss': 0.0, 'max_win_streak': 0, | |
| 'max_loss_streak': 0, 'max_drawdown': 0.0 | |
| }) | |
| return results | |
| async def run_optimization(self, target_regime="RANGE"): | |
| await self.generate_truth_data() | |
| start_ts = int(datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() * 1000) | |
| end_ts = int(datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() * 1000) | |
| period_id = f"{start_ts}_{end_ts}" | |
| current_period_files = [] | |
| for f in os.listdir(CACHE_DIR): | |
| if f.endswith('_scores.pkl') and period_id in f: | |
| current_period_files.append(os.path.join(CACHE_DIR, f)) | |
| if not current_period_files: | |
| print(f"❌ No combined signal data found for {target_regime}.") | |
| return None, None | |
| print(f"\n🧩 [Phase 2] Optimizing for {target_regime}...") | |
| w_titan_range = np.linspace(0.4, 0.9, num=self.GRID_DENSITY) | |
| w_struct_range = np.linspace(0.1, 0.6, num=self.GRID_DENSITY) | |
| thresh_range = np.linspace(0.20, 0.60, num=self.GRID_DENSITY) | |
| combinations = [] | |
| for wt, ws, th in itertools.product(w_titan_range, w_struct_range, thresh_range): | |
| if 0.9 <= (wt + ws) <= 1.1: | |
| combinations.append({'w_titan': round(wt, 2), 'w_struct': round(ws, 2), 'thresh': round(th, 2)}) | |
| final_results = [] | |
| batch_size = 100 | |
| for i in range(0, len(combinations), batch_size): | |
| batch = combinations[i:i+batch_size] | |
| res = self._worker_optimize(batch, current_period_files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) | |
| final_results.extend(res) | |
| if i % 1000 == 0: print(f" ...Analyzed {i}/{len(combinations)} configs", flush=True) | |
| if not final_results: return None, None | |
| best = sorted(final_results, key=lambda x: x['final_balance'], reverse=True)[0] | |
| print("\n" + "="*60) | |
| print(f"🏆 CHAMPION REPORT [{target_regime}]:") | |
| print(f" 📅 Period: {self.force_start_date} -> {self.force_end_date}") | |
| print(f" 💰 Final Balance: ${best['final_balance']:,.2f}") | |
| print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}") | |
| print("-" * 60) | |
| print(f" 📊 Total Trades: {best['total_trades']}") | |
| print(f" ✅ Winning Trades: {best['win_count']}") | |
| print(f" ❌ Losing Trades: {best['loss_count']}") | |
| print(f" 📈 Win Rate: {best['win_rate']:.1f}%") | |
| print("-" * 60) | |
| print(f" 🟢 Max Single Win: ${best['max_single_win']:.2f}") | |
| print(f" 🔴 Max Single Loss: ${best['max_single_loss']:.2f}") | |
| print(f" 🔥 Max Win Streak: {best['max_win_streak']} trades") | |
| print(f" 🧊 Max Loss Streak: {best['max_loss_streak']} trades") | |
| print(f" 📉 Max Drawdown: {best['max_drawdown']:.1f}%") | |
| print("-" * 60) | |
| print(f" ⚙️ Config: Titan={best['config']['w_titan']} | Struct={best['config']['w_struct']} | Thresh={best['config']['thresh']}") | |
| print("="*60) | |
| return best['config'], best | |
| async def run_strategic_optimization_task(): | |
| print("\n🧪 [STRATEGIC BACKTEST] Mass-Scale Edition Initiated...") | |
| r2 = R2Service() | |
| dm = DataManager(None, None, r2) | |
| proc = MLProcessor(dm) | |
| await dm.initialize() | |
| await proc.initialize() | |
| try: | |
| hub = AdaptiveHub(r2) | |
| await hub.initialize() | |
| scenarios = [ | |
| {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, | |
| {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, | |
| {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, | |
| {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"} | |
| ] | |
| optimizer = HeavyDutyBacktester(dm, proc) | |
| for scen in scenarios: | |
| target = scen["regime"] | |
| optimizer.set_date_range(scen["start"], scen["end"]) | |
| best_config, best_stats = await optimizer.run_optimization(target_regime=target) | |
| if best_config and best_stats: | |
| hub.submit_challenger(target, best_config, best_stats) | |
| await hub._save_state_to_r2() | |
| hub._inject_current_parameters() | |
| print(f"✅ [System] ALL DNA Updated & Saved Successfully.") | |
| finally: | |
| await dm.close() | |
| if __name__ == "__main__": | |
| asyncio.run(run_strategic_optimization_task()) |