Tradtesting / backtest_engine.py
Riy777's picture
Update backtest_engine.py
e18c5ae verified
raw
history blame
34.8 kB
# ============================================================
# πŸ§ͺ backtest_engine.py (V129.0 - GEM-Architect: Sniper Debug Mode)
# ============================================================
import asyncio
import pandas as pd
import numpy as np
import pandas_ta as ta
import time
import logging
import itertools
import os
import glob
import gc
import sys
import traceback
from numpy.lib.stride_tricks import sliding_window_view
from datetime import datetime, timezone
from typing import Dict, Any, List
# βœ… Ψ§Ψ³Ψͺيراد Ψ§Ω„Ω…Ψ­Ψ±ΩƒΨ§Ψͺ
try:
from ml_engine.processor import MLProcessor, SystemLimits
from ml_engine.data_manager import DataManager
from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
from r2 import R2Service
import ccxt.async_support as ccxt
import xgboost as xgb
except ImportError:
print("❌ [Import Error] Critical ML modules missing.")
pass
logging.getLogger('ml_engine').setLevel(logging.WARNING)
CACHE_DIR = "backtest_real_scores"
# ============================================================
# πŸ›‘οΈ GLOBAL SANITIZATION
# ============================================================
def sanitize_features(df):
"""
Cleans DataFrame from Infinity and NaNs.
Forces float32 to be friendly with ML models.
"""
if df is None or df.empty: return df
# Replace Infinity with NaN, then fill NaN with 0.0, then convert to float32
return df.replace([np.inf, -np.inf], np.nan).fillna(0.0).astype(np.float32)
def _zv(x):
with np.errstate(divide='ignore', invalid='ignore'):
x = np.asarray(x, dtype="float32")
m = np.nanmean(x, axis=0)
s = np.nanstd(x, axis=0) + 1e-9
return np.nan_to_num((x - m) / s, nan=0.0)
def _z_score_rolling(x, w=500):
r = x.rolling(w).mean()
s = x.rolling(w).std().replace(0, np.nan)
return ((x - r) / s).fillna(0)
# ============================================================
# πŸ”§ 1. FEATURE ENGINEERING
# ============================================================
def _transform_window_for_pattern(df_window):
try:
c = df_window['close'].values.astype('float32')
o = df_window['open'].values.astype('float32')
h = df_window['high'].values.astype('float32')
l = df_window['low'].values.astype('float32')
v = df_window['volume'].values.astype('float32')
base = np.stack([o, h, l, c, v], axis=1)
base_z = _zv(base)
lr = np.zeros_like(c); lr[1:] = np.diff(np.log1p(c))
rng = (h - l) / (c + 1e-9)
extra = np.stack([lr, rng], axis=1)
extra_z = _zv(extra)
def _ema(arr, n):
return pd.Series(arr).ewm(span=n, adjust=False).mean().values
ema9 = _ema(c, 9); ema21 = _ema(c, 21); ema50 = _ema(c, 50); ema200 = _ema(c, 200)
slope21 = np.gradient(ema21); slope50 = np.gradient(ema50)
delta = np.diff(c, prepend=c[0])
up, down = delta.copy(), delta.copy()
up[up < 0] = 0; down[down > 0] = 0
roll_up = pd.Series(up).ewm(alpha=1/14, adjust=False).mean().values
roll_down = pd.Series(down).abs().ewm(alpha=1/14, adjust=False).mean().values
rs = roll_up / (roll_down + 1e-9)
rsi = 100.0 - (100.0 / (1.0 + rs))
indicators = np.stack([ema9, ema21, ema50, ema200, slope21, slope50, rsi], axis=1)
padding = np.zeros((200, 5), dtype='float32')
indicators_full = np.concatenate([indicators, padding], axis=1)
indicators_z = _zv(indicators_full)
X_seq = np.concatenate([base_z, extra_z, indicators_z], axis=1)
X_flat = X_seq.flatten()
X_stat = np.array([0.5, 0.0, 0.5], dtype="float32")
return np.concatenate([X_flat, X_stat])
except: return None
def calculate_sniper_features_exact(df):
"""Sniper Features Calculation - EXACT MATCH with Processor."""
df = df.copy()
# Base Returns
df['return_1m'] = df['close'].pct_change(1).fillna(0)
df['return_3m'] = df['close'].pct_change(3).fillna(0)
df['return_5m'] = df['close'].pct_change(5).fillna(0)
df['return_15m'] = df['close'].pct_change(15).fillna(0)
# Liquidity Proxies
df['ret'] = df['close'].pct_change().fillna(0)
df['dollar_vol'] = df['close'] * df['volume']
df['amihud'] = (df['ret'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0)
dp = df['close'].diff()
roll_cov = dp.rolling(64).cov(dp.shift(1))
df['roll_spread'] = (2 * np.sqrt(np.maximum(0, -roll_cov))).fillna(0)
sign = np.sign(df['close'].diff()).fillna(0)
df['signed_vol'] = sign * df['volume']
df['ofi'] = df['signed_vol'].rolling(30).sum().fillna(0)
buy_vol = (sign > 0) * df['volume']; sell_vol = (sign < 0) * df['volume']
tot = df['volume'].rolling(60).sum()
imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs()
df['vpin'] = (imb / tot.replace(0, np.nan)).fillna(0)
df['rv_gk'] = (np.log(df['high'] / df['low'])**2)/2 - (2*np.log(2)-1)*(np.log(df['close']/df['open'])**2)
vwap = (df['close']*df['volume']).rolling(20).sum() / df['volume'].rolling(20).sum()
df['vwap_dev'] = (df['close'] - vwap).fillna(0)
df['L_score'] = (_z_score_rolling(df['volume']) + _z_score_rolling(1/(df['amihud']+1e-9)) +
_z_score_rolling(-df['roll_spread']) + _z_score_rolling(-df['rv_gk'].abs()) +
_z_score_rolling(-df['vwap_dev'].abs()) + _z_score_rolling(df['ofi']))
# Standard Indicators
df['rsi_14'] = ta.rsi(df['close'], 14).fillna(50)
df['atr'] = ta.atr(df['high'], df['low'], df['close'], 100).fillna(0)
df['vol_zscore_50'] = _z_score_rolling(df['volume'], 50)
ema9 = ta.ema(df['close'], 9);
if ema9 is not None: df['ema_9_slope'] = (ema9 - ema9.shift(1))/ema9.shift(1)
else: df['ema_9_slope'] = 0.0
ema21 = ta.ema(df['close'], 21);
if ema21 is not None: df['ema_21_dist'] = (df['close'] - ema21)/ema21
else: df['ema_21_dist'] = 0.0
df['candle_range'] = df['high'] - df['low']
df['close_pos_in_range'] = (df['close'] - df['low']) / (df['candle_range'].replace(0, np.nan))
return sanitize_features(df)
def calculate_titan_features_real(df):
"""Titan features with strict Infinity handling."""
df = df.copy()
df['RSI'] = ta.rsi(df['close'], 14)
macd = ta.macd(df['close'])
if macd is not None:
df['MACD'] = macd.iloc[:, 0]; df['MACD_h'] = macd.iloc[:, 1]
else: df['MACD'] = 0.0; df['MACD_h'] = 0.0
df['CCI'] = ta.cci(df['high'], df['low'], df['close'], 20)
adx = ta.adx(df['high'], df['low'], df['close'], 14)
if adx is not None: df['ADX'] = adx.iloc[:, 0]
else: df['ADX'] = 0.0
for p in [9, 21, 50, 200]:
ema = ta.ema(df['close'], p)
if ema is not None:
df[f'EMA_{p}_dist'] = (df['close'] / ema.replace(0, np.nan)) - 1
else: df[f'EMA_{p}_dist'] = 0.0
bb = ta.bbands(df['close'], 20, 2.0)
if bb is not None:
df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1].replace(0, np.nan)
df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]).replace(0, np.nan)
else: df['BB_w'] = 0.0; df['BB_p'] = 0.0
df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], 14)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
if vwap is not None: df['VWAP_dist'] = (df['close'] / vwap.replace(0, np.nan)) - 1
else: df['VWAP_dist'] = 0.0
return sanitize_features(df)
# ============================================================
# πŸ”§ LEGACY GUARD (V2 & V3)
# ============================================================
def calculate_legacy_v2_vectorized(df_1m, df_5m, df_15m):
try:
def calc_basic(df, suffix):
c = df['close']; h = df['high']; l = df['low']
res = pd.DataFrame(index=df.index)
res[f'log_ret_{suffix}'] = np.log(c / c.shift(1).replace(0, np.nan)).fillna(0)
res[f'rsi_{suffix}'] = (ta.rsi(c, 14) / 100.0).fillna(0.5)
roll_max = h.rolling(50).max(); roll_min = l.rolling(50).min()
diff = (roll_max - roll_min).replace(0, 1e-9)
res[f'fib_pos_{suffix}'] = ((c - roll_min) / diff).fillna(0.5)
if suffix == '1m':
res[f'volatility_{suffix}'] = (ta.atr(h, l, c, 14) / c.replace(0, np.nan)).fillna(0)
else:
ema = ta.ema(c, 20)
if ema is not None:
res[f'trend_slope_{suffix}'] = ((ema - ema.shift(5)) / ema.shift(5).replace(0, np.nan)).fillna(0)
else: res[f'trend_slope_{suffix}'] = 0.0
if suffix == '15m':
fib618 = roll_max - (diff * 0.382)
res[f'dist_fib618_{suffix}'] = ((c - fib618) / c.replace(0, np.nan)).fillna(0)
return res
f1 = calc_basic(df_1m, '1m')
f5 = calc_basic(df_5m, '5m').reindex(df_1m.index, method='ffill')
f15 = calc_basic(df_15m, '15m').reindex(df_1m.index, method='ffill')
FEATS_1M = ['log_ret_1m', 'rsi_1m', 'fib_pos_1m', 'volatility_1m']
FEATS_5M = ['log_ret_5m', 'rsi_5m', 'fib_pos_5m', 'trend_slope_5m']
FEATS_15M = ['log_ret_15m', 'rsi_15m', 'dist_fib618_15m', 'trend_slope_15m']
parts = [f1[FEATS_1M], f5[FEATS_5M], f15[FEATS_15M]]
lags = [1, 2, 3, 5, 10, 20]
for lag in lags: parts.append(f1[FEATS_1M].shift(lag).fillna(0))
X_df = pd.concat(parts, axis=1)
return sanitize_features(X_df).values
except Exception as e:
print(f"Legacy V2 Vec Error: {e}")
return None
def calculate_legacy_v3_vectorized(df_1m, df_5m, df_15m):
"""Legacy V3 Safe Calc."""
try:
def calc_v3_base(df, prefix=""):
d = df.copy()
# Initialize All
targets = ['rsi', 'rsi_slope', 'macd_h', 'macd_h_slope', 'adx', 'dmp', 'dmn',
'trend_net_force', 'ema_20', 'ema_50', 'ema_200', 'dist_ema20',
'dist_ema50', 'dist_ema200', 'slope_ema50', 'atr', 'atr_rel',
'obv', 'obv_slope', 'cmf', 'log_ret', 'mc_skew', 'mc_kurt',
'mc_prob_gain', 'mc_var_95', 'mc_shock']
for t in targets: d[t] = 0.0
c = d['close']; h = d['high']; l = d['low']; v = d['volume']
try:
d['log_ret'] = np.log(c / c.shift(1).replace(0, np.nan)).fillna(0)
d['rsi'] = ta.rsi(c, 14).fillna(50)
d['rsi_slope'] = (d['rsi'] - d['rsi'].shift(3).fillna(50)) / 3
macd = ta.macd(c)
if macd is not None:
d['macd_h'] = macd.iloc[:, 1].fillna(0)
d['macd_h_slope'] = (d['macd_h'] - d['macd_h'].shift(3).fillna(0)) / 3
adx = ta.adx(h, l, c, 14)
if adx is not None:
d['adx'] = adx.iloc[:, 0].fillna(0); d['dmp'] = adx.iloc[:, 1].fillna(0); d['dmn'] = adx.iloc[:, 2].fillna(0)
d['trend_net_force'] = (d['dmp'] - d['dmn']) * (d['adx'] / 100.0)
d['ema_20'] = ta.ema(c, 20).fillna(c); d['ema_50'] = ta.ema(c, 50).fillna(c); d['ema_200'] = ta.ema(c, 200).fillna(c)
d['dist_ema20'] = (c - d['ema_20']) / d['ema_20'].replace(0, np.nan)
d['dist_ema50'] = (c - d['ema_50']) / d['ema_50'].replace(0, np.nan)
d['dist_ema200'] = (c - d['ema_200']) / d['ema_200'].replace(0, np.nan)
d['slope_ema50'] = (d['ema_50'] - d['ema_50'].shift(5).fillna(0)) / d['ema_50'].shift(5).replace(0, np.nan)
d['atr'] = ta.atr(h, l, c, 14).fillna(0); d['atr_rel'] = d['atr'] / c.replace(0, np.nan)
d['obv'] = ta.obv(c, v).fillna(0); d['obv_slope'] = d['obv'] - d['obv'].shift(5).fillna(0)
d['cmf'] = ta.cmf(h, l, c, v, 20).fillna(0)
win = 30; roll = d['log_ret'].rolling(win)
d['mc_skew'] = roll.skew().fillna(0); d['mc_kurt'] = roll.kurt().fillna(0)
d['mc_prob_gain'] = (d['log_ret'] > 0).rolling(win).mean().fillna(0.5)
d['mc_var_95'] = roll.quantile(0.05).fillna(-0.02)
d['mc_shock'] = ((d['log_ret'] - roll.mean()) / (roll.std().replace(0, np.nan))).fillna(0)
except: pass
if prefix:
d.columns = [f"{col}_{prefix}" if col not in ['timestamp'] else col for col in d.columns]
return sanitize_features(d)
return sanitize_features(d)
df1 = calc_v3_base(df_1m); df5 = calc_v3_base(df_5m, "5m").reindex(df_1m.index, method='ffill')
df15 = calc_v3_base(df_15m, "15m").reindex(df_1m.index, method='ffill')
final_df = pd.DataFrame(index=df_1m.index)
for i, col_name in enumerate(["6", "7", "8", "9", "10", "11"], 1):
final_df[col_name] = df1['log_ret'].shift(i)
cols_1m = ['rsi', 'rsi_slope', 'macd_h', 'macd_h_slope', 'adx', 'dmp', 'dmn', 'trend_net_force',
'ema_20', 'ema_50', 'ema_200', 'dist_ema20', 'dist_ema50', 'dist_ema200', 'slope_ema50',
'atr', 'atr_rel', 'obv', 'obv_slope', 'cmf', 'log_ret', 'mc_skew', 'mc_kurt',
'mc_prob_gain', 'mc_var_95', 'mc_shock']
for c in cols_1m: final_df[c] = df1[c]
cols_5m = {'rsi_5m': 'rsi_5m', 'rsi_slope_5m': 'rsi_slope_5m', 'macd_h_5m': 'macd_h_5m',
'mc_prob_gain_5m': 'mc_prob_gain_5m', 'mc_shock_5m': 'mc_shock_5m'}
for k, v in cols_5m.items(): final_df[k] = df5[v]
cols_15m = {'rsi_15m': 'rsi_15m', 'macd_h_15m': 'macd_h_15m', 'trend_net_force_15m': 'trend_net_force_15m',
'mc_prob_gain_15m': 'mc_prob_gain_15m', 'dist_ema200_15m': 'dist_ema200_15m'}
for k, v in cols_15m.items(): final_df[k] = df15[v]
expected = ["6", "7", "8", "9", "10", "11", "rsi", "rsi_slope", "macd_h", "macd_h_slope", "adx", "dmp", "dmn",
"trend_net_force", "ema_20", "ema_50", "ema_200", "dist_ema20", "dist_ema50", "dist_ema200",
"slope_ema50", "atr", "atr_rel", "obv", "obv_slope", "cmf", "log_ret", "mc_skew", "mc_kurt",
"mc_prob_gain", "mc_var_95", "mc_shock", "rsi_5m", "rsi_slope_5m", "macd_h_5m", "mc_prob_gain_5m",
"mc_shock_5m", "rsi_15m", "macd_h_15m", "trend_net_force_15m", "mc_prob_gain_15m", "dist_ema200_15m"]
return sanitize_features(final_df.reindex(columns=expected, fill_value=0.0))
except: return None
# ============================================================
# πŸ§ͺ THE BACKTESTER
# ============================================================
class HeavyDutyBacktester:
def __init__(self, data_manager, processor):
self.dm = data_manager
self.proc = processor
self.GRID_DENSITY = 3
self.INITIAL_CAPITAL = 10.0
self.TRADING_FEES = 0.001
self.MAX_SLOTS = 4
self.TARGET_COINS = ['SOL/USDT', 'XRP/USDT', 'DOGE/USDT']
self.force_start_date = None; self.force_end_date = None
if os.path.exists(CACHE_DIR):
for f in glob.glob(os.path.join(CACHE_DIR, "*")): os.remove(f)
else: os.makedirs(CACHE_DIR)
self._check_engines()
def _check_engines(self):
status = []
if self.proc.titan and self.proc.titan.model: status.append("Titan")
if self.proc.pattern_engine and self.proc.pattern_engine.models: status.append("Patterns")
if self.proc.oracle: status.append("Oracle")
if self.proc.sniper: status.append("Sniper")
if self.proc.guardian_hydra: status.append("Hydra")
if self.proc.guardian_legacy: status.append("Legacy")
print(f" βœ… Engines Ready: {', '.join(status)}")
def set_date_range(self, start_str, end_str):
self.force_start_date = start_str; self.force_end_date = end_str
def _smart_predict(self, model, X):
try:
if hasattr(model, "predict_proba"):
raw = model.predict_proba(X)
if raw.ndim == 2: return raw[:, -1]
return raw
return model.predict(X)
except Exception as e:
# print(f"⚠️ Predict Error: {e}") # Enable if needed
return np.zeros(len(X) if hasattr(X, '__len__') else 0)
def _extract_probs(self, raw_preds):
if isinstance(raw_preds, list): raw_preds = np.array(raw_preds)
if raw_preds.ndim == 1: return raw_preds
elif raw_preds.ndim == 2:
if raw_preds.shape[1] >= 2: return raw_preds[:, -1]
return raw_preds.flatten()
return raw_preds.flatten()
async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
print(f" ⚑ [Network] Downloading {sym}...", flush=True)
limit = 1000; duration = limit * 60 * 1000
tasks = []; curr = start_ms
while curr < end_ms: tasks.append(curr); curr += duration
all_c = []; sem = asyncio.Semaphore(20)
async def _fetch(ts):
async with sem:
try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=ts, limit=limit)
except: await asyncio.sleep(0.5); return []
chunk = 50
for i in range(0, len(tasks), chunk):
res = await asyncio.gather(*[_fetch(t) for t in tasks[i:i+chunk]])
for r in res: all_c.extend(r)
seen = set(); unique = []
for c in all_c:
if c[0] not in seen and c[0] >= start_ms and c[0] <= end_ms:
unique.append(c); seen.add(c[0])
unique.sort(key=lambda x: x[0])
print(f" βœ… Downloaded {len(unique)} candles.", flush=True)
return unique
async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
safe_sym = sym.replace('/', '_')
scores_file = f"{CACHE_DIR}/{safe_sym}_{start_ms}_{end_ms}_scores.pkl"
if os.path.exists(scores_file):
print(f" πŸ“‚ [{sym}] Data Exists -> Skipping.")
return
print(f" βš™οΈ [CPU] Analyzing {sym} (ALL REAL MODELS)...", flush=True)
t0 = time.time()
df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
for c in ['open', 'high', 'low', 'close', 'volume']: df_1m[c] = df_1m[c].astype(float)
df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
df_1m.set_index('datetime', inplace=True)
df_1m = df_1m.sort_index()
df_5m = df_1m.resample('5T').agg({'open':'first', 'high':'max', 'low':'min', 'close':'last', 'volume':'sum'}).dropna()
df_15m = df_1m.resample('15T').agg({'open':'first', 'high':'max', 'low':'min', 'close':'last', 'volume':'sum'}).dropna()
# 1. Sniper
df_sniper = calculate_sniper_features_exact(df_1m)
df_sniper['rel_vol'] = df_sniper['volume'] / (df_sniper['volume'].rolling(50).mean() + 1e-9)
df_sniper['l1_score'] = (df_sniper['rel_vol'] * 10) + ((df_sniper['atr']/df_sniper['close']) * 1000)
valid_mask = (df_sniper['l1_score'] >= 5.0) & (np.arange(len(df_sniper)) > 500) & (np.arange(len(df_sniper)) < len(df_sniper) - 245)
df_candidates = df_sniper[valid_mask].copy()
if df_candidates.empty: return
print(f" 🎯 Candidates: {len(df_candidates)}. Running Deep Inference...", flush=True)
# 2. Patterns
res_patterns = np.full(len(df_candidates), 0.5)
pattern_models = getattr(self.proc.pattern_engine, 'models', {})
if pattern_models and '15m' in pattern_models:
try:
df_15m_res = df_1m.resample('15T').agg({'open':'first', 'high':'max', 'low':'min', 'close':'last', 'volume':'sum'}).dropna()
pat_scores_15m = np.full(len(df_15m_res), 0.5)
pat_inputs = []; valid_15m_idxs = []
for i in range(200, len(df_15m_res)):
window = df_15m_res.iloc[i-200:i]
vec = _transform_window_for_pattern(window)
if vec is not None:
pat_inputs.append(vec); valid_15m_idxs.append(i)
if pat_inputs:
X_pat = np.array(pat_inputs)
pat_preds = self._smart_predict(pattern_models['15m'], xgb.DMatrix(X_pat))
pat_scores_15m[valid_15m_idxs] = pat_preds
ts_15m = df_15m_res.index.astype(np.int64) // 10**6
map_idxs = np.searchsorted(ts_15m, df_candidates['timestamp'].values) - 1
res_patterns = pat_scores_15m[np.clip(map_idxs, 0, len(pat_scores_15m)-1)]
except Exception as e: print(f"Patterns Error: {e}")
# 3. Titan
res_titan = np.full(len(df_candidates), 0.5)
if self.proc.titan and self.proc.titan.model:
try:
df_5m_feat = calculate_titan_features_real(df_5m).add_prefix('5m_')
ts_5m = df_5m.index.astype(np.int64) // 10**6
map_idxs = np.clip(np.searchsorted(ts_5m, df_candidates['timestamp'].values) - 1, 0, len(df_5m_feat)-1)
feats = self.proc.titan.feature_names
X_titan_df = sanitize_features(df_5m_feat.iloc[map_idxs].reindex(columns=feats, fill_value=0))
res_titan = self.proc.titan.model.predict(xgb.DMatrix(X_titan_df.values, feature_names=feats))
except Exception as e: print(f"Titan Error: {e}")
# 4. Sniper
res_sniper = np.full(len(df_candidates), 0.5)
sniper_models = getattr(self.proc.sniper, 'models', [])
if sniper_models:
try:
feats = getattr(self.proc.sniper, 'feature_names', [])
X_snip = sanitize_features(df_candidates[feats]).values
preds = [self._extract_probs(self._smart_predict(m, X_snip)) for m in sniper_models]
res_sniper = np.mean(preds, axis=0)
except Exception as e: print(f"Sniper Error: {e}")
# 5. Oracle
res_oracle = np.full(len(df_candidates), 0.5)
oracle_model = getattr(self.proc.oracle, 'model_direction', None)
if oracle_model:
try:
oracle_feats = getattr(self.proc.oracle, 'feature_cols', [])
X_orc_df = pd.DataFrame(0.0, index=range(len(df_candidates)), columns=oracle_feats)
if 'sim_titan_score' in X_orc_df: X_orc_df['sim_titan_score'] = res_titan
if 'sim_pattern_score' in X_orc_df: X_orc_df['sim_pattern_score'] = res_patterns
if 'sim_mc_score' in X_orc_df: X_orc_df['sim_mc_score'] = 0.5
res_oracle = self._extract_probs(self._smart_predict(oracle_model, X_orc_df.values))
except Exception as e: print(f"Oracle Error: {e}")
# 6. Hydra
res_hydra_risk = np.zeros(len(df_candidates))
hydra_models = getattr(self.proc.guardian_hydra, 'models', {})
if hydra_models and 'crash' in hydra_models:
try:
global_hydra_feats = np.column_stack([
df_sniper['rsi_14'], df_sniper['rsi_14'], df_sniper['rsi_14'],
(df_sniper['close']-df_sniper['close'].rolling(20).mean())/df_sniper['close'],
df_sniper['rel_vol'], df_sniper['atr'], df_sniper['close']
]).astype(np.float32)
global_hydra_feats = np.nan_to_num(global_hydra_feats, nan=0.0, posinf=0.0, neginf=0.0)
window_view = sliding_window_view(global_hydra_feats, 240, axis=0).transpose(0, 2, 1)
c_idxs = np.searchsorted(df_sniper.index, df_candidates.index)
valid_s = c_idxs + 1
valid_mask_h = valid_s < (len(global_hydra_feats) - 240)
final_s = valid_s[valid_mask_h]; res_idxs = np.where(valid_mask_h)[0]
for i in range(0, len(final_s), 5000):
b_idxs = final_s[i:i+5000]; r_idxs = res_idxs[i:i+5000]
static = window_view[b_idxs]
B = len(b_idxs)
entry = df_sniper['close'].values[b_idxs-1].reshape(B, 1)
s_c = static[:, 6, :]; s_atr = static[:, 5, :]
dist = np.maximum(1.5*s_atr, entry*0.015) + 1e-9
pnl = (s_c - entry)/dist
max_pnl = (np.maximum.accumulate(s_c, axis=1) - entry)/dist
atr_p = s_atr/(s_c+1e-9)
zeros = np.zeros((B, 240)); ones = np.ones((B, 240)); t = np.tile(np.arange(1, 241), (B, 1))
X = np.stack([
static[:,0], static[:,1], static[:,2], static[:,3], static[:,4],
zeros, atr_p, pnl, max_pnl, zeros, zeros, t, zeros, ones*0.6, ones*0.7, ones*3
], axis=2).reshape(-1, 16)
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
preds = hydra_models['crash'].predict_proba(X)[:, 1].reshape(B, 240)
res_hydra_risk[r_idxs] = np.max(preds, axis=1)
except: pass
# 7. Legacy
res_legacy_v2 = np.zeros(len(df_candidates))
res_legacy_v3 = np.zeros(len(df_candidates))
if self.proc.guardian_legacy:
try:
X_v2_full = calculate_legacy_v2_vectorized(df_1m, df_5m, df_15m)
v3_df_full = calculate_legacy_v3_vectorized(df_1m, df_5m, df_15m)
all_indices = np.arange(len(df_1m))
cand_indices = all_indices[valid_mask]
max_len = len(df_1m)
cand_indices = cand_indices[cand_indices < max_len]
if len(cand_indices) > 0:
if self.proc.guardian_legacy.model_v2 and X_v2_full is not None:
subset_v2 = X_v2_full[cand_indices]
preds_v2 = self.proc.guardian_legacy.model_v2.predict(xgb.DMatrix(subset_v2))
if len(preds_v2.shape) > 1: res_legacy_v2[:len(cand_indices)] = preds_v2[:, 2]
else: res_legacy_v2[:len(cand_indices)] = preds_v2
if self.proc.guardian_legacy.model_v3 and v3_df_full is not None:
subset_v3_df = v3_df_full.iloc[cand_indices]
preds_v3 = self.proc.guardian_legacy.model_v3.predict(xgb.DMatrix(subset_v3_df))
res_legacy_v3[:len(cand_indices)] = preds_v3
except Exception as e: print(f"Legacy Error: {e}")
# 8. Assembly
print(f" πŸ“Š [Stats] Titan:{res_titan.mean():.2f} | Patterns:{res_patterns.mean():.2f} | Sniper:{res_sniper.mean():.2f} | Oracle:{res_oracle.mean():.2f}")
ai_df = pd.DataFrame({
'timestamp': df_candidates['timestamp'],
'symbol': sym,
'close': df_candidates['close'],
'real_titan': res_titan,
'oracle_conf': res_oracle,
'sniper_score': res_sniper,
'l1_score': df_candidates['l1_score'],
'risk_hydra_crash': res_hydra_risk,
'risk_legacy_v2': res_legacy_v2,
'risk_legacy_v3': res_legacy_v3
})
dt = time.time() - t0
if not ai_df.empty:
ai_df.to_pickle(scores_file)
print(f" βœ… [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True)
gc.collect()
async def generate_truth_data(self):
if self.force_start_date:
dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000)
print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
for sym in self.TARGET_COINS:
c = await self._fetch_all_data_fast(sym, ms_s, ms_e)
if c: await self._process_data_in_memory(sym, c, ms_s, ms_e)
@staticmethod
def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
print(f" ⏳ [System] Loading {len(scores_files)} datasets...", flush=True)
data = []
for f in scores_files:
try: data.append(pd.read_pickle(f))
except: pass
if not data: return []
df = pd.concat(data).sort_values('timestamp')
ts = df['timestamp'].values; close = df['close'].values.astype(float)
sym = df['symbol'].values; sym_map = {s:i for i,s in enumerate(np.unique(sym))}
sym_id = np.array([sym_map[s] for s in sym])
oracle = df['oracle_conf'].values; sniper = df['sniper_score'].values
hydra = df['risk_hydra_crash'].values; titan = df['real_titan'].values
l1 = df['l1_score'].values
legacy_v2 = df['risk_legacy_v2'].values; legacy_v3 = df['risk_legacy_v3'].values
N = len(ts)
print(f" πŸš€ [System] Testing {len(combinations_batch)} configs on {N} candles...", flush=True)
res = []
for cfg in combinations_batch:
pos = {}; log = []
bal = initial_capital; alloc = 0.0
mask = (l1 >= cfg['l1_thresh']) & (oracle >= cfg['oracle_thresh']) & (sniper >= cfg['sniper_thresh'])
for i in range(N):
s = sym_id[i]; p = close[i]
if s in pos:
entry = pos[s][0]; h_r = pos[s][1]; titan_entry = pos[s][3]
crash_hydra = (h_r > cfg['hydra_thresh'])
panic_legacy = (legacy_v2[i] > cfg['legacy_thresh']) or (legacy_v3[i] > cfg['legacy_thresh'])
pnl = (p - entry)/entry
if crash_hydra or panic_legacy or pnl > 0.04 or pnl < -0.02:
realized = pnl - fees_pct*2
bal += pos[s][2] * (1 + realized)
alloc -= pos[s][2]
is_consensus = (titan_entry > 0.55)
log.append({'pnl': realized, 'consensus': is_consensus})
del pos[s]
if len(pos) < max_slots and mask[i]:
if s not in pos and bal >= 5.0:
size = min(10.0, bal * 0.98)
pos[s] = (p, hydra[i], size, titan[i])
bal -= size; alloc += size
final_bal = bal + alloc
profit = final_bal - initial_capital
tot = len(log)
wins = sum(1 for x in log if x['pnl'] > 0)
win_rate = (wins/tot*100) if tot else 0
cons_trades = [x for x in log if x['consensus']]
n_cons = len(cons_trades)
agree_rate = (n_cons/tot*100) if tot else 0
cons_win_rate = (sum(1 for x in cons_trades if x['pnl']>0)/n_cons*100) if n_cons else 0
cons_avg_pnl = (sum(x['pnl'] for x in cons_trades)/n_cons*100) if n_cons else 0
res.append({
'config': cfg, 'final_balance': final_bal, 'net_profit': profit,
'total_trades': tot, 'win_rate': win_rate, 'max_drawdown': 0,
'consensus_agreement_rate': agree_rate,
'high_consensus_win_rate': cons_win_rate,
'high_consensus_avg_pnl': cons_avg_pnl
})
return res
async def run_optimization(self, target_regime="RANGE"):
await self.generate_truth_data()
oracle_r = np.linspace(0.3, 0.7, 3); sniper_r = np.linspace(0.2, 0.6, 3)
hydra_r = [0.8, 0.9]; l1_r = [5.0, 10.0]
combos = []
for o, s, h, l1 in itertools.product(oracle_r, sniper_r, hydra_r, l1_r):
combos.append({
'w_titan': 0.4, 'w_struct': 0.3, 'thresh': l1, 'l1_thresh': l1,
'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95
})
files = glob.glob(os.path.join(CACHE_DIR, "*.pkl"))
results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0}
results_list.sort(key=lambda x: x['net_profit'], reverse=True)
best = results_list[0]
print("\n" + "="*60)
print(f"πŸ† CHAMPION REPORT [{target_regime}]:")
print(f" πŸ’° Final Balance: ${best['final_balance']:,.2f}")
print(f" πŸš€ Net PnL: ${best['net_profit']:,.2f}")
print("-" * 60)
print(f" πŸ“Š Total Trades: {best['total_trades']}")
print(f" πŸ“ˆ Win Rate: {best['win_rate']:.1f}%")
print("-" * 60)
print(f" 🧠 CONSENSUS ANALYTICS:")
print(f" 🀝 Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%")
print(f" 🌟 High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%")
print(f" πŸ’Ž High-Consensus Avg PnL: {best['high_consensus_avg_pnl']:.2f}%")
print("-" * 60)
print(f" βš™οΈ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}")
print("="*60)
return best['config'], best
async def run_strategic_optimization_task():
print("\nπŸ§ͺ [STRATEGIC BACKTEST] Full Spectrum Mode...")
r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm)
try:
await dm.initialize(); await proc.initialize()
if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True)
hub = AdaptiveHub(r2); await hub.initialize()
opt = HeavyDutyBacktester(dm, proc)
scenarios = [
{"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
{"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
{"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
{"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
]
for s in scenarios:
opt.set_date_range(s["start"], s["end"])
best_cfg, best_stats = await opt.run_optimization(s["regime"])
if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats)
await hub._save_state_to_r2()
print("βœ… [System] DNA Updated.")
finally:
print("πŸ”Œ [System] Closing connections...")
await dm.close()
if __name__ == "__main__":
asyncio.run(run_strategic_optimization_task())