# ============================================================ # ๐Ÿงช backtest_engine.py (V118.5 - GEM-Architect: Hyper-Vectorized) # ============================================================ import asyncio import pandas as pd import numpy as np import pandas_ta as ta import time import logging import itertools import os import glob import gc import sys import traceback from datetime import datetime, timezone from typing import Dict, Any, List # โœ… ุงุณุชูŠุฑุงุฏ ุงู„ู…ุญุฑูƒุงุช ุงู„ุฃุณุงุณูŠุฉ try: from ml_engine.processor import MLProcessor, SystemLimits from ml_engine.data_manager import DataManager from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub from r2 import R2Service import ccxt.async_support as ccxt import xgboost as xgb except ImportError: print("โŒ [Import Error] Critical ML modules missing.") pass logging.getLogger('ml_engine').setLevel(logging.WARNING) CACHE_DIR = "backtest_real_scores" class HeavyDutyBacktester: def __init__(self, data_manager, processor): self.dm = data_manager self.proc = processor # ๐ŸŽ›๏ธ ูƒุซุงูุฉ ุดุจูƒุฉ ุงู„ุจุญุซ self.GRID_DENSITY = 3 # ุฅุนุฏุงุฏุงุช ุงู„ู…ุญูุธุฉ self.INITIAL_CAPITAL = 10.0 self.TRADING_FEES = 0.001 self.MAX_SLOTS = 4 self.TARGET_COINS = [ 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT' ] self.force_start_date = None self.force_end_date = None # ๐Ÿ”ฅ ุชู†ุธูŠู ุงู„ูƒุงุด ๐Ÿ”ฅ if os.path.exists(CACHE_DIR): files = glob.glob(os.path.join(CACHE_DIR, "*")) print(f"๐Ÿงน [System] Flushing Cache: Deleting {len(files)} old files...", flush=True) for f in files: try: os.remove(f) except: pass else: os.makedirs(CACHE_DIR) print(f"๐Ÿงช [Backtest V118.5] Hyper-Vectorized Mode. Models: {self._check_models_status()}") def _check_models_status(self): status = [] if self.proc.titan: status.append("Titan") if self.proc.oracle and getattr(self.proc.oracle, 'model_direction', None): status.append("Oracle") if self.proc.sniper and getattr(self.proc.sniper, 'models', None): status.append("Sniper") if self.proc.guardian_hydra: status.append("Hydra") return "+".join(status) if status else "None" def set_date_range(self, start_str, end_str): self.force_start_date = start_str self.force_end_date = end_str # ============================================================== # โšก FAST DATA DOWNLOADER # ============================================================== async def _fetch_all_data_fast(self, sym, start_ms, end_ms): print(f" โšก [Network] Downloading {sym}...", flush=True) limit = 1000 duration_per_batch = limit * 60 * 1000 tasks = [] current = start_ms while current < end_ms: tasks.append(current) current += duration_per_batch all_candles = [] sem = asyncio.Semaphore(10) async def _fetch_batch(timestamp): async with sem: for _ in range(3): try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit) except: await asyncio.sleep(1) return [] chunk_size = 20 for i in range(0, len(tasks), chunk_size): chunk_tasks = tasks[i:i + chunk_size] futures = [_fetch_batch(ts) for ts in chunk_tasks] results = await asyncio.gather(*futures) for res in results: if res: all_candles.extend(res) if not all_candles: return None filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms] seen = set(); unique_candles = [] for c in filtered: if c[0] not in seen: unique_candles.append(c) seen.add(c[0]) unique_candles.sort(key=lambda x: x[0]) print(f" โœ… Downloaded {len(unique_candles)} candles.", flush=True) return unique_candles # ============================================================== # ๐ŸŽ๏ธ VECTORIZED INDICATORS (Robust) # ============================================================== def _calculate_indicators_vectorized(self, df, timeframe='1m'): for col in ['close', 'high', 'low', 'volume', 'open']: df[col] = df[col].astype(float) df['rsi'] = ta.rsi(df['close'], length=14) df['ema20'] = ta.ema(df['close'], length=20) df['ema50'] = ta.ema(df['close'], length=50) df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14) # Global calc df['vol_ma50'] = df['volume'].rolling(50).mean() df['rel_vol'] = df['volume'] / (df['vol_ma50'] + 1e-9) if timeframe in ['1m', '5m', '15m']: sma20 = df['close'].rolling(20).mean() std20 = df['close'].rolling(20).std() df['bb_width'] = ((sma20 + 2*std20) - (sma20 - 2*std20)) / sma20 vol_mean = df['volume'].rolling(20).mean() vol_std = df['volume'].rolling(20).std() df['vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9) df['atr_pct'] = df['atr'] / df['close'] # L1 Score rsi_penalty = np.where(df['rsi'] > 70, (df['rsi'] - 70) * 2, 0) l1_score_raw = (df['rel_vol'] * 10) + (df['atr_pct'] * 1000) - rsi_penalty df['l1_score'] = l1_score_raw.fillna(0) if timeframe == '1m': df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) df['ret'] = df['close'].pct_change() roll_max = df['high'].rolling(50).max() roll_min = df['low'].rolling(50).min() diff = (roll_max - roll_min).replace(0, 1e-9) df['fib_pos'] = (df['close'] - roll_min) / diff df['volatility'] = df['atr'] / df['close'] df['trend_slope'] = (df['ema20'] - df['ema20'].shift(5)) / df['ema20'].shift(5) for lag in [1, 2, 3, 5, 10, 20]: df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0) df[f'rsi_lag_{lag}'] = (df['rsi'].shift(lag).fillna(50) / 100.0) df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5) df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0) r = df['volume'].rolling(500).mean() s = df['volume'].rolling(500).std() df['vol_zscore_50'] = ((df['volume'] - r) / s).fillna(0) df.fillna(0, inplace=True) return df # ============================================================== # ๐Ÿง  CPU PROCESSING (HYPER-VECTORIZED) # ============================================================== async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): safe_sym = sym.replace('/', '_') period_suffix = f"{start_ms}_{end_ms}" scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl" if os.path.exists(scores_file): print(f" ๐Ÿ“‚ [{sym}] Data Exists -> Skipping.") return print(f" โš™๏ธ [CPU] Analyzing {sym} (Hyper-Vectorized Mode)...", flush=True) t0 = time.time() # 1. Data Prep df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') df_1m.set_index('datetime', inplace=True) df_1m = df_1m.sort_index() frames = {} agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m') frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6 fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns} numpy_htf = {} for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]: resampled = df_1m.resample(tf_code).agg(agg_dict).dropna() resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str) resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6 frames[tf_str] = resampled numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} # 2. Time Alignment (Vectorized) map_1m_to_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['1h']['timestamp'])-1) map_1m_to_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['5m']['timestamp'])-1) map_1m_to_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['15m']['timestamp'])-1) map_1m_to_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['4h']['timestamp'])-1) # 3. Model Access oracle_dir_model = getattr(self.proc.oracle, 'model_direction', None) sniper_models = getattr(self.proc.sniper, 'models', []) hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {} legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None) # 4. ๐Ÿ”ฅ Pre-Calc Legacy V2 (Vectorized) ๐Ÿ”ฅ global_v2_probs = np.zeros(len(fast_1m['close'])) if legacy_v2: try: # Direct array construction l_log = fast_1m['log_ret'] l_rsi = fast_1m['rsi'] / 100.0 l_fib = fast_1m['fib_pos'] l_vol = fast_1m['volatility'] l5_log = numpy_htf['5m']['log_ret'][map_1m_to_5m] l5_rsi = numpy_htf['5m']['rsi'][map_1m_to_5m] / 100.0 l5_fib = numpy_htf['5m']['fib_pos'][map_1m_to_5m] l5_trd = numpy_htf['5m']['trend_slope'][map_1m_to_5m] l15_log = numpy_htf['15m']['log_ret'][map_1m_to_15m] l15_rsi = numpy_htf['15m']['rsi'][map_1m_to_15m] / 100.0 l15_fib618 = numpy_htf['15m']['dist_fib618'][map_1m_to_15m] l15_trd = numpy_htf['15m']['trend_slope'][map_1m_to_15m] lag_cols = [] for lag in [1, 2, 3, 5, 10, 20]: lag_cols.extend([ fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}'] ]) X_GLOBAL_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lag_cols]) global_v2_probs = legacy_v2.predict(xgb.DMatrix(X_GLOBAL_V2)) if len(global_v2_probs.shape) > 1: global_v2_probs = global_v2_probs[:, 2] except: pass # 5. ๐Ÿ”ฅ Pre-Assemble Hydra Static ๐Ÿ”ฅ global_hydra_static = None if hydra_models: try: h_rsi_1m = fast_1m['rsi'] h_rsi_5m = numpy_htf['5m']['rsi'][map_1m_to_5m] h_rsi_15m = numpy_htf['15m']['rsi'][map_1m_to_15m] h_bb = fast_1m['bb_width'] h_vol = fast_1m['rel_vol'] h_atr = fast_1m['atr'] h_close = fast_1m['close'] global_hydra_static = np.column_stack([h_rsi_1m, h_rsi_5m, h_rsi_15m, h_bb, h_vol, h_atr, h_close]) except: pass # 6. Candidate Filtering valid_indices_mask = fast_1m['l1_score'] >= 5.0 valid_indices = np.where(valid_indices_mask)[0] # Skip warmup and tail mask_bounds = (valid_indices > 500) & (valid_indices < len(fast_1m['close']) - 245) final_valid_indices = valid_indices[mask_bounds] print(f" ๐ŸŽฏ Raw Candidates (Score > 5): {len(final_valid_indices)}. Vectorized Scoring...", flush=True) # ๐Ÿš€ HYPER-VECTORIZATION START ๐Ÿš€ # Instead of looping, we construct the BIG matrices for all candidates at once. # This brings speed back to ~60s num_candidates = len(final_valid_indices) if num_candidates == 0: return # --- A. ORACLE MATRIX CONSTRUCTION --- oracle_preds = np.full(num_candidates, 0.5) if oracle_dir_model: try: # Mapped Indices for all candidates idx_1h = map_1m_to_1h[final_valid_indices] idx_15m = map_1m_to_15m[final_valid_indices] idx_4h = map_1m_to_4h[final_valid_indices] titan_scores = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95) oracle_features = [] for col in getattr(self.proc.oracle, 'feature_cols', []): if col.startswith('1h_'): c = col[3:] oracle_features.append(numpy_htf['1h'][c][idx_1h] if c in numpy_htf['1h'] else np.zeros(num_candidates)) elif col.startswith('15m_'): c = col[4:] oracle_features.append(numpy_htf['15m'][c][idx_15m] if c in numpy_htf['15m'] else np.zeros(num_candidates)) elif col.startswith('4h_'): c = col[3:] oracle_features.append(numpy_htf['4h'][c][idx_4h] if c in numpy_htf['4h'] else np.zeros(num_candidates)) elif col == 'sim_titan_score': oracle_features.append(titan_scores) elif col == 'sim_mc_score': oracle_features.append(np.full(num_candidates, 0.5)) elif col == 'sim_pattern_score': oracle_features.append(np.full(num_candidates, 0.5)) else: oracle_features.append(np.zeros(num_candidates)) X_oracle_big = np.column_stack(oracle_features) preds = oracle_dir_model.predict(X_oracle_big) # Handle output shape if len(preds.shape) > 1 and preds.shape[1] > 1: oracle_preds = preds[:, 1] # Prob of Class 1 else: oracle_preds = preds.flatten() # If model outputs 0/1 class, we might need proba. Assuming predict gives prob or class. # Adjust if simple XGB classifier gives 0/1. For backtest, assume regression or proba. except Exception as e: print(f"Oracle Error: {e}") # --- B. SNIPER MATRIX CONSTRUCTION --- sniper_preds = np.full(num_candidates, 0.5) if sniper_models: try: sniper_features = [] for col in getattr(self.proc.sniper, 'feature_names', []): if col in fast_1m: sniper_features.append(fast_1m[col][final_valid_indices]) elif col == 'L_score': sniper_features.append(fast_1m.get('vol_zscore_50', np.zeros(len(fast_1m['close'])))[final_valid_indices]) else: sniper_features.append(np.zeros(num_candidates)) X_sniper_big = np.column_stack(sniper_features) # Ensemble Average preds_list = [m.predict(X_sniper_big) for m in sniper_models] sniper_preds = np.mean(preds_list, axis=0) except Exception as e: print(f"Sniper Error: {e}") # --- C. HYDRA MATRIX CONSTRUCTION (The Heavy One) --- hydra_risk_preds = np.zeros(num_candidates) hydra_time_preds = np.zeros(num_candidates, dtype=int) # Hydra is sequence-based (window of 240). Vectorizing this is tricky without exploding memory. # We will iterate but ONLY for prediction input construction, which is lighter than full logic. # Actually, for 95k candidates, a (95000, 240, features) array is huge. # We MUST batch Hydra. But efficiently. if hydra_models and global_hydra_static is not None: # We process in chunks of 5000 to keep memory sane chunk_size = 5000 for i in range(0, num_candidates, chunk_size): chunk_indices = final_valid_indices[i : i + chunk_size] # Build batch X batch_X = [] valid_batch_indices = [] # Map back to chunk index for k, idx in enumerate(chunk_indices): start = idx + 1 end = start + 240 # Quick slice sl_static = global_hydra_static[start:end] entry_p = fast_1m['close'][idx] sl_close = sl_static[:, 6] sl_atr = sl_static[:, 5] sl_dist = np.maximum(1.5 * sl_atr, entry_p * 0.015) sl_pnl = sl_close - entry_p sl_norm_pnl = sl_pnl / sl_dist # Accumulate max - vectorized for the window sl_cum_max = np.maximum.accumulate(sl_close) sl_cum_max = np.maximum(sl_cum_max, entry_p) sl_max_pnl_r = (sl_cum_max - entry_p) / sl_dist sl_atr_pct = sl_atr / sl_close # Static cols zeros = np.zeros(240); ones = np.ones(240) row = np.column_stack([ sl_static[:, 0], sl_static[:, 1], sl_static[:, 2], sl_static[:, 3], sl_static[:, 4], zeros, sl_atr_pct, sl_norm_pnl, sl_max_pnl_r, zeros, zeros, time_vec, zeros, ones*0.6, ones*0.7, ones*3.0 ]) batch_X.append(row) valid_batch_indices.append(i + k) # Global index in final_valid_indices if batch_X: try: big_X = np.array(batch_X) # Shape: (Batch, 240, Feats) # Flatten for 2D model if needed, or keeping 3D depending on Hydra. # Assuming Hydra uses 2D input (stacking windows): big_X_flat = big_X.reshape(-1, big_X.shape[-1]) preds_flat = hydra_models['crash'].predict_proba(big_X_flat)[:, 1] # Reshape back to (Batch, 240) preds_batch = preds_flat.reshape(len(batch_X), 240) # Extract Max Risk & Time batch_max_risk = np.max(preds_batch, axis=1) # Find first index > thresh (0.6) for time over_thresh = preds_batch > 0.6 # argmax gives first True index has_crash = over_thresh.any(axis=1) crash_times_rel = np.argmax(over_thresh, axis=1) # Map back to global results for j, glob_idx in enumerate(valid_batch_indices): hydra_risk_preds[glob_idx] = batch_max_risk[j] if has_crash[j]: # Calc absolute timestamp start_t_idx = final_valid_indices[glob_idx] + 1 abs_time = fast_1m['timestamp'][start_t_idx + crash_times_rel[j]] hydra_time_preds[glob_idx] = abs_time except Exception: pass # --- D. LEGACY V2 MAPPING --- legacy_risk_preds = np.zeros(num_candidates) legacy_time_preds = np.zeros(num_candidates, dtype=int) if legacy_v2: # Vectorized mapping logic # For each candidate at idx, scan global_v2_probs[idx+1 : idx+241] # This is a sliding window max. Can be slow if looped. # Fast approx: Check max just for the entry? No, need lookahead. # We loop simply because it's fast scalar lookups. for k, idx in enumerate(final_valid_indices): start = idx + 1 if start + 240 < len(global_v2_probs): window = global_v2_probs[start : start + 240] legacy_risk_preds[k] = np.max(window) # Time logic can be added if needed, sticking to max risk for now # --- E. CONSTRUCT FINAL DATAFRAME --- # Titan Proxy titan_scores_final = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95) l1_scores_final = fast_1m['l1_score'][final_valid_indices] timestamps_final = fast_1m['timestamp'][final_valid_indices] closes_final = fast_1m['close'][final_valid_indices] ai_df = pd.DataFrame({ 'timestamp': timestamps_final, 'symbol': sym, 'close': closes_final, 'real_titan': titan_scores_final, 'oracle_conf': oracle_preds, 'sniper_score': sniper_preds, 'l1_score': l1_scores_final, 'risk_hydra_crash': hydra_risk_preds, 'time_hydra_crash': hydra_time_preds, 'risk_legacy_v2': legacy_risk_preds, 'time_legacy_panic': legacy_time_preds }) dt = time.time() - t0 if not ai_df.empty: ai_df.to_pickle(scores_file) print(f" โœ… [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True) del frames, fast_1m, numpy_htf, global_v2_probs, global_hydra_static gc.collect() async def generate_truth_data(self): if self.force_start_date and self.force_end_date: dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) start_time_ms = int(dt_start.timestamp() * 1000) end_time_ms = int(dt_end.timestamp() * 1000) print(f"\n๐Ÿšœ [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") else: return for sym in self.TARGET_COINS: try: candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms) if candles: await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms) except Exception as e: print(f" โŒ SKIP {sym}: {e}", flush=True) gc.collect() @staticmethod def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): print(f" โณ [System] Loading {len(scores_files)} datasets into memory...", flush=True) all_data = [] for fp in scores_files: try: df = pd.read_pickle(fp) if not df.empty: all_data.append(df) except: pass if not all_data: return [] global_df = pd.concat(all_data) global_df.sort_values('timestamp', inplace=True) # ๐Ÿš€ Numpy Conversion ๐Ÿš€ arr_ts = global_df['timestamp'].values arr_close = global_df['close'].values.astype(np.float64) arr_symbol = global_df['symbol'].values arr_oracle = global_df['oracle_conf'].values.astype(np.float64) arr_sniper = global_df['sniper_score'].values.astype(np.float64) arr_hydra_risk = global_df['risk_hydra_crash'].values.astype(np.float64) arr_hydra_time = global_df['time_hydra_crash'].values.astype(np.int64) arr_titan = global_df['real_titan'].values.astype(np.float64) arr_l1 = global_df['l1_score'].values.astype(np.float64) unique_syms = np.unique(arr_symbol) sym_map = {s: i for i, s in enumerate(unique_syms)} arr_sym_int = np.array([sym_map[s] for s in arr_symbol], dtype=np.int32) total_len = len(arr_ts) print(f" ๐Ÿš€ [System] Starting Optimized Grid Search on {len(combinations_batch)} combos...", flush=True) results = [] for idx, config in enumerate(combinations_batch): # No Annoying Progress Logs wallet_bal = initial_capital wallet_alloc = 0.0 positions = {} trades_log = [] oracle_thresh = config.get('oracle_thresh', 0.6) sniper_thresh = config.get('sniper_thresh', 0.4) hydra_thresh = config['hydra_thresh'] l1_thresh = config.get('l1_thresh', 15.0) mask_buy = (arr_l1 >= l1_thresh) & (arr_oracle >= oracle_thresh) & (arr_sniper >= sniper_thresh) peak_bal = initial_capital max_dd = 0.0 for i in range(total_len): ts = arr_ts[i] sym_id = arr_sym_int[i] price = arr_close[i] # Exits if sym_id in positions: pos = positions[sym_id] entry = pos[0]; h_risk = pos[2]; h_time = pos[3] is_crash = (h_risk > hydra_thresh) and (h_time > 0) and (ts >= h_time) pnl = (price - entry) / entry if is_crash or pnl > 0.04 or pnl < -0.02: wallet_bal += pos[1] * (1 + pnl - (fees_pct*2)) wallet_alloc -= pos[1] trades_log.append((pnl, pos[4])) del positions[sym_id] tot = wallet_bal + wallet_alloc if tot > peak_bal: peak_bal = tot else: dd = (peak_bal - tot) / peak_bal if dd > max_dd: max_dd = dd # Entries if len(positions) < max_slots: if mask_buy[i]: if sym_id not in positions: if wallet_bal >= 5.0: cons_score = (arr_titan[i] + arr_oracle[i] + arr_sniper[i]) / 3.0 size = min(10.0, wallet_bal * 0.98) positions[sym_id] = [price, size, arr_hydra_risk[i], arr_hydra_time[i], cons_score] wallet_bal -= size wallet_alloc += size # Stats final_bal = wallet_bal + wallet_alloc net_profit = final_bal - initial_capital total_t = len(trades_log) win_count = sum(1 for p, _ in trades_log if p > 0) loss_count = total_t - win_count win_rate = (win_count / total_t * 100) if total_t > 0 else 0.0 hc_count = sum(1 for _, s in trades_log if s > 0.65) hc_wins = sum(1 for p, s in trades_log if s > 0.65 and p > 0) hc_win_rate = (hc_wins/hc_count*100) if hc_count > 0 else 0.0 hc_avg_pnl = (sum(p for p, s in trades_log if s > 0.65)/hc_count*100) if hc_count > 0 else 0.0 agree_rate = (hc_count / total_t * 100) if total_t > 0 else 0.0 # โœ… FIX: Ensure 'thresh' key exists for AdaptiveHub compatibility config['thresh'] = l1_thresh results.append({ 'config': config, 'final_balance': final_bal, 'net_profit': net_profit, 'total_trades': total_t, 'win_count': win_count, 'loss_count': loss_count, 'win_rate': win_rate, 'max_drawdown': max_dd * 100, 'consensus_agreement_rate': agree_rate, 'high_consensus_win_rate': hc_win_rate, 'high_consensus_avg_pnl': hc_avg_pnl }) print("") return results async def run_optimization(self, target_regime="RANGE"): await self.generate_truth_data() d = self.GRID_DENSITY oracle_range = np.linspace(0.45, 0.8, d).tolist() sniper_range = np.linspace(0.35, 0.7, d).tolist() hydra_range = np.linspace(0.70, 0.95, d).tolist() l1_range = [10.0, 15.0, 20.0, 25.0] titan_range = [0.4, 0.6] pattern_range = [0.2, 0.4] combinations = [] for o, s, h, l1, wt, wp in itertools.product(oracle_range, sniper_range, hydra_range, l1_range, titan_range, pattern_range): combinations.append({ 'w_titan': wt, 'w_struct': wp, 'l1_thresh': l1, 'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95 }) valid_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith('_scores.pkl')] print(f"\n๐Ÿงฉ [Phase 2] Optimizing {len(combinations)} Configs (Full Stack) for {target_regime}...") best_res = self._worker_optimize(combinations, valid_files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) if not best_res: return None, None best = sorted(best_res, key=lambda x: x['final_balance'], reverse=True)[0] print("\n" + "="*60) print(f"๐Ÿ† CHAMPION REPORT [{target_regime}]:") print(f" ๐Ÿ’ฐ Final Balance: ${best['final_balance']:,.2f}") print(f" ๐Ÿš€ Net PnL: ${best['net_profit']:,.2f}") print("-" * 60) print(f" ๐Ÿ“Š Total Trades: {best['total_trades']}") print(f" ๐Ÿ“ˆ Win Rate: {best['win_rate']:.1f}%") print(f" ๐Ÿ“‰ Max Drawdown: {best['max_drawdown']:.1f}%") print("-" * 60) print(f" ๐Ÿง  CONSENSUS ANALYTICS:") print(f" ๐Ÿค Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%") print(f" ๐ŸŒŸ High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%") print(f" ๐Ÿ’Ž High-Consensus Avg PnL: {best['high_consensus_avg_pnl']:.2f}%") print("-" * 60) print(f" โš™๏ธ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}") print(f" โš–๏ธ Weights: Titan={best['config']['w_titan']:.2f} | Patterns={best['config']['w_struct']:.2f} | L1={best['config']['l1_thresh']}") print("="*60) return best['config'], best async def run_strategic_optimization_task(): print("\n๐Ÿงช [STRATEGIC BACKTEST] Hyper-Vectorized Mode...") r2 = R2Service() dm = DataManager(None, None, r2) proc = MLProcessor(dm) await dm.initialize(); await proc.initialize() if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True) try: hub = AdaptiveHub(r2); await hub.initialize() optimizer = HeavyDutyBacktester(dm, proc) scenarios = [ {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"} ] for scen in scenarios: target = scen["regime"] optimizer.set_date_range(scen["start"], scen["end"]) best_cfg, best_stats = await optimizer.run_optimization(target_regime=target) if best_cfg: hub.submit_challenger(target, best_cfg, best_stats) await hub._save_state_to_r2() print("โœ… [System] ALL Strategic DNA Updated & Saved.") finally: await dm.close() if __name__ == "__main__": asyncio.run(run_strategic_optimization_task())