# ============================================================ # ๐Ÿงช backtest_engine.py (V139.0 - GEM-Architect: Vectorized Hydra Speed) # ============================================================ import asyncio import pandas as pd import numpy as np import pandas_ta as ta import time import logging import itertools import os import glob import gc import sys import traceback from numpy.lib.stride_tricks import sliding_window_view from datetime import datetime, timezone from typing import Dict, Any, List from scipy.special import expit try: from ml_engine.processor import MLProcessor, SystemLimits from ml_engine.data_manager import DataManager from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub from r2 import R2Service import ccxt.async_support as ccxt import xgboost as xgb import lightgbm as lgb except ImportError: pass logging.getLogger('ml_engine').setLevel(logging.WARNING) CACHE_DIR = "backtest_real_scores" # ============================================================ # ๐Ÿ›ก๏ธ GLOBAL HELPERS # ============================================================ def sanitize_features(df): if df is None or df.empty: return df return df.replace([np.inf, -np.inf], np.nan).ffill().bfill().fillna(0.0) def _z_roll(x, w=500): if not isinstance(x, pd.Series): x = pd.Series(x) r = x.rolling(w).mean() s = x.rolling(w).std().replace(0, np.nan) return ((x - r) / s).fillna(0) def _revive_score_distribution(scores): scores = np.array(scores, dtype=np.float32) if len(scores) < 10: return scores std = np.std(scores) if std < 0.05: mean = np.mean(scores) z = (scores - mean) / (std + 1e-9) return expit(z) return scores def safe_ta(ind_output, index, fill_method='smart'): if ind_output is None: return pd.Series(0.0, index=index, dtype='float64') if not isinstance(ind_output, pd.Series): s = pd.Series(ind_output, index=index) else: s = ind_output s = s.bfill().ffill() return s.fillna(0.0).astype('float64') def _zv(x): with np.errstate(divide='ignore', invalid='ignore'): x = np.asarray(x, dtype="float32") m = np.nanmean(x, axis=0) s = np.nanstd(x, axis=0) + 1e-9 return np.nan_to_num((x - m) / s, nan=0.0) def _transform_window_for_pattern(df_window): try: c = df_window['close'].values.astype('float32') o = df_window['open'].values.astype('float32') h = df_window['high'].values.astype('float32') l = df_window['low'].values.astype('float32') v = df_window['volume'].values.astype('float32') base = np.stack([o, h, l, c, v], axis=1) base_z = _zv(base) lr = np.zeros_like(c); lr[1:] = np.diff(np.log1p(c)) rng = (h - l) / (c + 1e-9) extra = np.stack([lr, rng], axis=1) extra_z = _zv(extra) def _ema(arr, n): return pd.Series(arr).ewm(span=n, adjust=False).mean().values ema9 = _ema(c, 9); ema21 = _ema(c, 21); ema50 = _ema(c, 50); ema200 = _ema(c, 200) slope21 = np.gradient(ema21); slope50 = np.gradient(ema50) delta = np.diff(c, prepend=c[0]) up, down = delta.copy(), delta.copy() up[up < 0] = 0; down[down > 0] = 0 roll_up = pd.Series(up).ewm(alpha=1/14, adjust=False).mean().values roll_down = pd.Series(down).abs().ewm(alpha=1/14, adjust=False).mean().values rs = roll_up / (roll_down + 1e-9) rsi = 100.0 - (100.0 / (1.0 + rs)) indicators = np.stack([ema9, ema21, ema50, ema200, slope21, slope50, rsi], axis=1) X_seq = np.concatenate([base_z, extra_z, _zv(indicators)], axis=1) X_flat = X_seq.flatten() X_stat = np.array([0.5, 0.0, 0.5], dtype="float32") return np.concatenate([X_flat, X_stat]) except: return None # ============================================================ # ๐Ÿงช THE BACKTESTER CLASS # ============================================================ class HeavyDutyBacktester: def __init__(self, data_manager, processor): self.dm = data_manager self.proc = processor self.GRID_DENSITY = 5 self.INITIAL_CAPITAL = 10.0 self.TRADING_FEES = 0.001 self.MAX_SLOTS = 4 self.TARGET_COINS = [ 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT', 'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT', 'SEI/USDT', 'TIA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'PYTH/USDT', 'WIF/USDT', 'PEPE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT', 'ONDO/USDT', 'ENA/USDT', 'HBAR/USDT', 'XLM/USDT', 'TAO/USDT', 'ZK/USDT', 'ZRO/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT', 'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'JTO/USDT', 'STRK/USDT', 'BLUR/USDT', 'ALT/USDT', 'JUP/USDT', 'PENDLE/USDT', 'ETHFI/USDT', 'MEME/USDT', 'ATOM/USDT' ] self.force_start_date = None self.force_end_date = None if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) print(f"๐Ÿงช [Backtest V139.0] Vectorized Hydra Speed Optimization.") def set_date_range(self, start_str, end_str): self.force_start_date = start_str self.force_end_date = end_str async def _fetch_all_data_fast(self, sym, start_ms, end_ms): print(f" โšก [Network] Downloading {sym}...", flush=True) limit = 1000 tasks = [] current = start_ms duration_per_batch = limit * 60 * 1000 while current < end_ms: tasks.append(current) current += duration_per_batch all_candles = [] sem = asyncio.Semaphore(20) async def _fetch_batch(timestamp): async with sem: for _ in range(3): try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit) except: await asyncio.sleep(0.5) return [] chunk_size = 50 for i in range(0, len(tasks), chunk_size): chunk_tasks = tasks[i:i + chunk_size] futures = [_fetch_batch(ts) for ts in chunk_tasks] results = await asyncio.gather(*futures) for res in results: if res: all_candles.extend(res) if not all_candles: return None df = pd.DataFrame(all_candles, columns=['timestamp', 'o', 'h', 'l', 'c', 'v']) df.drop_duplicates('timestamp', inplace=True) df = df[(df['timestamp'] >= start_ms) & (df['timestamp'] <= end_ms)] df.sort_values('timestamp', inplace=True) print(f" โœ… Downloaded {len(df)} candles.", flush=True) return df.values.tolist() def _calculate_indicators_vectorized(self, df, timeframe='1m'): cols = ['close', 'high', 'low', 'volume', 'open'] for c in cols: df[c] = df[c].astype(np.float64) idx = df.index df['RSI'] = safe_ta(ta.rsi(df['close'], length=14), idx, 50) macd = ta.macd(df['close']) if macd is not None: df['MACD'] = safe_ta(macd.iloc[:, 0], idx, 0) df['MACD_h'] = safe_ta(macd.iloc[:, 1], idx, 0) else: df['MACD'] = 0.0; df['MACD_h'] = 0.0 df['CCI'] = safe_ta(ta.cci(df['high'], df['low'], df['close'], length=20), idx, 0) adx = ta.adx(df['high'], df['low'], df['close'], length=14) if adx is not None: df['ADX'] = safe_ta(adx.iloc[:, 0], idx, 0) else: df['ADX'] = 0.0 if timeframe == '1d': df['Trend_Strong'] = np.where(df['ADX'] > 25, 1.0, 0.0) for p in [9, 21, 50, 200]: ema = safe_ta(ta.ema(df['close'], length=p), idx, 0) df[f'EMA_{p}_dist'] = ((df['close'] / ema.replace(0, np.nan)) - 1).fillna(0) df[f'ema{p}'] = ema df['ema20'] = safe_ta(ta.ema(df['close'], length=20), idx, df['close']) bb = ta.bbands(df['close'], length=20, std=2.0) if bb is not None: w = ((bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1].replace(0, np.nan)).fillna(0) p = ((df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]).replace(0, np.nan)).fillna(0) df['BB_w'] = w; df['BB_p'] = p; df['bb_width'] = w else: df['BB_w'] = 0; df['BB_p'] = 0; df['bb_width'] = 0 df['MFI'] = safe_ta(ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14), idx, 50) vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) if vwap is not None: df['VWAP_dist'] = ((df['close'] / vwap.replace(0, np.nan)) - 1).fillna(0) df['vwap'] = vwap else: df['VWAP_dist'] = 0.0; df['vwap'] = df['close'] df['atr'] = safe_ta(ta.atr(df['high'], df['low'], df['close'], length=14), idx, 0) df['atr_pct'] = (df['atr'] / df['close'].replace(0, np.nan)).fillna(0) df['ATR_pct'] = df['atr_pct'] if timeframe == '1m': df['return_1m'] = df['close'].pct_change().fillna(0) df['return_3m'] = df['close'].pct_change(3).fillna(0) df['return_5m'] = df['close'].pct_change(5).fillna(0) df['return_15m'] = df['close'].pct_change(15).fillna(0) df['rsi_14'] = df['RSI'] e9 = df['ema9'].replace(0, np.nan) df['ema_9_slope'] = ((df['ema9'] - df['ema9'].shift(1)) / e9.shift(1)).fillna(0) df['ema_21_dist'] = df['EMA_21_dist'] atr_100 = safe_ta(ta.atr(df['high'], df['low'], df['close'], length=100), idx, 0) df['atr_z'] = _z_roll(atr_100) df['vol_zscore_50'] = _z_roll(df['volume'], 50) rng = (df['high'] - df['low']).replace(0, 1e-9) df['candle_range'] = _z_roll(rng, 500) df['close_pos_in_range'] = ((df['close'] - df['low']) / rng).fillna(0.5) df['dollar_vol'] = df['close'] * df['volume'] amihud_raw = (df['return_1m'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0) df['amihud'] = _z_roll(amihud_raw) dp = df['close'].diff() roll_cov = dp.rolling(64).cov(dp.shift(1)) roll_spread_raw = (2 * np.sqrt(np.maximum(0, -roll_cov))).fillna(0) df['roll_spread'] = _z_roll(roll_spread_raw) sign = np.sign(df['close'].diff()).fillna(0) signed_vol = sign * df['volume'] ofi_raw = signed_vol.rolling(30).sum().fillna(0) df['ofi'] = _z_roll(ofi_raw) buy_vol = (sign > 0) * df['volume'] sell_vol = (sign < 0) * df['volume'] imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs() tot = df['volume'].rolling(60).sum().replace(0, np.nan) df['vpin'] = (imb / tot).fillna(0) vwap_win = 20 v_short = (df['dollar_vol'].rolling(vwap_win).sum() / df['volume'].rolling(vwap_win).sum().replace(0, np.nan)).fillna(df['close']) df['vwap_dev'] = _z_roll(df['close'] - v_short) rv_gk = ((np.log(df['high'] / df['low'])**2) / 2) - ((2 * np.log(2) - 1) * (np.log(df['close'] / df['open'])**2)) df['rv_gk'] = _z_roll(rv_gk) df['L_score'] = (df['vol_zscore_50'] - df['amihud'] - df['roll_spread'] - df['rv_gk'].abs() - df['vwap_dev'].abs() + df['ofi']).fillna(0) df['slope'] = safe_ta(ta.slope(df['close'], length=7), idx, 0) vol_mean = df['volume'].rolling(20).mean() vol_std = df['volume'].rolling(20).std().replace(0, np.nan) df['vol_z'] = ((df['volume'] - vol_mean) / vol_std).fillna(0) df['rel_vol'] = df['volume'] / (df['volume'].rolling(50).mean() + 1e-9) df['log_ret'] = np.log(df['close'] / df['close'].shift(1).replace(0, np.nan)).fillna(0) roll_max = df['high'].rolling(50).max() roll_min = df['low'].rolling(50).min() diff = (roll_max - roll_min).replace(0, 1e-9) df['fib_pos'] = ((df['close'] - roll_min) / diff).fillna(0.5) e20_s = df['ema20'].shift(5).replace(0, np.nan) df['trend_slope'] = ((df['ema20'] - df['ema20'].shift(5)) / e20_s).fillna(0) df['volatility'] = (df['atr'] / df['close'].replace(0, np.nan)).fillna(0) fib618 = roll_max - (diff * 0.382) df['dist_fib618'] = ((df['close'] - fib618) / df['close'].replace(0, np.nan)).fillna(0) df['dist_ema50'] = ((df['close'] - df['ema50']) / df['ema50'].replace(0, np.nan)).fillna(0) e200 = safe_ta(ta.ema(df['close'], length=200), idx, df['close']) df['ema200'] = e200 df['dist_ema200'] = ((df['close'] - e200) / e200.replace(0, np.nan)).fillna(0) if timeframe == '1m': for lag in [1, 2, 3, 5, 10, 20]: df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0) df[f'rsi_lag_{lag}'] = (df['RSI'].shift(lag) / 100.0).fillna(0.5) df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5) df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0) df.fillna(0, inplace=True) return df async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): safe_sym = sym.replace('/', '_') period_suffix = f"{start_ms}_{end_ms}" scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl" if os.path.exists(scores_file): print(f" ๐Ÿ“‚ [{sym}] Data Exists -> Skipping.") return print(f" โš™๏ธ [CPU] Analyzing {sym}...", flush=True) t0 = time.time() df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') df_1m.set_index('datetime', inplace=True) df_1m = df_1m.sort_index() frames = {} frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m') frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6 fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns} agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} numpy_htf = {} for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]: resampled = df_1m.resample(tf_code).agg(agg_dict).dropna() resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str) resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6 frames[tf_str] = resampled numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} arr_ts_1m = fast_1m['timestamp'] map_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['5m']['timestamp']) - 1) map_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['15m']['timestamp']) - 1) map_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1h']['timestamp']) - 1) map_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['4h']['timestamp']) - 1) map_1d = np.clip(np.searchsorted(numpy_htf['1d']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1d']['timestamp']) - 1) if '1d' in numpy_htf else np.zeros(len(arr_ts_1m), dtype=int) hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {} hydra_cols = getattr(self.proc.guardian_hydra, 'feature_cols', []) if self.proc.guardian_hydra else [] legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None) oracle_dir = getattr(self.proc.oracle, 'model_direction', None) oracle_cols = getattr(self.proc.oracle, 'feature_cols', []) sniper_models = getattr(self.proc.sniper, 'models', []) sniper_cols = getattr(self.proc.sniper, 'feature_names', []) titan_model = getattr(self.proc.titan, 'model', None) # A. TITAN (Global) global_titan_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) if titan_model: titan_cols = [ '5m_open', '5m_high', '5m_low', '5m_close', '5m_volume', '5m_RSI', '5m_MACD', '5m_MACD_h', '5m_CCI', '5m_ADX', '5m_EMA_9_dist', '5m_EMA_21_dist', '5m_EMA_50_dist', '5m_EMA_200_dist', '5m_BB_w', '5m_BB_p', '5m_MFI', '5m_VWAP_dist', '15m_timestamp', '15m_RSI', '15m_MACD', '15m_MACD_h', '15m_CCI', '15m_ADX', '15m_EMA_9_dist', '15m_EMA_21_dist', '15m_EMA_50_dist', '15m_EMA_200_dist', '15m_BB_w', '15m_BB_p', '15m_MFI', '15m_VWAP_dist', '1h_timestamp', '1h_RSI', '1h_MACD_h', '1h_EMA_50_dist', '1h_EMA_200_dist', '1h_ATR_pct', '4h_timestamp', '4h_RSI', '4h_MACD_h', '4h_EMA_50_dist', '4h_EMA_200_dist', '4h_ATR_pct', '1d_timestamp', '1d_RSI', '1d_EMA_200_dist', '1d_Trend_Strong' ] print(" ๐Ÿš€ Running Global Titan...", flush=True) try: t_vecs = [] for col in titan_cols: parts = col.split('_', 1); tf = parts[0]; feat = parts[1] target_arr = numpy_htf.get(tf, None) target_map = locals().get(f"map_{tf}", None) if target_arr and feat in target_arr: t_vecs.append(target_arr[feat][target_map]) elif target_arr and feat == 'timestamp': t_vecs.append(target_arr['timestamp'][target_map]) elif target_arr and feat in ['open','high','low','close','volume']: t_vecs.append(target_arr[feat][target_map]) else: t_vecs.append(np.zeros(len(arr_ts_1m))) X_TITAN = np.column_stack(t_vecs) global_titan_scores = _revive_score_distribution(titan_model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols))) except: pass # B. SNIPER (Global) global_sniper_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) if sniper_models: print(" ๐Ÿš€ Running Global Sniper...", flush=True) try: s_vecs = [] for col in sniper_cols: if col in fast_1m: s_vecs.append(fast_1m[col]) elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z']) else: s_vecs.append(np.zeros(len(arr_ts_1m))) X_SNIPER = np.column_stack(s_vecs) preds = [m.predict(X_SNIPER) for m in sniper_models] global_sniper_scores = _revive_score_distribution(np.mean(preds, axis=0)) except: pass # C. ORACLE (Global) global_oracle_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) if oracle_dir: print(" ๐Ÿš€ Running Global Oracle...", flush=True) try: o_vecs = [] for col in oracle_cols: if col.startswith('1h_'): o_vecs.append(numpy_htf['1h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_1h]) elif col.startswith('15m_'): o_vecs.append(numpy_htf['15m'].get(col[4:], np.zeros(len(arr_ts_1m)))[map_15m]) elif col.startswith('4h_'): o_vecs.append(numpy_htf['4h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_4h]) elif col == 'sim_titan_score': o_vecs.append(global_titan_scores) elif col == 'sim_mc_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5)) elif col == 'sim_pattern_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5)) else: o_vecs.append(np.zeros(len(arr_ts_1m))) X_ORACLE = np.column_stack(o_vecs) preds_o = oracle_dir.predict(X_ORACLE) preds_o = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0] global_oracle_scores = _revive_score_distribution(preds_o) except: pass # D. LEGACY (Global) global_v2_scores = np.zeros(len(arr_ts_1m), dtype=np.float32) if legacy_v2: try: l_log = fast_1m['log_ret']; l_rsi = fast_1m['RSI'] / 100.0; l_fib = fast_1m['fib_pos']; l_vol = fast_1m['volatility'] l5_log = numpy_htf['5m']['log_ret'][map_5m]; l5_rsi = numpy_htf['5m']['RSI'][map_5m] / 100.0; l5_fib = numpy_htf['5m']['fib_pos'][map_5m]; l5_trd = numpy_htf['5m']['trend_slope'][map_5m] l15_log = numpy_htf['15m']['log_ret'][map_15m]; l15_rsi = numpy_htf['15m']['RSI'][map_15m] / 100.0; l15_fib618 = numpy_htf['15m']['dist_fib618'][map_15m]; l15_trd = numpy_htf['15m']['trend_slope'][map_15m] lags = [] for lag in [1, 2, 3, 5, 10, 20]: lags.extend([fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']]) X_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lags]) preds = legacy_v2.predict(xgb.DMatrix(X_V2)) global_v2_scores = preds[:, 2] if len(preds.shape) > 1 else preds except: pass # Filter is_candidate = (numpy_htf['1h']['RSI'][map_1h] <= 70) & (global_titan_scores > 0.4) & (global_oracle_scores > 0.4) candidate_indices = np.where(is_candidate)[0] start_ts_val = frames['1m'].index[0] + pd.Timedelta(minutes=500) start_idx_offset = np.searchsorted(arr_ts_1m, int(start_ts_val.timestamp()*1000)) candidate_indices = candidate_indices[candidate_indices >= start_idx_offset] candidate_indices = candidate_indices[candidate_indices < (len(arr_ts_1m) - 245)] print(f" ๐ŸŽฏ Candidates: {len(candidate_indices)}. Running Vectorized Hydra...", flush=True) # ๐Ÿš€ VECTORIZED HYDRA SIMULATION ๐Ÿš€ ai_results = [] if hydra_models and len(candidate_indices) > 0: # Prepare Static Features Matrix (Global) h_static = np.column_stack([ fast_1m['RSI'], numpy_htf['5m']['RSI'][map_5m], numpy_htf['15m']['RSI'][map_15m], fast_1m['bb_width'], fast_1m['rel_vol'], fast_1m['atr'], fast_1m['close'] ]) # Shape: (N, 7) # Process candidates in chunks to avoid RAM explosion chunk_size = 5000 for i in range(0, len(candidate_indices), chunk_size): chunk_idxs = candidate_indices[i:i+chunk_size] # We need sliding windows of 240 steps for each candidate # Trick: Use broadcasting or sliding_window_view on static features # But sliding_window_view on huge array is slow. Better to just slice. # Vectorized construction for chunk # 1. Extract entry prices entries = fast_1m['close'][chunk_idxs] entries_ts = fast_1m['timestamp'][chunk_idxs] # 2. Prepare sequences (Vectorized slice is hard in numpy without creating copies) # We stick to a tight loop or specialized indexing. # Given we need to construct a [Batch, 240, Features] array for Hydra... # Fastest way: List comprehension for slicing, then stack. # Since Hydra is XGBoost, we can flatten the time dimension? No, Hydra is 1D input (snapshot). # Wait, Hydra predicts Crash Probability for a SNAPSHOT state. # In simulation, we need to check crash prob at t+1, t+2... t+240. # That is 240 checks per candidate. 42,000 * 240 = 10 Million checks. # This IS the bottleneck. # OPTIMIZATION: Only check Hydra if PnL drops below -0.5% or something? No, that misses the point. # OPTIMIZATION 2 (Implemented): Vectorize the "Check" logic. # Construct big matrix for ALL checks: (N_Candidates * 240, Features) # But that's 10M rows. XGBoost inference on 10M rows takes ~3-5 seconds on CPU. This is feasible! # Let's do it per candidate to be safe on RAM, but fast. for idx in chunk_idxs: # Slicing is fast sl_st = h_static[idx:idx+240] sl_close = sl_st[:, 6]; sl_atr = sl_st[:, 5] entry = fast_1m['close'][idx] dist = np.maximum(1.5 * sl_atr, entry * 0.015) pnl = sl_close - entry norm_pnl = pnl / dist max_pnl_r = (np.maximum.accumulate(sl_close) - entry) / dist atr_pct = sl_atr / sl_close # Stack Hydra Input (240 rows) # Cols: rsi1, rsi5, rsi15, bb, vol, dist_ema(0), atr_pct, norm, max, dists(0), time, entry(0), oracle, l2, target zeros = np.zeros(240) time_vec = np.arange(1, 241) s_oracle = global_oracle_scores[idx] X_H = np.column_stack([ sl_st[:,0], sl_st[:,1], sl_st[:,2], sl_st[:,3], sl_st[:,4], zeros, atr_pct, norm_pnl, max_pnl_r, zeros, zeros, time_vec, zeros, np.full(240, s_oracle), np.full(240, 0.7), np.full(240, 3.0) ]) # Predict 240 steps at once max_hydra = 0.0; hydra_time = 0 try: probs = hydra_models['crash'].predict_proba(X_H)[:, 1] max_hydra = np.max(probs) if max_hydra > 0.6: t = np.argmax(probs) hydra_time = int(fast_1m['timestamp'][idx + t]) except: pass # Legacy Max max_v2 = np.max(global_v2_scores[idx:idx+240]) v2_time = 0 if max_v2 > 0.8: t2 = np.argmax(global_v2_scores[idx:idx+240]) v2_time = int(fast_1m['timestamp'][idx + t2]) ai_results.append({ 'timestamp': int(fast_1m['timestamp'][idx]), 'symbol': sym, 'close': entry, 'real_titan': global_titan_scores[idx], 'oracle_conf': s_oracle, 'sniper_score': global_sniper_scores[idx], 'risk_hydra_crash': max_hydra, 'time_hydra_crash': hydra_time, 'risk_legacy_v2': max_v2, 'time_legacy_panic': v2_time, 'signal_type': 'BREAKOUT', 'l1_score': 50.0 }) dt = time.time() - t0 if ai_results: pd.DataFrame(ai_results).to_pickle(scores_file) print(f" โœ… [{sym}] Completed in {dt:.2f} seconds. ({len(ai_results)} signals)", flush=True) gc.collect() async def generate_truth_data(self): if self.force_start_date: dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000) print(f"\n๐Ÿšœ [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") for sym in self.TARGET_COINS: c = await self._fetch_all_data_fast(sym, ms_s, ms_e) if c: await self._process_data_in_memory(sym, c, ms_s, ms_e) @staticmethod def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): print(f" โณ [System] Loading {len(scores_files)} datasets...", flush=True) data = [] for f in scores_files: try: data.append(pd.read_pickle(f)) except: pass if not data: return [] df = pd.concat(data).sort_values('timestamp') ts = df['timestamp'].values; close = df['close'].values.astype(float) sym = df['symbol'].values; sym_map = {s:i for i,s in enumerate(np.unique(sym))} sym_id = np.array([sym_map[s] for s in sym]) oracle = df['oracle_conf'].values; sniper = df['sniper_score'].values hydra = df['risk_hydra_crash'].values; titan = df['real_titan'].values l1 = df['l1_score'].values legacy_v2 = df['risk_legacy_v2'].values N = len(ts) print(f" ๐Ÿš€ [System] Testing {len(combinations_batch)} configs on {N} candles...", flush=True) res = [] for cfg in combinations_batch: pos = {}; log = [] bal = initial_capital; alloc = 0.0 mask = (l1 >= cfg['l1_thresh']) & (oracle >= cfg['oracle_thresh']) & (sniper >= cfg['sniper_thresh']) & (titan >= 0.55) for i in range(N): s = sym_id[i]; p = close[i] if s in pos: entry = pos[s][0]; h_r = pos[s][1]; titan_entry = pos[s][3] crash_hydra = (h_r > cfg['hydra_thresh']) panic_legacy = (legacy_v2[i] > cfg['legacy_thresh']) pnl = (p - entry)/entry if crash_hydra or panic_legacy or pnl > 0.04 or pnl < -0.02: realized = pnl - fees_pct*2 bal += pos[s][2] * (1 + realized) alloc -= pos[s][2] is_consensus = (titan_entry > 0.55) log.append({'pnl': realized, 'consensus': is_consensus}) del pos[s] if len(pos) < max_slots and mask[i]: if s not in pos and bal >= 5.0: size = min(10.0, bal * 0.98) pos[s] = (p, hydra[i], size, titan[i]) bal -= size; alloc += size final_bal = bal + alloc profit = final_bal - initial_capital tot = len(log) winning = [x for x in log if x['pnl'] > 0] losing = [x for x in log if x['pnl'] <= 0] win_count = len(winning); loss_count = len(losing) win_rate = (win_count/tot*100) if tot else 0 avg_win = np.mean([x['pnl'] for x in winning]) if winning else 0 avg_loss = np.mean([x['pnl'] for x in losing]) if losing else 0 gross_p = sum([x['pnl'] for x in winning]) gross_l = abs(sum([x['pnl'] for x in losing])) profit_factor = (gross_p / gross_l) if gross_l > 0 else 99.9 max_win_s = 0; max_loss_s = 0; curr_w = 0; curr_l = 0 for t in log: if t['pnl'] > 0: curr_w += 1; curr_l = 0 if curr_w > max_win_s: max_win_s = curr_w else: curr_l += 1; curr_w = 0 if curr_l > max_loss_s: max_loss_s = curr_l cons_trades = [x for x in log if x['consensus']] n_cons = len(cons_trades) agree_rate = (n_cons/tot*100) if tot else 0 cons_win_rate = (sum(1 for x in cons_trades if x['pnl']>0)/n_cons*100) if n_cons else 0 cons_avg_pnl = (sum(x['pnl'] for x in cons_trades)/n_cons*100) if n_cons else 0 res.append({ 'config': cfg, 'final_balance': final_bal, 'net_profit': profit, 'total_trades': tot, 'win_rate': win_rate, 'max_drawdown': 0, 'win_count': win_count, 'loss_count': loss_count, 'avg_win': avg_win, 'avg_loss': avg_loss, 'max_win_streak': max_win_s, 'max_loss_streak': max_loss_s, 'profit_factor': profit_factor, 'consensus_agreement_rate': agree_rate, 'high_consensus_win_rate': cons_win_rate, 'high_consensus_avg_pnl': cons_avg_pnl }) return res async def run_optimization(self, target_regime="RANGE"): await self.generate_truth_data() oracle_r = np.linspace(0.4, 0.7, 3); sniper_r = np.linspace(0.4, 0.7, 3) hydra_r = [0.85, 0.95]; l1_r = [10.0] combos = [] for o, s, h, l1 in itertools.product(oracle_r, sniper_r, hydra_r, l1_r): combos.append({ 'w_titan': 0.4, 'w_struct': 0.3, 'thresh': l1, 'l1_thresh': l1, 'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95 }) files = glob.glob(os.path.join(CACHE_DIR, "*.pkl")) results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0} results_list.sort(key=lambda x: x['net_profit'], reverse=True) best = results_list[0] # Auto-Diagnosis diag = [] if best['total_trades'] > 2000 and best['net_profit'] < 10: diag.append("โš ๏ธ Overtrading") if best['win_rate'] > 55 and best['net_profit'] < 0: diag.append("โš ๏ธ Fee Burn") if abs(best['avg_loss']) > best['avg_win']: diag.append("โš ๏ธ Risk/Reward Inversion") if best['max_loss_streak'] > 10: diag.append("โš ๏ธ Consecutive Loss Risk") if not diag: diag.append("โœ… System Healthy") print("\n" + "="*60) print(f"๐Ÿ† CHAMPION REPORT [{target_regime}]:") print(f" ๐Ÿ’ฐ Final Balance: ${best['final_balance']:,.2f}") print(f" ๐Ÿš€ Net PnL: ${best['net_profit']:,.2f}") print("-" * 60) print(f" ๐Ÿ“Š Total Trades: {best['total_trades']}") print(f" ๐Ÿ“ˆ Win Rate: {best['win_rate']:.1f}%") print(f" โœ… Winning Trades: {best['win_count']} (Avg: {best['avg_win']*100:.2f}%)") print(f" โŒ Losing Trades: {best['loss_count']} (Avg: {best['avg_loss']*100:.2f}%)") print(f" ๐ŸŒŠ Max Streaks: Win {best['max_win_streak']} | Loss {best['max_loss_streak']}") print(f" โš–๏ธ Profit Factor: {best['profit_factor']:.2f}") print("-" * 60) print(f" ๐Ÿง  CONSENSUS ANALYTICS:") print(f" ๐Ÿค Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%") print(f" ๐ŸŒŸ High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%") print("-" * 60) print(f" ๐Ÿฉบ DIAGNOSIS: {' '.join(diag)}") print(f" โš™๏ธ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}") print("="*60) return best['config'], best async def run_strategic_optimization_task(): print("\n๐Ÿงช [STRATEGIC BACKTEST] Vectorized Hydra Speed...") r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm) try: await dm.initialize(); await proc.initialize() if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True) hub = AdaptiveHub(r2); await hub.initialize() opt = HeavyDutyBacktester(dm, proc) scenarios = [ {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"} ] for s in scenarios: opt.set_date_range(s["start"], s["end"]) best_cfg, best_stats = await opt.run_optimization(s["regime"]) if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats) await hub._save_state_to_r2() print("โœ… [System] DNA Updated.") finally: print("๐Ÿ”Œ [System] Closing connections...") await dm.close() if __name__ == "__main__": asyncio.run(run_strategic_optimization_task())