Spaces:
Paused
Paused
| # ============================================================ | |
| # ๐งช backtest_engine.py (V223.5 - GEM-Architect: The Data Floodgates Open) | |
| # FIXES: | |
| # 1) Relaxed Storage Thresholds (Solves "Signals: 0"). | |
| # 2) Includes MFI/Slope/Aliases (Solves Warnings). | |
| # 3) Includes Sniper Shape Fix (Solves Crash). | |
| # ============================================================ | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import time | |
| import logging | |
| import os | |
| import glob | |
| import gc | |
| import traceback | |
| import pickle | |
| from datetime import datetime, timezone, timedelta | |
| try: | |
| import pandas_ta as ta | |
| import xgboost as xgb | |
| import lightgbm as lgb | |
| except ImportError as e: | |
| raise ImportError(f"๐ด CRITICAL: Missing mandatory dependency for Truth Mode: {e}") | |
| try: | |
| from ml_engine.processor import MLProcessor | |
| from ml_engine.data_manager import DataManager | |
| from learning_hub.adaptive_hub import AdaptiveHub | |
| from r2 import R2Service | |
| from governance_engine import GovernanceEngine | |
| except ImportError: | |
| pass | |
| logging.getLogger("ml_engine").setLevel(logging.WARNING) | |
| CACHE_DIR = "backtest_v223_immutable" | |
| GOV_CACHE_DIR = os.path.join(CACHE_DIR, "gov_cache") | |
| PATTERN_CACHE_DIR = os.path.join(CACHE_DIR, "patterns_cache") | |
| for d in [CACHE_DIR, GOV_CACHE_DIR, PATTERN_CACHE_DIR]: | |
| if not os.path.exists(d): | |
| os.makedirs(d) | |
| # ============================================================ | |
| # ๐ ๏ธ HELPERS | |
| # ============================================================ | |
| def optimize_dataframe_memory(df: pd.DataFrame): | |
| if df is None or len(df) == 0: | |
| return df | |
| float_cols = df.select_dtypes(include=["float64"]).columns | |
| if len(float_cols) > 0: | |
| df[float_cols] = df[float_cols].astype("float32") | |
| int_cols = df.select_dtypes(include=["int64", "int32"]).columns | |
| for col in int_cols: | |
| c_min = df[col].min() | |
| c_max = df[col].max() | |
| if c_min > -128 and c_max < 127: | |
| df[col] = df[col].astype("int8") | |
| elif c_min > -32768 and c_max < 32767: | |
| df[col] = df[col].astype("int16") | |
| else: | |
| df[col] = df[col].astype("int32") | |
| return df | |
| def tf_to_offset(tf: str): | |
| if tf.endswith("m") and tf[:-1].isdigit(): | |
| return f"{int(tf[:-1])}T" | |
| if tf.endswith("h") and tf[:-1].isdigit(): | |
| return f"{int(tf[:-1])}h" | |
| if tf.endswith("d") and tf[:-1].isdigit(): | |
| return f"{int(tf[:-1])}D" | |
| return None | |
| def calc_max_drawdown(equity_curve): | |
| if not equity_curve: | |
| return 0.0 | |
| eq = np.array(equity_curve, dtype=np.float64) | |
| peak = np.maximum.accumulate(eq) | |
| dd = (eq - peak) / (peak + 1e-9) | |
| return float(dd.min()) * 100 | |
| def calc_profit_factor(wins, losses): | |
| gross_win = np.sum(wins) | |
| gross_loss = abs(np.sum(losses)) | |
| if gross_loss < 1e-9: | |
| return 99.0 | |
| return float(gross_win / gross_loss) | |
| def calc_ulcer_index(equity_curve): | |
| if not equity_curve: | |
| return 0.0 | |
| eq = np.asarray(equity_curve, dtype=np.float64) | |
| peak = np.maximum.accumulate(eq) | |
| dd_pct = (eq - peak) / (peak + 1e-12) * 100.0 | |
| return float(np.sqrt(np.mean(dd_pct**2))) | |
| def calc_sharpe(returns, eps=1e-12): | |
| if returns is None or len(returns) < 2: | |
| return 0.0 | |
| r = np.asarray(returns, dtype=np.float64) | |
| mu = np.mean(r) | |
| sd = np.std(r) | |
| if sd < eps: | |
| return 0.0 | |
| return float(mu / sd * np.sqrt(len(r))) | |
| def calc_sortino(returns, eps=1e-12): | |
| if returns is None or len(returns) < 2: | |
| return 0.0 | |
| r = np.asarray(returns, dtype=np.float64) | |
| mu = np.mean(r) | |
| downside = r[r < 0] | |
| if len(downside) < 1: | |
| return 99.0 | |
| dd = np.std(downside) | |
| if dd < eps: | |
| return 0.0 | |
| return float(mu / dd * np.sqrt(len(r))) | |
| def calc_cagr(initial_capital, final_balance, start_ms, end_ms): | |
| if initial_capital <= 0 or final_balance <= 0 or end_ms <= start_ms: | |
| return 0.0 | |
| years = (end_ms - start_ms) / (1000.0 * 60 * 60 * 24 * 365.25) | |
| if years < 1e-6: | |
| return 0.0 | |
| try: | |
| return float((final_balance / initial_capital) ** (1.0 / years) - 1.0) | |
| except: | |
| return 0.0 | |
| def calc_calmar(cagr, max_drawdown_pct): | |
| dd = abs(max_drawdown_pct) | |
| if dd < 1e-9: | |
| return 99.0 | |
| return float((cagr * 100.0) / dd) | |
| def calc_consecutive_streaks(pnls): | |
| max_w = max_l = 0 | |
| cur_w = cur_l = 0 | |
| for p in pnls: | |
| if p > 0: | |
| cur_w += 1 | |
| cur_l = 0 | |
| else: | |
| cur_l += 1 | |
| cur_w = 0 | |
| max_w = max(max_w, cur_w) | |
| max_l = max(max_l, cur_l) | |
| return int(max_w), int(max_l) | |
| # ============================================================ | |
| # ๐งช BACKTESTER | |
| # ============================================================ | |
| class HeavyDutyBacktester: | |
| def __init__(self, data_manager, processor): | |
| self.dm = data_manager | |
| self.proc = processor | |
| self.gov_engine = GovernanceEngine() | |
| # If True: raise on missing features. If False: fill 0 and continue. | |
| self.STRICT_FEATURES = False | |
| self._missing_feature_once = set() | |
| self._verify_system_integrity() | |
| self.GRID_DENSITY = 6 | |
| self.MAX_SAMPLES = 3000 | |
| self.INITIAL_CAPITAL = 10.0 | |
| self.MIN_CAPITAL_FOR_SPLIT = 20.0 | |
| self.TRADING_FEES = 0.001 | |
| self.SLIPPAGE_PCT = 0.0005 | |
| self.MAX_SLOTS = 4 | |
| self.GRID_RANGES = { | |
| "TITAN": np.linspace(0.40, 0.70, self.GRID_DENSITY), | |
| "ORACLE": np.linspace(0.55, 0.80, self.GRID_DENSITY), | |
| "SNIPER": np.linspace(0.30, 0.65, self.GRID_DENSITY), | |
| "PATTERN": np.linspace(0.30, 0.70, self.GRID_DENSITY), | |
| "GOV_SCORE": np.linspace(50.0, 80.0, self.GRID_DENSITY), | |
| "HYDRA_THRESH": np.linspace(0.60, 0.90, self.GRID_DENSITY), | |
| "LEGACY_THRESH": np.linspace(0.85, 0.98, self.GRID_DENSITY), | |
| } | |
| self.TARGET_COINS = [ | |
| "SOL/USDT", "XRP/USDT", "DOGE/USDT" ] | |
| self.USE_FIXED_DATES = False | |
| self.LOOKBACK_DAYS = 60 | |
| self.force_start_date = "2024-01-01" | |
| self.force_end_date = "2024-02-01" | |
| self.required_timeframes = self._determine_required_timeframes() | |
| print(f"๐งช [Backtest V223.5] IMMUTABLE TRUTH (Patched & Open Gates). TFs: {self.required_timeframes}") | |
| def _verify_system_integrity(self): | |
| errors = [] | |
| if not getattr(self.proc, "titan", None) or not getattr(self.proc.titan, "model", None): | |
| errors.append("โ Titan Engine missing") | |
| if not getattr(self.proc, "oracle", None) or not getattr(self.proc.oracle, "model_direction", None): | |
| errors.append("โ Oracle Engine missing") | |
| if not getattr(self.proc, "pattern_engine", None): | |
| errors.append("โ Pattern Engine missing") | |
| if not getattr(self.proc, "sniper", None): | |
| errors.append("โ Sniper Engine missing") | |
| if not getattr(self.proc, "guardian_hydra", None) or not getattr(self.proc.guardian_hydra, "models", None): | |
| errors.append("โ Hydra Guardian missing/models not loaded") | |
| else: | |
| m = self.proc.guardian_hydra.models | |
| if "crash" not in m or "giveback" not in m: | |
| errors.append("โ Hydra missing crash/giveback heads") | |
| try: | |
| _ = m["crash"].predict_proba | |
| _ = m["giveback"].predict_proba | |
| except: | |
| errors.append("โ Hydra heads must implement predict_proba()") | |
| if errors: | |
| raise RuntimeError(f"CRITICAL INTEGRITY FAILURE: {errors}") | |
| def _determine_required_timeframes(self): | |
| tfs = set(["5m", "15m", "1h", "4h"]) | |
| def maybe_add(prefix: str): | |
| if tf_to_offset(prefix): | |
| tfs.add(prefix) | |
| if hasattr(self.proc.titan.model, "feature_names"): | |
| for f in self.proc.titan.model.feature_names: | |
| if "_" in f: | |
| maybe_add(f.split("_", 1)[0]) | |
| if hasattr(self.proc.oracle, "feature_cols"): | |
| for f in self.proc.oracle.feature_cols: | |
| if "_" in f: | |
| maybe_add(f.split("_", 1)[0]) | |
| return list(tfs) | |
| # -------------------------- | |
| # Indicator Hardening Layer (FIXED: MFI, Slope, Aliases) | |
| # -------------------------- | |
| def _safe_bbands(close: pd.Series, length=20, std=2.0): | |
| basis = close.rolling(length).mean() | |
| dev = close.rolling(length).std(ddof=0) | |
| upper = basis + std * dev | |
| lower = basis - std * dev | |
| width = (upper - lower) / (basis.abs() + 1e-12) | |
| pct = (close - lower) / ((upper - lower) + 1e-12) | |
| return lower, upper, width, pct | |
| def _calculate_all_indicators(self, df: pd.DataFrame): | |
| cols = ["open", "high", "low", "close", "volume"] | |
| for c in cols: | |
| df[c] = pd.to_numeric(df[c], errors="coerce") | |
| df[cols] = df[cols].replace([np.inf, -np.inf], np.nan) | |
| df.dropna(subset=["close", "high", "low"], inplace=True) | |
| c = df["close"].astype(np.float64) | |
| h = df["high"].astype(np.float64) | |
| l = df["low"].astype(np.float64) | |
| v = df["volume"].astype(np.float64) if "volume" in df.columns else pd.Series(np.zeros(len(df)), index=df.index) | |
| # ---------------------------------------------------- | |
| # โ [GEM-FIX] Compatibility Bridge (Oracle & Titan) | |
| # ---------------------------------------------------- | |
| # 1. Oracle: Slope | |
| try: df['slope'] = ta.slope(c, length=7).fillna(0) | |
| except: df['slope'] = 0.0 | |
| # 2. Titan: MFI | |
| try: df['MFI'] = ta.mfi(h, l, c, v, length=14).fillna(50) | |
| except: df['MFI'] = 50.0 | |
| # 3. Oracle Mapping (LowerCase Aliases) | |
| df["RSI"] = ta.rsi(c, length=14).fillna(50) | |
| df["rsi"] = df["RSI"] # Alias | |
| df["ATR"] = ta.atr(h, l, c, length=14).fillna(0) | |
| df["ATR_pct"] = (df["ATR"] / (c + 1e-12)) * 100 | |
| df["atr_pct"] = df["ATR_pct"] # Alias | |
| # 4. Oracle: Volume Z-Score (vol_z) | |
| vol_mean = v.rolling(20).mean() | |
| vol_std = v.rolling(20).std() | |
| df["vol_z"] = ((v - vol_mean) / (vol_std + 1e-9)).fillna(0) # For Oracle | |
| # 5. Titan: Trend Strong (Approx) | |
| adx_df = ta.adx(h, l, c, length=14) | |
| if adx_df is not None and not adx_df.empty: | |
| df["ADX"] = adx_df.iloc[:, 0].fillna(0) | |
| df["Trend_Strong"] = np.where(df["ADX"] > 25, 1, 0) | |
| else: | |
| df["ADX"] = 0.0 | |
| df["Trend_Strong"] = 0 | |
| # ---------------------------------------------------- | |
| try: | |
| df["CHOP"] = ta.chop(h, l, c, length=14).fillna(50) | |
| except: | |
| df["CHOP"] = 50 | |
| try: | |
| df["vwap"] = ta.vwap(h, l, c, v).fillna(c) | |
| except: | |
| df["vwap"] = c | |
| try: | |
| df["CCI"] = ta.cci(h, l, c, length=20).fillna(0) | |
| except: | |
| df["CCI"] = 0.0 | |
| # EMAs | |
| for span in [9, 20, 21, 50, 200]: | |
| df[f"ema{span}"] = c.ewm(span=span, adjust=False).mean() | |
| # Derived | |
| df["EMA_9_dist"] = (c / (df["ema9"] + 1e-12)) - 1 | |
| df["EMA_21_dist"] = (c / (df["ema21"] + 1e-12)) - 1 | |
| df["EMA_50_dist"] = (c / (df["ema50"] + 1e-12)) - 1 | |
| df["EMA_200_dist"] = (c / (df["ema200"] + 1e-12)) - 1 | |
| df["VWAP_dist"] = (c / (df["vwap"] + 1e-12)) - 1 | |
| # BBANDS | |
| if len(df) < 30: | |
| df["lower_bb"] = c; df["upper_bb"] = c; df["bb_width"] = 0.0; df["bb_pct"] = 0.5 | |
| df["BB_w"] = 0.0; df["BB_p"] = 0.5 | |
| else: | |
| bb = ta.bbands(c, length=20, std=2.0) | |
| if bb is not None and isinstance(bb, pd.DataFrame): | |
| col_w = [x for x in bb.columns if "BBB" in x] | |
| col_p = [x for x in bb.columns if "BBP" in x] | |
| col_l = [x for x in bb.columns if "BBL" in x] | |
| col_u = [x for x in bb.columns if "BBU" in x] | |
| df["bb_width"] = bb[col_w[0]] if col_w else 0.0 | |
| df["bb_pct"] = bb[col_p[0]] if col_p else 0.5 | |
| df["lower_bb"] = bb[col_l[0]] if col_l else c | |
| df["upper_bb"] = bb[col_u[0]] if col_u else c | |
| df["BB_w"] = df["bb_width"] | |
| df["BB_p"] = df["bb_pct"] | |
| else: | |
| df["lower_bb"] = c; df["upper_bb"] = c; df["bb_width"] = 0.0; df["bb_pct"] = 0.5 | |
| df["BB_w"] = 0.0; df["BB_p"] = 0.5 | |
| # MACD | |
| macd = ta.macd(c) | |
| if macd is not None and not macd.empty: | |
| df["MACD"] = macd.iloc[:, 0] | |
| df["MACD_h"] = macd.iloc[:, 1] | |
| df["MACD_s"] = macd.iloc[:, 2] | |
| else: | |
| df["MACD"] = 0.0; df["MACD_h"] = 0.0; df["MACD_s"] = 0.0 | |
| mean_vol = v.rolling(50).mean() + 1e-9 | |
| df["rel_vol"] = v / mean_vol | |
| df["log_ret"] = np.concatenate([[0], np.diff(np.log(c + 1e-9))]) | |
| return df.fillna(0) | |
| def _warn_missing_once(self, msg: str): | |
| if msg in self._missing_feature_once: | |
| return | |
| self._missing_feature_once.add(msg) | |
| print(f"[WARN] {msg}") | |
| async def _fetch_all_data_fast(self, sym, start_ms, end_ms): | |
| print(f" โก [Network] Downloading {sym}...", flush=True) | |
| limit = 1000 | |
| tasks = [] | |
| curr = start_ms | |
| while curr < end_ms: | |
| tasks.append(curr) | |
| curr += limit * 60 * 1000 | |
| all_candles = [] | |
| sem = asyncio.Semaphore(20) | |
| async def _fetch_batch(timestamp): | |
| async with sem: | |
| for _ in range(3): | |
| try: | |
| return await self.dm.exchange.fetch_ohlcv(sym, "1m", since=timestamp, limit=limit) | |
| except: | |
| await asyncio.sleep(0.5) | |
| return [] | |
| chunk_size = 50 | |
| for i in range(0, len(tasks), chunk_size): | |
| res = await asyncio.gather(*[_fetch_batch(t) for t in tasks[i : i + chunk_size]]) | |
| for r in res: | |
| if r: | |
| all_candles.extend(r) | |
| if not all_candles: | |
| return None | |
| df = pd.DataFrame(all_candles, columns=["timestamp", "o", "h", "l", "c", "v"]) | |
| df.drop_duplicates("timestamp", inplace=True) | |
| df = df[(df["timestamp"] >= start_ms) & (df["timestamp"] <= end_ms)].sort_values("timestamp") | |
| return df.values.tolist() | |
| async def _process_data_in_memory(self, sym, candles, start_ms, end_ms): | |
| safe_sym = sym.replace("/", "_") | |
| period_suffix = f"{start_ms}_{end_ms}" | |
| scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_processed.pkl" | |
| if os.path.exists(scores_file): | |
| print(f" ๐ [{sym}] Loaded Cache.") | |
| return | |
| print(f" โ๏ธ [CPU] Processing {sym} (Truth Mode)...", flush=True) | |
| t0 = time.time() | |
| df_1m = pd.DataFrame(candles, columns=["timestamp", "open", "high", "low", "close", "volume"]) | |
| df_1m["datetime"] = pd.to_datetime(df_1m["timestamp"] + 60000, unit="ms", utc=True) | |
| df_1m.set_index("datetime", inplace=True) | |
| df_1m = df_1m.sort_index() | |
| df_1m = self._calculate_all_indicators(df_1m) | |
| if len(df_1m) < 300: | |
| raise RuntimeError(f"{sym} has too few valid candles after cleaning: {len(df_1m)}") | |
| arr_ts_1m = (df_1m.index.astype(np.int64) // 10**6).values | |
| fast_1m_close = df_1m["close"].values.astype(np.float32) | |
| numpy_htf = {} | |
| agg_dict = {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} | |
| for tf_str in self.required_timeframes: | |
| offset = tf_to_offset(tf_str) | |
| if not offset: | |
| continue | |
| resampled = df_1m.resample(offset, label="right", closed="right").agg(agg_dict).dropna() | |
| resampled = self._calculate_all_indicators(resampled) | |
| if len(resampled) == 0: | |
| continue | |
| resampled["timestamp"] = resampled.index.astype(np.int64) // 10**6 | |
| numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns} | |
| if "1h" not in numpy_htf: | |
| raise RuntimeError(f"CRITICAL: '1h' missing for {sym}.") | |
| def get_safe_map(tf): | |
| if tf not in numpy_htf or len(numpy_htf[tf]["timestamp"]) == 0: | |
| return np.full(len(arr_ts_1m), -1, dtype=np.int32) | |
| htf_ts = numpy_htf[tf]["timestamp"] | |
| idx = np.searchsorted(htf_ts, arr_ts_1m, side="right") - 1 | |
| return idx.astype(np.int32) | |
| maps = {tf: get_safe_map(tf) for tf in self.required_timeframes} | |
| validity_mask = np.ones(len(arr_ts_1m), dtype=bool) | |
| for tf in maps: | |
| validity_mask &= (maps[tf] >= 0) | |
| validity_mask[:200] = False | |
| # 1) Pattern (Cached) | |
| global_pattern_scores = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| pat_cache_file = os.path.join(PATTERN_CACHE_DIR, f"{safe_sym}_{period_suffix}_pat.pkl") | |
| pattern_results_map = {} | |
| if os.path.exists(pat_cache_file): | |
| with open(pat_cache_file, "rb") as f: | |
| pattern_results_map = pickle.load(f) | |
| elif "15m" in numpy_htf: | |
| ts_15m = numpy_htf["15m"]["timestamp"] | |
| cols = ["timestamp", "open", "high", "low", "close", "volume"] | |
| df_15m_source = pd.DataFrame({c: numpy_htf["15m"][c] for c in cols}) | |
| for i in range(200, len(df_15m_source)): | |
| window = df_15m_source.iloc[i - 200 : i + 1] | |
| ohlcv_input = {"15m": window.values.tolist()} | |
| try: | |
| res = await self.proc.pattern_engine.detect_chart_patterns(ohlcv_input) | |
| pattern_results_map[ts_15m[i]] = res.get("pattern_confidence", 0.0) | |
| except: | |
| pass | |
| with open(pat_cache_file, "wb") as f: | |
| pickle.dump(pattern_results_map, f) | |
| if "15m" in maps and "15m" in numpy_htf: | |
| map_15 = maps["15m"] | |
| ts_15_arr = numpy_htf["15m"]["timestamp"] | |
| for i in range(len(arr_ts_1m)): | |
| if not validity_mask[i]: | |
| continue | |
| idx = map_15[i] | |
| if idx >= 0: | |
| global_pattern_scores[i] = pattern_results_map.get(ts_15_arr[idx], 0.0) | |
| # 2) Governance (Cached) | |
| gov_scores_final = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| gov_cache_file = os.path.join(GOV_CACHE_DIR, f"{safe_sym}_{period_suffix}_gov.pkl") | |
| gov_results_map = {} | |
| if os.path.exists(gov_cache_file): | |
| with open(gov_cache_file, "rb") as f: | |
| gov_results_map = pickle.load(f) | |
| elif "15m" in numpy_htf: | |
| cols = ["timestamp", "open", "high", "low", "close", "volume"] | |
| df_15m_g = pd.DataFrame({c: numpy_htf["15m"][c] for c in cols}) | |
| ts_15m = numpy_htf["15m"]["timestamp"] | |
| has_1h = "1h" in numpy_htf | |
| df_1h_g = pd.DataFrame({c: numpy_htf["1h"][c] for c in cols}) if has_1h else None | |
| ts_1h = numpy_htf["1h"]["timestamp"] if has_1h else None | |
| for i in range(200, len(df_15m_g)): | |
| curr_ts = ts_15m[i] | |
| win_15 = df_15m_g.iloc[i - 120 : i + 1] | |
| ohlcv_input = {"15m": win_15.values.tolist()} | |
| if has_1h: | |
| idx_1h = np.searchsorted(ts_1h, curr_ts, side="right") - 1 | |
| if idx_1h >= 50: | |
| ohlcv_input["1h"] = df_1h_g.iloc[idx_1h - 60 : idx_1h + 1].values.tolist() | |
| try: | |
| res = await self.gov_engine.evaluate_trade(sym, ohlcv_input, {}, "NORMAL", False, has_1h) | |
| score = res.get("governance_score", 0.0) if res.get("grade") != "REJECT" else 0.0 | |
| gov_results_map[curr_ts] = score | |
| except: | |
| pass | |
| with open(gov_cache_file, "wb") as f: | |
| pickle.dump(gov_results_map, f) | |
| if "15m" in maps and "15m" in numpy_htf: | |
| map_15 = maps["15m"] | |
| ts_15_arr = numpy_htf["15m"]["timestamp"] | |
| for i in range(len(arr_ts_1m)): | |
| if not validity_mask[i]: | |
| continue | |
| idx = map_15[i] | |
| if idx >= 0: | |
| gov_scores_final[i] = gov_results_map.get(ts_15_arr[idx], 0.0) | |
| # 3) Market State | |
| map_1h = maps["1h"] | |
| valid_1h = map_1h >= 0 | |
| idx_1h = map_1h[valid_1h] | |
| h1_chop = numpy_htf["1h"]["CHOP"][idx_1h] | |
| h1_adx = numpy_htf["1h"]["ADX"][idx_1h] | |
| h1_atr_pct = numpy_htf["1h"]["ATR_pct"][idx_1h] | |
| market_ok = np.ones(len(arr_ts_1m), dtype=bool) | |
| market_ok[valid_1h] = ~((h1_chop > 61.8) | ((h1_atr_pct < 0.3) & (h1_adx < 20))) | |
| coin_state = np.zeros(len(arr_ts_1m), dtype=np.int8) | |
| h1_rsi = numpy_htf["1h"]["RSI"][idx_1h] | |
| h1_bbw = numpy_htf["1h"]["bb_width"][idx_1h] | |
| h1_upper = numpy_htf["1h"]["upper_bb"][idx_1h] | |
| h1_ema20 = numpy_htf["1h"]["ema20"][idx_1h] | |
| h1_ema50 = numpy_htf["1h"]["ema50"][idx_1h] | |
| h1_ema200 = numpy_htf["1h"]["ema200"][idx_1h] | |
| h1_close = numpy_htf["1h"]["close"][idx_1h] | |
| h1_rel_vol = numpy_htf["1h"]["rel_vol"][idx_1h] | |
| mask_acc = (h1_bbw < 0.20) & (h1_rsi >= 35) & (h1_rsi <= 65) | |
| mask_safe = (h1_adx > 25) & (h1_ema20 > h1_ema50) & (h1_ema50 > h1_ema200) & (h1_rsi > 50) & (h1_rsi < 75) | |
| mask_exp = (h1_rsi > 65) & (h1_close > h1_upper) & (h1_rel_vol > 1.5) | |
| state_buffer = np.zeros(len(idx_1h), dtype=np.int8) | |
| state_buffer[mask_acc] = 1 | |
| state_buffer[mask_safe] = 2 | |
| state_buffer[mask_exp] = 3 | |
| coin_state[valid_1h] = state_buffer | |
| coin_state[~validity_mask] = 0 | |
| coin_state[~market_ok] = 0 | |
| # ========================= | |
| # 4) Titan & Oracle (Hardened) | |
| # ========================= | |
| titan_cols = self.proc.titan.model.feature_names | |
| t_vecs = [] | |
| for col in titan_cols: | |
| parts = col.split("_", 1) | |
| if len(parts) < 2: | |
| raise ValueError(f"Titan Feature Format Error: {col}") | |
| tf = parts[0] | |
| raw_feat = parts[1] | |
| lookup_key = "bb_pct" if raw_feat in ["BB_p", "BB_pct"] else ("bb_width" if raw_feat == "BB_w" else raw_feat) | |
| if tf not in numpy_htf: | |
| if self.STRICT_FEATURES: | |
| raise ValueError(f"Titan requires TF not built: {tf} (feature: {col})") | |
| self._warn_missing_once(f"Titan TF missing -> {col}. Filled 0.") | |
| t_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) | |
| continue | |
| if lookup_key not in numpy_htf[tf] and lookup_key != "timestamp": | |
| if self.STRICT_FEATURES: | |
| raise ValueError(f"Missing Titan Feature: {col}") | |
| self._warn_missing_once(f"Missing Titan Feature -> {col}. Filled 0.") | |
| t_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) | |
| continue | |
| idx = maps[tf] | |
| vals = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| valid = idx >= 0 | |
| if lookup_key == "timestamp": | |
| vals[valid] = numpy_htf[tf]["timestamp"][idx[valid]] | |
| else: | |
| vals[valid] = numpy_htf[tf][lookup_key][idx[valid]] | |
| t_vecs.append(vals) | |
| X_TITAN = np.column_stack(t_vecs) | |
| global_titan_scores = self.proc.titan.model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols)) | |
| oracle_cols = self.proc.oracle.feature_cols | |
| o_vecs = [] | |
| for col in oracle_cols: | |
| if col == "sim_titan_score": | |
| o_vecs.append(global_titan_scores.astype(np.float32)) | |
| elif col in ["sim_pattern_score", "pattern_score"]: | |
| o_vecs.append(global_pattern_scores.astype(np.float32)) | |
| elif col == "sim_mc_score": | |
| o_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) | |
| else: | |
| parts = col.split("_", 1) | |
| if len(parts) != 2: | |
| raise ValueError(f"Oracle Feature Error: {col}") | |
| tf, key = parts | |
| if tf not in numpy_htf: | |
| if self.STRICT_FEATURES: | |
| raise ValueError(f"Oracle requires TF not built: {tf} (feature: {col})") | |
| self._warn_missing_once(f"Oracle TF missing -> {col}. Filled 0.") | |
| o_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) | |
| continue | |
| if key not in numpy_htf[tf]: | |
| if self.STRICT_FEATURES: | |
| raise ValueError(f"Missing Oracle Feature: {col}") | |
| self._warn_missing_once(f"Missing Oracle Feature -> {col}. Filled 0.") | |
| o_vecs.append(np.zeros(len(arr_ts_1m), dtype=np.float32)) | |
| continue | |
| idx = maps[tf] | |
| vals = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| valid = idx >= 0 | |
| vals[valid] = numpy_htf[tf][key][idx[valid]] | |
| o_vecs.append(vals) | |
| X_ORACLE = np.column_stack(o_vecs) | |
| preds_o = self.proc.oracle.model_direction.predict(X_ORACLE) | |
| if isinstance(preds_o, np.ndarray) and len(preds_o.shape) > 1: | |
| preds_o = preds_o[:, 0] | |
| global_oracle_scores = preds_o.astype(np.float32) | |
| # 5) Sniper (GEM-FIXED: Shape & Broadcasting Safety) | |
| df_sniper_feats = self.proc.sniper._calculate_features_live(df_1m) | |
| X_sniper = df_sniper_feats[self.proc.sniper.feature_names].fillna(0) | |
| preds_accum = np.zeros(len(X_sniper), dtype=np.float32) | |
| for model in self.proc.sniper.models: | |
| raw_preds = model.predict(X_sniper) | |
| if len(raw_preds.shape) > 1 and raw_preds.shape[1] > 1: | |
| preds_accum += raw_preds[:, 1].astype(np.float32) | |
| else: | |
| preds_accum += raw_preds.astype(np.float32) | |
| global_sniper_scores = (preds_accum / max(1, len(self.proc.sniper.models))).astype(np.float32) | |
| # 6) Hydra Static | |
| map_5 = maps["5m"] | |
| map_15 = maps["15m"] | |
| map_1 = maps.get("1h", map_15) | |
| f_rsi_1m = df_1m["RSI"].values.astype(np.float32) | |
| f_rsi_5m = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| v5 = map_5 >= 0 | |
| if "5m" in numpy_htf and "RSI" in numpy_htf["5m"]: | |
| f_rsi_5m[v5] = numpy_htf["5m"]["RSI"][map_5[v5]].astype(np.float32) | |
| f_rsi_15m = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| v15 = map_15 >= 0 | |
| if "15m" in numpy_htf and "RSI" in numpy_htf["15m"]: | |
| f_rsi_15m[v15] = numpy_htf["15m"]["RSI"][map_15[v15]].astype(np.float32) | |
| f_dist_1h = np.zeros(len(arr_ts_1m), dtype=np.float32) | |
| v1 = map_1 >= 0 | |
| ema20_1h = numpy_htf["1h"]["ema20"][map_1[v1]].astype(np.float32) | |
| close_1h = numpy_htf["1h"]["close"][map_1[v1]].astype(np.float32) | |
| f_dist_1h[v1] = (close_1h - ema20_1h) / (close_1h + 1e-12) | |
| hydra_static = np.column_stack( | |
| [ | |
| f_rsi_1m, | |
| f_rsi_5m, | |
| f_rsi_15m, | |
| df_1m["bb_width"].values.astype(np.float32), | |
| df_1m["rel_vol"].values.astype(np.float32), | |
| f_dist_1h, | |
| (df_1m["ATR_pct"].values.astype(np.float32) / 100.0), | |
| ] | |
| ).astype(np.float32) | |
| # ========================================== | |
| # ๐ข GEM-FIX: Relaxed Saving Thresholds | |
| # ========================================== | |
| min_gov = 0.01 | |
| min_oracle = 0.01 | |
| min_titan = 0.01 | |
| min_sniper = 0.01 | |
| min_pattern = 0.01 | |
| filter_mask = ( | |
| validity_mask | |
| & (coin_state > 0) | |
| & (gov_scores_final >= min_gov) | |
| & (global_oracle_scores >= min_oracle) | |
| & (global_titan_scores >= min_titan) | |
| & (global_sniper_scores >= min_sniper) | |
| & (global_pattern_scores >= min_pattern) | |
| ) | |
| valid_idxs = np.where(filter_mask)[0] | |
| signals_df = pd.DataFrame( | |
| { | |
| "timestamp": arr_ts_1m[valid_idxs], | |
| "symbol": sym, | |
| "close": fast_1m_close[valid_idxs], | |
| "coin_state": coin_state[valid_idxs], | |
| "gov_score": gov_scores_final[valid_idxs], | |
| "titan_score": global_titan_scores[valid_idxs].astype(np.float32), | |
| "oracle_conf": global_oracle_scores[valid_idxs].astype(np.float32), | |
| "sniper_score": global_sniper_scores[valid_idxs].astype(np.float32), | |
| "pattern_score": global_pattern_scores[valid_idxs].astype(np.float32), | |
| } | |
| ) | |
| sim_data = { | |
| "timestamp": arr_ts_1m.astype(np.int64), | |
| "close": fast_1m_close, | |
| "high": df_1m["high"].values.astype(np.float32), | |
| "low": df_1m["low"].values.astype(np.float32), | |
| "atr": df_1m["ATR"].values.astype(np.float32), | |
| "hydra_static": hydra_static, | |
| "oracle_conf": global_oracle_scores.astype(np.float32), | |
| "titan_score": global_titan_scores.astype(np.float32), | |
| } | |
| pd.to_pickle({"signals": signals_df, "sim_data": sim_data}, scores_file) | |
| dt = time.time() - t0 | |
| print(f" โ [{sym}] Processed in {dt:.2f}s. Signals: {len(signals_df)}") | |
| gc.collect() | |
| async def generate_truth_data(self): | |
| if self.USE_FIXED_DATES: | |
| dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| else: | |
| now = datetime.now(timezone.utc) | |
| dt_s = now - timedelta(days=self.LOOKBACK_DAYS) | |
| dt_e = now | |
| ms_s = int(dt_s.timestamp() * 1000) | |
| ms_e = int(dt_e.timestamp() * 1000) | |
| for sym in self.TARGET_COINS: | |
| try: | |
| c = await self._fetch_all_data_fast(sym, ms_s, ms_e) | |
| if c: | |
| await self._process_data_in_memory(sym, c, ms_s, ms_e) | |
| except Exception as e: | |
| print(f"[WARN] {sym} skipped due to error: {e}") | |
| traceback.print_exc() | |
| # ========================= | |
| # Optimization core (unchanged from your last version) | |
| # ========================= | |
| def _flush_position_interval( | |
| self, cfg, open_sym, pos, curr_ts, sim_env, crash_model, giveback_model, fees_pct, | |
| trade_pnls, trade_returns, trade_durations, equity_curve, cash_bal, wins_losses, | |
| last_update_map, end_idx_override=None | |
| ): | |
| c_data = sim_env[open_sym] | |
| full_ts = c_data["timestamp"] | |
| start_idx = int(last_update_map.get(open_sym, 0)) | |
| if start_idx < 0: | |
| start_idx = 0 | |
| if end_idx_override is None: | |
| end_idx = int(np.searchsorted(full_ts, curr_ts, side="right")) | |
| else: | |
| end_idx = int(end_idx_override) | |
| end_idx = min(end_idx, len(full_ts)) | |
| if end_idx <= start_idx: | |
| return cash_bal, False | |
| interval_high = c_data["high"][start_idx:end_idx] | |
| interval_low = c_data["low"][start_idx:end_idx] | |
| interval_close = c_data["close"][start_idx:end_idx] | |
| interval_atr = c_data["atr"][start_idx:end_idx] | |
| h_static = c_data["hydra_static"][start_idx:end_idx] | |
| h_oracle = c_data["oracle_conf"][start_idx:end_idx] | |
| h_titan = c_data["titan_score"][start_idx:end_idx] | |
| entry_p = float(pos["entry_p"]) | |
| entry_time = int(pos["entry_ts"]) | |
| prev_high = float(pos.get("highest_price", entry_p)) | |
| current_highs = np.maximum.accumulate(np.concatenate([[prev_high], interval_high]))[1:] | |
| pos["highest_price"] = float(current_highs[-1]) | |
| durations = (full_ts[start_idx:end_idx] - entry_time) / 60000.0 | |
| sl_dist = np.maximum(1.5 * interval_atr, 1e-8) | |
| pnl = interval_close - entry_p | |
| norm_pnl = pnl / sl_dist | |
| max_pnl = (current_highs - entry_p) / sl_dist | |
| zeros = np.zeros(len(interval_close), dtype=np.float32) | |
| h_dynamic = np.column_stack([norm_pnl, max_pnl, zeros, zeros, durations]).astype(np.float32) | |
| threes = np.full(len(interval_close), 3.0, dtype=np.float32) | |
| h_context = np.column_stack([zeros, h_oracle, h_titan, threes]).astype(np.float32) | |
| X_H = np.column_stack([h_static, h_dynamic, h_context]).astype(np.float32) | |
| crash_probs = crash_model.predict_proba(X_H)[:, 1] | |
| give_probs = giveback_model.predict_proba(X_H)[:, 1] | |
| sl_hit = interval_low < pos["sl_p"] | |
| tp_hit = interval_high > pos["tp_p"] | |
| hydra_hit = (crash_probs > cfg["HYDRA_THRESH"]) | (give_probs > cfg["HYDRA_THRESH"]) | |
| legacy_hit = (crash_probs > cfg["LEGACY_THRESH"]) | (give_probs > cfg["LEGACY_THRESH"]) | |
| any_exit = sl_hit | tp_hit | legacy_hit | hydra_hit | |
| last_update_map[open_sym] = end_idx | |
| if not np.any(any_exit): | |
| return cash_bal, False | |
| idx = int(np.argmax(any_exit)) | |
| exit_ts = int(full_ts[start_idx + idx]) | |
| if sl_hit[idx]: | |
| exit_p = float(pos["sl_p"]) * (1 - self.SLIPPAGE_PCT) | |
| elif tp_hit[idx]: | |
| exit_p = float(pos["tp_p"]) * (1 - self.SLIPPAGE_PCT) | |
| elif legacy_hit[idx]: | |
| exit_p = float(interval_close[idx]) * (1 - (self.SLIPPAGE_PCT * 2.0)) | |
| else: | |
| exit_p = float(interval_close[idx]) * (1 - self.SLIPPAGE_PCT) | |
| net = (pos["qty"] * exit_p) * (1 - fees_pct) | |
| cash_bal += net | |
| pnl_real = float(net - pos["cost"]) | |
| trade_pnls.append(pnl_real) | |
| trade_returns.append(pnl_real / (float(pos["cost"]) + 1e-12)) | |
| trade_durations.append((exit_ts - entry_time) / 60000.0) | |
| equity_curve.append(float(cash_bal)) | |
| if pnl_real > 0: | |
| wins_losses["wins"] += 1 | |
| else: | |
| wins_losses["losses"] += 1 | |
| return cash_bal, True | |
| def _worker_optimize(self, combinations_batch, scores_files, initial_capital, fees_pct, max_slots, target_state): | |
| all_signals = [] | |
| sim_env = {} | |
| crash_model = self.proc.guardian_hydra.models["crash"] | |
| giveback_model = self.proc.guardian_hydra.models["giveback"] | |
| for f in scores_files: | |
| try: | |
| data = pd.read_pickle(f) | |
| sig = optimize_dataframe_memory(data.get("signals", None)) | |
| if sig is None or len(sig) == 0: | |
| continue | |
| all_signals.append(sig) | |
| sym = str(sig["symbol"].iloc[0]) | |
| sim_env[sym] = data["sim_data"] | |
| except: | |
| pass | |
| if not all_signals: | |
| return [] | |
| timeline_df = pd.concat(all_signals).sort_values("timestamp").reset_index(drop=True) | |
| t_ts = timeline_df["timestamp"].values.astype(np.int64) | |
| t_sym = timeline_df["symbol"].values | |
| t_close = timeline_df["close"].values.astype(np.float64) | |
| t_state = timeline_df["coin_state"].values | |
| t_gov = timeline_df["gov_score"].values.astype(np.float64) | |
| t_oracle = timeline_df["oracle_conf"].values.astype(np.float64) | |
| t_titan = timeline_df["titan_score"].values.astype(np.float64) | |
| t_sniper = timeline_df["sniper_score"].values.astype(np.float64) | |
| t_pattern = timeline_df["pattern_score"].values.astype(np.float64) | |
| del all_signals, timeline_df | |
| gc.collect() | |
| start_ms = int(t_ts[0]) if len(t_ts) else 0 | |
| end_ms = int(t_ts[-1]) if len(t_ts) else 0 | |
| res = [] | |
| BATCH_SIZE = 300 | |
| USE_MARK_TO_MARKET_EQUITY = True | |
| for i in range(0, len(combinations_batch), BATCH_SIZE): | |
| batch = combinations_batch[i : i + BATCH_SIZE] | |
| for cfg in batch: | |
| cash_bal = float(initial_capital) | |
| active_positions = {} | |
| last_update_map = {} | |
| last_price = {} | |
| trade_pnls = [] | |
| trade_returns = [] | |
| trade_durations = [] | |
| equity_curve = [float(initial_capital)] | |
| wins_losses = {"wins": 0, "losses": 0} | |
| exposure_steps = 0 | |
| def mark_to_market_equity(curr_ts): | |
| nonlocal exposure_steps | |
| open_val = 0.0 | |
| has_open = False | |
| for s, pos in active_positions.items(): | |
| px = last_price.get(s, None) | |
| if px is None: | |
| continue | |
| has_open = True | |
| open_val += (pos["qty"] * px * (1 - self.SLIPPAGE_PCT)) * (1 - fees_pct) | |
| if has_open: | |
| exposure_steps += 1 | |
| equity_curve.append(float(cash_bal + open_val)) | |
| for curr_ts, sym, p, c_state, gov, oracle, titan, sniper, pattern in zip( | |
| t_ts, t_sym, t_close, t_state, t_gov, t_oracle, t_titan, t_sniper, t_pattern | |
| ): | |
| sym = str(sym) | |
| last_price[sym] = float(p) | |
| to_close = [] | |
| for open_sym, pos in list(active_positions.items()): | |
| cash_bal, closed = self._flush_position_interval( | |
| cfg, open_sym, pos, curr_ts, sim_env, crash_model, giveback_model, fees_pct, | |
| trade_pnls, trade_returns, trade_durations, equity_curve, cash_bal, | |
| wins_losses, last_update_map | |
| ) | |
| if closed: | |
| to_close.append(open_sym) | |
| for s in to_close: | |
| del active_positions[s] | |
| if USE_MARK_TO_MARKET_EQUITY: | |
| mark_to_market_equity(curr_ts) | |
| is_valid = ( | |
| (int(c_state) == int(target_state)) | |
| and (float(gov) >= float(cfg["GOV_SCORE"])) | |
| and (float(oracle) >= float(cfg["ORACLE"])) | |
| and (float(titan) >= float(cfg["TITAN"])) | |
| and (float(sniper) >= float(cfg["SNIPER"])) | |
| and (float(pattern) >= float(cfg["PATTERN"])) | |
| ) | |
| if is_valid and sym not in active_positions: | |
| slots = 1 if cash_bal < self.MIN_CAPITAL_FOR_SPLIT else int(max_slots) | |
| if len(active_positions) < slots and cash_bal >= 5.0: | |
| size = (cash_bal * 0.95) if cash_bal < self.MIN_CAPITAL_FOR_SPLIT else (cash_bal / max_slots) | |
| if size >= 5.0: | |
| ep = float(p) * (1 + self.SLIPPAGE_PCT) | |
| fee = float(size) * fees_pct | |
| cost = float(size) | |
| qty = (cost - fee) / (ep + 1e-12) | |
| sym_ts = sim_env[sym]["timestamp"] | |
| idx = int(np.searchsorted(sym_ts, curr_ts, side="right") - 1) | |
| idx = max(0, min(idx, len(sym_ts) - 1)) | |
| atr_val = float(sim_env[sym]["atr"][idx]) | |
| active_positions[sym] = { | |
| "qty": float(qty), | |
| "entry_p": float(ep), | |
| "cost": float(cost), | |
| "entry_ts": int(curr_ts), | |
| "sl_p": float(ep - 1.5 * atr_val), | |
| "tp_p": float(ep + 2.5 * atr_val), | |
| "highest_price": float(ep), | |
| } | |
| cash_bal -= float(cost) | |
| last_update_map[sym] = min(idx + 1, len(sym_ts)) | |
| if not trade_pnls: | |
| continue | |
| max_dd = calc_max_drawdown(equity_curve) | |
| ulcer = calc_ulcer_index(equity_curve) | |
| wins_list = [p for p in trade_pnls if p > 0] | |
| loss_list = [p for p in trade_pnls if p <= 0] | |
| prof_fac = calc_profit_factor(wins_list, loss_list) | |
| mean_pnl = float(np.mean(trade_pnls)) | |
| std_pnl = float(np.std(trade_pnls)) | |
| sqn = float((mean_pnl / std_pnl) * np.sqrt(len(trade_pnls))) if std_pnl > 0 else 0.0 | |
| sharpe = calc_sharpe(trade_returns) | |
| sortino = calc_sortino(trade_returns) | |
| cagr = calc_cagr(initial_capital, cash_bal, start_ms, end_ms) | |
| calmar = calc_calmar(cagr, max_dd) | |
| exposure_pct = float(exposure_steps / max(1, len(t_ts)) * 100.0) | |
| max_w_streak, max_l_streak = calc_consecutive_streaks(trade_pnls) | |
| payoff = float(np.mean(wins_list) / max(abs(np.mean(loss_list)), 1e-12)) if (wins_list and loss_list) else 99.0 | |
| res.append({ | |
| "config": cfg, | |
| "net_profit": float(cash_bal - initial_capital), | |
| "total_trades": int(len(trade_pnls)), | |
| "final_balance": float(cash_bal), | |
| "win_rate": float((wins_losses["wins"] / len(trade_pnls)) * 100.0), | |
| "sqn": sqn, | |
| "max_drawdown": float(max_dd), | |
| "ulcer_index": ulcer, | |
| "profit_factor": prof_fac, | |
| "payoff_ratio": payoff, | |
| "sharpe": sharpe, | |
| "sortino": sortino, | |
| "cagr": cagr, | |
| "calmar": calmar, | |
| "expectancy": mean_pnl, | |
| "exposure_pct": exposure_pct, | |
| "max_consec_wins": max_w_streak, | |
| "max_consec_losses": max_l_streak, | |
| }) | |
| gc.collect() | |
| return res | |
| async def run_optimization(self): | |
| await self.generate_truth_data() | |
| files = glob.glob(os.path.join(CACHE_DIR, "*.pkl")) | |
| keys = list(self.GRID_RANGES.keys()) | |
| values = [list(self.GRID_RANGES[k]) for k in keys] | |
| combos = [] | |
| seen = set() | |
| while len(combos) < self.MAX_SAMPLES: | |
| c = tuple(np.random.choice(v) for v in values) | |
| if c not in seen: | |
| seen.add(c) | |
| combos.append(dict(zip(keys, c))) | |
| print(f"โ Generated {len(combos)} configs.") | |
| for state_name, state_id in [("ACCUMULATION", 1), ("SAFE_TREND", 2), ("EXPLOSIVE", 3)]: | |
| print(f"\n๐ Optimizing [{state_name}]...") | |
| results = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS, state_id) | |
| if not results: | |
| continue | |
| results.sort(key=lambda x: (x["calmar"], x["sqn"]), reverse=True) | |
| best = results[0] | |
| print(f"๐ BEST [{state_name}]:") | |
| print(f" ๐ฐ Net Profit: ${best['net_profit']:.2f} | Final: ${best['final_balance']:.2f}") | |
| print(f" ๐ Trades: {best['total_trades']} | WR: {best['win_rate']:.1f}% | Exp: {best['expectancy']:.4f}") | |
| print(f" ๐ฒ SQN: {best['sqn']:.2f} | PF: {best['profit_factor']:.2f} | Payoff: {best['payoff_ratio']:.2f}") | |
| print(f" ๐ MaxDD: {best['max_drawdown']:.2f}% | Ulcer: {best['ulcer_index']:.2f}") | |
| print(f" ๐ Sharpe/Sortino: {best['sharpe']:.2f} / {best['sortino']:.2f}") | |
| print(f" ๐งฎ CAGR/Calmar: {(best['cagr']*100):.2f}% / {best['calmar']:.2f}") | |
| print(f" โ๏ธ Config: {best['config']}") | |
| # ============================================================ | |
| # Runner (Guaranteed cleanup) | |
| # ============================================================ | |
| async def run_strategic_optimization_task(): | |
| r2 = R2Service() | |
| dm = DataManager(None, None, r2) | |
| proc = MLProcessor(dm) | |
| try: | |
| await dm.initialize() | |
| await proc.initialize() | |
| if getattr(proc, "guardian_hydra", None): | |
| proc.guardian_hydra.set_silent_mode(True) | |
| opt = HeavyDutyBacktester(dm, proc) | |
| await opt.run_optimization() | |
| except Exception as e: | |
| print(f"[ERROR] โ Backtest Failed: {e}") | |
| traceback.print_exc() | |
| finally: | |
| try: | |
| ex = getattr(dm, "exchange", None) | |
| if ex is not None: | |
| await ex.close() | |
| except Exception: | |
| pass | |
| try: | |
| await dm.close() | |
| except Exception: | |
| pass | |
| if __name__ == "__main__": | |
| asyncio.run(run_strategic_optimization_task()) |