# ============================================================ # ๐Ÿงช backtest_engine.py (V223.5 - GEM-Architect: The Data Floodgates Open) # FIXES: # 1) Relaxed Storage Thresholds (Solves "Signals: 0"). # 2) Includes MFI/Slope/Aliases (Solves Warnings). # 3) Includes Sniper Shape Fix (Solves Crash). # ============================================================ import asyncio import pandas as pd import numpy as np import time import logging import os import glob import gc import traceback import pickle from datetime import datetime, timezone, timedelta try: import pandas_ta as ta import xgboost as xgb import lightgbm as lgb except ImportError as e: raise ImportError(f"๐Ÿ”ด CRITICAL: Missing mandatory dependency for Truth Mode: {e}") try: from ml_engine.processor import MLProcessor from ml_engine.data_manager import DataManager from learning_hub.adaptive_hub import AdaptiveHub from r2 import R2Service from governance_engine import GovernanceEngine except ImportError: pass logging.getLogger("ml_engine").setLevel(logging.WARNING) CACHE_DIR = "backtest_v223_immutable" GOV_CACHE_DIR = os.path.join(CACHE_DIR, "gov_cache") PATTERN_CACHE_DIR = os.path.join(CACHE_DIR, "patterns_cache") for d in [CACHE_DIR, GOV_CACHE_DIR, PATTERN_CACHE_DIR]: if not os.path.exists(d): os.makedirs(d) # ============================================================ # ๐Ÿ› ๏ธ HELPERS # ============================================================ def optimize_dataframe_memory(df: pd.DataFrame): if df is None or len(df) == 0: return df float_cols = df.select_dtypes(include=["float64"]).columns if len(float_cols) > 0: df[float_cols] = df[float_cols].astype("float32") int_cols = df.select_dtypes(include=["int64", "int32"]).columns for col in int_cols: c_min = df[col].min() c_max = df[col].max() if c_min > -128 and c_max < 127: df[col] = df[col].astype("int8") elif c_min > -32768 and c_max < 32767: df[col] = df[col].astype("int16") else: df[col] = df[col].astype("int32") return df def tf_to_offset(tf: str): if tf.endswith("m") and tf[:-1].isdigit(): return f"{int(tf[:-1])}T" if tf.endswith("h") and tf[:-1].isdigit(): return f"{int(tf[:-1])}h" if tf.endswith("d") and tf[:-1].isdigit(): return f"{int(tf[:-1])}D" return None def calc_max_drawdown(equity_curve): if not equity_curve: return 0.0 eq = np.array(equity_curve, dtype=np.float64) peak = np.maximum.accumulate(eq) dd = (eq - peak) / (peak + 1e-9) return float(dd.min()) * 100 def calc_profit_factor(wins, losses): gross_win = np.sum(wins) gross_loss = abs(np.sum(losses)) if gross_loss < 1e-9: return 99.0 return float(gross_win / gross_loss) def calc_ulcer_index(equity_curve): if not equity_curve: return 0.0 eq = np.asarray(equity_curve, dtype=np.float64) peak = np.maximum.accumulate(eq) dd_pct = (eq - peak) / (peak + 1e-12) * 100.0 return float(np.sqrt(np.mean(dd_pct**2))) def calc_sharpe(returns, eps=1e-12): if returns is None or len(returns) < 2: return 0.0 r = np.asarray(returns, dtype=np.float64) mu = np.mean(r) sd = np.std(r) if sd < eps: return 0.0 return float(mu / sd * np.sqrt(len(r))) def calc_sortino(returns, eps=1e-12): if returns is None or len(returns) < 2: return 0.0 r = np.asarray(returns, dtype=np.float64) mu = np.mean(r) downside = r[r < 0] if len(downside) < 1: return 99.0 dd = np.std(downside) if dd < eps: return 0.0 return float(mu / dd * np.sqrt(len(r))) def calc_cagr(initial_capital, final_balance, start_ms, end_ms): if initial_capital <= 0 or final_balance <= 0 or end_ms <= start_ms: return 0.0 years = (end_ms - start_ms) / (1000.0 * 60 * 60 * 24 * 365.25) if years < 1e-6: return 0.0 try: return float((final_balance / initial_capital) ** (1.0 / years) - 1.0) except: return 0.0 def calc_calmar(cagr, max_drawdown_pct): dd = abs(max_drawdown_pct) if dd < 1e-9: return 99.0 return float((cagr * 100.0) / dd) def calc_consecutive_streaks(pnls): max_w = max_l = 0 cur_w = cur_l = 0 for p in pnls: if p > 0: cur_w += 1 cur_l = 0 else: cur_l += 1 cur_w = 0 max_w = max(max_w, cur_w) max_l = max(max_l, cur_l) return int(max_w), int(max_l) # ============================================================ # ๐Ÿงช BACKTESTER # ============================================================ class HeavyDutyBacktester: def __init__(self, data_manager, processor): self.dm = data_manager self.proc = processor self.gov_engine = GovernanceEngine() # If True: raise on missing features. If False: fill 0 and continue. self.STRICT_FEATURES = False self._missing_feature_once = set() self._verify_system_integrity() self.GRID_DENSITY = 6 self.MAX_SAMPLES = 3000 self.INITIAL_CAPITAL = 10.0 self.MIN_CAPITAL_FOR_SPLIT = 20.0 self.TRADING_FEES = 0.001 self.SLIPPAGE_PCT = 0.0005 self.MAX_SLOTS = 4 self.GRID_RANGES = { "TITAN": np.linspace(0.40, 0.70, self.GRID_DENSITY), "ORACLE": np.linspace(0.55, 0.80, self.GRID_DENSITY), "SNIPER": np.linspace(0.30, 0.65, self.GRID_DENSITY), "PATTERN": np.linspace(0.30, 0.70, self.GRID_DENSITY), "GOV_SCORE": np.linspace(50.0, 80.0, self.GRID_DENSITY), "HYDRA_THRESH": np.linspace(0.60, 0.90, self.GRID_DENSITY), "LEGACY_THRESH": np.linspace(0.85, 0.98, self.GRID_DENSITY), } self.TARGET_COINS = [ "SOL/USDT", "XRP/USDT", "DOGE/USDT" ] self.USE_FIXED_DATES = False self.LOOKBACK_DAYS = 60 self.force_start_date = "2024-01-01" self.force_end_date = "2024-02-01" self.required_timeframes = self._determine_required_timeframes() print(f"๐Ÿงช [Backtest V223.5] IMMUTABLE TRUTH (Patched & Open Gates). TFs: {self.required_timeframes}") def _verify_system_integrity(self): errors = [] if not getattr(self.proc, "titan", None) or not getattr(self.proc.titan, "model", None): errors.append("โŒ Titan Engine missing") if not getattr(self.proc, "oracle", None) or not getattr(self.proc.oracle, "model_direction", None): errors.append("โŒ Oracle Engine missing") if not getattr(self.proc, "pattern_engine", None): errors.append("โŒ Pattern Engine missing") if not getattr(self.proc, "sniper", None): errors.append("โŒ Sniper Engine missing") if not getattr(self.proc, "guardian_hydra", None) or not getattr(self.proc.guardian_hydra, "models", None): errors.append("โŒ Hydra Guardian missing/models not loaded") else: m = self.proc.guardian_hydra.models if "crash" not in m or "giveback" not in m: errors.append("โŒ Hydra missing crash/giveback heads") try: _ = m["crash"].predict_proba _ = m["giveback"].predict_proba except: errors.append("โŒ Hydra heads must implement predict_proba()") if errors: raise RuntimeError(f"CRITICAL INTEGRITY FAILURE: {errors}") def _determine_required_timeframes(self): tfs = set(["5m", "15m", "1h", "4h"]) def maybe_add(prefix: str): if tf_to_offset(prefix): tfs.add(prefix) if hasattr(self.proc.titan.model, "feature_names"): for f in self.proc.titan.model.feature_names: if "_" in f: maybe_add(f.split("_", 1)[0]) if hasattr(self.proc.oracle, "feature_cols"): for f in self.proc.oracle.feature_cols: if "_" in f: maybe_add(f.split("_", 1)[0]) return list(tfs) # -------------------------- # Indicator Hardening Layer (FIXED: MFI, Slope, Aliases) # -------------------------- @staticmethod def _safe_bbands(close: pd.Series, length=20, std=2.0): basis = close.rolling(length).mean() dev = close.rolling(length).std(ddof=0) upper = basis + std * dev lower = basis - std * dev width = (upper - lower) / (basis.abs() + 1e-12) pct = (close - lower) / ((upper - lower) + 1e-12) return lower, upper, width, pct def _calculate_all_indicators(self, df: pd.DataFrame): cols = ["open", "high", "low", "close", "volume"] for c in cols: df[c] = pd.to_numeric(df[c], errors="coerce") df[cols] = df[cols].replace([np.inf, -np.inf], np.nan) df.dropna(subset=["close", "high", "low"], inplace=True) c = df["close"].astype(np.float64) h = df["high"].astype(np.float64) l = df["low"].astype(np.float64) v = df["volume"].astype(np.float64) if "volume" in df.columns else pd.Series(np.zeros(len(df)), index=df.index) # ---------------------------------------------------- # โœ… [GEM-FIX] Compatibility Bridge (Oracle & Titan) # ---------------------------------------------------- # 1. Oracle: Slope try: df['slope'] = ta.slope(c, length=7).fillna(0) except: df['slope'] = 0.0 # 2. Titan: MFI try: df['MFI'] = ta.mfi(h, l, c, v, length=14).fillna(50) except: df['MFI'] = 50.0 # 3. Oracle Mapping (LowerCase Aliases) df["RSI"] = ta.rsi(c, length=14).fillna(50) df["rsi"] = df["RSI"] # Alias df["ATR"] = ta.atr(h, l, c, length=14).fillna(0) df["ATR_pct"] = (df["ATR"] / (c + 1e-12)) * 100 df["atr_pct"] = df["ATR_pct"] # Alias # 4. Oracle: Volume Z-Score (vol_z) vol_mean = v.rolling(20).mean() vol_std = v.rolling(20).std() df["vol_z"] = ((v - vol_mean) / (vol_std + 1e-9)).fillna(0) # For Oracle # 5. Titan: Trend Strong (Approx) adx_df = ta.adx(h, l, c, length=14) if adx_df is not None and not adx_df.empty: df["ADX"] = adx_df.iloc[:, 0].fillna(0) df["Trend_Strong"] = np.where(df["ADX"] > 25, 1, 0) else: df["ADX"] = 0.0 df["Trend_Strong"] = 0 # ---------------------------------------------------- try: df["CHOP"] = ta.chop(h, l, c, length=14).fillna(50) except: df["CHOP"] = 50 try: df["vwap"] = ta.vwap(h, l, c, v).fillna(c) except: df["vwap"] = c try: df["CCI"] = ta.cci(h, l, c, length=20).fillna(0) except: df["CCI"] = 0.0 # EMAs for span in [9, 20, 21, 50, 200]: df[f"ema{span}"] = c.ewm(span=span, adjust=False).mean() # Derived df["EMA_9_dist"] = (c / (df["ema9"] + 1e-12)) - 1 df["EMA_21_dist"] = (c / (df["ema21"] + 1e-12)) - 1 df["EMA_50_dist"] = (c / (df["ema50"] + 1e-12)) - 1 df["EMA_200_dist"] = (c / (df["ema200"] + 1e-12)) - 1 df["VWAP_dist"] = (c / (df["vwap"] + 1e-12)) - 1 # BBANDS if len(df) < 30: df["lower_bb"] = c; df["upper_bb"] = c; df["bb_width"] = 0.0; df["bb_pct"] = 0.5 df["BB_w"] = 0.0; df["BB_p"] = 0.5 else: bb = ta.bbands(c, length=20, std=2.0) if bb is not None and isinstance(bb, pd.DataFrame): col_w = [x for x in bb.columns if "BBB" in x] col_p = [x for x in bb.columns if "BBP" in x] col_l = [x for x in bb.columns if "BBL" in x] col_u = [x for x in bb.columns if "BBU" in x] df["bb_width"] = bb[col_w[0]] if col_w else 0.0 df["bb_pct"] = bb[col_p[0]] if col_p else 0.5 df["lower_bb"] = bb[col_l[0]] if col_l else c df["upper_bb"] = bb[col_u[0]] if col_u else c df["BB_w"] = df["bb_width"] df["BB_p"] = df["bb_pct"] else: df["lower_bb"] = c; df["upper_bb"] = c; df["bb_width"] = 0.0; df["bb_pct"] = 0.5 df["BB_w"] = 0.0; df["BB_p"] = 0.5 # MACD macd = ta.macd(c) if macd is not None and not macd.empty: df["MACD"] = macd.iloc[:, 0] df["MACD_h"] = macd.iloc[:, 1] df["MACD_s"] = macd.iloc[:, 2] else: df["MACD"] = 0.0; df["MACD_h"] = 0.0; df["MACD_s"] = 0.0 mean_vol = v.rolling(50).mean() + 1e-9 df["rel_vol"] = v / mean_vol df["log_ret"] = np.concatenate([[0], np.diff(np.log(c + 1e-9))]) return df.fillna(0) def _warn_missing_once(self, msg: str): if msg in self._missing_feature_once: return self._missing_feature_once.add(msg) print(f"[WARN] {msg}") async def _fetch_all_data_fast(self, sym, start_ms, end_ms): print(f" โšก [Network] Downloading {sym}...", flush=True) limit = 1000 tasks = [] curr = start_ms while curr < end_ms: tasks.append(curr) curr += limit * 60 * 1000 all_candles = [] sem = asyncio.Semaphore(20) async def _fetch_batch(timestamp): async with sem: for _ in range(3): try: return await self.dm.exchange.fetch_ohlcv(sym, "1m", since=timestamp, limit=limit) except: await asyncio.sleep(0.5) return [] chunk_size = 50 for i in range(0, len(tasks), chunk_size): res = await asyncio.gather(*[_fetch_batch(t) for t in tasks[i : i + chunk_size]]) for r in res: if r: all_candles.extend(r) if not all_candles: return None df = pd.DataFrame(all_candles, columns=["timestamp", "o", "h", "l", "c", "v"]) df.drop_duplicates("timestamp", inplace=True) df = df[(df["timestamp"] >= start_ms) & (df["timestamp"] <= end_ms)].sort_values("timestamp") return df.values.tolist() async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): safe_sym = sym.replace("/", "_") period_suffix = f"{start_ms}_{end_ms}" scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_processed.pkl" if os.path.exists(scores_file): print(f" ๐Ÿ“‚ [{sym}] Loaded Cache.") return print(f" โš™๏ธ [CPU] Processing {sym} (Truth Mode)...", flush=True) t0 = time.time() df_1m = pd.DataFrame(candles, columns=["timestamp", "open", "high", "low", "close", "volume"]) df_1m["datetime"] = pd.to_datetime(df_1m["timestamp"] + 60000, unit="ms", utc=True) df_1m.set_index("datetime", inplace=True) df_1m = df_1m.sort_index() df_1m = self._calculate_all_indicators(df_1m) if len(df_1m) < 300: raise RuntimeError(f"{sym} has too few valid candles after cleaning: {len(df_1m)}") arr_ts_1m = (df_1m.index.astype(np.int64) // 10**6).values fast_1m_close = df_1m["close"].values.astype(np.float32) numpy_htf = {} agg_dict = {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} for tf_str in self.required_timeframes: offset = tf_to_offset(tf_str) if not offset: continue resampled = df_1m.resample(offset, label="right", closed="right").agg(agg_dict).dropna() resampled = self._calculate_all_indicators(resampled) if len(resampled) == 0: continue resampled["timestamp"] = resampled.index.astype(np.int64) // 10**6 numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} if "1h" not in numpy_htf: raise RuntimeError(f"CRITICAL: '1h' missing for {sym}.") def get_safe_map(tf): if tf not in numpy_htf or len(numpy_htf[tf]["timestamp"]) == 0: return np.full(len(arr_ts_1m), -1, dtype=np.int32) htf_ts = numpy_htf[tf]["timestamp"] idx = np.searchsorted(htf_ts, arr_ts_1m, side="right") - 1 return idx.astype(np.int32) maps = {tf: get_safe_map(tf) for tf in self.required_timeframes} validity_mask = np.ones(len(arr_ts_1m), dtype=bool) for tf in maps: validity_mask &= (maps[tf] >= 0) validity_mask[:200] = False # 1) Pattern (Cached) global_pattern_scores = np.zeros(len(arr_ts_1m), dtype=np.float32) pat_cache_file = os.path.join(PATTERN_CACHE_DIR, f"{safe_sym}_{period_suffix}_pat.pkl") pattern_results_map = {} if os.path.exists(pat_cache_file): with open(pat_cache_file, "rb") as f: pattern_results_map = pickle.load(f) elif "15m" in numpy_htf: ts_15m = numpy_htf["15m"]["timestamp"] cols = ["timestamp", "open", "high", "low", "close", "volume"] df_15m_source = pd.DataFrame({c: numpy_htf["15m"][c] for c in cols}) for i in range(200, len(df_15m_source)): window = df_15m_source.iloc[i - 200 : i + 1] ohlcv_input = {"15m": window.values.tolist()} try: res = await self.proc.pattern_engine.detect_chart_patterns(ohlcv_input) pattern_results_map[ts_15m[i]] = res.get("pattern_confidence", 0.0) except: pass with open(pat_cache_file, "wb") as f: pickle.dump(pattern_results_map, f) if "15m" in maps and "15m" in numpy_htf: map_15 = maps["15m"] ts_15_arr = numpy_htf["15m"]["timestamp"] for i in range(len(arr_ts_1m)): if not validity_mask[i]: continue idx = map_15[i] if idx >= 0: global_pattern_scores[i] = pattern_results_map.get(ts_15_arr[idx], 0.0) # 2) Governance (Cached) gov_scores_final = np.zeros(len(arr_ts_1m), dtype=np.float32) gov_cache_file = os.path.join(GOV_CACHE_DIR, f"{safe_sym}_{period_suffix}_gov.pkl") gov_results_map = {} if os.path.exists(gov_cache_file): with open(gov_cache_file, "rb") as f: gov_results_map = pickle.load(f) elif "15m" in numpy_htf: cols = ["timestamp", "open", "high", "low", "close", "volume"] df_15m_g = pd.DataFrame({c: numpy_htf["15m"][c] for c in cols}) ts_15m = numpy_htf["15m"]["timestamp"] has_1h = "1h" in numpy_htf df_1h_g = pd.DataFrame({c: numpy_htf["1h"][c] for c in cols}) if has_1h else None ts_1h = numpy_htf["1h"]["timestamp"] if has_1h else None for i in range(200, len(df_15m_g)): curr_ts = ts_15m[i] win_15 = df_15m_g.iloc[i - 120 : i + 1] ohlcv_input = {"15m": win_15.values.tolist()} if has_1h: idx_1h = np.searchsorted(ts_1h, curr_ts, side="right") - 1 if idx_1h >= 50: ohlcv_input["1h"] = df_1h_g.iloc[idx_1h - 60 : idx_1h + 1].values.tolist() try: res = await self.gov_engine.evaluate_trade(sym, ohlcv_input, {}, "NORMAL", False, has_1h) score = res.get("governance_score", 0.0) if res.get("grade") != "REJECT" else 0.0 gov_results_map[curr_ts] = score except: pass with open(gov_cache_file, "wb") as f: pickle.dump(gov_results_map, f) if "15m" in maps and "15m" in numpy_htf: map_15 = maps["15m"] ts_15_arr = numpy_htf["15m"]["timestamp"] for i in range(len(arr_ts_1m)): if not validity_mask[i]: continue idx = map_15[i] if idx >= 0: gov_scores_final[i] = gov_results_map.get(ts_15_arr[idx], 0.0) # 3) Market State map_1h = maps["1h"] valid_1h = map_1h >= 0 idx_1h = map_1h[valid_1h] h1_chop = numpy_htf["1h"]["CHOP"][idx_1h] h1_adx = numpy_htf["1h"]["ADX"][idx_1h] h1_atr_pct = numpy_htf["1h"]["ATR_pct"][idx_1h] market_ok = np.ones(len(arr_ts_1m), dtype=bool) market_ok[valid_1h] = ~((h1_chop > 61.8) | ((h1_atr_pct < 0.3) & (h1_adx < 20))) coin_state = np.zeros(len(arr_ts_1m), dtype=np.int8) h1_rsi = numpy_htf["1h"]["RSI"][idx_1h] h1_bbw = numpy_htf["1h"]["bb_width"][idx_1h] h1_upper = numpy_htf["1h"]["upper_bb"][idx_1h] h1_ema20 = numpy_htf["1h"]["ema20"][idx_1h] h1_ema50 = numpy_htf["1h"]["ema50"][idx_1h] h1_ema200 = numpy_htf["1h"]["ema200"][idx_1h] h1_close = numpy_htf["1h"]["close"][idx_1h] h1_rel_vol = numpy_htf["1h"]["rel_vol"][idx_1h] mask_acc = (h1_bbw < 0.20) & (h1_rsi >= 35) & (h1_rsi <= 65) mask_safe = (h1_adx > 25) & (h1_ema20 > h1_ema50) & (h1_ema50 > h1_ema200) & (h1_rsi > 50) & (h1_rsi < 75) mask_exp = (h1_rsi > 65) & (h1_close > h1_upper) & (h1_rel_vol > 1.5) state_buffer = np.zeros(len(idx_1h), dtype=np.int8) state_buffer[mask_acc] = 1 state_buffer[mask_safe] = 2 state_buffer[mask_exp] = 3 coin_state[valid_1h] = state_buffer coin_state[~validity_mask] = 0 coin_state[~market_ok] = 0 # ========================= # 4) Titan & Oracle (Hardened) # ========================= titan_cols = self.proc.titan.model.feature_names t_vecs = [] for col in titan_cols: parts = col.split("_", 1) if len(parts) < 2: raise ValueError(f"Titan Feature Format Error: {col}") tf = parts[0] raw_feat = parts[1] lookup_key = "bb_pct" if raw_feat in ["BB_p", "BB_pct"] else ("bb_width" if raw_feat == "BB_w" else raw_feat) if tf not in numpy_htf: if self.STRICT_FEATURES: raise ValueError(f"Titan requires TF not built: {tf} (feature: {col})") self._warn_missing_once(f"Titan TF missing -> {col}. Filled 0.") t_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) continue if lookup_key not in numpy_htf[tf] and lookup_key != "timestamp": if self.STRICT_FEATURES: raise ValueError(f"Missing Titan Feature: {col}") self._warn_missing_once(f"Missing Titan Feature -> {col}. Filled 0.") t_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) continue idx = maps[tf] vals = np.zeros(len(arr_ts_1m), dtype=np.float32) valid = idx >= 0 if lookup_key == "timestamp": vals[valid] = numpy_htf[tf]["timestamp"][idx[valid]] else: vals[valid] = numpy_htf[tf][lookup_key][idx[valid]] t_vecs.append(vals) X_TITAN = np.column_stack(t_vecs) global_titan_scores = self.proc.titan.model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols)) oracle_cols = self.proc.oracle.feature_cols o_vecs = [] for col in oracle_cols: if col == "sim_titan_score": o_vecs.append(global_titan_scores.astype(np.float32)) elif col in ["sim_pattern_score", "pattern_score"]: o_vecs.append(global_pattern_scores.astype(np.float32)) elif col == "sim_mc_score": o_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) else: parts = col.split("_", 1) if len(parts) != 2: raise ValueError(f"Oracle Feature Error: {col}") tf, key = parts if tf not in numpy_htf: if self.STRICT_FEATURES: raise ValueError(f"Oracle requires TF not built: {tf} (feature: {col})") self._warn_missing_once(f"Oracle TF missing -> {col}. Filled 0.") o_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) continue if key not in numpy_htf[tf]: if self.STRICT_FEATURES: raise ValueError(f"Missing Oracle Feature: {col}") self._warn_missing_once(f"Missing Oracle Feature -> {col}. Filled 0.") o_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) continue idx = maps[tf] vals = np.zeros(len(arr_ts_1m), dtype=np.float32) valid = idx >= 0 vals[valid] = numpy_htf[tf][key][idx[valid]] o_vecs.append(vals) X_ORACLE = np.column_stack(o_vecs) preds_o = self.proc.oracle.model_direction.predict(X_ORACLE) if isinstance(preds_o, np.ndarray) and len(preds_o.shape) > 1: preds_o = preds_o[:, 0] global_oracle_scores = preds_o.astype(np.float32) # 5) Sniper (GEM-FIXED: Shape & Broadcasting Safety) df_sniper_feats = self.proc.sniper._calculate_features_live(df_1m) X_sniper = df_sniper_feats[self.proc.sniper.feature_names].fillna(0) preds_accum = np.zeros(len(X_sniper), dtype=np.float32) for model in self.proc.sniper.models: raw_preds = model.predict(X_sniper) if len(raw_preds.shape) > 1 and raw_preds.shape[1] > 1: preds_accum += raw_preds[:, 1].astype(np.float32) else: preds_accum += raw_preds.astype(np.float32) global_sniper_scores = (preds_accum / max(1, len(self.proc.sniper.models))).astype(np.float32) # 6) Hydra Static map_5 = maps["5m"] map_15 = maps["15m"] map_1 = maps.get("1h", map_15) f_rsi_1m = df_1m["RSI"].values.astype(np.float32) f_rsi_5m = np.zeros(len(arr_ts_1m), dtype=np.float32) v5 = map_5 >= 0 if "5m" in numpy_htf and "RSI" in numpy_htf["5m"]: f_rsi_5m[v5] = numpy_htf["5m"]["RSI"][map_5[v5]].astype(np.float32) f_rsi_15m = np.zeros(len(arr_ts_1m), dtype=np.float32) v15 = map_15 >= 0 if "15m" in numpy_htf and "RSI" in numpy_htf["15m"]: f_rsi_15m[v15] = numpy_htf["15m"]["RSI"][map_15[v15]].astype(np.float32) f_dist_1h = np.zeros(len(arr_ts_1m), dtype=np.float32) v1 = map_1 >= 0 ema20_1h = numpy_htf["1h"]["ema20"][map_1[v1]].astype(np.float32) close_1h = numpy_htf["1h"]["close"][map_1[v1]].astype(np.float32) f_dist_1h[v1] = (close_1h - ema20_1h) / (close_1h + 1e-12) hydra_static = np.column_stack( [ f_rsi_1m, f_rsi_5m, f_rsi_15m, df_1m["bb_width"].values.astype(np.float32), df_1m["rel_vol"].values.astype(np.float32), f_dist_1h, (df_1m["ATR_pct"].values.astype(np.float32) / 100.0), ] ).astype(np.float32) # ========================================== # ๐ŸŸข GEM-FIX: Relaxed Saving Thresholds # ========================================== min_gov = 0.01 min_oracle = 0.01 min_titan = 0.01 min_sniper = 0.01 min_pattern = 0.01 filter_mask = ( validity_mask & (coin_state > 0) & (gov_scores_final >= min_gov) & (global_oracle_scores >= min_oracle) & (global_titan_scores >= min_titan) & (global_sniper_scores >= min_sniper) & (global_pattern_scores >= min_pattern) ) valid_idxs = np.where(filter_mask)[0] signals_df = pd.DataFrame( { "timestamp": arr_ts_1m[valid_idxs], "symbol": sym, "close": fast_1m_close[valid_idxs], "coin_state": coin_state[valid_idxs], "gov_score": gov_scores_final[valid_idxs], "titan_score": global_titan_scores[valid_idxs].astype(np.float32), "oracle_conf": global_oracle_scores[valid_idxs].astype(np.float32), "sniper_score": global_sniper_scores[valid_idxs].astype(np.float32), "pattern_score": global_pattern_scores[valid_idxs].astype(np.float32), } ) sim_data = { "timestamp": arr_ts_1m.astype(np.int64), "close": fast_1m_close, "high": df_1m["high"].values.astype(np.float32), "low": df_1m["low"].values.astype(np.float32), "atr": df_1m["ATR"].values.astype(np.float32), "hydra_static": hydra_static, "oracle_conf": global_oracle_scores.astype(np.float32), "titan_score": global_titan_scores.astype(np.float32), } pd.to_pickle({"signals": signals_df, "sim_data": sim_data}, scores_file) dt = time.time() - t0 print(f" โœ… [{sym}] Processed in {dt:.2f}s. Signals: {len(signals_df)}") gc.collect() async def generate_truth_data(self): if self.USE_FIXED_DATES: dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) else: now = datetime.now(timezone.utc) dt_s = now - timedelta(days=self.LOOKBACK_DAYS) dt_e = now ms_s = int(dt_s.timestamp() * 1000) ms_e = int(dt_e.timestamp() * 1000) for sym in self.TARGET_COINS: try: c = await self._fetch_all_data_fast(sym, ms_s, ms_e) if c: await self._process_data_in_memory(sym, c, ms_s, ms_e) except Exception as e: print(f"[WARN] {sym} skipped due to error: {e}") traceback.print_exc() # ========================= # Optimization core (unchanged from your last version) # ========================= def _flush_position_interval( self, cfg, open_sym, pos, curr_ts, sim_env, crash_model, giveback_model, fees_pct, trade_pnls, trade_returns, trade_durations, equity_curve, cash_bal, wins_losses, last_update_map, end_idx_override=None ): c_data = sim_env[open_sym] full_ts = c_data["timestamp"] start_idx = int(last_update_map.get(open_sym, 0)) if start_idx < 0: start_idx = 0 if end_idx_override is None: end_idx = int(np.searchsorted(full_ts, curr_ts, side="right")) else: end_idx = int(end_idx_override) end_idx = min(end_idx, len(full_ts)) if end_idx <= start_idx: return cash_bal, False interval_high = c_data["high"][start_idx:end_idx] interval_low = c_data["low"][start_idx:end_idx] interval_close = c_data["close"][start_idx:end_idx] interval_atr = c_data["atr"][start_idx:end_idx] h_static = c_data["hydra_static"][start_idx:end_idx] h_oracle = c_data["oracle_conf"][start_idx:end_idx] h_titan = c_data["titan_score"][start_idx:end_idx] entry_p = float(pos["entry_p"]) entry_time = int(pos["entry_ts"]) prev_high = float(pos.get("highest_price", entry_p)) current_highs = np.maximum.accumulate(np.concatenate([[prev_high], interval_high]))[1:] pos["highest_price"] = float(current_highs[-1]) durations = (full_ts[start_idx:end_idx] - entry_time) / 60000.0 sl_dist = np.maximum(1.5 * interval_atr, 1e-8) pnl = interval_close - entry_p norm_pnl = pnl / sl_dist max_pnl = (current_highs - entry_p) / sl_dist zeros = np.zeros(len(interval_close), dtype=np.float32) h_dynamic = np.column_stack([norm_pnl, max_pnl, zeros, zeros, durations]).astype(np.float32) threes = np.full(len(interval_close), 3.0, dtype=np.float32) h_context = np.column_stack([zeros, h_oracle, h_titan, threes]).astype(np.float32) X_H = np.column_stack([h_static, h_dynamic, h_context]).astype(np.float32) crash_probs = crash_model.predict_proba(X_H)[:, 1] give_probs = giveback_model.predict_proba(X_H)[:, 1] sl_hit = interval_low < pos["sl_p"] tp_hit = interval_high > pos["tp_p"] hydra_hit = (crash_probs > cfg["HYDRA_THRESH"]) | (give_probs > cfg["HYDRA_THRESH"]) legacy_hit = (crash_probs > cfg["LEGACY_THRESH"]) | (give_probs > cfg["LEGACY_THRESH"]) any_exit = sl_hit | tp_hit | legacy_hit | hydra_hit last_update_map[open_sym] = end_idx if not np.any(any_exit): return cash_bal, False idx = int(np.argmax(any_exit)) exit_ts = int(full_ts[start_idx + idx]) if sl_hit[idx]: exit_p = float(pos["sl_p"]) * (1 - self.SLIPPAGE_PCT) elif tp_hit[idx]: exit_p = float(pos["tp_p"]) * (1 - self.SLIPPAGE_PCT) elif legacy_hit[idx]: exit_p = float(interval_close[idx]) * (1 - (self.SLIPPAGE_PCT * 2.0)) else: exit_p = float(interval_close[idx]) * (1 - self.SLIPPAGE_PCT) net = (pos["qty"] * exit_p) * (1 - fees_pct) cash_bal += net pnl_real = float(net - pos["cost"]) trade_pnls.append(pnl_real) trade_returns.append(pnl_real / (float(pos["cost"]) + 1e-12)) trade_durations.append((exit_ts - entry_time) / 60000.0) equity_curve.append(float(cash_bal)) if pnl_real > 0: wins_losses["wins"] += 1 else: wins_losses["losses"] += 1 return cash_bal, True def _worker_optimize(self, combinations_batch, scores_files, initial_capital, fees_pct, max_slots, target_state): all_signals = [] sim_env = {} crash_model = self.proc.guardian_hydra.models["crash"] giveback_model = self.proc.guardian_hydra.models["giveback"] for f in scores_files: try: data = pd.read_pickle(f) sig = optimize_dataframe_memory(data.get("signals", None)) if sig is None or len(sig) == 0: continue all_signals.append(sig) sym = str(sig["symbol"].iloc[0]) sim_env[sym] = data["sim_data"] except: pass if not all_signals: return [] timeline_df = pd.concat(all_signals).sort_values("timestamp").reset_index(drop=True) t_ts = timeline_df["timestamp"].values.astype(np.int64) t_sym = timeline_df["symbol"].values t_close = timeline_df["close"].values.astype(np.float64) t_state = timeline_df["coin_state"].values t_gov = timeline_df["gov_score"].values.astype(np.float64) t_oracle = timeline_df["oracle_conf"].values.astype(np.float64) t_titan = timeline_df["titan_score"].values.astype(np.float64) t_sniper = timeline_df["sniper_score"].values.astype(np.float64) t_pattern = timeline_df["pattern_score"].values.astype(np.float64) del all_signals, timeline_df gc.collect() start_ms = int(t_ts[0]) if len(t_ts) else 0 end_ms = int(t_ts[-1]) if len(t_ts) else 0 res = [] BATCH_SIZE = 300 USE_MARK_TO_MARKET_EQUITY = True for i in range(0, len(combinations_batch), BATCH_SIZE): batch = combinations_batch[i : i + BATCH_SIZE] for cfg in batch: cash_bal = float(initial_capital) active_positions = {} last_update_map = {} last_price = {} trade_pnls = [] trade_returns = [] trade_durations = [] equity_curve = [float(initial_capital)] wins_losses = {"wins": 0, "losses": 0} exposure_steps = 0 def mark_to_market_equity(curr_ts): nonlocal exposure_steps open_val = 0.0 has_open = False for s, pos in active_positions.items(): px = last_price.get(s, None) if px is None: continue has_open = True open_val += (pos["qty"] * px * (1 - self.SLIPPAGE_PCT)) * (1 - fees_pct) if has_open: exposure_steps += 1 equity_curve.append(float(cash_bal + open_val)) for curr_ts, sym, p, c_state, gov, oracle, titan, sniper, pattern in zip( t_ts, t_sym, t_close, t_state, t_gov, t_oracle, t_titan, t_sniper, t_pattern ): sym = str(sym) last_price[sym] = float(p) to_close = [] for open_sym, pos in list(active_positions.items()): cash_bal, closed = self._flush_position_interval( cfg, open_sym, pos, curr_ts, sim_env, crash_model, giveback_model, fees_pct, trade_pnls, trade_returns, trade_durations, equity_curve, cash_bal, wins_losses, last_update_map ) if closed: to_close.append(open_sym) for s in to_close: del active_positions[s] if USE_MARK_TO_MARKET_EQUITY: mark_to_market_equity(curr_ts) is_valid = ( (int(c_state) == int(target_state)) and (float(gov) >= float(cfg["GOV_SCORE"])) and (float(oracle) >= float(cfg["ORACLE"])) and (float(titan) >= float(cfg["TITAN"])) and (float(sniper) >= float(cfg["SNIPER"])) and (float(pattern) >= float(cfg["PATTERN"])) ) if is_valid and sym not in active_positions: slots = 1 if cash_bal < self.MIN_CAPITAL_FOR_SPLIT else int(max_slots) if len(active_positions) < slots and cash_bal >= 5.0: size = (cash_bal * 0.95) if cash_bal < self.MIN_CAPITAL_FOR_SPLIT else (cash_bal / max_slots) if size >= 5.0: ep = float(p) * (1 + self.SLIPPAGE_PCT) fee = float(size) * fees_pct cost = float(size) qty = (cost - fee) / (ep + 1e-12) sym_ts = sim_env[sym]["timestamp"] idx = int(np.searchsorted(sym_ts, curr_ts, side="right") - 1) idx = max(0, min(idx, len(sym_ts) - 1)) atr_val = float(sim_env[sym]["atr"][idx]) active_positions[sym] = { "qty": float(qty), "entry_p": float(ep), "cost": float(cost), "entry_ts": int(curr_ts), "sl_p": float(ep - 1.5 * atr_val), "tp_p": float(ep + 2.5 * atr_val), "highest_price": float(ep), } cash_bal -= float(cost) last_update_map[sym] = min(idx + 1, len(sym_ts)) if not trade_pnls: continue max_dd = calc_max_drawdown(equity_curve) ulcer = calc_ulcer_index(equity_curve) wins_list = [p for p in trade_pnls if p > 0] loss_list = [p for p in trade_pnls if p <= 0] prof_fac = calc_profit_factor(wins_list, loss_list) mean_pnl = float(np.mean(trade_pnls)) std_pnl = float(np.std(trade_pnls)) sqn = float((mean_pnl / std_pnl) * np.sqrt(len(trade_pnls))) if std_pnl > 0 else 0.0 sharpe = calc_sharpe(trade_returns) sortino = calc_sortino(trade_returns) cagr = calc_cagr(initial_capital, cash_bal, start_ms, end_ms) calmar = calc_calmar(cagr, max_dd) exposure_pct = float(exposure_steps / max(1, len(t_ts)) * 100.0) max_w_streak, max_l_streak = calc_consecutive_streaks(trade_pnls) payoff = float(np.mean(wins_list) / max(abs(np.mean(loss_list)), 1e-12)) if (wins_list and loss_list) else 99.0 res.append({ "config": cfg, "net_profit": float(cash_bal - initial_capital), "total_trades": int(len(trade_pnls)), "final_balance": float(cash_bal), "win_rate": float((wins_losses["wins"] / len(trade_pnls)) * 100.0), "sqn": sqn, "max_drawdown": float(max_dd), "ulcer_index": ulcer, "profit_factor": prof_fac, "payoff_ratio": payoff, "sharpe": sharpe, "sortino": sortino, "cagr": cagr, "calmar": calmar, "expectancy": mean_pnl, "exposure_pct": exposure_pct, "max_consec_wins": max_w_streak, "max_consec_losses": max_l_streak, }) gc.collect() return res async def run_optimization(self): await self.generate_truth_data() files = glob.glob(os.path.join(CACHE_DIR, "*.pkl")) keys = list(self.GRID_RANGES.keys()) values = [list(self.GRID_RANGES[k]) for k in keys] combos = [] seen = set() while len(combos) < self.MAX_SAMPLES: c = tuple(np.random.choice(v) for v in values) if c not in seen: seen.add(c) combos.append(dict(zip(keys, c))) print(f"โœ… Generated {len(combos)} configs.") for state_name, state_id in [("ACCUMULATION", 1), ("SAFE_TREND", 2), ("EXPLOSIVE", 3)]: print(f"\n๐ŸŒ€ Optimizing [{state_name}]...") results = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS, state_id) if not results: continue results.sort(key=lambda x: (x["calmar"], x["sqn"]), reverse=True) best = results[0] print(f"๐Ÿ† BEST [{state_name}]:") print(f" ๐Ÿ’ฐ Net Profit: ${best['net_profit']:.2f} | Final: ${best['final_balance']:.2f}") print(f" ๐Ÿ“Š Trades: {best['total_trades']} | WR: {best['win_rate']:.1f}% | Exp: {best['expectancy']:.4f}") print(f" ๐ŸŽฒ SQN: {best['sqn']:.2f} | PF: {best['profit_factor']:.2f} | Payoff: {best['payoff_ratio']:.2f}") print(f" ๐Ÿ“‰ MaxDD: {best['max_drawdown']:.2f}% | Ulcer: {best['ulcer_index']:.2f}") print(f" ๐Ÿ“ˆ Sharpe/Sortino: {best['sharpe']:.2f} / {best['sortino']:.2f}") print(f" ๐Ÿงฎ CAGR/Calmar: {(best['cagr']*100):.2f}% / {best['calmar']:.2f}") print(f" โš™๏ธ Config: {best['config']}") # ============================================================ # Runner (Guaranteed cleanup) # ============================================================ async def run_strategic_optimization_task(): r2 = R2Service() dm = DataManager(None, None, r2) proc = MLProcessor(dm) try: await dm.initialize() await proc.initialize() if getattr(proc, "guardian_hydra", None): proc.guardian_hydra.set_silent_mode(True) opt = HeavyDutyBacktester(dm, proc) await opt.run_optimization() except Exception as e: print(f"[ERROR] โŒ Backtest Failed: {e}") traceback.print_exc() finally: try: ex = getattr(dm, "exchange", None) if ex is not None: await ex.close() except Exception: pass try: await dm.close() except Exception: pass if __name__ == "__main__": asyncio.run(run_strategic_optimization_task())