# ============================================================ # ๐Ÿงช backtest_engine.py (V159.0 - GEM-Architect: Hyper-Speed Jump Logic) # ============================================================ import asyncio import pandas as pd import numpy as np import time import logging import itertools import os import glob import gc import sys import traceback from datetime import datetime, timezone from typing import Dict, Any, List # ู…ุญุงูˆู„ุฉ ุงุณุชูŠุฑุงุฏ ุงู„ู…ูƒุชุจุงุช try: import pandas_ta as ta except ImportError: ta = None try: from ml_engine.processor import MLProcessor from ml_engine.data_manager import DataManager from learning_hub.adaptive_hub import AdaptiveHub from r2 import R2Service import xgboost as xgb except ImportError: pass logging.getLogger('ml_engine').setLevel(logging.WARNING) CACHE_DIR = "backtest_real_scores" # ============================================================ # โšก VECTORIZED HELPERS # ============================================================ def _z_roll_np(arr, w=500): if len(arr) < w: return np.zeros_like(arr) mean = pd.Series(arr).rolling(w).mean().fillna(0).values std = pd.Series(arr).rolling(w).std().fillna(1).values return np.nan_to_num((arr - mean) / (std + 1e-9)) def _revive_score_distribution(scores): scores = np.array(scores, dtype=np.float32).flatten() s_min, s_max = np.min(scores), np.max(scores) if (s_max - s_min) < 1e-6: return scores if s_max < 0.8 or s_min > 0.2: return (scores - s_min) / (s_max - s_min) return scores # ============================================================ # ๐Ÿงช THE BACKTESTER CLASS # ============================================================ class HeavyDutyBacktester: def __init__(self, data_manager, processor): self.dm = data_manager self.proc = processor # ๐ŸŽ›๏ธ ุงู„ูƒุซุงูุฉ (Density): ุนุฏุฏ ุงู„ุฎุทูˆุงุช ููŠ ุงู„ู†ุทุงู‚ self.GRID_DENSITY = 3 # 3 is enough for quick checks, 5 for deep dive self.INITIAL_CAPITAL = 10.0 self.TRADING_FEES = 0.001 self.MAX_SLOTS = 4 # ๐ŸŽ›๏ธ CONTROL PANEL - DYNAMIC RANGES self.GRID_RANGES = { 'TITAN': np.linspace(0.10, 0.50, self.GRID_DENSITY), 'ORACLE': np.linspace(0.40, 0.80, self.GRID_DENSITY), 'SNIPER': np.linspace(0.30, 0.70, self.GRID_DENSITY), 'PATTERN': np.linspace(0.10, 0.50, self.GRID_DENSITY), 'L1_SCORE': [10.0], # Guardians 'HYDRA_CRASH': np.linspace(0.60, 0.85, self.GRID_DENSITY), 'HYDRA_GIVEBACK': np.linspace(0.60, 0.85, self.GRID_DENSITY), 'LEGACY_V2': np.linspace(0.85, 0.98, self.GRID_DENSITY), } self.TARGET_COINS = [ 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT', 'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT', 'SEI/USDT', 'MINA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'API3/USDT', 'FLOKI/USDT', 'BABYDOGE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT', 'ONDO/USDT', 'SNX/USDT', 'HBAR/USDT', 'XLM/USDT', 'AGIX/USDT', 'IMX/USDT', 'LRC/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT', 'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'RPL/USDT', 'MNT/USDT', 'RAY/USDT', 'CAKE/USDT', 'SRM/USDT', 'PENDLE/USDT', 'ATOM/USDT' ] self.force_start_date = None self.force_end_date = None if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) print(f"๐Ÿงช [Backtest V159.0] Hyper-Speed Jump Engine (CPU Optimized).") def set_date_range(self, start_str, end_str): self.force_start_date = start_str self.force_end_date = end_str async def _fetch_all_data_fast(self, sym, start_ms, end_ms): print(f" โšก [Network] Downloading {sym}...", flush=True) limit = 1000 tasks = [] curr = start_ms while curr < end_ms: tasks.append(curr) curr += limit * 60 * 1000 all_candles = [] sem = asyncio.Semaphore(20) async def _fetch_batch(timestamp): async with sem: for _ in range(3): try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit) except: await asyncio.sleep(0.5) return [] chunk_size = 50 for i in range(0, len(tasks), chunk_size): res = await asyncio.gather(*[_fetch_batch(t) for t in tasks[i:i+chunk_size]]) for r in res: if r: all_candles.extend(r) if not all_candles: return None df = pd.DataFrame(all_candles, columns=['timestamp', 'o', 'h', 'l', 'c', 'v']) df.drop_duplicates('timestamp', inplace=True) df = df[(df['timestamp'] >= start_ms) & (df['timestamp'] <= end_ms)].sort_values('timestamp') print(f" โœ… Downloaded {len(df)} candles.", flush=True) return df.values.tolist() # ---------------------------------------------------------------------- # ๐ŸŽ๏ธ VECTORIZED INDICATORS # ---------------------------------------------------------------------- def _calculate_indicators_vectorized(self, df, timeframe='1m'): if df.empty: return df cols = ['close', 'high', 'low', 'volume', 'open'] for c in cols: df[c] = df[c].astype(np.float64) # EMAs df['ema9'] = df['close'].ewm(span=9, adjust=False).mean() df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema21'] = df['close'].ewm(span=21, adjust=False).mean() df['ema50'] = df['close'].ewm(span=50, adjust=False).mean() df['ema200'] = df['close'].ewm(span=200, adjust=False).mean() if ta: df['RSI'] = ta.rsi(df['close'], length=14).fillna(50) df['ATR'] = ta.atr(df['high'], df['low'], df['close'], length=14).fillna(0) bb = ta.bbands(df['close'], length=20, std=2.0) df['bb_width'] = bb.iloc[:, 3].fillna(0) if bb is not None else 0.0 macd = ta.macd(df['close']) if macd is not None: df['MACD'] = macd.iloc[:, 0].fillna(0) df['MACD_h'] = macd.iloc[:, 1].fillna(0) else: df['MACD'] = 0; df['MACD_h'] = 0 df['ADX'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0].fillna(0) df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20).fillna(0) df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14).fillna(50) df['slope'] = ta.slope(df['close'], length=7).fillna(0) vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) df['vwap'] = vwap.fillna(df['close']) if vwap is not None else df['close'] c = df['close'].values df['EMA_9_dist'] = (c / df['ema9'].values) - 1 df['EMA_21_dist'] = (c / df['ema21'].values) - 1 df['EMA_50_dist'] = (c / df['ema50'].values) - 1 df['EMA_200_dist'] = (c / df['ema200'].values) - 1 df['VWAP_dist'] = (c / df['vwap'].values) - 1 df['ATR_pct'] = df['ATR'] / (c + 1e-9) if timeframe == '1d': df['Trend_Strong'] = np.where(df['ADX'] > 25, 1.0, 0.0) df['vol_z'] = _z_roll_np(df['volume'].values, 20) df['rel_vol'] = df['volume'] / (df['volume'].rolling(50).mean() + 1e-9) df['log_ret'] = np.concatenate([[0], np.diff(np.log(c + 1e-9))]) roll_min = df['low'].rolling(50).min(); roll_max = df['high'].rolling(50).max() df['fib_pos'] = (c - roll_min) / (roll_max - roll_min + 1e-9) df['volatility'] = df['ATR_pct'] e20 = df['ema20'].values e20_s = np.roll(e20, 5); e20_s[:5] = e20[0] df['trend_slope'] = (e20 - e20_s) / (e20_s + 1e-9) fib618 = roll_max - ((roll_max - roll_min) * 0.382) df['dist_fib618'] = (c - fib618) / (c + 1e-9) df['dist_ema50'] = df['EMA_50_dist'] df['dist_ema200'] = df['EMA_200_dist'] if timeframe == '1m': df['return_1m'] = df['log_ret'] df['rsi_14'] = df['RSI'] e9 = df['ema9'].values; e9_s = np.roll(e9, 1); e9_s[0] = e9[0] df['ema_9_slope'] = (e9 - e9_s) / (e9_s + 1e-9) df['ema_21_dist'] = df['EMA_21_dist'] df['atr_z'] = _z_roll_np(df['ATR'].values, 100) df['vol_zscore_50'] = _z_roll_np(df['volume'].values, 50) rng = df['high'].values - df['low'].values df['candle_range'] = _z_roll_np(rng, 500) df['close_pos_in_range'] = (c - df['low'].values) / (rng + 1e-9) dollar_vol = c * df['volume'].values amihud = np.abs(df['log_ret']) / (dollar_vol + 1e-9) df['amihud'] = _z_roll_np(amihud, 500) sign = np.sign(np.diff(c, prepend=c[0])) signed_vol = sign * df['volume'].values ofi = pd.Series(signed_vol).rolling(30).sum().fillna(0).values df['ofi'] = _z_roll_np(ofi, 500) df['vwap_dev'] = _z_roll_np(c - df['vwap'].values, 500) for lag in [1, 2, 3, 5, 10, 20]: df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0) df[f'rsi_lag_{lag}'] = df['RSI'].shift(lag).fillna(50)/100.0 df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5) df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0) df.fillna(0, inplace=True) return df async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): safe_sym = sym.replace('/', '_') period_suffix = f"{start_ms}_{end_ms}" scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl" if os.path.exists(scores_file): print(f" ๐Ÿ“‚ [{sym}] Data Exists -> Skipping.") return print(f" โš™๏ธ [CPU] Analyzing {sym}...", flush=True) t0 = time.time() df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') df_1m.set_index('datetime', inplace=True) df_1m = df_1m.sort_index() frames = {} frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m') frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6 fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns} agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} numpy_htf = {} for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]: resampled = df_1m.resample(tf_code).agg(agg_dict).dropna() if resampled.empty: numpy_htf[tf_str] = {} continue resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str) resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6 frames[tf_str] = resampled numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} arr_ts_1m = fast_1m['timestamp'] def get_map(tf): if tf not in numpy_htf or 'timestamp' not in numpy_htf[tf]: return np.zeros(len(arr_ts_1m), dtype=int) return np.clip(np.searchsorted(numpy_htf[tf]['timestamp'], arr_ts_1m), 0, len(numpy_htf[tf]['timestamp']) - 1) map_5m = get_map('5m'); map_15m = get_map('15m'); map_1h = get_map('1h'); map_4h = get_map('4h') titan_model = getattr(self.proc.titan, 'model', None) oracle_dir = getattr(self.proc.oracle, 'model_direction', None) oracle_cols = getattr(self.proc.oracle, 'feature_cols', []) sniper_models = getattr(self.proc.sniper, 'models', []) sniper_cols = getattr(self.proc.sniper, 'feature_names', []) hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {} legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None) # --- BATCH PREDICTIONS --- global_titan_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) if titan_model: titan_cols = [ '5m_open', '5m_high', '5m_low', '5m_close', '5m_volume', '5m_RSI', '5m_MACD', '5m_MACD_h', '5m_CCI', '5m_ADX', '5m_EMA_9_dist', '5m_EMA_21_dist', '5m_EMA_50_dist', '5m_EMA_200_dist', '5m_BB_w', '5m_BB_p', '5m_MFI', '5m_VWAP_dist', '15m_timestamp', '15m_RSI', '15m_MACD', '15m_MACD_h', '15m_CCI', '15m_ADX', '15m_EMA_9_dist', '15m_EMA_21_dist', '15m_EMA_50_dist', '15m_EMA_200_dist', '15m_BB_w', '15m_BB_p', '15m_MFI', '15m_VWAP_dist', '1h_timestamp', '1h_RSI', '1h_MACD_h', '1h_EMA_50_dist', '1h_EMA_200_dist', '1h_ATR_pct', '4h_timestamp', '4h_RSI', '4h_MACD_h', '4h_EMA_50_dist', '4h_EMA_200_dist', '4h_ATR_pct', '1d_timestamp', '1d_RSI', '1d_EMA_200_dist', '1d_Trend_Strong' ] try: t_vecs = [] for col in titan_cols: parts = col.split('_', 1); tf = parts[0]; feat = parts[1] target_arr = numpy_htf.get(tf, {}) target_map = locals().get(f"map_{tf}", np.zeros(len(arr_ts_1m), dtype=int)) if feat in target_arr: t_vecs.append(target_arr[feat][target_map]) elif feat == 'timestamp' and 'timestamp' in target_arr: t_vecs.append(target_arr['timestamp'][target_map]) elif feat in ['open','high','low','close','volume'] and feat in target_arr: t_vecs.append(target_arr[feat][target_map]) else: t_vecs.append(np.zeros(len(arr_ts_1m))) X_TITAN = np.column_stack(t_vecs) global_titan_scores = _revive_score_distribution(titan_model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols))) except: pass global_oracle_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) if oracle_dir: try: o_vecs = [] for col in oracle_cols: if col.startswith('1h_'): o_vecs.append(numpy_htf['1h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_1h]) elif col.startswith('15m_'): o_vecs.append(numpy_htf['15m'].get(col[4:], np.zeros(len(arr_ts_1m)))[map_15m]) elif col.startswith('4h_'): o_vecs.append(numpy_htf['4h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_4h]) elif col == 'sim_titan_score': o_vecs.append(global_titan_scores) elif col == 'sim_mc_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5)) elif col == 'sim_pattern_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5)) else: o_vecs.append(np.zeros(len(arr_ts_1m))) X_ORACLE = np.column_stack(o_vecs) preds_o = oracle_dir.predict(X_ORACLE) preds_o = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0] global_oracle_scores = _revive_score_distribution(preds_o) except: pass global_sniper_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) if sniper_models: try: s_vecs = [] for col in sniper_cols: if col in fast_1m: s_vecs.append(fast_1m[col]) elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z']) else: s_vecs.append(np.zeros(len(arr_ts_1m))) X_SNIPER = np.column_stack(s_vecs) preds = [m.predict(X_SNIPER) for m in sniper_models] global_sniper_scores = _revive_score_distribution(np.mean(preds, axis=0)) except: pass global_v2_scores = np.zeros(len(arr_ts_1m), dtype=np.float32) if legacy_v2: try: l_log = fast_1m['log_ret']; l_rsi = fast_1m['RSI'] / 100.0; l_fib = fast_1m['fib_pos']; l_vol = fast_1m['volatility'] l5_log = numpy_htf['5m']['log_ret'][map_5m]; l5_rsi = numpy_htf['5m']['RSI'][map_5m] / 100.0; l5_fib = numpy_htf['5m']['fib_pos'][map_5m]; l5_trd = numpy_htf['5m']['trend_slope'][map_5m] l15_log = numpy_htf['15m']['log_ret'][map_15m]; l15_rsi = numpy_htf['15m']['RSI'][map_15m] / 100.0; l15_fib618 = numpy_htf['15m']['dist_fib618'][map_15m]; l15_trd = numpy_htf['15m']['trend_slope'][map_15m] lags = [] for lag in [1, 2, 3, 5, 10, 20]: lags.extend([fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']]) X_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lags]) preds = legacy_v2.predict(xgb.DMatrix(X_V2)) global_v2_scores = preds[:, 2] if len(preds.shape) > 1 else preds global_v2_scores = global_v2_scores.flatten() except: pass global_hydra_crash = np.zeros(len(arr_ts_1m), dtype=np.float32) global_hydra_give = np.zeros(len(arr_ts_1m), dtype=np.float32) if hydra_models: try: zeros = np.zeros(len(arr_ts_1m)) h_static = np.column_stack([ fast_1m['RSI'], numpy_htf['5m']['RSI'][map_5m], numpy_htf['15m']['RSI'][map_15m], fast_1m['bb_width'], fast_1m['rel_vol'], fast_1m['atr'], fast_1m['close'] ]) X_H = np.column_stack([ h_static[:,0], h_static[:,1], h_static[:,2], h_static[:,3], h_static[:,4], zeros, fast_1m['ATR_pct'], zeros, zeros, zeros, zeros, zeros, zeros, global_oracle_scores, np.full(len(arr_ts_1m), 0.7), np.full(len(arr_ts_1m), 3.0) ]) probs_c = hydra_models['crash'].predict_proba(X_H)[:, 1] global_hydra_crash = probs_c.astype(np.float32) probs_g = hydra_models['giveback'].predict_proba(X_H)[:, 1] global_hydra_give = probs_g.astype(np.float32) except: pass # Filter rsi_1h = numpy_htf['1h'].get('RSI', np.zeros(len(arr_ts_1m)))[map_1h] # Keep candles where at least minimal promise exists (reduces size) is_candidate_mask = (rsi_1h <= 70) & (global_titan_scores > 0.3) & (global_oracle_scores > 0.3) candidate_indices = np.where(is_candidate_mask)[0] end_limit = len(arr_ts_1m) - 60 candidate_indices = candidate_indices[candidate_indices < end_limit] candidate_indices = candidate_indices[candidate_indices >= 500] print(f" ๐ŸŒช๏ธ Final List: {len(candidate_indices)} candidates ready for testing.", flush=True) ai_results = pd.DataFrame({ 'timestamp': arr_ts_1m[candidate_indices], 'symbol': sym, 'close': fast_1m['close'][candidate_indices], 'real_titan': global_titan_scores[candidate_indices], 'oracle_conf': global_oracle_scores[candidate_indices], 'sniper_score': global_sniper_scores[candidate_indices], 'pattern_score': np.full(len(candidate_indices), 0.5), 'risk_hydra_crash': global_hydra_crash[candidate_indices], 'risk_hydra_giveback': global_hydra_give[candidate_indices], 'risk_legacy_v2': global_v2_scores[candidate_indices], 'time_hydra_crash': np.zeros(len(candidate_indices), dtype=int), 'l1_score': 50.0 }) dt = time.time() - t0 if not ai_results.empty: ai_results.to_pickle(scores_file) print(f" โœ… [{sym}] Completed in {dt:.2f} seconds. ({len(ai_results)} signals)", flush=True) gc.collect() async def generate_truth_data(self): if self.force_start_date: dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000) print(f"\n๐Ÿšœ [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") for sym in self.TARGET_COINS: c = await self._fetch_all_data_fast(sym, ms_s, ms_e) if c: await self._process_data_in_memory(sym, c, ms_s, ms_e) @staticmethod def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): """๐Ÿš€ HYPER-SPEED JUMP LOGIC (NO LOOPING OVER IDLE CANDLES)""" print(f" โณ [System] Loading {len(scores_files)} datasets...", flush=True) data = [] for f in scores_files: try: data.append(pd.read_pickle(f)) except: pass if not data: return [] df = pd.concat(data).sort_values('timestamp').reset_index(drop=True) # Pre-load arrays for max speed ts = df['timestamp'].values close = df['close'].values.astype(float) sym = df['symbol'].values u_syms = np.unique(sym); sym_map = {s: i for i, s in enumerate(u_syms)}; sym_id = np.array([sym_map[s] for s in sym]) oracle = df['oracle_conf'].values sniper = df['sniper_score'].values titan = df['real_titan'].values pattern = df['pattern_score'].values l1 = df['l1_score'].values hydra = df['risk_hydra_crash'].values hydra_give = df['risk_hydra_giveback'].values legacy = df['risk_legacy_v2'].values N = len(ts) print(f" ๐Ÿš€ [System] Testing {len(combinations_batch)} configs on {N} candidates...", flush=True) res = [] for cfg in combinations_batch: # 1. Vectorized Entry Mask (The Speed Secret) # Instead of checking every candle, we calculate ALL valid entries at once entry_mask = (l1 >= cfg['L1_SCORE']) & \ (oracle >= cfg['ORACLE']) & \ (sniper >= cfg['SNIPER']) & \ (titan >= cfg['TITAN']) & \ (pattern >= cfg.get('PATTERN', 0.10)) # Get only the indices where entry is possible valid_entry_indices = np.where(entry_mask)[0] # Extract thresholds locally to avoid dictionary lookups in inner loop h_crash_thresh = cfg['HYDRA_CRASH'] h_give_thresh = cfg['HYDRA_GIVEBACK'] leg_thresh = cfg['LEGACY_V2'] # Simulation State pos = {} # sym_id -> (entry_price, size) bal = float(initial_capital) alloc = 0.0 log = [] # Iterate ONLY on relevant indices (Jump!) # But we must respect time. So we iterate valid indices, # and check exits for OPEN positions at that time step? # Problem: If we jump, we miss exits between entries. # Fix: We must iterate all rows for exits, but we can skip logic if no pos. # OR: Since df is filtered candidates only, gaps exist. # We assume candidates are frequent enough or we only check exits on candidate candles. # *Refinement*: The dataframe `df` only contains ~30k candidates out of 100k candles. # Exiting only on candidate candles is an approximation, but acceptable for optimization speed. for i in range(N): s = sym_id[i]; p = float(close[i]) # A. Check Exits (If holding this symbol) if s in pos: entry_p, size_val = pos[s] pnl = (p - entry_p) / entry_p # Guardian Logic (Local vars) is_guard = (hydra[i] > h_crash_thresh) or \ (hydra_give[i] > h_give_thresh) or \ (legacy[i] > leg_thresh) # VETO (Price Confirmation) confirmed = is_guard and (pnl < -0.0015) if confirmed or (pnl > 0.04) or (pnl < -0.02): realized = pnl - (fees_pct * 2) bal += size_val * (1.0 + realized) alloc -= size_val del pos[s] log.append({'pnl': realized}) continue # Can't buy same candle we sold # B. Check Entries (Only if mask is True) if entry_mask[i] and len(pos) < max_slots: if s not in pos and bal >= 5.0: size = min(10.0, bal * 0.98) pos[s] = (p, size) bal -= size; alloc += size # Calc Stats final_bal = bal + alloc profit = final_bal - initial_capital tot = len(log) winning = [x for x in log if x['pnl'] > 0] losing = [x for x in log if x['pnl'] <= 0] win_rate = (len(winning)/tot*100) if tot > 0 else 0.0 avg_win = np.mean([x['pnl'] for x in winning]) if winning else 0.0 avg_loss = np.mean([x['pnl'] for x in losing]) if losing else 0.0 gross_p = sum([x['pnl'] for x in winning]) gross_l = abs(sum([x['pnl'] for x in losing])) profit_factor = (gross_p / gross_l) if gross_l > 0 else 99.9 # Simple streaks max_win_s = 0; max_loss_s = 0; curr_w = 0; curr_l = 0 for t in log: if t['pnl'] > 0: curr_w +=1; curr_l = 0; max_win_s = max(max_win_s, curr_w) else: curr_l +=1; curr_w = 0; max_loss_s = max(max_loss_s, curr_l) res.append({ 'config': cfg, 'final_balance': final_bal, 'net_profit': profit, 'total_trades': tot, 'win_rate': win_rate, 'profit_factor': profit_factor, 'win_count': len(winning), 'loss_count': len(losing), 'avg_win': avg_win, 'avg_loss': avg_loss, 'max_win_streak': max_win_s, 'max_loss_streak': max_loss_s, 'consensus_agreement_rate': 0.0, 'high_consensus_win_rate': 0.0 }) return res async def run_optimization(self, target_regime="RANGE"): await self.generate_truth_data() keys = list(self.GRID_RANGES.keys()) values = list(self.GRID_RANGES.values()) combos = [dict(zip(keys, c)) for c in itertools.product(*values)] files = glob.glob(os.path.join(CACHE_DIR, "*.pkl")) results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0} results_list.sort(key=lambda x: x['net_profit'], reverse=True) best = results_list[0] mapped_config = { 'w_titan': best['config']['TITAN'], 'w_struct': best['config']['PATTERN'], 'thresh': best['config']['L1_SCORE'], 'oracle_thresh': best['config']['ORACLE'], 'sniper_thresh': best['config']['SNIPER'], 'hydra_thresh': best['config']['HYDRA_CRASH'], 'legacy_thresh': best['config']['LEGACY_V2'] } # Diagnosis diag = [] if best['total_trades'] > 2000 and best['net_profit'] < 10: diag.append("โš ๏ธ Overtrading") if best['win_rate'] > 55 and best['net_profit'] < 0: diag.append("โš ๏ธ Fee Burn") if abs(best['avg_loss']) > best['avg_win'] and best['win_count'] > 0: diag.append("โš ๏ธ Risk/Reward Inversion") if best['max_loss_streak'] > 10: diag.append("โš ๏ธ Consecutive Loss Risk") if not diag: diag.append("โœ… System Healthy") print("\n" + "="*60) print(f"๐Ÿ† CHAMPION REPORT [{target_regime}]:") print(f" ๐Ÿ’ฐ Final Balance: ${best['final_balance']:,.2f}") print(f" ๐Ÿš€ Net PnL: ${best['net_profit']:,.2f}") print("-" * 60) print(f" ๐Ÿ“Š Total Trades: {best['total_trades']}") print(f" ๐Ÿ“ˆ Win Rate: {best['win_rate']:.1f}%") print(f" โœ… Winning Trades: {best['win_count']} (Avg: {best['avg_win']*100:.2f}%)") print(f" โŒ Losing Trades: {best['loss_count']} (Avg: {best['avg_loss']*100:.2f}%)") print(f" ๐ŸŒŠ Max Streaks: Win {best['max_win_streak']} | Loss {best['max_loss_streak']}") print(f" โš–๏ธ Profit Factor: {best['profit_factor']:.2f}") print("-" * 60) print(f" ๐Ÿง  CONSENSUS ANALYTICS:") print(f" ๐Ÿค Model Agreement Rate: {best.get('consensus_agreement_rate', 0.0):.1f}%") print(f" ๐ŸŒŸ High-Consensus Win Rate: {best.get('high_consensus_win_rate', 0.0):.1f}%") print("-" * 60) print(f" ๐Ÿฉบ DIAGNOSIS: {' '.join(diag)}") p_str = "" for k, v in mapped_config.items(): if isinstance(v, float): p_str += f"{k}={v:.2f} | " else: p_str += f"{k}={v} | " print(f" โš™๏ธ Config: {p_str}") print("="*60) return mapped_config, best async def run_strategic_optimization_task(): print("\n๐Ÿงช [STRATEGIC BACKTEST] Hyper-Speed Jump Engine...") r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm) try: await dm.initialize(); await proc.initialize() if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True) hub = AdaptiveHub(r2); await hub.initialize() opt = HeavyDutyBacktester(dm, proc) scenarios = [ {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}, {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, ] for s in scenarios: opt.set_date_range(s["start"], s["end"]) best_cfg, best_stats = await opt.run_optimization(s["regime"]) if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats) await hub._save_state_to_r2() print("โœ… [System] DNA Updated.") finally: print("๐Ÿ”Œ [System] Closing connections...") await dm.close() if __name__ == "__main__": asyncio.run(run_strategic_optimization_task())