Spaces:
Paused
Paused
| # ============================================================ | |
| # 🧪 backtest_engine.py (V118.5 - GEM-Architect: Hyper-Vectorized) | |
| # ============================================================ | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import pandas_ta as ta | |
| import time | |
| import logging | |
| import itertools | |
| import os | |
| import glob | |
| import gc | |
| import sys | |
| import traceback | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List | |
| # ✅ استيراد المحركات الأساسية | |
| try: | |
| from ml_engine.processor import MLProcessor, SystemLimits | |
| from ml_engine.data_manager import DataManager | |
| from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub | |
| from r2 import R2Service | |
| import ccxt.async_support as ccxt | |
| import xgboost as xgb | |
| except ImportError: | |
| print("❌ [Import Error] Critical ML modules missing.") | |
| pass | |
| logging.getLogger('ml_engine').setLevel(logging.WARNING) | |
| CACHE_DIR = "backtest_real_scores" | |
| class HeavyDutyBacktester: | |
| def __init__(self, data_manager, processor): | |
| self.dm = data_manager | |
| self.proc = processor | |
| # 🎛️ كثافة شبكة البحث | |
| self.GRID_DENSITY = 3 | |
| # إعدادات المحفظة | |
| self.INITIAL_CAPITAL = 10.0 | |
| self.TRADING_FEES = 0.001 | |
| self.MAX_SLOTS = 4 | |
| self.TARGET_COINS = [ | |
| 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT' | |
| ] | |
| self.force_start_date = None | |
| self.force_end_date = None | |
| # 🔥 تنظيف الكاش 🔥 | |
| if os.path.exists(CACHE_DIR): | |
| files = glob.glob(os.path.join(CACHE_DIR, "*")) | |
| print(f"🧹 [System] Flushing Cache: Deleting {len(files)} old files...", flush=True) | |
| for f in files: | |
| try: os.remove(f) | |
| except: pass | |
| else: | |
| os.makedirs(CACHE_DIR) | |
| print(f"🧪 [Backtest V118.5] Hyper-Vectorized Mode. Models: {self._check_models_status()}") | |
| def _check_models_status(self): | |
| status = [] | |
| if self.proc.titan: status.append("Titan") | |
| if self.proc.oracle and getattr(self.proc.oracle, 'model_direction', None): status.append("Oracle") | |
| if self.proc.sniper and getattr(self.proc.sniper, 'models', None): status.append("Sniper") | |
| if self.proc.guardian_hydra: status.append("Hydra") | |
| return "+".join(status) if status else "None" | |
| def set_date_range(self, start_str, end_str): | |
| self.force_start_date = start_str | |
| self.force_end_date = end_str | |
| # ============================================================== | |
| # ⚡ FAST DATA DOWNLOADER | |
| # ============================================================== | |
| async def _fetch_all_data_fast(self, sym, start_ms, end_ms): | |
| print(f" ⚡ [Network] Downloading {sym}...", flush=True) | |
| limit = 1000 | |
| duration_per_batch = limit * 60 * 1000 | |
| tasks = [] | |
| current = start_ms | |
| while current < end_ms: | |
| tasks.append(current) | |
| current += duration_per_batch | |
| all_candles = [] | |
| sem = asyncio.Semaphore(10) | |
| async def _fetch_batch(timestamp): | |
| async with sem: | |
| for _ in range(3): | |
| try: | |
| return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit) | |
| except: await asyncio.sleep(1) | |
| return [] | |
| chunk_size = 20 | |
| for i in range(0, len(tasks), chunk_size): | |
| chunk_tasks = tasks[i:i + chunk_size] | |
| futures = [_fetch_batch(ts) for ts in chunk_tasks] | |
| results = await asyncio.gather(*futures) | |
| for res in results: | |
| if res: all_candles.extend(res) | |
| if not all_candles: return None | |
| filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms] | |
| seen = set(); unique_candles = [] | |
| for c in filtered: | |
| if c[0] not in seen: | |
| unique_candles.append(c) | |
| seen.add(c[0]) | |
| unique_candles.sort(key=lambda x: x[0]) | |
| print(f" ✅ Downloaded {len(unique_candles)} candles.", flush=True) | |
| return unique_candles | |
| # ============================================================== | |
| # 🏎️ VECTORIZED INDICATORS (Robust) | |
| # ============================================================== | |
| def _calculate_indicators_vectorized(self, df, timeframe='1m'): | |
| for col in ['close', 'high', 'low', 'volume', 'open']: | |
| df[col] = df[col].astype(float) | |
| df['rsi'] = ta.rsi(df['close'], length=14) | |
| df['ema20'] = ta.ema(df['close'], length=20) | |
| df['ema50'] = ta.ema(df['close'], length=50) | |
| df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14) | |
| # Global calc | |
| df['vol_ma50'] = df['volume'].rolling(50).mean() | |
| df['rel_vol'] = df['volume'] / (df['vol_ma50'] + 1e-9) | |
| if timeframe in ['1m', '5m', '15m']: | |
| sma20 = df['close'].rolling(20).mean() | |
| std20 = df['close'].rolling(20).std() | |
| df['bb_width'] = ((sma20 + 2*std20) - (sma20 - 2*std20)) / sma20 | |
| vol_mean = df['volume'].rolling(20).mean() | |
| vol_std = df['volume'].rolling(20).std() | |
| df['vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9) | |
| df['atr_pct'] = df['atr'] / df['close'] | |
| # L1 Score | |
| rsi_penalty = np.where(df['rsi'] > 70, (df['rsi'] - 70) * 2, 0) | |
| l1_score_raw = (df['rel_vol'] * 10) + (df['atr_pct'] * 1000) - rsi_penalty | |
| df['l1_score'] = l1_score_raw.fillna(0) | |
| if timeframe == '1m': | |
| df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) | |
| df['ret'] = df['close'].pct_change() | |
| roll_max = df['high'].rolling(50).max() | |
| roll_min = df['low'].rolling(50).min() | |
| diff = (roll_max - roll_min).replace(0, 1e-9) | |
| df['fib_pos'] = (df['close'] - roll_min) / diff | |
| df['volatility'] = df['atr'] / df['close'] | |
| df['trend_slope'] = (df['ema20'] - df['ema20'].shift(5)) / df['ema20'].shift(5) | |
| for lag in [1, 2, 3, 5, 10, 20]: | |
| df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0) | |
| df[f'rsi_lag_{lag}'] = (df['rsi'].shift(lag).fillna(50) / 100.0) | |
| df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5) | |
| df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0) | |
| r = df['volume'].rolling(500).mean() | |
| s = df['volume'].rolling(500).std() | |
| df['vol_zscore_50'] = ((df['volume'] - r) / s).fillna(0) | |
| df.fillna(0, inplace=True) | |
| return df | |
| # ============================================================== | |
| # 🧠 CPU PROCESSING (HYPER-VECTORIZED) | |
| # ============================================================== | |
| async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): | |
| safe_sym = sym.replace('/', '_') | |
| period_suffix = f"{start_ms}_{end_ms}" | |
| scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl" | |
| if os.path.exists(scores_file): | |
| print(f" 📂 [{sym}] Data Exists -> Skipping.") | |
| return | |
| print(f" ⚙️ [CPU] Analyzing {sym} (Hyper-Vectorized Mode)...", flush=True) | |
| t0 = time.time() | |
| # 1. Data Prep | |
| df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') | |
| df_1m.set_index('datetime', inplace=True) | |
| df_1m = df_1m.sort_index() | |
| frames = {} | |
| agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} | |
| frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m') | |
| frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6 | |
| fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns} | |
| numpy_htf = {} | |
| for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]: | |
| resampled = df_1m.resample(tf_code).agg(agg_dict).dropna() | |
| resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str) | |
| resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6 | |
| frames[tf_str] = resampled | |
| numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} | |
| # 2. Time Alignment (Vectorized) | |
| map_1m_to_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['1h']['timestamp'])-1) | |
| map_1m_to_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['5m']['timestamp'])-1) | |
| map_1m_to_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['15m']['timestamp'])-1) | |
| map_1m_to_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['4h']['timestamp'])-1) | |
| # 3. Model Access | |
| oracle_dir_model = getattr(self.proc.oracle, 'model_direction', None) | |
| sniper_models = getattr(self.proc.sniper, 'models', []) | |
| hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {} | |
| legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None) | |
| # 4. 🔥 Pre-Calc Legacy V2 (Vectorized) 🔥 | |
| global_v2_probs = np.zeros(len(fast_1m['close'])) | |
| if legacy_v2: | |
| try: | |
| # Direct array construction | |
| l_log = fast_1m['log_ret'] | |
| l_rsi = fast_1m['rsi'] / 100.0 | |
| l_fib = fast_1m['fib_pos'] | |
| l_vol = fast_1m['volatility'] | |
| l5_log = numpy_htf['5m']['log_ret'][map_1m_to_5m] | |
| l5_rsi = numpy_htf['5m']['rsi'][map_1m_to_5m] / 100.0 | |
| l5_fib = numpy_htf['5m']['fib_pos'][map_1m_to_5m] | |
| l5_trd = numpy_htf['5m']['trend_slope'][map_1m_to_5m] | |
| l15_log = numpy_htf['15m']['log_ret'][map_1m_to_15m] | |
| l15_rsi = numpy_htf['15m']['rsi'][map_1m_to_15m] / 100.0 | |
| l15_fib618 = numpy_htf['15m']['dist_fib618'][map_1m_to_15m] | |
| l15_trd = numpy_htf['15m']['trend_slope'][map_1m_to_15m] | |
| lag_cols = [] | |
| for lag in [1, 2, 3, 5, 10, 20]: | |
| lag_cols.extend([ | |
| fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], | |
| fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}'] | |
| ]) | |
| X_GLOBAL_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lag_cols]) | |
| global_v2_probs = legacy_v2.predict(xgb.DMatrix(X_GLOBAL_V2)) | |
| if len(global_v2_probs.shape) > 1: global_v2_probs = global_v2_probs[:, 2] | |
| except: pass | |
| # 5. 🔥 Pre-Assemble Hydra Static 🔥 | |
| global_hydra_static = None | |
| if hydra_models: | |
| try: | |
| h_rsi_1m = fast_1m['rsi'] | |
| h_rsi_5m = numpy_htf['5m']['rsi'][map_1m_to_5m] | |
| h_rsi_15m = numpy_htf['15m']['rsi'][map_1m_to_15m] | |
| h_bb = fast_1m['bb_width'] | |
| h_vol = fast_1m['rel_vol'] | |
| h_atr = fast_1m['atr'] | |
| h_close = fast_1m['close'] | |
| global_hydra_static = np.column_stack([h_rsi_1m, h_rsi_5m, h_rsi_15m, h_bb, h_vol, h_atr, h_close]) | |
| except: pass | |
| # 6. Candidate Filtering | |
| valid_indices_mask = fast_1m['l1_score'] >= 5.0 | |
| valid_indices = np.where(valid_indices_mask)[0] | |
| # Skip warmup and tail | |
| mask_bounds = (valid_indices > 500) & (valid_indices < len(fast_1m['close']) - 245) | |
| final_valid_indices = valid_indices[mask_bounds] | |
| print(f" 🎯 Raw Candidates (Score > 5): {len(final_valid_indices)}. Vectorized Scoring...", flush=True) | |
| # 🚀 HYPER-VECTORIZATION START 🚀 | |
| # Instead of looping, we construct the BIG matrices for all candidates at once. | |
| # This brings speed back to ~60s | |
| num_candidates = len(final_valid_indices) | |
| if num_candidates == 0: return | |
| # --- A. ORACLE MATRIX CONSTRUCTION --- | |
| oracle_preds = np.full(num_candidates, 0.5) | |
| if oracle_dir_model: | |
| try: | |
| # Mapped Indices for all candidates | |
| idx_1h = map_1m_to_1h[final_valid_indices] | |
| idx_15m = map_1m_to_15m[final_valid_indices] | |
| idx_4h = map_1m_to_4h[final_valid_indices] | |
| titan_scores = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95) | |
| oracle_features = [] | |
| for col in getattr(self.proc.oracle, 'feature_cols', []): | |
| if col.startswith('1h_'): | |
| c = col[3:] | |
| oracle_features.append(numpy_htf['1h'][c][idx_1h] if c in numpy_htf['1h'] else np.zeros(num_candidates)) | |
| elif col.startswith('15m_'): | |
| c = col[4:] | |
| oracle_features.append(numpy_htf['15m'][c][idx_15m] if c in numpy_htf['15m'] else np.zeros(num_candidates)) | |
| elif col.startswith('4h_'): | |
| c = col[3:] | |
| oracle_features.append(numpy_htf['4h'][c][idx_4h] if c in numpy_htf['4h'] else np.zeros(num_candidates)) | |
| elif col == 'sim_titan_score': oracle_features.append(titan_scores) | |
| elif col == 'sim_mc_score': oracle_features.append(np.full(num_candidates, 0.5)) | |
| elif col == 'sim_pattern_score': oracle_features.append(np.full(num_candidates, 0.5)) | |
| else: oracle_features.append(np.zeros(num_candidates)) | |
| X_oracle_big = np.column_stack(oracle_features) | |
| preds = oracle_dir_model.predict(X_oracle_big) | |
| # Handle output shape | |
| if len(preds.shape) > 1 and preds.shape[1] > 1: | |
| oracle_preds = preds[:, 1] # Prob of Class 1 | |
| else: | |
| oracle_preds = preds.flatten() | |
| # If model outputs 0/1 class, we might need proba. Assuming predict gives prob or class. | |
| # Adjust if simple XGB classifier gives 0/1. For backtest, assume regression or proba. | |
| except Exception as e: print(f"Oracle Error: {e}") | |
| # --- B. SNIPER MATRIX CONSTRUCTION --- | |
| sniper_preds = np.full(num_candidates, 0.5) | |
| if sniper_models: | |
| try: | |
| sniper_features = [] | |
| for col in getattr(self.proc.sniper, 'feature_names', []): | |
| if col in fast_1m: sniper_features.append(fast_1m[col][final_valid_indices]) | |
| elif col == 'L_score': sniper_features.append(fast_1m.get('vol_zscore_50', np.zeros(len(fast_1m['close'])))[final_valid_indices]) | |
| else: sniper_features.append(np.zeros(num_candidates)) | |
| X_sniper_big = np.column_stack(sniper_features) | |
| # Ensemble Average | |
| preds_list = [m.predict(X_sniper_big) for m in sniper_models] | |
| sniper_preds = np.mean(preds_list, axis=0) | |
| except Exception as e: print(f"Sniper Error: {e}") | |
| # --- C. HYDRA MATRIX CONSTRUCTION (The Heavy One) --- | |
| hydra_risk_preds = np.zeros(num_candidates) | |
| hydra_time_preds = np.zeros(num_candidates, dtype=int) | |
| # Hydra is sequence-based (window of 240). Vectorizing this is tricky without exploding memory. | |
| # We will iterate but ONLY for prediction input construction, which is lighter than full logic. | |
| # Actually, for 95k candidates, a (95000, 240, features) array is huge. | |
| # We MUST batch Hydra. But efficiently. | |
| if hydra_models and global_hydra_static is not None: | |
| # We process in chunks of 5000 to keep memory sane | |
| chunk_size = 5000 | |
| for i in range(0, num_candidates, chunk_size): | |
| chunk_indices = final_valid_indices[i : i + chunk_size] | |
| # Build batch X | |
| batch_X = [] | |
| valid_batch_indices = [] # Map back to chunk index | |
| for k, idx in enumerate(chunk_indices): | |
| start = idx + 1 | |
| end = start + 240 | |
| # Quick slice | |
| sl_static = global_hydra_static[start:end] | |
| entry_p = fast_1m['close'][idx] | |
| sl_close = sl_static[:, 6] | |
| sl_atr = sl_static[:, 5] | |
| sl_dist = np.maximum(1.5 * sl_atr, entry_p * 0.015) | |
| sl_pnl = sl_close - entry_p | |
| sl_norm_pnl = sl_pnl / sl_dist | |
| # Accumulate max - vectorized for the window | |
| sl_cum_max = np.maximum.accumulate(sl_close) | |
| sl_cum_max = np.maximum(sl_cum_max, entry_p) | |
| sl_max_pnl_r = (sl_cum_max - entry_p) / sl_dist | |
| sl_atr_pct = sl_atr / sl_close | |
| # Static cols | |
| zeros = np.zeros(240); ones = np.ones(240) | |
| row = np.column_stack([ | |
| sl_static[:, 0], sl_static[:, 1], sl_static[:, 2], | |
| sl_static[:, 3], sl_static[:, 4], | |
| zeros, sl_atr_pct, sl_norm_pnl, sl_max_pnl_r, | |
| zeros, zeros, time_vec, | |
| zeros, ones*0.6, ones*0.7, ones*3.0 | |
| ]) | |
| batch_X.append(row) | |
| valid_batch_indices.append(i + k) # Global index in final_valid_indices | |
| if batch_X: | |
| try: | |
| big_X = np.array(batch_X) # Shape: (Batch, 240, Feats) | |
| # Flatten for 2D model if needed, or keeping 3D depending on Hydra. | |
| # Assuming Hydra uses 2D input (stacking windows): | |
| big_X_flat = big_X.reshape(-1, big_X.shape[-1]) | |
| preds_flat = hydra_models['crash'].predict_proba(big_X_flat)[:, 1] | |
| # Reshape back to (Batch, 240) | |
| preds_batch = preds_flat.reshape(len(batch_X), 240) | |
| # Extract Max Risk & Time | |
| batch_max_risk = np.max(preds_batch, axis=1) | |
| # Find first index > thresh (0.6) for time | |
| over_thresh = preds_batch > 0.6 | |
| # argmax gives first True index | |
| has_crash = over_thresh.any(axis=1) | |
| crash_times_rel = np.argmax(over_thresh, axis=1) | |
| # Map back to global results | |
| for j, glob_idx in enumerate(valid_batch_indices): | |
| hydra_risk_preds[glob_idx] = batch_max_risk[j] | |
| if has_crash[j]: | |
| # Calc absolute timestamp | |
| start_t_idx = final_valid_indices[glob_idx] + 1 | |
| abs_time = fast_1m['timestamp'][start_t_idx + crash_times_rel[j]] | |
| hydra_time_preds[glob_idx] = abs_time | |
| except Exception: pass | |
| # --- D. LEGACY V2 MAPPING --- | |
| legacy_risk_preds = np.zeros(num_candidates) | |
| legacy_time_preds = np.zeros(num_candidates, dtype=int) | |
| if legacy_v2: | |
| # Vectorized mapping logic | |
| # For each candidate at idx, scan global_v2_probs[idx+1 : idx+241] | |
| # This is a sliding window max. Can be slow if looped. | |
| # Fast approx: Check max just for the entry? No, need lookahead. | |
| # We loop simply because it's fast scalar lookups. | |
| for k, idx in enumerate(final_valid_indices): | |
| start = idx + 1 | |
| if start + 240 < len(global_v2_probs): | |
| window = global_v2_probs[start : start + 240] | |
| legacy_risk_preds[k] = np.max(window) | |
| # Time logic can be added if needed, sticking to max risk for now | |
| # --- E. CONSTRUCT FINAL DATAFRAME --- | |
| # Titan Proxy | |
| titan_scores_final = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95) | |
| l1_scores_final = fast_1m['l1_score'][final_valid_indices] | |
| timestamps_final = fast_1m['timestamp'][final_valid_indices] | |
| closes_final = fast_1m['close'][final_valid_indices] | |
| ai_df = pd.DataFrame({ | |
| 'timestamp': timestamps_final, | |
| 'symbol': sym, | |
| 'close': closes_final, | |
| 'real_titan': titan_scores_final, | |
| 'oracle_conf': oracle_preds, | |
| 'sniper_score': sniper_preds, | |
| 'l1_score': l1_scores_final, | |
| 'risk_hydra_crash': hydra_risk_preds, | |
| 'time_hydra_crash': hydra_time_preds, | |
| 'risk_legacy_v2': legacy_risk_preds, | |
| 'time_legacy_panic': legacy_time_preds | |
| }) | |
| dt = time.time() - t0 | |
| if not ai_df.empty: | |
| ai_df.to_pickle(scores_file) | |
| print(f" ✅ [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True) | |
| del frames, fast_1m, numpy_htf, global_v2_probs, global_hydra_static | |
| gc.collect() | |
| async def generate_truth_data(self): | |
| if self.force_start_date and self.force_end_date: | |
| dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| start_time_ms = int(dt_start.timestamp() * 1000) | |
| end_time_ms = int(dt_end.timestamp() * 1000) | |
| print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") | |
| else: return | |
| for sym in self.TARGET_COINS: | |
| try: | |
| candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms) | |
| if candles: await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms) | |
| except Exception as e: print(f" ❌ SKIP {sym}: {e}", flush=True) | |
| gc.collect() | |
| def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): | |
| print(f" ⏳ [System] Loading {len(scores_files)} datasets into memory...", flush=True) | |
| all_data = [] | |
| for fp in scores_files: | |
| try: | |
| df = pd.read_pickle(fp) | |
| if not df.empty: all_data.append(df) | |
| except: pass | |
| if not all_data: return [] | |
| global_df = pd.concat(all_data) | |
| global_df.sort_values('timestamp', inplace=True) | |
| # 🚀 Numpy Conversion 🚀 | |
| arr_ts = global_df['timestamp'].values | |
| arr_close = global_df['close'].values.astype(np.float64) | |
| arr_symbol = global_df['symbol'].values | |
| arr_oracle = global_df['oracle_conf'].values.astype(np.float64) | |
| arr_sniper = global_df['sniper_score'].values.astype(np.float64) | |
| arr_hydra_risk = global_df['risk_hydra_crash'].values.astype(np.float64) | |
| arr_hydra_time = global_df['time_hydra_crash'].values.astype(np.int64) | |
| arr_titan = global_df['real_titan'].values.astype(np.float64) | |
| arr_l1 = global_df['l1_score'].values.astype(np.float64) | |
| unique_syms = np.unique(arr_symbol) | |
| sym_map = {s: i for i, s in enumerate(unique_syms)} | |
| arr_sym_int = np.array([sym_map[s] for s in arr_symbol], dtype=np.int32) | |
| total_len = len(arr_ts) | |
| print(f" 🚀 [System] Starting Optimized Grid Search on {len(combinations_batch)} combos...", flush=True) | |
| results = [] | |
| for idx, config in enumerate(combinations_batch): | |
| # No Annoying Progress Logs | |
| wallet_bal = initial_capital | |
| wallet_alloc = 0.0 | |
| positions = {} | |
| trades_log = [] | |
| oracle_thresh = config.get('oracle_thresh', 0.6) | |
| sniper_thresh = config.get('sniper_thresh', 0.4) | |
| hydra_thresh = config['hydra_thresh'] | |
| l1_thresh = config.get('l1_thresh', 15.0) | |
| mask_buy = (arr_l1 >= l1_thresh) & (arr_oracle >= oracle_thresh) & (arr_sniper >= sniper_thresh) | |
| peak_bal = initial_capital | |
| max_dd = 0.0 | |
| for i in range(total_len): | |
| ts = arr_ts[i] | |
| sym_id = arr_sym_int[i] | |
| price = arr_close[i] | |
| # Exits | |
| if sym_id in positions: | |
| pos = positions[sym_id] | |
| entry = pos[0]; h_risk = pos[2]; h_time = pos[3] | |
| is_crash = (h_risk > hydra_thresh) and (h_time > 0) and (ts >= h_time) | |
| pnl = (price - entry) / entry | |
| if is_crash or pnl > 0.04 or pnl < -0.02: | |
| wallet_bal += pos[1] * (1 + pnl - (fees_pct*2)) | |
| wallet_alloc -= pos[1] | |
| trades_log.append((pnl, pos[4])) | |
| del positions[sym_id] | |
| tot = wallet_bal + wallet_alloc | |
| if tot > peak_bal: peak_bal = tot | |
| else: | |
| dd = (peak_bal - tot) / peak_bal | |
| if dd > max_dd: max_dd = dd | |
| # Entries | |
| if len(positions) < max_slots: | |
| if mask_buy[i]: | |
| if sym_id not in positions: | |
| if wallet_bal >= 5.0: | |
| cons_score = (arr_titan[i] + arr_oracle[i] + arr_sniper[i]) / 3.0 | |
| size = min(10.0, wallet_bal * 0.98) | |
| positions[sym_id] = [price, size, arr_hydra_risk[i], arr_hydra_time[i], cons_score] | |
| wallet_bal -= size | |
| wallet_alloc += size | |
| # Stats | |
| final_bal = wallet_bal + wallet_alloc | |
| net_profit = final_bal - initial_capital | |
| total_t = len(trades_log) | |
| win_count = sum(1 for p, _ in trades_log if p > 0) | |
| loss_count = total_t - win_count | |
| win_rate = (win_count / total_t * 100) if total_t > 0 else 0.0 | |
| hc_count = sum(1 for _, s in trades_log if s > 0.65) | |
| hc_wins = sum(1 for p, s in trades_log if s > 0.65 and p > 0) | |
| hc_win_rate = (hc_wins/hc_count*100) if hc_count > 0 else 0.0 | |
| hc_avg_pnl = (sum(p for p, s in trades_log if s > 0.65)/hc_count*100) if hc_count > 0 else 0.0 | |
| agree_rate = (hc_count / total_t * 100) if total_t > 0 else 0.0 | |
| # ✅ FIX: Ensure 'thresh' key exists for AdaptiveHub compatibility | |
| config['thresh'] = l1_thresh | |
| results.append({ | |
| 'config': config, 'final_balance': final_bal, 'net_profit': net_profit, | |
| 'total_trades': total_t, 'win_count': win_count, 'loss_count': loss_count, | |
| 'win_rate': win_rate, 'max_drawdown': max_dd * 100, | |
| 'consensus_agreement_rate': agree_rate, | |
| 'high_consensus_win_rate': hc_win_rate, | |
| 'high_consensus_avg_pnl': hc_avg_pnl | |
| }) | |
| print("") | |
| return results | |
| async def run_optimization(self, target_regime="RANGE"): | |
| await self.generate_truth_data() | |
| d = self.GRID_DENSITY | |
| oracle_range = np.linspace(0.45, 0.8, d).tolist() | |
| sniper_range = np.linspace(0.35, 0.7, d).tolist() | |
| hydra_range = np.linspace(0.70, 0.95, d).tolist() | |
| l1_range = [10.0, 15.0, 20.0, 25.0] | |
| titan_range = [0.4, 0.6] | |
| pattern_range = [0.2, 0.4] | |
| combinations = [] | |
| for o, s, h, l1, wt, wp in itertools.product(oracle_range, sniper_range, hydra_range, l1_range, titan_range, pattern_range): | |
| combinations.append({ | |
| 'w_titan': wt, 'w_struct': wp, 'l1_thresh': l1, | |
| 'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, | |
| 'legacy_thresh': 0.95 | |
| }) | |
| valid_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith('_scores.pkl')] | |
| print(f"\n🧩 [Phase 2] Optimizing {len(combinations)} Configs (Full Stack) for {target_regime}...") | |
| best_res = self._worker_optimize(combinations, valid_files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) | |
| if not best_res: return None, None | |
| best = sorted(best_res, key=lambda x: x['final_balance'], reverse=True)[0] | |
| print("\n" + "="*60) | |
| print(f"🏆 CHAMPION REPORT [{target_regime}]:") | |
| print(f" 💰 Final Balance: ${best['final_balance']:,.2f}") | |
| print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}") | |
| print("-" * 60) | |
| print(f" 📊 Total Trades: {best['total_trades']}") | |
| print(f" 📈 Win Rate: {best['win_rate']:.1f}%") | |
| print(f" 📉 Max Drawdown: {best['max_drawdown']:.1f}%") | |
| print("-" * 60) | |
| print(f" 🧠 CONSENSUS ANALYTICS:") | |
| print(f" 🤝 Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%") | |
| print(f" 🌟 High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%") | |
| print(f" 💎 High-Consensus Avg PnL: {best['high_consensus_avg_pnl']:.2f}%") | |
| print("-" * 60) | |
| print(f" ⚙️ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}") | |
| print(f" ⚖️ Weights: Titan={best['config']['w_titan']:.2f} | Patterns={best['config']['w_struct']:.2f} | L1={best['config']['l1_thresh']}") | |
| print("="*60) | |
| return best['config'], best | |
| async def run_strategic_optimization_task(): | |
| print("\n🧪 [STRATEGIC BACKTEST] Hyper-Vectorized Mode...") | |
| r2 = R2Service() | |
| dm = DataManager(None, None, r2) | |
| proc = MLProcessor(dm) | |
| await dm.initialize(); await proc.initialize() | |
| if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True) | |
| try: | |
| hub = AdaptiveHub(r2); await hub.initialize() | |
| optimizer = HeavyDutyBacktester(dm, proc) | |
| scenarios = [ | |
| {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, | |
| {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, | |
| {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, | |
| {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"} | |
| ] | |
| for scen in scenarios: | |
| target = scen["regime"] | |
| optimizer.set_date_range(scen["start"], scen["end"]) | |
| best_cfg, best_stats = await optimizer.run_optimization(target_regime=target) | |
| if best_cfg: | |
| hub.submit_challenger(target, best_cfg, best_stats) | |
| await hub._save_state_to_r2() | |
| print("✅ [System] ALL Strategic DNA Updated & Saved.") | |
| finally: | |
| await dm.close() | |
| if __name__ == "__main__": | |
| asyncio.run(run_strategic_optimization_task()) |