Spaces:
Paused
Paused
| # ============================================================ | |
| # π§ͺ backtest_engine.py (V139.0 - GEM-Architect: Vectorized Hydra Speed) | |
| # ============================================================ | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import pandas_ta as ta | |
| import time | |
| import logging | |
| import itertools | |
| import os | |
| import glob | |
| import gc | |
| import sys | |
| import traceback | |
| from numpy.lib.stride_tricks import sliding_window_view | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List | |
| from scipy.special import expit | |
| try: | |
| from ml_engine.processor import MLProcessor, SystemLimits | |
| from ml_engine.data_manager import DataManager | |
| from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub | |
| from r2 import R2Service | |
| import ccxt.async_support as ccxt | |
| import xgboost as xgb | |
| import lightgbm as lgb | |
| except ImportError: | |
| pass | |
| logging.getLogger('ml_engine').setLevel(logging.WARNING) | |
| CACHE_DIR = "backtest_real_scores" | |
| # ============================================================ | |
| # π‘οΈ GLOBAL HELPERS | |
| # ============================================================ | |
| def sanitize_features(df): | |
| if df is None or df.empty: return df | |
| return df.replace([np.inf, -np.inf], np.nan).ffill().bfill().fillna(0.0) | |
| def _z_roll(x, w=500): | |
| if not isinstance(x, pd.Series): x = pd.Series(x) | |
| r = x.rolling(w).mean() | |
| s = x.rolling(w).std().replace(0, np.nan) | |
| return ((x - r) / s).fillna(0) | |
| def _revive_score_distribution(scores): | |
| scores = np.array(scores, dtype=np.float32) | |
| if len(scores) < 10: return scores | |
| std = np.std(scores) | |
| if std < 0.05: | |
| mean = np.mean(scores) | |
| z = (scores - mean) / (std + 1e-9) | |
| return expit(z) | |
| return scores | |
| def safe_ta(ind_output, index, fill_method='smart'): | |
| if ind_output is None: | |
| return pd.Series(0.0, index=index, dtype='float64') | |
| if not isinstance(ind_output, pd.Series): | |
| s = pd.Series(ind_output, index=index) | |
| else: | |
| s = ind_output | |
| s = s.bfill().ffill() | |
| return s.fillna(0.0).astype('float64') | |
| def _zv(x): | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| x = np.asarray(x, dtype="float32") | |
| m = np.nanmean(x, axis=0) | |
| s = np.nanstd(x, axis=0) + 1e-9 | |
| return np.nan_to_num((x - m) / s, nan=0.0) | |
| def _transform_window_for_pattern(df_window): | |
| try: | |
| c = df_window['close'].values.astype('float32') | |
| o = df_window['open'].values.astype('float32') | |
| h = df_window['high'].values.astype('float32') | |
| l = df_window['low'].values.astype('float32') | |
| v = df_window['volume'].values.astype('float32') | |
| base = np.stack([o, h, l, c, v], axis=1) | |
| base_z = _zv(base) | |
| lr = np.zeros_like(c); lr[1:] = np.diff(np.log1p(c)) | |
| rng = (h - l) / (c + 1e-9) | |
| extra = np.stack([lr, rng], axis=1) | |
| extra_z = _zv(extra) | |
| def _ema(arr, n): return pd.Series(arr).ewm(span=n, adjust=False).mean().values | |
| ema9 = _ema(c, 9); ema21 = _ema(c, 21); ema50 = _ema(c, 50); ema200 = _ema(c, 200) | |
| slope21 = np.gradient(ema21); slope50 = np.gradient(ema50) | |
| delta = np.diff(c, prepend=c[0]) | |
| up, down = delta.copy(), delta.copy() | |
| up[up < 0] = 0; down[down > 0] = 0 | |
| roll_up = pd.Series(up).ewm(alpha=1/14, adjust=False).mean().values | |
| roll_down = pd.Series(down).abs().ewm(alpha=1/14, adjust=False).mean().values | |
| rs = roll_up / (roll_down + 1e-9) | |
| rsi = 100.0 - (100.0 / (1.0 + rs)) | |
| indicators = np.stack([ema9, ema21, ema50, ema200, slope21, slope50, rsi], axis=1) | |
| X_seq = np.concatenate([base_z, extra_z, _zv(indicators)], axis=1) | |
| X_flat = X_seq.flatten() | |
| X_stat = np.array([0.5, 0.0, 0.5], dtype="float32") | |
| return np.concatenate([X_flat, X_stat]) | |
| except: return None | |
| # ============================================================ | |
| # π§ͺ THE BACKTESTER CLASS | |
| # ============================================================ | |
| class HeavyDutyBacktester: | |
| def __init__(self, data_manager, processor): | |
| self.dm = data_manager | |
| self.proc = processor | |
| self.GRID_DENSITY = 5 | |
| self.INITIAL_CAPITAL = 10.0 | |
| self.TRADING_FEES = 0.001 | |
| self.MAX_SLOTS = 4 | |
| self.TARGET_COINS = [ | |
| 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT', | |
| 'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT', | |
| 'SEI/USDT', 'TIA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'PYTH/USDT', | |
| 'WIF/USDT', 'PEPE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT', | |
| 'ONDO/USDT', 'ENA/USDT', 'HBAR/USDT', 'XLM/USDT', 'TAO/USDT', 'ZK/USDT', | |
| 'ZRO/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT', | |
| 'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'JTO/USDT', | |
| 'STRK/USDT', 'BLUR/USDT', 'ALT/USDT', 'JUP/USDT', 'PENDLE/USDT', 'ETHFI/USDT', | |
| 'MEME/USDT', 'ATOM/USDT' | |
| ] | |
| self.force_start_date = None | |
| self.force_end_date = None | |
| if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) | |
| print(f"π§ͺ [Backtest V139.0] Vectorized Hydra Speed Optimization.") | |
| def set_date_range(self, start_str, end_str): | |
| self.force_start_date = start_str | |
| self.force_end_date = end_str | |
| async def _fetch_all_data_fast(self, sym, start_ms, end_ms): | |
| print(f" β‘ [Network] Downloading {sym}...", flush=True) | |
| limit = 1000 | |
| tasks = [] | |
| current = start_ms | |
| duration_per_batch = limit * 60 * 1000 | |
| while current < end_ms: | |
| tasks.append(current) | |
| current += duration_per_batch | |
| all_candles = [] | |
| sem = asyncio.Semaphore(20) | |
| async def _fetch_batch(timestamp): | |
| async with sem: | |
| for _ in range(3): | |
| try: | |
| return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit) | |
| except: await asyncio.sleep(0.5) | |
| return [] | |
| chunk_size = 50 | |
| for i in range(0, len(tasks), chunk_size): | |
| chunk_tasks = tasks[i:i + chunk_size] | |
| futures = [_fetch_batch(ts) for ts in chunk_tasks] | |
| results = await asyncio.gather(*futures) | |
| for res in results: | |
| if res: all_candles.extend(res) | |
| if not all_candles: return None | |
| df = pd.DataFrame(all_candles, columns=['timestamp', 'o', 'h', 'l', 'c', 'v']) | |
| df.drop_duplicates('timestamp', inplace=True) | |
| df = df[(df['timestamp'] >= start_ms) & (df['timestamp'] <= end_ms)] | |
| df.sort_values('timestamp', inplace=True) | |
| print(f" β Downloaded {len(df)} candles.", flush=True) | |
| return df.values.tolist() | |
| def _calculate_indicators_vectorized(self, df, timeframe='1m'): | |
| cols = ['close', 'high', 'low', 'volume', 'open'] | |
| for c in cols: df[c] = df[c].astype(np.float64) | |
| idx = df.index | |
| df['RSI'] = safe_ta(ta.rsi(df['close'], length=14), idx, 50) | |
| macd = ta.macd(df['close']) | |
| if macd is not None: | |
| df['MACD'] = safe_ta(macd.iloc[:, 0], idx, 0) | |
| df['MACD_h'] = safe_ta(macd.iloc[:, 1], idx, 0) | |
| else: df['MACD'] = 0.0; df['MACD_h'] = 0.0 | |
| df['CCI'] = safe_ta(ta.cci(df['high'], df['low'], df['close'], length=20), idx, 0) | |
| adx = ta.adx(df['high'], df['low'], df['close'], length=14) | |
| if adx is not None: df['ADX'] = safe_ta(adx.iloc[:, 0], idx, 0) | |
| else: df['ADX'] = 0.0 | |
| if timeframe == '1d': df['Trend_Strong'] = np.where(df['ADX'] > 25, 1.0, 0.0) | |
| for p in [9, 21, 50, 200]: | |
| ema = safe_ta(ta.ema(df['close'], length=p), idx, 0) | |
| df[f'EMA_{p}_dist'] = ((df['close'] / ema.replace(0, np.nan)) - 1).fillna(0) | |
| df[f'ema{p}'] = ema | |
| df['ema20'] = safe_ta(ta.ema(df['close'], length=20), idx, df['close']) | |
| bb = ta.bbands(df['close'], length=20, std=2.0) | |
| if bb is not None: | |
| w = ((bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1].replace(0, np.nan)).fillna(0) | |
| p = ((df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]).replace(0, np.nan)).fillna(0) | |
| df['BB_w'] = w; df['BB_p'] = p; df['bb_width'] = w | |
| else: df['BB_w'] = 0; df['BB_p'] = 0; df['bb_width'] = 0 | |
| df['MFI'] = safe_ta(ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14), idx, 50) | |
| vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) | |
| if vwap is not None: | |
| df['VWAP_dist'] = ((df['close'] / vwap.replace(0, np.nan)) - 1).fillna(0) | |
| df['vwap'] = vwap | |
| else: df['VWAP_dist'] = 0.0; df['vwap'] = df['close'] | |
| df['atr'] = safe_ta(ta.atr(df['high'], df['low'], df['close'], length=14), idx, 0) | |
| df['atr_pct'] = (df['atr'] / df['close'].replace(0, np.nan)).fillna(0) | |
| df['ATR_pct'] = df['atr_pct'] | |
| if timeframe == '1m': | |
| df['return_1m'] = df['close'].pct_change().fillna(0) | |
| df['return_3m'] = df['close'].pct_change(3).fillna(0) | |
| df['return_5m'] = df['close'].pct_change(5).fillna(0) | |
| df['return_15m'] = df['close'].pct_change(15).fillna(0) | |
| df['rsi_14'] = df['RSI'] | |
| e9 = df['ema9'].replace(0, np.nan) | |
| df['ema_9_slope'] = ((df['ema9'] - df['ema9'].shift(1)) / e9.shift(1)).fillna(0) | |
| df['ema_21_dist'] = df['EMA_21_dist'] | |
| atr_100 = safe_ta(ta.atr(df['high'], df['low'], df['close'], length=100), idx, 0) | |
| df['atr_z'] = _z_roll(atr_100) | |
| df['vol_zscore_50'] = _z_roll(df['volume'], 50) | |
| rng = (df['high'] - df['low']).replace(0, 1e-9) | |
| df['candle_range'] = _z_roll(rng, 500) | |
| df['close_pos_in_range'] = ((df['close'] - df['low']) / rng).fillna(0.5) | |
| df['dollar_vol'] = df['close'] * df['volume'] | |
| amihud_raw = (df['return_1m'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0) | |
| df['amihud'] = _z_roll(amihud_raw) | |
| dp = df['close'].diff() | |
| roll_cov = dp.rolling(64).cov(dp.shift(1)) | |
| roll_spread_raw = (2 * np.sqrt(np.maximum(0, -roll_cov))).fillna(0) | |
| df['roll_spread'] = _z_roll(roll_spread_raw) | |
| sign = np.sign(df['close'].diff()).fillna(0) | |
| signed_vol = sign * df['volume'] | |
| ofi_raw = signed_vol.rolling(30).sum().fillna(0) | |
| df['ofi'] = _z_roll(ofi_raw) | |
| buy_vol = (sign > 0) * df['volume'] | |
| sell_vol = (sign < 0) * df['volume'] | |
| imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs() | |
| tot = df['volume'].rolling(60).sum().replace(0, np.nan) | |
| df['vpin'] = (imb / tot).fillna(0) | |
| vwap_win = 20 | |
| v_short = (df['dollar_vol'].rolling(vwap_win).sum() / df['volume'].rolling(vwap_win).sum().replace(0, np.nan)).fillna(df['close']) | |
| df['vwap_dev'] = _z_roll(df['close'] - v_short) | |
| rv_gk = ((np.log(df['high'] / df['low'])**2) / 2) - ((2 * np.log(2) - 1) * (np.log(df['close'] / df['open'])**2)) | |
| df['rv_gk'] = _z_roll(rv_gk) | |
| df['L_score'] = (df['vol_zscore_50'] - df['amihud'] - df['roll_spread'] - df['rv_gk'].abs() - df['vwap_dev'].abs() + df['ofi']).fillna(0) | |
| df['slope'] = safe_ta(ta.slope(df['close'], length=7), idx, 0) | |
| vol_mean = df['volume'].rolling(20).mean() | |
| vol_std = df['volume'].rolling(20).std().replace(0, np.nan) | |
| df['vol_z'] = ((df['volume'] - vol_mean) / vol_std).fillna(0) | |
| df['rel_vol'] = df['volume'] / (df['volume'].rolling(50).mean() + 1e-9) | |
| df['log_ret'] = np.log(df['close'] / df['close'].shift(1).replace(0, np.nan)).fillna(0) | |
| roll_max = df['high'].rolling(50).max() | |
| roll_min = df['low'].rolling(50).min() | |
| diff = (roll_max - roll_min).replace(0, 1e-9) | |
| df['fib_pos'] = ((df['close'] - roll_min) / diff).fillna(0.5) | |
| e20_s = df['ema20'].shift(5).replace(0, np.nan) | |
| df['trend_slope'] = ((df['ema20'] - df['ema20'].shift(5)) / e20_s).fillna(0) | |
| df['volatility'] = (df['atr'] / df['close'].replace(0, np.nan)).fillna(0) | |
| fib618 = roll_max - (diff * 0.382) | |
| df['dist_fib618'] = ((df['close'] - fib618) / df['close'].replace(0, np.nan)).fillna(0) | |
| df['dist_ema50'] = ((df['close'] - df['ema50']) / df['ema50'].replace(0, np.nan)).fillna(0) | |
| e200 = safe_ta(ta.ema(df['close'], length=200), idx, df['close']) | |
| df['ema200'] = e200 | |
| df['dist_ema200'] = ((df['close'] - e200) / e200.replace(0, np.nan)).fillna(0) | |
| if timeframe == '1m': | |
| for lag in [1, 2, 3, 5, 10, 20]: | |
| df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0) | |
| df[f'rsi_lag_{lag}'] = (df['RSI'].shift(lag) / 100.0).fillna(0.5) | |
| df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5) | |
| df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0) | |
| df.fillna(0, inplace=True) | |
| return df | |
| async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): | |
| safe_sym = sym.replace('/', '_') | |
| period_suffix = f"{start_ms}_{end_ms}" | |
| scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl" | |
| if os.path.exists(scores_file): | |
| print(f" π [{sym}] Data Exists -> Skipping.") | |
| return | |
| print(f" βοΈ [CPU] Analyzing {sym}...", flush=True) | |
| t0 = time.time() | |
| df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') | |
| df_1m.set_index('datetime', inplace=True) | |
| df_1m = df_1m.sort_index() | |
| frames = {} | |
| frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m') | |
| frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6 | |
| fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns} | |
| agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} | |
| numpy_htf = {} | |
| for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]: | |
| resampled = df_1m.resample(tf_code).agg(agg_dict).dropna() | |
| resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str) | |
| resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6 | |
| frames[tf_str] = resampled | |
| numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} | |
| arr_ts_1m = fast_1m['timestamp'] | |
| map_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['5m']['timestamp']) - 1) | |
| map_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['15m']['timestamp']) - 1) | |
| map_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1h']['timestamp']) - 1) | |
| map_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['4h']['timestamp']) - 1) | |
| map_1d = np.clip(np.searchsorted(numpy_htf['1d']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1d']['timestamp']) - 1) if '1d' in numpy_htf else np.zeros(len(arr_ts_1m), dtype=int) | |
| hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {} | |
| hydra_cols = getattr(self.proc.guardian_hydra, 'feature_cols', []) if self.proc.guardian_hydra else [] | |
| legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None) | |
| oracle_dir = getattr(self.proc.oracle, 'model_direction', None) | |
| oracle_cols = getattr(self.proc.oracle, 'feature_cols', []) | |
| sniper_models = getattr(self.proc.sniper, 'models', []) | |
| sniper_cols = getattr(self.proc.sniper, 'feature_names', []) | |
| titan_model = getattr(self.proc.titan, 'model', None) | |
| # A. TITAN (Global) | |
| global_titan_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) | |
| if titan_model: | |
| titan_cols = [ | |
| '5m_open', '5m_high', '5m_low', '5m_close', '5m_volume', '5m_RSI', '5m_MACD', '5m_MACD_h', | |
| '5m_CCI', '5m_ADX', '5m_EMA_9_dist', '5m_EMA_21_dist', '5m_EMA_50_dist', '5m_EMA_200_dist', | |
| '5m_BB_w', '5m_BB_p', '5m_MFI', '5m_VWAP_dist', '15m_timestamp', '15m_RSI', '15m_MACD', | |
| '15m_MACD_h', '15m_CCI', '15m_ADX', '15m_EMA_9_dist', '15m_EMA_21_dist', '15m_EMA_50_dist', | |
| '15m_EMA_200_dist', '15m_BB_w', '15m_BB_p', '15m_MFI', '15m_VWAP_dist', '1h_timestamp', | |
| '1h_RSI', '1h_MACD_h', '1h_EMA_50_dist', '1h_EMA_200_dist', '1h_ATR_pct', '4h_timestamp', | |
| '4h_RSI', '4h_MACD_h', '4h_EMA_50_dist', '4h_EMA_200_dist', '4h_ATR_pct', '1d_timestamp', | |
| '1d_RSI', '1d_EMA_200_dist', '1d_Trend_Strong' | |
| ] | |
| print(" π Running Global Titan...", flush=True) | |
| try: | |
| t_vecs = [] | |
| for col in titan_cols: | |
| parts = col.split('_', 1); tf = parts[0]; feat = parts[1] | |
| target_arr = numpy_htf.get(tf, None) | |
| target_map = locals().get(f"map_{tf}", None) | |
| if target_arr and feat in target_arr: t_vecs.append(target_arr[feat][target_map]) | |
| elif target_arr and feat == 'timestamp': t_vecs.append(target_arr['timestamp'][target_map]) | |
| elif target_arr and feat in ['open','high','low','close','volume']: t_vecs.append(target_arr[feat][target_map]) | |
| else: t_vecs.append(np.zeros(len(arr_ts_1m))) | |
| X_TITAN = np.column_stack(t_vecs) | |
| global_titan_scores = _revive_score_distribution(titan_model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols))) | |
| except: pass | |
| # B. SNIPER (Global) | |
| global_sniper_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) | |
| if sniper_models: | |
| print(" π Running Global Sniper...", flush=True) | |
| try: | |
| s_vecs = [] | |
| for col in sniper_cols: | |
| if col in fast_1m: s_vecs.append(fast_1m[col]) | |
| elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z']) | |
| else: s_vecs.append(np.zeros(len(arr_ts_1m))) | |
| X_SNIPER = np.column_stack(s_vecs) | |
| preds = [m.predict(X_SNIPER) for m in sniper_models] | |
| global_sniper_scores = _revive_score_distribution(np.mean(preds, axis=0)) | |
| except: pass | |
| # C. ORACLE (Global) | |
| global_oracle_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32) | |
| if oracle_dir: | |
| print(" π Running Global Oracle...", flush=True) | |
| try: | |
| o_vecs = [] | |
| for col in oracle_cols: | |
| if col.startswith('1h_'): o_vecs.append(numpy_htf['1h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_1h]) | |
| elif col.startswith('15m_'): o_vecs.append(numpy_htf['15m'].get(col[4:], np.zeros(len(arr_ts_1m)))[map_15m]) | |
| elif col.startswith('4h_'): o_vecs.append(numpy_htf['4h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_4h]) | |
| elif col == 'sim_titan_score': o_vecs.append(global_titan_scores) | |
| elif col == 'sim_mc_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5)) | |
| elif col == 'sim_pattern_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5)) | |
| else: o_vecs.append(np.zeros(len(arr_ts_1m))) | |
| X_ORACLE = np.column_stack(o_vecs) | |
| preds_o = oracle_dir.predict(X_ORACLE) | |
| preds_o = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0] | |
| global_oracle_scores = _revive_score_distribution(preds_o) | |
| except: pass | |
| # D. LEGACY (Global) | |
| global_v2_scores = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| if legacy_v2: | |
| try: | |
| l_log = fast_1m['log_ret']; l_rsi = fast_1m['RSI'] / 100.0; l_fib = fast_1m['fib_pos']; l_vol = fast_1m['volatility'] | |
| l5_log = numpy_htf['5m']['log_ret'][map_5m]; l5_rsi = numpy_htf['5m']['RSI'][map_5m] / 100.0; l5_fib = numpy_htf['5m']['fib_pos'][map_5m]; l5_trd = numpy_htf['5m']['trend_slope'][map_5m] | |
| l15_log = numpy_htf['15m']['log_ret'][map_15m]; l15_rsi = numpy_htf['15m']['RSI'][map_15m] / 100.0; l15_fib618 = numpy_htf['15m']['dist_fib618'][map_15m]; l15_trd = numpy_htf['15m']['trend_slope'][map_15m] | |
| lags = [] | |
| for lag in [1, 2, 3, 5, 10, 20]: | |
| lags.extend([fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']]) | |
| X_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lags]) | |
| preds = legacy_v2.predict(xgb.DMatrix(X_V2)) | |
| global_v2_scores = preds[:, 2] if len(preds.shape) > 1 else preds | |
| except: pass | |
| # Filter | |
| is_candidate = (numpy_htf['1h']['RSI'][map_1h] <= 70) & (global_titan_scores > 0.4) & (global_oracle_scores > 0.4) | |
| candidate_indices = np.where(is_candidate)[0] | |
| start_ts_val = frames['1m'].index[0] + pd.Timedelta(minutes=500) | |
| start_idx_offset = np.searchsorted(arr_ts_1m, int(start_ts_val.timestamp()*1000)) | |
| candidate_indices = candidate_indices[candidate_indices >= start_idx_offset] | |
| candidate_indices = candidate_indices[candidate_indices < (len(arr_ts_1m) - 245)] | |
| print(f" π― Candidates: {len(candidate_indices)}. Running Vectorized Hydra...", flush=True) | |
| # π VECTORIZED HYDRA SIMULATION π | |
| ai_results = [] | |
| if hydra_models and len(candidate_indices) > 0: | |
| # Prepare Static Features Matrix (Global) | |
| h_static = np.column_stack([ | |
| fast_1m['RSI'], numpy_htf['5m']['RSI'][map_5m], numpy_htf['15m']['RSI'][map_15m], | |
| fast_1m['bb_width'], fast_1m['rel_vol'], fast_1m['atr'], fast_1m['close'] | |
| ]) # Shape: (N, 7) | |
| # Process candidates in chunks to avoid RAM explosion | |
| chunk_size = 5000 | |
| for i in range(0, len(candidate_indices), chunk_size): | |
| chunk_idxs = candidate_indices[i:i+chunk_size] | |
| # We need sliding windows of 240 steps for each candidate | |
| # Trick: Use broadcasting or sliding_window_view on static features | |
| # But sliding_window_view on huge array is slow. Better to just slice. | |
| # Vectorized construction for chunk | |
| # 1. Extract entry prices | |
| entries = fast_1m['close'][chunk_idxs] | |
| entries_ts = fast_1m['timestamp'][chunk_idxs] | |
| # 2. Prepare sequences (Vectorized slice is hard in numpy without creating copies) | |
| # We stick to a tight loop or specialized indexing. | |
| # Given we need to construct a [Batch, 240, Features] array for Hydra... | |
| # Fastest way: List comprehension for slicing, then stack. | |
| # Since Hydra is XGBoost, we can flatten the time dimension? No, Hydra is 1D input (snapshot). | |
| # Wait, Hydra predicts Crash Probability for a SNAPSHOT state. | |
| # In simulation, we need to check crash prob at t+1, t+2... t+240. | |
| # That is 240 checks per candidate. 42,000 * 240 = 10 Million checks. | |
| # This IS the bottleneck. | |
| # OPTIMIZATION: Only check Hydra if PnL drops below -0.5% or something? No, that misses the point. | |
| # OPTIMIZATION 2 (Implemented): Vectorize the "Check" logic. | |
| # Construct big matrix for ALL checks: (N_Candidates * 240, Features) | |
| # But that's 10M rows. XGBoost inference on 10M rows takes ~3-5 seconds on CPU. This is feasible! | |
| # Let's do it per candidate to be safe on RAM, but fast. | |
| for idx in chunk_idxs: | |
| # Slicing is fast | |
| sl_st = h_static[idx:idx+240] | |
| sl_close = sl_st[:, 6]; sl_atr = sl_st[:, 5] | |
| entry = fast_1m['close'][idx] | |
| dist = np.maximum(1.5 * sl_atr, entry * 0.015) | |
| pnl = sl_close - entry | |
| norm_pnl = pnl / dist | |
| max_pnl_r = (np.maximum.accumulate(sl_close) - entry) / dist | |
| atr_pct = sl_atr / sl_close | |
| # Stack Hydra Input (240 rows) | |
| # Cols: rsi1, rsi5, rsi15, bb, vol, dist_ema(0), atr_pct, norm, max, dists(0), time, entry(0), oracle, l2, target | |
| zeros = np.zeros(240) | |
| time_vec = np.arange(1, 241) | |
| s_oracle = global_oracle_scores[idx] | |
| X_H = np.column_stack([ | |
| sl_st[:,0], sl_st[:,1], sl_st[:,2], sl_st[:,3], sl_st[:,4], | |
| zeros, atr_pct, norm_pnl, max_pnl_r, zeros, zeros, time_vec, zeros, | |
| np.full(240, s_oracle), np.full(240, 0.7), np.full(240, 3.0) | |
| ]) | |
| # Predict 240 steps at once | |
| max_hydra = 0.0; hydra_time = 0 | |
| try: | |
| probs = hydra_models['crash'].predict_proba(X_H)[:, 1] | |
| max_hydra = np.max(probs) | |
| if max_hydra > 0.6: | |
| t = np.argmax(probs) | |
| hydra_time = int(fast_1m['timestamp'][idx + t]) | |
| except: pass | |
| # Legacy Max | |
| max_v2 = np.max(global_v2_scores[idx:idx+240]) | |
| v2_time = 0 | |
| if max_v2 > 0.8: | |
| t2 = np.argmax(global_v2_scores[idx:idx+240]) | |
| v2_time = int(fast_1m['timestamp'][idx + t2]) | |
| ai_results.append({ | |
| 'timestamp': int(fast_1m['timestamp'][idx]), | |
| 'symbol': sym, 'close': entry, | |
| 'real_titan': global_titan_scores[idx], | |
| 'oracle_conf': s_oracle, | |
| 'sniper_score': global_sniper_scores[idx], | |
| 'risk_hydra_crash': max_hydra, 'time_hydra_crash': hydra_time, | |
| 'risk_legacy_v2': max_v2, 'time_legacy_panic': v2_time, | |
| 'signal_type': 'BREAKOUT', 'l1_score': 50.0 | |
| }) | |
| dt = time.time() - t0 | |
| if ai_results: | |
| pd.DataFrame(ai_results).to_pickle(scores_file) | |
| print(f" β [{sym}] Completed in {dt:.2f} seconds. ({len(ai_results)} signals)", flush=True) | |
| gc.collect() | |
| async def generate_truth_data(self): | |
| if self.force_start_date: | |
| dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000) | |
| print(f"\nπ [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") | |
| for sym in self.TARGET_COINS: | |
| c = await self._fetch_all_data_fast(sym, ms_s, ms_e) | |
| if c: await self._process_data_in_memory(sym, c, ms_s, ms_e) | |
| def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): | |
| print(f" β³ [System] Loading {len(scores_files)} datasets...", flush=True) | |
| data = [] | |
| for f in scores_files: | |
| try: data.append(pd.read_pickle(f)) | |
| except: pass | |
| if not data: return [] | |
| df = pd.concat(data).sort_values('timestamp') | |
| ts = df['timestamp'].values; close = df['close'].values.astype(float) | |
| sym = df['symbol'].values; sym_map = {s:i for i,s in enumerate(np.unique(sym))} | |
| sym_id = np.array([sym_map[s] for s in sym]) | |
| oracle = df['oracle_conf'].values; sniper = df['sniper_score'].values | |
| hydra = df['risk_hydra_crash'].values; titan = df['real_titan'].values | |
| l1 = df['l1_score'].values | |
| legacy_v2 = df['risk_legacy_v2'].values | |
| N = len(ts) | |
| print(f" π [System] Testing {len(combinations_batch)} configs on {N} candles...", flush=True) | |
| res = [] | |
| for cfg in combinations_batch: | |
| pos = {}; log = [] | |
| bal = initial_capital; alloc = 0.0 | |
| mask = (l1 >= cfg['l1_thresh']) & (oracle >= cfg['oracle_thresh']) & (sniper >= cfg['sniper_thresh']) & (titan >= 0.55) | |
| for i in range(N): | |
| s = sym_id[i]; p = close[i] | |
| if s in pos: | |
| entry = pos[s][0]; h_r = pos[s][1]; titan_entry = pos[s][3] | |
| crash_hydra = (h_r > cfg['hydra_thresh']) | |
| panic_legacy = (legacy_v2[i] > cfg['legacy_thresh']) | |
| pnl = (p - entry)/entry | |
| if crash_hydra or panic_legacy or pnl > 0.04 or pnl < -0.02: | |
| realized = pnl - fees_pct*2 | |
| bal += pos[s][2] * (1 + realized) | |
| alloc -= pos[s][2] | |
| is_consensus = (titan_entry > 0.55) | |
| log.append({'pnl': realized, 'consensus': is_consensus}) | |
| del pos[s] | |
| if len(pos) < max_slots and mask[i]: | |
| if s not in pos and bal >= 5.0: | |
| size = min(10.0, bal * 0.98) | |
| pos[s] = (p, hydra[i], size, titan[i]) | |
| bal -= size; alloc += size | |
| final_bal = bal + alloc | |
| profit = final_bal - initial_capital | |
| tot = len(log) | |
| winning = [x for x in log if x['pnl'] > 0] | |
| losing = [x for x in log if x['pnl'] <= 0] | |
| win_count = len(winning); loss_count = len(losing) | |
| win_rate = (win_count/tot*100) if tot else 0 | |
| avg_win = np.mean([x['pnl'] for x in winning]) if winning else 0 | |
| avg_loss = np.mean([x['pnl'] for x in losing]) if losing else 0 | |
| gross_p = sum([x['pnl'] for x in winning]) | |
| gross_l = abs(sum([x['pnl'] for x in losing])) | |
| profit_factor = (gross_p / gross_l) if gross_l > 0 else 99.9 | |
| max_win_s = 0; max_loss_s = 0; curr_w = 0; curr_l = 0 | |
| for t in log: | |
| if t['pnl'] > 0: | |
| curr_w += 1; curr_l = 0 | |
| if curr_w > max_win_s: max_win_s = curr_w | |
| else: | |
| curr_l += 1; curr_w = 0 | |
| if curr_l > max_loss_s: max_loss_s = curr_l | |
| cons_trades = [x for x in log if x['consensus']] | |
| n_cons = len(cons_trades) | |
| agree_rate = (n_cons/tot*100) if tot else 0 | |
| cons_win_rate = (sum(1 for x in cons_trades if x['pnl']>0)/n_cons*100) if n_cons else 0 | |
| cons_avg_pnl = (sum(x['pnl'] for x in cons_trades)/n_cons*100) if n_cons else 0 | |
| res.append({ | |
| 'config': cfg, 'final_balance': final_bal, 'net_profit': profit, | |
| 'total_trades': tot, 'win_rate': win_rate, 'max_drawdown': 0, | |
| 'win_count': win_count, 'loss_count': loss_count, | |
| 'avg_win': avg_win, 'avg_loss': avg_loss, | |
| 'max_win_streak': max_win_s, 'max_loss_streak': max_loss_s, | |
| 'profit_factor': profit_factor, | |
| 'consensus_agreement_rate': agree_rate, | |
| 'high_consensus_win_rate': cons_win_rate, | |
| 'high_consensus_avg_pnl': cons_avg_pnl | |
| }) | |
| return res | |
| async def run_optimization(self, target_regime="RANGE"): | |
| await self.generate_truth_data() | |
| oracle_r = np.linspace(0.4, 0.7, 3); sniper_r = np.linspace(0.4, 0.7, 3) | |
| hydra_r = [0.85, 0.95]; l1_r = [10.0] | |
| combos = [] | |
| for o, s, h, l1 in itertools.product(oracle_r, sniper_r, hydra_r, l1_r): | |
| combos.append({ | |
| 'w_titan': 0.4, 'w_struct': 0.3, 'thresh': l1, 'l1_thresh': l1, | |
| 'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95 | |
| }) | |
| files = glob.glob(os.path.join(CACHE_DIR, "*.pkl")) | |
| results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) | |
| if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0} | |
| results_list.sort(key=lambda x: x['net_profit'], reverse=True) | |
| best = results_list[0] | |
| # Auto-Diagnosis | |
| diag = [] | |
| if best['total_trades'] > 2000 and best['net_profit'] < 10: diag.append("β οΈ Overtrading") | |
| if best['win_rate'] > 55 and best['net_profit'] < 0: diag.append("β οΈ Fee Burn") | |
| if abs(best['avg_loss']) > best['avg_win']: diag.append("β οΈ Risk/Reward Inversion") | |
| if best['max_loss_streak'] > 10: diag.append("β οΈ Consecutive Loss Risk") | |
| if not diag: diag.append("β System Healthy") | |
| print("\n" + "="*60) | |
| print(f"π CHAMPION REPORT [{target_regime}]:") | |
| print(f" π° Final Balance: ${best['final_balance']:,.2f}") | |
| print(f" π Net PnL: ${best['net_profit']:,.2f}") | |
| print("-" * 60) | |
| print(f" π Total Trades: {best['total_trades']}") | |
| print(f" π Win Rate: {best['win_rate']:.1f}%") | |
| print(f" β Winning Trades: {best['win_count']} (Avg: {best['avg_win']*100:.2f}%)") | |
| print(f" β Losing Trades: {best['loss_count']} (Avg: {best['avg_loss']*100:.2f}%)") | |
| print(f" π Max Streaks: Win {best['max_win_streak']} | Loss {best['max_loss_streak']}") | |
| print(f" βοΈ Profit Factor: {best['profit_factor']:.2f}") | |
| print("-" * 60) | |
| print(f" π§ CONSENSUS ANALYTICS:") | |
| print(f" π€ Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%") | |
| print(f" π High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%") | |
| print("-" * 60) | |
| print(f" π©Ί DIAGNOSIS: {' '.join(diag)}") | |
| print(f" βοΈ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}") | |
| print("="*60) | |
| return best['config'], best | |
| async def run_strategic_optimization_task(): | |
| print("\nπ§ͺ [STRATEGIC BACKTEST] Vectorized Hydra Speed...") | |
| r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm) | |
| try: | |
| await dm.initialize(); await proc.initialize() | |
| if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True) | |
| hub = AdaptiveHub(r2); await hub.initialize() | |
| opt = HeavyDutyBacktester(dm, proc) | |
| scenarios = [ | |
| {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, | |
| {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, | |
| {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, | |
| {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"} | |
| ] | |
| for s in scenarios: | |
| opt.set_date_range(s["start"], s["end"]) | |
| best_cfg, best_stats = await opt.run_optimization(s["regime"]) | |
| if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats) | |
| await hub._save_state_to_r2() | |
| print("β [System] DNA Updated.") | |
| finally: | |
| print("π [System] Closing connections...") | |
| await dm.close() | |
| if __name__ == "__main__": | |
| asyncio.run(run_strategic_optimization_task()) |