Spaces:
Paused
Paused
| # ============================================================ | |
| # π§ͺ backtest_engine.py (V129.0 - GEM-Architect: Sniper Debug Mode) | |
| # ============================================================ | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import pandas_ta as ta | |
| import time | |
| import logging | |
| import itertools | |
| import os | |
| import glob | |
| import gc | |
| import sys | |
| import traceback | |
| from numpy.lib.stride_tricks import sliding_window_view | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List | |
| # β Ψ§Ψ³ΨͺΩΨ±Ψ§Ψ― Ψ§ΩΩ ΨΨ±ΩΨ§Ψͺ | |
| try: | |
| from ml_engine.processor import MLProcessor, SystemLimits | |
| from ml_engine.data_manager import DataManager | |
| from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub | |
| from r2 import R2Service | |
| import ccxt.async_support as ccxt | |
| import xgboost as xgb | |
| except ImportError: | |
| print("β [Import Error] Critical ML modules missing.") | |
| pass | |
| logging.getLogger('ml_engine').setLevel(logging.WARNING) | |
| CACHE_DIR = "backtest_real_scores" | |
| # ============================================================ | |
| # π‘οΈ GLOBAL SANITIZATION | |
| # ============================================================ | |
| def sanitize_features(df): | |
| """ | |
| Cleans DataFrame from Infinity and NaNs. | |
| Forces float32 to be friendly with ML models. | |
| """ | |
| if df is None or df.empty: return df | |
| # Replace Infinity with NaN, then fill NaN with 0.0, then convert to float32 | |
| return df.replace([np.inf, -np.inf], np.nan).fillna(0.0).astype(np.float32) | |
| def _zv(x): | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| x = np.asarray(x, dtype="float32") | |
| m = np.nanmean(x, axis=0) | |
| s = np.nanstd(x, axis=0) + 1e-9 | |
| return np.nan_to_num((x - m) / s, nan=0.0) | |
| def _z_score_rolling(x, w=500): | |
| r = x.rolling(w).mean() | |
| s = x.rolling(w).std().replace(0, np.nan) | |
| return ((x - r) / s).fillna(0) | |
| # ============================================================ | |
| # π§ 1. FEATURE ENGINEERING | |
| # ============================================================ | |
| def _transform_window_for_pattern(df_window): | |
| try: | |
| c = df_window['close'].values.astype('float32') | |
| o = df_window['open'].values.astype('float32') | |
| h = df_window['high'].values.astype('float32') | |
| l = df_window['low'].values.astype('float32') | |
| v = df_window['volume'].values.astype('float32') | |
| base = np.stack([o, h, l, c, v], axis=1) | |
| base_z = _zv(base) | |
| lr = np.zeros_like(c); lr[1:] = np.diff(np.log1p(c)) | |
| rng = (h - l) / (c + 1e-9) | |
| extra = np.stack([lr, rng], axis=1) | |
| extra_z = _zv(extra) | |
| def _ema(arr, n): | |
| return pd.Series(arr).ewm(span=n, adjust=False).mean().values | |
| ema9 = _ema(c, 9); ema21 = _ema(c, 21); ema50 = _ema(c, 50); ema200 = _ema(c, 200) | |
| slope21 = np.gradient(ema21); slope50 = np.gradient(ema50) | |
| delta = np.diff(c, prepend=c[0]) | |
| up, down = delta.copy(), delta.copy() | |
| up[up < 0] = 0; down[down > 0] = 0 | |
| roll_up = pd.Series(up).ewm(alpha=1/14, adjust=False).mean().values | |
| roll_down = pd.Series(down).abs().ewm(alpha=1/14, adjust=False).mean().values | |
| rs = roll_up / (roll_down + 1e-9) | |
| rsi = 100.0 - (100.0 / (1.0 + rs)) | |
| indicators = np.stack([ema9, ema21, ema50, ema200, slope21, slope50, rsi], axis=1) | |
| padding = np.zeros((200, 5), dtype='float32') | |
| indicators_full = np.concatenate([indicators, padding], axis=1) | |
| indicators_z = _zv(indicators_full) | |
| X_seq = np.concatenate([base_z, extra_z, indicators_z], axis=1) | |
| X_flat = X_seq.flatten() | |
| X_stat = np.array([0.5, 0.0, 0.5], dtype="float32") | |
| return np.concatenate([X_flat, X_stat]) | |
| except: return None | |
| def calculate_sniper_features_exact(df): | |
| """Sniper Features Calculation - EXACT MATCH with Processor.""" | |
| df = df.copy() | |
| # Base Returns | |
| df['return_1m'] = df['close'].pct_change(1).fillna(0) | |
| df['return_3m'] = df['close'].pct_change(3).fillna(0) | |
| df['return_5m'] = df['close'].pct_change(5).fillna(0) | |
| df['return_15m'] = df['close'].pct_change(15).fillna(0) | |
| # Liquidity Proxies | |
| df['ret'] = df['close'].pct_change().fillna(0) | |
| df['dollar_vol'] = df['close'] * df['volume'] | |
| df['amihud'] = (df['ret'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0) | |
| dp = df['close'].diff() | |
| roll_cov = dp.rolling(64).cov(dp.shift(1)) | |
| df['roll_spread'] = (2 * np.sqrt(np.maximum(0, -roll_cov))).fillna(0) | |
| sign = np.sign(df['close'].diff()).fillna(0) | |
| df['signed_vol'] = sign * df['volume'] | |
| df['ofi'] = df['signed_vol'].rolling(30).sum().fillna(0) | |
| buy_vol = (sign > 0) * df['volume']; sell_vol = (sign < 0) * df['volume'] | |
| tot = df['volume'].rolling(60).sum() | |
| imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs() | |
| df['vpin'] = (imb / tot.replace(0, np.nan)).fillna(0) | |
| df['rv_gk'] = (np.log(df['high'] / df['low'])**2)/2 - (2*np.log(2)-1)*(np.log(df['close']/df['open'])**2) | |
| vwap = (df['close']*df['volume']).rolling(20).sum() / df['volume'].rolling(20).sum() | |
| df['vwap_dev'] = (df['close'] - vwap).fillna(0) | |
| df['L_score'] = (_z_score_rolling(df['volume']) + _z_score_rolling(1/(df['amihud']+1e-9)) + | |
| _z_score_rolling(-df['roll_spread']) + _z_score_rolling(-df['rv_gk'].abs()) + | |
| _z_score_rolling(-df['vwap_dev'].abs()) + _z_score_rolling(df['ofi'])) | |
| # Standard Indicators | |
| df['rsi_14'] = ta.rsi(df['close'], 14).fillna(50) | |
| df['atr'] = ta.atr(df['high'], df['low'], df['close'], 100).fillna(0) | |
| df['vol_zscore_50'] = _z_score_rolling(df['volume'], 50) | |
| ema9 = ta.ema(df['close'], 9); | |
| if ema9 is not None: df['ema_9_slope'] = (ema9 - ema9.shift(1))/ema9.shift(1) | |
| else: df['ema_9_slope'] = 0.0 | |
| ema21 = ta.ema(df['close'], 21); | |
| if ema21 is not None: df['ema_21_dist'] = (df['close'] - ema21)/ema21 | |
| else: df['ema_21_dist'] = 0.0 | |
| df['candle_range'] = df['high'] - df['low'] | |
| df['close_pos_in_range'] = (df['close'] - df['low']) / (df['candle_range'].replace(0, np.nan)) | |
| return sanitize_features(df) | |
| def calculate_titan_features_real(df): | |
| """Titan features with strict Infinity handling.""" | |
| df = df.copy() | |
| df['RSI'] = ta.rsi(df['close'], 14) | |
| macd = ta.macd(df['close']) | |
| if macd is not None: | |
| df['MACD'] = macd.iloc[:, 0]; df['MACD_h'] = macd.iloc[:, 1] | |
| else: df['MACD'] = 0.0; df['MACD_h'] = 0.0 | |
| df['CCI'] = ta.cci(df['high'], df['low'], df['close'], 20) | |
| adx = ta.adx(df['high'], df['low'], df['close'], 14) | |
| if adx is not None: df['ADX'] = adx.iloc[:, 0] | |
| else: df['ADX'] = 0.0 | |
| for p in [9, 21, 50, 200]: | |
| ema = ta.ema(df['close'], p) | |
| if ema is not None: | |
| df[f'EMA_{p}_dist'] = (df['close'] / ema.replace(0, np.nan)) - 1 | |
| else: df[f'EMA_{p}_dist'] = 0.0 | |
| bb = ta.bbands(df['close'], 20, 2.0) | |
| if bb is not None: | |
| df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1].replace(0, np.nan) | |
| df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]).replace(0, np.nan) | |
| else: df['BB_w'] = 0.0; df['BB_p'] = 0.0 | |
| df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], 14) | |
| vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) | |
| if vwap is not None: df['VWAP_dist'] = (df['close'] / vwap.replace(0, np.nan)) - 1 | |
| else: df['VWAP_dist'] = 0.0 | |
| return sanitize_features(df) | |
| # ============================================================ | |
| # π§ LEGACY GUARD (V2 & V3) | |
| # ============================================================ | |
| def calculate_legacy_v2_vectorized(df_1m, df_5m, df_15m): | |
| try: | |
| def calc_basic(df, suffix): | |
| c = df['close']; h = df['high']; l = df['low'] | |
| res = pd.DataFrame(index=df.index) | |
| res[f'log_ret_{suffix}'] = np.log(c / c.shift(1).replace(0, np.nan)).fillna(0) | |
| res[f'rsi_{suffix}'] = (ta.rsi(c, 14) / 100.0).fillna(0.5) | |
| roll_max = h.rolling(50).max(); roll_min = l.rolling(50).min() | |
| diff = (roll_max - roll_min).replace(0, 1e-9) | |
| res[f'fib_pos_{suffix}'] = ((c - roll_min) / diff).fillna(0.5) | |
| if suffix == '1m': | |
| res[f'volatility_{suffix}'] = (ta.atr(h, l, c, 14) / c.replace(0, np.nan)).fillna(0) | |
| else: | |
| ema = ta.ema(c, 20) | |
| if ema is not None: | |
| res[f'trend_slope_{suffix}'] = ((ema - ema.shift(5)) / ema.shift(5).replace(0, np.nan)).fillna(0) | |
| else: res[f'trend_slope_{suffix}'] = 0.0 | |
| if suffix == '15m': | |
| fib618 = roll_max - (diff * 0.382) | |
| res[f'dist_fib618_{suffix}'] = ((c - fib618) / c.replace(0, np.nan)).fillna(0) | |
| return res | |
| f1 = calc_basic(df_1m, '1m') | |
| f5 = calc_basic(df_5m, '5m').reindex(df_1m.index, method='ffill') | |
| f15 = calc_basic(df_15m, '15m').reindex(df_1m.index, method='ffill') | |
| FEATS_1M = ['log_ret_1m', 'rsi_1m', 'fib_pos_1m', 'volatility_1m'] | |
| FEATS_5M = ['log_ret_5m', 'rsi_5m', 'fib_pos_5m', 'trend_slope_5m'] | |
| FEATS_15M = ['log_ret_15m', 'rsi_15m', 'dist_fib618_15m', 'trend_slope_15m'] | |
| parts = [f1[FEATS_1M], f5[FEATS_5M], f15[FEATS_15M]] | |
| lags = [1, 2, 3, 5, 10, 20] | |
| for lag in lags: parts.append(f1[FEATS_1M].shift(lag).fillna(0)) | |
| X_df = pd.concat(parts, axis=1) | |
| return sanitize_features(X_df).values | |
| except Exception as e: | |
| print(f"Legacy V2 Vec Error: {e}") | |
| return None | |
| def calculate_legacy_v3_vectorized(df_1m, df_5m, df_15m): | |
| """Legacy V3 Safe Calc.""" | |
| try: | |
| def calc_v3_base(df, prefix=""): | |
| d = df.copy() | |
| # Initialize All | |
| targets = ['rsi', 'rsi_slope', 'macd_h', 'macd_h_slope', 'adx', 'dmp', 'dmn', | |
| 'trend_net_force', 'ema_20', 'ema_50', 'ema_200', 'dist_ema20', | |
| 'dist_ema50', 'dist_ema200', 'slope_ema50', 'atr', 'atr_rel', | |
| 'obv', 'obv_slope', 'cmf', 'log_ret', 'mc_skew', 'mc_kurt', | |
| 'mc_prob_gain', 'mc_var_95', 'mc_shock'] | |
| for t in targets: d[t] = 0.0 | |
| c = d['close']; h = d['high']; l = d['low']; v = d['volume'] | |
| try: | |
| d['log_ret'] = np.log(c / c.shift(1).replace(0, np.nan)).fillna(0) | |
| d['rsi'] = ta.rsi(c, 14).fillna(50) | |
| d['rsi_slope'] = (d['rsi'] - d['rsi'].shift(3).fillna(50)) / 3 | |
| macd = ta.macd(c) | |
| if macd is not None: | |
| d['macd_h'] = macd.iloc[:, 1].fillna(0) | |
| d['macd_h_slope'] = (d['macd_h'] - d['macd_h'].shift(3).fillna(0)) / 3 | |
| adx = ta.adx(h, l, c, 14) | |
| if adx is not None: | |
| d['adx'] = adx.iloc[:, 0].fillna(0); d['dmp'] = adx.iloc[:, 1].fillna(0); d['dmn'] = adx.iloc[:, 2].fillna(0) | |
| d['trend_net_force'] = (d['dmp'] - d['dmn']) * (d['adx'] / 100.0) | |
| d['ema_20'] = ta.ema(c, 20).fillna(c); d['ema_50'] = ta.ema(c, 50).fillna(c); d['ema_200'] = ta.ema(c, 200).fillna(c) | |
| d['dist_ema20'] = (c - d['ema_20']) / d['ema_20'].replace(0, np.nan) | |
| d['dist_ema50'] = (c - d['ema_50']) / d['ema_50'].replace(0, np.nan) | |
| d['dist_ema200'] = (c - d['ema_200']) / d['ema_200'].replace(0, np.nan) | |
| d['slope_ema50'] = (d['ema_50'] - d['ema_50'].shift(5).fillna(0)) / d['ema_50'].shift(5).replace(0, np.nan) | |
| d['atr'] = ta.atr(h, l, c, 14).fillna(0); d['atr_rel'] = d['atr'] / c.replace(0, np.nan) | |
| d['obv'] = ta.obv(c, v).fillna(0); d['obv_slope'] = d['obv'] - d['obv'].shift(5).fillna(0) | |
| d['cmf'] = ta.cmf(h, l, c, v, 20).fillna(0) | |
| win = 30; roll = d['log_ret'].rolling(win) | |
| d['mc_skew'] = roll.skew().fillna(0); d['mc_kurt'] = roll.kurt().fillna(0) | |
| d['mc_prob_gain'] = (d['log_ret'] > 0).rolling(win).mean().fillna(0.5) | |
| d['mc_var_95'] = roll.quantile(0.05).fillna(-0.02) | |
| d['mc_shock'] = ((d['log_ret'] - roll.mean()) / (roll.std().replace(0, np.nan))).fillna(0) | |
| except: pass | |
| if prefix: | |
| d.columns = [f"{col}_{prefix}" if col not in ['timestamp'] else col for col in d.columns] | |
| return sanitize_features(d) | |
| return sanitize_features(d) | |
| df1 = calc_v3_base(df_1m); df5 = calc_v3_base(df_5m, "5m").reindex(df_1m.index, method='ffill') | |
| df15 = calc_v3_base(df_15m, "15m").reindex(df_1m.index, method='ffill') | |
| final_df = pd.DataFrame(index=df_1m.index) | |
| for i, col_name in enumerate(["6", "7", "8", "9", "10", "11"], 1): | |
| final_df[col_name] = df1['log_ret'].shift(i) | |
| cols_1m = ['rsi', 'rsi_slope', 'macd_h', 'macd_h_slope', 'adx', 'dmp', 'dmn', 'trend_net_force', | |
| 'ema_20', 'ema_50', 'ema_200', 'dist_ema20', 'dist_ema50', 'dist_ema200', 'slope_ema50', | |
| 'atr', 'atr_rel', 'obv', 'obv_slope', 'cmf', 'log_ret', 'mc_skew', 'mc_kurt', | |
| 'mc_prob_gain', 'mc_var_95', 'mc_shock'] | |
| for c in cols_1m: final_df[c] = df1[c] | |
| cols_5m = {'rsi_5m': 'rsi_5m', 'rsi_slope_5m': 'rsi_slope_5m', 'macd_h_5m': 'macd_h_5m', | |
| 'mc_prob_gain_5m': 'mc_prob_gain_5m', 'mc_shock_5m': 'mc_shock_5m'} | |
| for k, v in cols_5m.items(): final_df[k] = df5[v] | |
| cols_15m = {'rsi_15m': 'rsi_15m', 'macd_h_15m': 'macd_h_15m', 'trend_net_force_15m': 'trend_net_force_15m', | |
| 'mc_prob_gain_15m': 'mc_prob_gain_15m', 'dist_ema200_15m': 'dist_ema200_15m'} | |
| for k, v in cols_15m.items(): final_df[k] = df15[v] | |
| expected = ["6", "7", "8", "9", "10", "11", "rsi", "rsi_slope", "macd_h", "macd_h_slope", "adx", "dmp", "dmn", | |
| "trend_net_force", "ema_20", "ema_50", "ema_200", "dist_ema20", "dist_ema50", "dist_ema200", | |
| "slope_ema50", "atr", "atr_rel", "obv", "obv_slope", "cmf", "log_ret", "mc_skew", "mc_kurt", | |
| "mc_prob_gain", "mc_var_95", "mc_shock", "rsi_5m", "rsi_slope_5m", "macd_h_5m", "mc_prob_gain_5m", | |
| "mc_shock_5m", "rsi_15m", "macd_h_15m", "trend_net_force_15m", "mc_prob_gain_15m", "dist_ema200_15m"] | |
| return sanitize_features(final_df.reindex(columns=expected, fill_value=0.0)) | |
| except: return None | |
| # ============================================================ | |
| # π§ͺ THE BACKTESTER | |
| # ============================================================ | |
| class HeavyDutyBacktester: | |
| def __init__(self, data_manager, processor): | |
| self.dm = data_manager | |
| self.proc = processor | |
| self.GRID_DENSITY = 3 | |
| self.INITIAL_CAPITAL = 10.0 | |
| self.TRADING_FEES = 0.001 | |
| self.MAX_SLOTS = 4 | |
| self.TARGET_COINS = ['SOL/USDT', 'XRP/USDT', 'DOGE/USDT'] | |
| self.force_start_date = None; self.force_end_date = None | |
| if os.path.exists(CACHE_DIR): | |
| for f in glob.glob(os.path.join(CACHE_DIR, "*")): os.remove(f) | |
| else: os.makedirs(CACHE_DIR) | |
| self._check_engines() | |
| def _check_engines(self): | |
| status = [] | |
| if self.proc.titan and self.proc.titan.model: status.append("Titan") | |
| if self.proc.pattern_engine and self.proc.pattern_engine.models: status.append("Patterns") | |
| if self.proc.oracle: status.append("Oracle") | |
| if self.proc.sniper: status.append("Sniper") | |
| if self.proc.guardian_hydra: status.append("Hydra") | |
| if self.proc.guardian_legacy: status.append("Legacy") | |
| print(f" β Engines Ready: {', '.join(status)}") | |
| def set_date_range(self, start_str, end_str): | |
| self.force_start_date = start_str; self.force_end_date = end_str | |
| def _smart_predict(self, model, X): | |
| try: | |
| if hasattr(model, "predict_proba"): | |
| raw = model.predict_proba(X) | |
| if raw.ndim == 2: return raw[:, -1] | |
| return raw | |
| return model.predict(X) | |
| except Exception as e: | |
| # print(f"β οΈ Predict Error: {e}") # Enable if needed | |
| return np.zeros(len(X) if hasattr(X, '__len__') else 0) | |
| def _extract_probs(self, raw_preds): | |
| if isinstance(raw_preds, list): raw_preds = np.array(raw_preds) | |
| if raw_preds.ndim == 1: return raw_preds | |
| elif raw_preds.ndim == 2: | |
| if raw_preds.shape[1] >= 2: return raw_preds[:, -1] | |
| return raw_preds.flatten() | |
| return raw_preds.flatten() | |
| async def _fetch_all_data_fast(self, sym, start_ms, end_ms): | |
| print(f" β‘ [Network] Downloading {sym}...", flush=True) | |
| limit = 1000; duration = limit * 60 * 1000 | |
| tasks = []; curr = start_ms | |
| while curr < end_ms: tasks.append(curr); curr += duration | |
| all_c = []; sem = asyncio.Semaphore(20) | |
| async def _fetch(ts): | |
| async with sem: | |
| try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=ts, limit=limit) | |
| except: await asyncio.sleep(0.5); return [] | |
| chunk = 50 | |
| for i in range(0, len(tasks), chunk): | |
| res = await asyncio.gather(*[_fetch(t) for t in tasks[i:i+chunk]]) | |
| for r in res: all_c.extend(r) | |
| seen = set(); unique = [] | |
| for c in all_c: | |
| if c[0] not in seen and c[0] >= start_ms and c[0] <= end_ms: | |
| unique.append(c); seen.add(c[0]) | |
| unique.sort(key=lambda x: x[0]) | |
| print(f" β Downloaded {len(unique)} candles.", flush=True) | |
| return unique | |
| async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): | |
| safe_sym = sym.replace('/', '_') | |
| scores_file = f"{CACHE_DIR}/{safe_sym}_{start_ms}_{end_ms}_scores.pkl" | |
| if os.path.exists(scores_file): | |
| print(f" π [{sym}] Data Exists -> Skipping.") | |
| return | |
| print(f" βοΈ [CPU] Analyzing {sym} (ALL REAL MODELS)...", flush=True) | |
| t0 = time.time() | |
| df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| for c in ['open', 'high', 'low', 'close', 'volume']: df_1m[c] = df_1m[c].astype(float) | |
| df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms') | |
| df_1m.set_index('datetime', inplace=True) | |
| df_1m = df_1m.sort_index() | |
| df_5m = df_1m.resample('5T').agg({'open':'first', 'high':'max', 'low':'min', 'close':'last', 'volume':'sum'}).dropna() | |
| df_15m = df_1m.resample('15T').agg({'open':'first', 'high':'max', 'low':'min', 'close':'last', 'volume':'sum'}).dropna() | |
| # 1. Sniper | |
| df_sniper = calculate_sniper_features_exact(df_1m) | |
| df_sniper['rel_vol'] = df_sniper['volume'] / (df_sniper['volume'].rolling(50).mean() + 1e-9) | |
| df_sniper['l1_score'] = (df_sniper['rel_vol'] * 10) + ((df_sniper['atr']/df_sniper['close']) * 1000) | |
| valid_mask = (df_sniper['l1_score'] >= 5.0) & (np.arange(len(df_sniper)) > 500) & (np.arange(len(df_sniper)) < len(df_sniper) - 245) | |
| df_candidates = df_sniper[valid_mask].copy() | |
| if df_candidates.empty: return | |
| print(f" π― Candidates: {len(df_candidates)}. Running Deep Inference...", flush=True) | |
| # 2. Patterns | |
| res_patterns = np.full(len(df_candidates), 0.5) | |
| pattern_models = getattr(self.proc.pattern_engine, 'models', {}) | |
| if pattern_models and '15m' in pattern_models: | |
| try: | |
| df_15m_res = df_1m.resample('15T').agg({'open':'first', 'high':'max', 'low':'min', 'close':'last', 'volume':'sum'}).dropna() | |
| pat_scores_15m = np.full(len(df_15m_res), 0.5) | |
| pat_inputs = []; valid_15m_idxs = [] | |
| for i in range(200, len(df_15m_res)): | |
| window = df_15m_res.iloc[i-200:i] | |
| vec = _transform_window_for_pattern(window) | |
| if vec is not None: | |
| pat_inputs.append(vec); valid_15m_idxs.append(i) | |
| if pat_inputs: | |
| X_pat = np.array(pat_inputs) | |
| pat_preds = self._smart_predict(pattern_models['15m'], xgb.DMatrix(X_pat)) | |
| pat_scores_15m[valid_15m_idxs] = pat_preds | |
| ts_15m = df_15m_res.index.astype(np.int64) // 10**6 | |
| map_idxs = np.searchsorted(ts_15m, df_candidates['timestamp'].values) - 1 | |
| res_patterns = pat_scores_15m[np.clip(map_idxs, 0, len(pat_scores_15m)-1)] | |
| except Exception as e: print(f"Patterns Error: {e}") | |
| # 3. Titan | |
| res_titan = np.full(len(df_candidates), 0.5) | |
| if self.proc.titan and self.proc.titan.model: | |
| try: | |
| df_5m_feat = calculate_titan_features_real(df_5m).add_prefix('5m_') | |
| ts_5m = df_5m.index.astype(np.int64) // 10**6 | |
| map_idxs = np.clip(np.searchsorted(ts_5m, df_candidates['timestamp'].values) - 1, 0, len(df_5m_feat)-1) | |
| feats = self.proc.titan.feature_names | |
| X_titan_df = sanitize_features(df_5m_feat.iloc[map_idxs].reindex(columns=feats, fill_value=0)) | |
| res_titan = self.proc.titan.model.predict(xgb.DMatrix(X_titan_df.values, feature_names=feats)) | |
| except Exception as e: print(f"Titan Error: {e}") | |
| # 4. Sniper | |
| res_sniper = np.full(len(df_candidates), 0.5) | |
| sniper_models = getattr(self.proc.sniper, 'models', []) | |
| if sniper_models: | |
| try: | |
| feats = getattr(self.proc.sniper, 'feature_names', []) | |
| X_snip = sanitize_features(df_candidates[feats]).values | |
| preds = [self._extract_probs(self._smart_predict(m, X_snip)) for m in sniper_models] | |
| res_sniper = np.mean(preds, axis=0) | |
| except Exception as e: print(f"Sniper Error: {e}") | |
| # 5. Oracle | |
| res_oracle = np.full(len(df_candidates), 0.5) | |
| oracle_model = getattr(self.proc.oracle, 'model_direction', None) | |
| if oracle_model: | |
| try: | |
| oracle_feats = getattr(self.proc.oracle, 'feature_cols', []) | |
| X_orc_df = pd.DataFrame(0.0, index=range(len(df_candidates)), columns=oracle_feats) | |
| if 'sim_titan_score' in X_orc_df: X_orc_df['sim_titan_score'] = res_titan | |
| if 'sim_pattern_score' in X_orc_df: X_orc_df['sim_pattern_score'] = res_patterns | |
| if 'sim_mc_score' in X_orc_df: X_orc_df['sim_mc_score'] = 0.5 | |
| res_oracle = self._extract_probs(self._smart_predict(oracle_model, X_orc_df.values)) | |
| except Exception as e: print(f"Oracle Error: {e}") | |
| # 6. Hydra | |
| res_hydra_risk = np.zeros(len(df_candidates)) | |
| hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) | |
| if hydra_models and 'crash' in hydra_models: | |
| try: | |
| global_hydra_feats = np.column_stack([ | |
| df_sniper['rsi_14'], df_sniper['rsi_14'], df_sniper['rsi_14'], | |
| (df_sniper['close']-df_sniper['close'].rolling(20).mean())/df_sniper['close'], | |
| df_sniper['rel_vol'], df_sniper['atr'], df_sniper['close'] | |
| ]).astype(np.float32) | |
| global_hydra_feats = np.nan_to_num(global_hydra_feats, nan=0.0, posinf=0.0, neginf=0.0) | |
| window_view = sliding_window_view(global_hydra_feats, 240, axis=0).transpose(0, 2, 1) | |
| c_idxs = np.searchsorted(df_sniper.index, df_candidates.index) | |
| valid_s = c_idxs + 1 | |
| valid_mask_h = valid_s < (len(global_hydra_feats) - 240) | |
| final_s = valid_s[valid_mask_h]; res_idxs = np.where(valid_mask_h)[0] | |
| for i in range(0, len(final_s), 5000): | |
| b_idxs = final_s[i:i+5000]; r_idxs = res_idxs[i:i+5000] | |
| static = window_view[b_idxs] | |
| B = len(b_idxs) | |
| entry = df_sniper['close'].values[b_idxs-1].reshape(B, 1) | |
| s_c = static[:, 6, :]; s_atr = static[:, 5, :] | |
| dist = np.maximum(1.5*s_atr, entry*0.015) + 1e-9 | |
| pnl = (s_c - entry)/dist | |
| max_pnl = (np.maximum.accumulate(s_c, axis=1) - entry)/dist | |
| atr_p = s_atr/(s_c+1e-9) | |
| zeros = np.zeros((B, 240)); ones = np.ones((B, 240)); t = np.tile(np.arange(1, 241), (B, 1)) | |
| X = np.stack([ | |
| static[:,0], static[:,1], static[:,2], static[:,3], static[:,4], | |
| zeros, atr_p, pnl, max_pnl, zeros, zeros, t, zeros, ones*0.6, ones*0.7, ones*3 | |
| ], axis=2).reshape(-1, 16) | |
| X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) | |
| preds = hydra_models['crash'].predict_proba(X)[:, 1].reshape(B, 240) | |
| res_hydra_risk[r_idxs] = np.max(preds, axis=1) | |
| except: pass | |
| # 7. Legacy | |
| res_legacy_v2 = np.zeros(len(df_candidates)) | |
| res_legacy_v3 = np.zeros(len(df_candidates)) | |
| if self.proc.guardian_legacy: | |
| try: | |
| X_v2_full = calculate_legacy_v2_vectorized(df_1m, df_5m, df_15m) | |
| v3_df_full = calculate_legacy_v3_vectorized(df_1m, df_5m, df_15m) | |
| all_indices = np.arange(len(df_1m)) | |
| cand_indices = all_indices[valid_mask] | |
| max_len = len(df_1m) | |
| cand_indices = cand_indices[cand_indices < max_len] | |
| if len(cand_indices) > 0: | |
| if self.proc.guardian_legacy.model_v2 and X_v2_full is not None: | |
| subset_v2 = X_v2_full[cand_indices] | |
| preds_v2 = self.proc.guardian_legacy.model_v2.predict(xgb.DMatrix(subset_v2)) | |
| if len(preds_v2.shape) > 1: res_legacy_v2[:len(cand_indices)] = preds_v2[:, 2] | |
| else: res_legacy_v2[:len(cand_indices)] = preds_v2 | |
| if self.proc.guardian_legacy.model_v3 and v3_df_full is not None: | |
| subset_v3_df = v3_df_full.iloc[cand_indices] | |
| preds_v3 = self.proc.guardian_legacy.model_v3.predict(xgb.DMatrix(subset_v3_df)) | |
| res_legacy_v3[:len(cand_indices)] = preds_v3 | |
| except Exception as e: print(f"Legacy Error: {e}") | |
| # 8. Assembly | |
| print(f" π [Stats] Titan:{res_titan.mean():.2f} | Patterns:{res_patterns.mean():.2f} | Sniper:{res_sniper.mean():.2f} | Oracle:{res_oracle.mean():.2f}") | |
| ai_df = pd.DataFrame({ | |
| 'timestamp': df_candidates['timestamp'], | |
| 'symbol': sym, | |
| 'close': df_candidates['close'], | |
| 'real_titan': res_titan, | |
| 'oracle_conf': res_oracle, | |
| 'sniper_score': res_sniper, | |
| 'l1_score': df_candidates['l1_score'], | |
| 'risk_hydra_crash': res_hydra_risk, | |
| 'risk_legacy_v2': res_legacy_v2, | |
| 'risk_legacy_v3': res_legacy_v3 | |
| }) | |
| dt = time.time() - t0 | |
| if not ai_df.empty: | |
| ai_df.to_pickle(scores_file) | |
| print(f" β [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True) | |
| gc.collect() | |
| async def generate_truth_data(self): | |
| if self.force_start_date: | |
| dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000) | |
| print(f"\nπ [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}") | |
| for sym in self.TARGET_COINS: | |
| c = await self._fetch_all_data_fast(sym, ms_s, ms_e) | |
| if c: await self._process_data_in_memory(sym, c, ms_s, ms_e) | |
| def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots): | |
| print(f" β³ [System] Loading {len(scores_files)} datasets...", flush=True) | |
| data = [] | |
| for f in scores_files: | |
| try: data.append(pd.read_pickle(f)) | |
| except: pass | |
| if not data: return [] | |
| df = pd.concat(data).sort_values('timestamp') | |
| ts = df['timestamp'].values; close = df['close'].values.astype(float) | |
| sym = df['symbol'].values; sym_map = {s:i for i,s in enumerate(np.unique(sym))} | |
| sym_id = np.array([sym_map[s] for s in sym]) | |
| oracle = df['oracle_conf'].values; sniper = df['sniper_score'].values | |
| hydra = df['risk_hydra_crash'].values; titan = df['real_titan'].values | |
| l1 = df['l1_score'].values | |
| legacy_v2 = df['risk_legacy_v2'].values; legacy_v3 = df['risk_legacy_v3'].values | |
| N = len(ts) | |
| print(f" π [System] Testing {len(combinations_batch)} configs on {N} candles...", flush=True) | |
| res = [] | |
| for cfg in combinations_batch: | |
| pos = {}; log = [] | |
| bal = initial_capital; alloc = 0.0 | |
| mask = (l1 >= cfg['l1_thresh']) & (oracle >= cfg['oracle_thresh']) & (sniper >= cfg['sniper_thresh']) | |
| for i in range(N): | |
| s = sym_id[i]; p = close[i] | |
| if s in pos: | |
| entry = pos[s][0]; h_r = pos[s][1]; titan_entry = pos[s][3] | |
| crash_hydra = (h_r > cfg['hydra_thresh']) | |
| panic_legacy = (legacy_v2[i] > cfg['legacy_thresh']) or (legacy_v3[i] > cfg['legacy_thresh']) | |
| pnl = (p - entry)/entry | |
| if crash_hydra or panic_legacy or pnl > 0.04 or pnl < -0.02: | |
| realized = pnl - fees_pct*2 | |
| bal += pos[s][2] * (1 + realized) | |
| alloc -= pos[s][2] | |
| is_consensus = (titan_entry > 0.55) | |
| log.append({'pnl': realized, 'consensus': is_consensus}) | |
| del pos[s] | |
| if len(pos) < max_slots and mask[i]: | |
| if s not in pos and bal >= 5.0: | |
| size = min(10.0, bal * 0.98) | |
| pos[s] = (p, hydra[i], size, titan[i]) | |
| bal -= size; alloc += size | |
| final_bal = bal + alloc | |
| profit = final_bal - initial_capital | |
| tot = len(log) | |
| wins = sum(1 for x in log if x['pnl'] > 0) | |
| win_rate = (wins/tot*100) if tot else 0 | |
| cons_trades = [x for x in log if x['consensus']] | |
| n_cons = len(cons_trades) | |
| agree_rate = (n_cons/tot*100) if tot else 0 | |
| cons_win_rate = (sum(1 for x in cons_trades if x['pnl']>0)/n_cons*100) if n_cons else 0 | |
| cons_avg_pnl = (sum(x['pnl'] for x in cons_trades)/n_cons*100) if n_cons else 0 | |
| res.append({ | |
| 'config': cfg, 'final_balance': final_bal, 'net_profit': profit, | |
| 'total_trades': tot, 'win_rate': win_rate, 'max_drawdown': 0, | |
| 'consensus_agreement_rate': agree_rate, | |
| 'high_consensus_win_rate': cons_win_rate, | |
| 'high_consensus_avg_pnl': cons_avg_pnl | |
| }) | |
| return res | |
| async def run_optimization(self, target_regime="RANGE"): | |
| await self.generate_truth_data() | |
| oracle_r = np.linspace(0.3, 0.7, 3); sniper_r = np.linspace(0.2, 0.6, 3) | |
| hydra_r = [0.8, 0.9]; l1_r = [5.0, 10.0] | |
| combos = [] | |
| for o, s, h, l1 in itertools.product(oracle_r, sniper_r, hydra_r, l1_r): | |
| combos.append({ | |
| 'w_titan': 0.4, 'w_struct': 0.3, 'thresh': l1, 'l1_thresh': l1, | |
| 'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95 | |
| }) | |
| files = glob.glob(os.path.join(CACHE_DIR, "*.pkl")) | |
| results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS) | |
| if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0} | |
| results_list.sort(key=lambda x: x['net_profit'], reverse=True) | |
| best = results_list[0] | |
| print("\n" + "="*60) | |
| print(f"π CHAMPION REPORT [{target_regime}]:") | |
| print(f" π° Final Balance: ${best['final_balance']:,.2f}") | |
| print(f" π Net PnL: ${best['net_profit']:,.2f}") | |
| print("-" * 60) | |
| print(f" π Total Trades: {best['total_trades']}") | |
| print(f" π Win Rate: {best['win_rate']:.1f}%") | |
| print("-" * 60) | |
| print(f" π§ CONSENSUS ANALYTICS:") | |
| print(f" π€ Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%") | |
| print(f" π High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%") | |
| print(f" π High-Consensus Avg PnL: {best['high_consensus_avg_pnl']:.2f}%") | |
| print("-" * 60) | |
| print(f" βοΈ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}") | |
| print("="*60) | |
| return best['config'], best | |
| async def run_strategic_optimization_task(): | |
| print("\nπ§ͺ [STRATEGIC BACKTEST] Full Spectrum Mode...") | |
| r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm) | |
| try: | |
| await dm.initialize(); await proc.initialize() | |
| if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True) | |
| hub = AdaptiveHub(r2); await hub.initialize() | |
| opt = HeavyDutyBacktester(dm, proc) | |
| scenarios = [ | |
| {"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"}, | |
| {"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"}, | |
| {"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"}, | |
| {"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"} | |
| ] | |
| for s in scenarios: | |
| opt.set_date_range(s["start"], s["end"]) | |
| best_cfg, best_stats = await opt.run_optimization(s["regime"]) | |
| if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats) | |
| await hub._save_state_to_r2() | |
| print("β [System] DNA Updated.") | |
| finally: | |
| print("π [System] Closing connections...") | |
| await dm.close() | |
| if __name__ == "__main__": | |
| asyncio.run(run_strategic_optimization_task()) |