Tradtesting / backtest_engine.py
Riy777's picture
Update backtest_engine.py
a661075 verified
raw
history blame
36.6 kB
# ============================================================
# πŸ§ͺ backtest_engine.py (V139.0 - GEM-Architect: Vectorized Hydra Speed)
# ============================================================
import asyncio
import pandas as pd
import numpy as np
import pandas_ta as ta
import time
import logging
import itertools
import os
import glob
import gc
import sys
import traceback
from numpy.lib.stride_tricks import sliding_window_view
from datetime import datetime, timezone
from typing import Dict, Any, List
from scipy.special import expit
try:
from ml_engine.processor import MLProcessor, SystemLimits
from ml_engine.data_manager import DataManager
from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
from r2 import R2Service
import ccxt.async_support as ccxt
import xgboost as xgb
import lightgbm as lgb
except ImportError:
pass
logging.getLogger('ml_engine').setLevel(logging.WARNING)
CACHE_DIR = "backtest_real_scores"
# ============================================================
# πŸ›‘οΈ GLOBAL HELPERS
# ============================================================
def sanitize_features(df):
if df is None or df.empty: return df
return df.replace([np.inf, -np.inf], np.nan).ffill().bfill().fillna(0.0)
def _z_roll(x, w=500):
if not isinstance(x, pd.Series): x = pd.Series(x)
r = x.rolling(w).mean()
s = x.rolling(w).std().replace(0, np.nan)
return ((x - r) / s).fillna(0)
def _revive_score_distribution(scores):
scores = np.array(scores, dtype=np.float32)
if len(scores) < 10: return scores
std = np.std(scores)
if std < 0.05:
mean = np.mean(scores)
z = (scores - mean) / (std + 1e-9)
return expit(z)
return scores
def safe_ta(ind_output, index, fill_method='smart'):
if ind_output is None:
return pd.Series(0.0, index=index, dtype='float64')
if not isinstance(ind_output, pd.Series):
s = pd.Series(ind_output, index=index)
else:
s = ind_output
s = s.bfill().ffill()
return s.fillna(0.0).astype('float64')
def _zv(x):
with np.errstate(divide='ignore', invalid='ignore'):
x = np.asarray(x, dtype="float32")
m = np.nanmean(x, axis=0)
s = np.nanstd(x, axis=0) + 1e-9
return np.nan_to_num((x - m) / s, nan=0.0)
def _transform_window_for_pattern(df_window):
try:
c = df_window['close'].values.astype('float32')
o = df_window['open'].values.astype('float32')
h = df_window['high'].values.astype('float32')
l = df_window['low'].values.astype('float32')
v = df_window['volume'].values.astype('float32')
base = np.stack([o, h, l, c, v], axis=1)
base_z = _zv(base)
lr = np.zeros_like(c); lr[1:] = np.diff(np.log1p(c))
rng = (h - l) / (c + 1e-9)
extra = np.stack([lr, rng], axis=1)
extra_z = _zv(extra)
def _ema(arr, n): return pd.Series(arr).ewm(span=n, adjust=False).mean().values
ema9 = _ema(c, 9); ema21 = _ema(c, 21); ema50 = _ema(c, 50); ema200 = _ema(c, 200)
slope21 = np.gradient(ema21); slope50 = np.gradient(ema50)
delta = np.diff(c, prepend=c[0])
up, down = delta.copy(), delta.copy()
up[up < 0] = 0; down[down > 0] = 0
roll_up = pd.Series(up).ewm(alpha=1/14, adjust=False).mean().values
roll_down = pd.Series(down).abs().ewm(alpha=1/14, adjust=False).mean().values
rs = roll_up / (roll_down + 1e-9)
rsi = 100.0 - (100.0 / (1.0 + rs))
indicators = np.stack([ema9, ema21, ema50, ema200, slope21, slope50, rsi], axis=1)
X_seq = np.concatenate([base_z, extra_z, _zv(indicators)], axis=1)
X_flat = X_seq.flatten()
X_stat = np.array([0.5, 0.0, 0.5], dtype="float32")
return np.concatenate([X_flat, X_stat])
except: return None
# ============================================================
# πŸ§ͺ THE BACKTESTER CLASS
# ============================================================
class HeavyDutyBacktester:
def __init__(self, data_manager, processor):
self.dm = data_manager
self.proc = processor
self.GRID_DENSITY = 5
self.INITIAL_CAPITAL = 10.0
self.TRADING_FEES = 0.001
self.MAX_SLOTS = 4
self.TARGET_COINS = [
'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT',
'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT',
'SEI/USDT', 'TIA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'PYTH/USDT',
'WIF/USDT', 'PEPE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT',
'ONDO/USDT', 'ENA/USDT', 'HBAR/USDT', 'XLM/USDT', 'TAO/USDT', 'ZK/USDT',
'ZRO/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT',
'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'JTO/USDT',
'STRK/USDT', 'BLUR/USDT', 'ALT/USDT', 'JUP/USDT', 'PENDLE/USDT', 'ETHFI/USDT',
'MEME/USDT', 'ATOM/USDT'
]
self.force_start_date = None
self.force_end_date = None
if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
print(f"πŸ§ͺ [Backtest V139.0] Vectorized Hydra Speed Optimization.")
def set_date_range(self, start_str, end_str):
self.force_start_date = start_str
self.force_end_date = end_str
async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
print(f" ⚑ [Network] Downloading {sym}...", flush=True)
limit = 1000
tasks = []
current = start_ms
duration_per_batch = limit * 60 * 1000
while current < end_ms:
tasks.append(current)
current += duration_per_batch
all_candles = []
sem = asyncio.Semaphore(20)
async def _fetch_batch(timestamp):
async with sem:
for _ in range(3):
try:
return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
except: await asyncio.sleep(0.5)
return []
chunk_size = 50
for i in range(0, len(tasks), chunk_size):
chunk_tasks = tasks[i:i + chunk_size]
futures = [_fetch_batch(ts) for ts in chunk_tasks]
results = await asyncio.gather(*futures)
for res in results:
if res: all_candles.extend(res)
if not all_candles: return None
df = pd.DataFrame(all_candles, columns=['timestamp', 'o', 'h', 'l', 'c', 'v'])
df.drop_duplicates('timestamp', inplace=True)
df = df[(df['timestamp'] >= start_ms) & (df['timestamp'] <= end_ms)]
df.sort_values('timestamp', inplace=True)
print(f" βœ… Downloaded {len(df)} candles.", flush=True)
return df.values.tolist()
def _calculate_indicators_vectorized(self, df, timeframe='1m'):
cols = ['close', 'high', 'low', 'volume', 'open']
for c in cols: df[c] = df[c].astype(np.float64)
idx = df.index
df['RSI'] = safe_ta(ta.rsi(df['close'], length=14), idx, 50)
macd = ta.macd(df['close'])
if macd is not None:
df['MACD'] = safe_ta(macd.iloc[:, 0], idx, 0)
df['MACD_h'] = safe_ta(macd.iloc[:, 1], idx, 0)
else: df['MACD'] = 0.0; df['MACD_h'] = 0.0
df['CCI'] = safe_ta(ta.cci(df['high'], df['low'], df['close'], length=20), idx, 0)
adx = ta.adx(df['high'], df['low'], df['close'], length=14)
if adx is not None: df['ADX'] = safe_ta(adx.iloc[:, 0], idx, 0)
else: df['ADX'] = 0.0
if timeframe == '1d': df['Trend_Strong'] = np.where(df['ADX'] > 25, 1.0, 0.0)
for p in [9, 21, 50, 200]:
ema = safe_ta(ta.ema(df['close'], length=p), idx, 0)
df[f'EMA_{p}_dist'] = ((df['close'] / ema.replace(0, np.nan)) - 1).fillna(0)
df[f'ema{p}'] = ema
df['ema20'] = safe_ta(ta.ema(df['close'], length=20), idx, df['close'])
bb = ta.bbands(df['close'], length=20, std=2.0)
if bb is not None:
w = ((bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1].replace(0, np.nan)).fillna(0)
p = ((df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]).replace(0, np.nan)).fillna(0)
df['BB_w'] = w; df['BB_p'] = p; df['bb_width'] = w
else: df['BB_w'] = 0; df['BB_p'] = 0; df['bb_width'] = 0
df['MFI'] = safe_ta(ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14), idx, 50)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
if vwap is not None:
df['VWAP_dist'] = ((df['close'] / vwap.replace(0, np.nan)) - 1).fillna(0)
df['vwap'] = vwap
else: df['VWAP_dist'] = 0.0; df['vwap'] = df['close']
df['atr'] = safe_ta(ta.atr(df['high'], df['low'], df['close'], length=14), idx, 0)
df['atr_pct'] = (df['atr'] / df['close'].replace(0, np.nan)).fillna(0)
df['ATR_pct'] = df['atr_pct']
if timeframe == '1m':
df['return_1m'] = df['close'].pct_change().fillna(0)
df['return_3m'] = df['close'].pct_change(3).fillna(0)
df['return_5m'] = df['close'].pct_change(5).fillna(0)
df['return_15m'] = df['close'].pct_change(15).fillna(0)
df['rsi_14'] = df['RSI']
e9 = df['ema9'].replace(0, np.nan)
df['ema_9_slope'] = ((df['ema9'] - df['ema9'].shift(1)) / e9.shift(1)).fillna(0)
df['ema_21_dist'] = df['EMA_21_dist']
atr_100 = safe_ta(ta.atr(df['high'], df['low'], df['close'], length=100), idx, 0)
df['atr_z'] = _z_roll(atr_100)
df['vol_zscore_50'] = _z_roll(df['volume'], 50)
rng = (df['high'] - df['low']).replace(0, 1e-9)
df['candle_range'] = _z_roll(rng, 500)
df['close_pos_in_range'] = ((df['close'] - df['low']) / rng).fillna(0.5)
df['dollar_vol'] = df['close'] * df['volume']
amihud_raw = (df['return_1m'].abs() / df['dollar_vol'].replace(0, np.nan)).fillna(0)
df['amihud'] = _z_roll(amihud_raw)
dp = df['close'].diff()
roll_cov = dp.rolling(64).cov(dp.shift(1))
roll_spread_raw = (2 * np.sqrt(np.maximum(0, -roll_cov))).fillna(0)
df['roll_spread'] = _z_roll(roll_spread_raw)
sign = np.sign(df['close'].diff()).fillna(0)
signed_vol = sign * df['volume']
ofi_raw = signed_vol.rolling(30).sum().fillna(0)
df['ofi'] = _z_roll(ofi_raw)
buy_vol = (sign > 0) * df['volume']
sell_vol = (sign < 0) * df['volume']
imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs()
tot = df['volume'].rolling(60).sum().replace(0, np.nan)
df['vpin'] = (imb / tot).fillna(0)
vwap_win = 20
v_short = (df['dollar_vol'].rolling(vwap_win).sum() / df['volume'].rolling(vwap_win).sum().replace(0, np.nan)).fillna(df['close'])
df['vwap_dev'] = _z_roll(df['close'] - v_short)
rv_gk = ((np.log(df['high'] / df['low'])**2) / 2) - ((2 * np.log(2) - 1) * (np.log(df['close'] / df['open'])**2))
df['rv_gk'] = _z_roll(rv_gk)
df['L_score'] = (df['vol_zscore_50'] - df['amihud'] - df['roll_spread'] - df['rv_gk'].abs() - df['vwap_dev'].abs() + df['ofi']).fillna(0)
df['slope'] = safe_ta(ta.slope(df['close'], length=7), idx, 0)
vol_mean = df['volume'].rolling(20).mean()
vol_std = df['volume'].rolling(20).std().replace(0, np.nan)
df['vol_z'] = ((df['volume'] - vol_mean) / vol_std).fillna(0)
df['rel_vol'] = df['volume'] / (df['volume'].rolling(50).mean() + 1e-9)
df['log_ret'] = np.log(df['close'] / df['close'].shift(1).replace(0, np.nan)).fillna(0)
roll_max = df['high'].rolling(50).max()
roll_min = df['low'].rolling(50).min()
diff = (roll_max - roll_min).replace(0, 1e-9)
df['fib_pos'] = ((df['close'] - roll_min) / diff).fillna(0.5)
e20_s = df['ema20'].shift(5).replace(0, np.nan)
df['trend_slope'] = ((df['ema20'] - df['ema20'].shift(5)) / e20_s).fillna(0)
df['volatility'] = (df['atr'] / df['close'].replace(0, np.nan)).fillna(0)
fib618 = roll_max - (diff * 0.382)
df['dist_fib618'] = ((df['close'] - fib618) / df['close'].replace(0, np.nan)).fillna(0)
df['dist_ema50'] = ((df['close'] - df['ema50']) / df['ema50'].replace(0, np.nan)).fillna(0)
e200 = safe_ta(ta.ema(df['close'], length=200), idx, df['close'])
df['ema200'] = e200
df['dist_ema200'] = ((df['close'] - e200) / e200.replace(0, np.nan)).fillna(0)
if timeframe == '1m':
for lag in [1, 2, 3, 5, 10, 20]:
df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0)
df[f'rsi_lag_{lag}'] = (df['RSI'].shift(lag) / 100.0).fillna(0.5)
df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5)
df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0)
df.fillna(0, inplace=True)
return df
async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
safe_sym = sym.replace('/', '_')
period_suffix = f"{start_ms}_{end_ms}"
scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
if os.path.exists(scores_file):
print(f" πŸ“‚ [{sym}] Data Exists -> Skipping.")
return
print(f" βš™οΈ [CPU] Analyzing {sym}...", flush=True)
t0 = time.time()
df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
df_1m.set_index('datetime', inplace=True)
df_1m = df_1m.sort_index()
frames = {}
frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
numpy_htf = {}
for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
resampled = df_1m.resample(tf_code).agg(agg_dict).dropna()
resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str)
resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
frames[tf_str] = resampled
numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns}
arr_ts_1m = fast_1m['timestamp']
map_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['5m']['timestamp']) - 1)
map_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], arr_ts_1m), 0, len(numpy_htf['15m']['timestamp']) - 1)
map_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1h']['timestamp']) - 1)
map_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], arr_ts_1m), 0, len(numpy_htf['4h']['timestamp']) - 1)
map_1d = np.clip(np.searchsorted(numpy_htf['1d']['timestamp'], arr_ts_1m), 0, len(numpy_htf['1d']['timestamp']) - 1) if '1d' in numpy_htf else np.zeros(len(arr_ts_1m), dtype=int)
hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
hydra_cols = getattr(self.proc.guardian_hydra, 'feature_cols', []) if self.proc.guardian_hydra else []
legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None)
oracle_dir = getattr(self.proc.oracle, 'model_direction', None)
oracle_cols = getattr(self.proc.oracle, 'feature_cols', [])
sniper_models = getattr(self.proc.sniper, 'models', [])
sniper_cols = getattr(self.proc.sniper, 'feature_names', [])
titan_model = getattr(self.proc.titan, 'model', None)
# A. TITAN (Global)
global_titan_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32)
if titan_model:
titan_cols = [
'5m_open', '5m_high', '5m_low', '5m_close', '5m_volume', '5m_RSI', '5m_MACD', '5m_MACD_h',
'5m_CCI', '5m_ADX', '5m_EMA_9_dist', '5m_EMA_21_dist', '5m_EMA_50_dist', '5m_EMA_200_dist',
'5m_BB_w', '5m_BB_p', '5m_MFI', '5m_VWAP_dist', '15m_timestamp', '15m_RSI', '15m_MACD',
'15m_MACD_h', '15m_CCI', '15m_ADX', '15m_EMA_9_dist', '15m_EMA_21_dist', '15m_EMA_50_dist',
'15m_EMA_200_dist', '15m_BB_w', '15m_BB_p', '15m_MFI', '15m_VWAP_dist', '1h_timestamp',
'1h_RSI', '1h_MACD_h', '1h_EMA_50_dist', '1h_EMA_200_dist', '1h_ATR_pct', '4h_timestamp',
'4h_RSI', '4h_MACD_h', '4h_EMA_50_dist', '4h_EMA_200_dist', '4h_ATR_pct', '1d_timestamp',
'1d_RSI', '1d_EMA_200_dist', '1d_Trend_Strong'
]
print(" πŸš€ Running Global Titan...", flush=True)
try:
t_vecs = []
for col in titan_cols:
parts = col.split('_', 1); tf = parts[0]; feat = parts[1]
target_arr = numpy_htf.get(tf, None)
target_map = locals().get(f"map_{tf}", None)
if target_arr and feat in target_arr: t_vecs.append(target_arr[feat][target_map])
elif target_arr and feat == 'timestamp': t_vecs.append(target_arr['timestamp'][target_map])
elif target_arr and feat in ['open','high','low','close','volume']: t_vecs.append(target_arr[feat][target_map])
else: t_vecs.append(np.zeros(len(arr_ts_1m)))
X_TITAN = np.column_stack(t_vecs)
global_titan_scores = _revive_score_distribution(titan_model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols)))
except: pass
# B. SNIPER (Global)
global_sniper_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32)
if sniper_models:
print(" πŸš€ Running Global Sniper...", flush=True)
try:
s_vecs = []
for col in sniper_cols:
if col in fast_1m: s_vecs.append(fast_1m[col])
elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z'])
else: s_vecs.append(np.zeros(len(arr_ts_1m)))
X_SNIPER = np.column_stack(s_vecs)
preds = [m.predict(X_SNIPER) for m in sniper_models]
global_sniper_scores = _revive_score_distribution(np.mean(preds, axis=0))
except: pass
# C. ORACLE (Global)
global_oracle_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32)
if oracle_dir:
print(" πŸš€ Running Global Oracle...", flush=True)
try:
o_vecs = []
for col in oracle_cols:
if col.startswith('1h_'): o_vecs.append(numpy_htf['1h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_1h])
elif col.startswith('15m_'): o_vecs.append(numpy_htf['15m'].get(col[4:], np.zeros(len(arr_ts_1m)))[map_15m])
elif col.startswith('4h_'): o_vecs.append(numpy_htf['4h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_4h])
elif col == 'sim_titan_score': o_vecs.append(global_titan_scores)
elif col == 'sim_mc_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5))
elif col == 'sim_pattern_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5))
else: o_vecs.append(np.zeros(len(arr_ts_1m)))
X_ORACLE = np.column_stack(o_vecs)
preds_o = oracle_dir.predict(X_ORACLE)
preds_o = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0]
global_oracle_scores = _revive_score_distribution(preds_o)
except: pass
# D. LEGACY (Global)
global_v2_scores = np.zeros(len(arr_ts_1m), dtype=np.float32)
if legacy_v2:
try:
l_log = fast_1m['log_ret']; l_rsi = fast_1m['RSI'] / 100.0; l_fib = fast_1m['fib_pos']; l_vol = fast_1m['volatility']
l5_log = numpy_htf['5m']['log_ret'][map_5m]; l5_rsi = numpy_htf['5m']['RSI'][map_5m] / 100.0; l5_fib = numpy_htf['5m']['fib_pos'][map_5m]; l5_trd = numpy_htf['5m']['trend_slope'][map_5m]
l15_log = numpy_htf['15m']['log_ret'][map_15m]; l15_rsi = numpy_htf['15m']['RSI'][map_15m] / 100.0; l15_fib618 = numpy_htf['15m']['dist_fib618'][map_15m]; l15_trd = numpy_htf['15m']['trend_slope'][map_15m]
lags = []
for lag in [1, 2, 3, 5, 10, 20]:
lags.extend([fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']])
X_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lags])
preds = legacy_v2.predict(xgb.DMatrix(X_V2))
global_v2_scores = preds[:, 2] if len(preds.shape) > 1 else preds
except: pass
# Filter
is_candidate = (numpy_htf['1h']['RSI'][map_1h] <= 70) & (global_titan_scores > 0.4) & (global_oracle_scores > 0.4)
candidate_indices = np.where(is_candidate)[0]
start_ts_val = frames['1m'].index[0] + pd.Timedelta(minutes=500)
start_idx_offset = np.searchsorted(arr_ts_1m, int(start_ts_val.timestamp()*1000))
candidate_indices = candidate_indices[candidate_indices >= start_idx_offset]
candidate_indices = candidate_indices[candidate_indices < (len(arr_ts_1m) - 245)]
print(f" 🎯 Candidates: {len(candidate_indices)}. Running Vectorized Hydra...", flush=True)
# πŸš€ VECTORIZED HYDRA SIMULATION πŸš€
ai_results = []
if hydra_models and len(candidate_indices) > 0:
# Prepare Static Features Matrix (Global)
h_static = np.column_stack([
fast_1m['RSI'], numpy_htf['5m']['RSI'][map_5m], numpy_htf['15m']['RSI'][map_15m],
fast_1m['bb_width'], fast_1m['rel_vol'], fast_1m['atr'], fast_1m['close']
]) # Shape: (N, 7)
# Process candidates in chunks to avoid RAM explosion
chunk_size = 5000
for i in range(0, len(candidate_indices), chunk_size):
chunk_idxs = candidate_indices[i:i+chunk_size]
# We need sliding windows of 240 steps for each candidate
# Trick: Use broadcasting or sliding_window_view on static features
# But sliding_window_view on huge array is slow. Better to just slice.
# Vectorized construction for chunk
# 1. Extract entry prices
entries = fast_1m['close'][chunk_idxs]
entries_ts = fast_1m['timestamp'][chunk_idxs]
# 2. Prepare sequences (Vectorized slice is hard in numpy without creating copies)
# We stick to a tight loop or specialized indexing.
# Given we need to construct a [Batch, 240, Features] array for Hydra...
# Fastest way: List comprehension for slicing, then stack.
# Since Hydra is XGBoost, we can flatten the time dimension? No, Hydra is 1D input (snapshot).
# Wait, Hydra predicts Crash Probability for a SNAPSHOT state.
# In simulation, we need to check crash prob at t+1, t+2... t+240.
# That is 240 checks per candidate. 42,000 * 240 = 10 Million checks.
# This IS the bottleneck.
# OPTIMIZATION: Only check Hydra if PnL drops below -0.5% or something? No, that misses the point.
# OPTIMIZATION 2 (Implemented): Vectorize the "Check" logic.
# Construct big matrix for ALL checks: (N_Candidates * 240, Features)
# But that's 10M rows. XGBoost inference on 10M rows takes ~3-5 seconds on CPU. This is feasible!
# Let's do it per candidate to be safe on RAM, but fast.
for idx in chunk_idxs:
# Slicing is fast
sl_st = h_static[idx:idx+240]
sl_close = sl_st[:, 6]; sl_atr = sl_st[:, 5]
entry = fast_1m['close'][idx]
dist = np.maximum(1.5 * sl_atr, entry * 0.015)
pnl = sl_close - entry
norm_pnl = pnl / dist
max_pnl_r = (np.maximum.accumulate(sl_close) - entry) / dist
atr_pct = sl_atr / sl_close
# Stack Hydra Input (240 rows)
# Cols: rsi1, rsi5, rsi15, bb, vol, dist_ema(0), atr_pct, norm, max, dists(0), time, entry(0), oracle, l2, target
zeros = np.zeros(240)
time_vec = np.arange(1, 241)
s_oracle = global_oracle_scores[idx]
X_H = np.column_stack([
sl_st[:,0], sl_st[:,1], sl_st[:,2], sl_st[:,3], sl_st[:,4],
zeros, atr_pct, norm_pnl, max_pnl_r, zeros, zeros, time_vec, zeros,
np.full(240, s_oracle), np.full(240, 0.7), np.full(240, 3.0)
])
# Predict 240 steps at once
max_hydra = 0.0; hydra_time = 0
try:
probs = hydra_models['crash'].predict_proba(X_H)[:, 1]
max_hydra = np.max(probs)
if max_hydra > 0.6:
t = np.argmax(probs)
hydra_time = int(fast_1m['timestamp'][idx + t])
except: pass
# Legacy Max
max_v2 = np.max(global_v2_scores[idx:idx+240])
v2_time = 0
if max_v2 > 0.8:
t2 = np.argmax(global_v2_scores[idx:idx+240])
v2_time = int(fast_1m['timestamp'][idx + t2])
ai_results.append({
'timestamp': int(fast_1m['timestamp'][idx]),
'symbol': sym, 'close': entry,
'real_titan': global_titan_scores[idx],
'oracle_conf': s_oracle,
'sniper_score': global_sniper_scores[idx],
'risk_hydra_crash': max_hydra, 'time_hydra_crash': hydra_time,
'risk_legacy_v2': max_v2, 'time_legacy_panic': v2_time,
'signal_type': 'BREAKOUT', 'l1_score': 50.0
})
dt = time.time() - t0
if ai_results:
pd.DataFrame(ai_results).to_pickle(scores_file)
print(f" βœ… [{sym}] Completed in {dt:.2f} seconds. ({len(ai_results)} signals)", flush=True)
gc.collect()
async def generate_truth_data(self):
if self.force_start_date:
dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000)
print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
for sym in self.TARGET_COINS:
c = await self._fetch_all_data_fast(sym, ms_s, ms_e)
if c: await self._process_data_in_memory(sym, c, ms_s, ms_e)
@staticmethod
def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
print(f" ⏳ [System] Loading {len(scores_files)} datasets...", flush=True)
data = []
for f in scores_files:
try: data.append(pd.read_pickle(f))
except: pass
if not data: return []
df = pd.concat(data).sort_values('timestamp')
ts = df['timestamp'].values; close = df['close'].values.astype(float)
sym = df['symbol'].values; sym_map = {s:i for i,s in enumerate(np.unique(sym))}
sym_id = np.array([sym_map[s] for s in sym])
oracle = df['oracle_conf'].values; sniper = df['sniper_score'].values
hydra = df['risk_hydra_crash'].values; titan = df['real_titan'].values
l1 = df['l1_score'].values
legacy_v2 = df['risk_legacy_v2'].values
N = len(ts)
print(f" πŸš€ [System] Testing {len(combinations_batch)} configs on {N} candles...", flush=True)
res = []
for cfg in combinations_batch:
pos = {}; log = []
bal = initial_capital; alloc = 0.0
mask = (l1 >= cfg['l1_thresh']) & (oracle >= cfg['oracle_thresh']) & (sniper >= cfg['sniper_thresh']) & (titan >= 0.55)
for i in range(N):
s = sym_id[i]; p = close[i]
if s in pos:
entry = pos[s][0]; h_r = pos[s][1]; titan_entry = pos[s][3]
crash_hydra = (h_r > cfg['hydra_thresh'])
panic_legacy = (legacy_v2[i] > cfg['legacy_thresh'])
pnl = (p - entry)/entry
if crash_hydra or panic_legacy or pnl > 0.04 or pnl < -0.02:
realized = pnl - fees_pct*2
bal += pos[s][2] * (1 + realized)
alloc -= pos[s][2]
is_consensus = (titan_entry > 0.55)
log.append({'pnl': realized, 'consensus': is_consensus})
del pos[s]
if len(pos) < max_slots and mask[i]:
if s not in pos and bal >= 5.0:
size = min(10.0, bal * 0.98)
pos[s] = (p, hydra[i], size, titan[i])
bal -= size; alloc += size
final_bal = bal + alloc
profit = final_bal - initial_capital
tot = len(log)
winning = [x for x in log if x['pnl'] > 0]
losing = [x for x in log if x['pnl'] <= 0]
win_count = len(winning); loss_count = len(losing)
win_rate = (win_count/tot*100) if tot else 0
avg_win = np.mean([x['pnl'] for x in winning]) if winning else 0
avg_loss = np.mean([x['pnl'] for x in losing]) if losing else 0
gross_p = sum([x['pnl'] for x in winning])
gross_l = abs(sum([x['pnl'] for x in losing]))
profit_factor = (gross_p / gross_l) if gross_l > 0 else 99.9
max_win_s = 0; max_loss_s = 0; curr_w = 0; curr_l = 0
for t in log:
if t['pnl'] > 0:
curr_w += 1; curr_l = 0
if curr_w > max_win_s: max_win_s = curr_w
else:
curr_l += 1; curr_w = 0
if curr_l > max_loss_s: max_loss_s = curr_l
cons_trades = [x for x in log if x['consensus']]
n_cons = len(cons_trades)
agree_rate = (n_cons/tot*100) if tot else 0
cons_win_rate = (sum(1 for x in cons_trades if x['pnl']>0)/n_cons*100) if n_cons else 0
cons_avg_pnl = (sum(x['pnl'] for x in cons_trades)/n_cons*100) if n_cons else 0
res.append({
'config': cfg, 'final_balance': final_bal, 'net_profit': profit,
'total_trades': tot, 'win_rate': win_rate, 'max_drawdown': 0,
'win_count': win_count, 'loss_count': loss_count,
'avg_win': avg_win, 'avg_loss': avg_loss,
'max_win_streak': max_win_s, 'max_loss_streak': max_loss_s,
'profit_factor': profit_factor,
'consensus_agreement_rate': agree_rate,
'high_consensus_win_rate': cons_win_rate,
'high_consensus_avg_pnl': cons_avg_pnl
})
return res
async def run_optimization(self, target_regime="RANGE"):
await self.generate_truth_data()
oracle_r = np.linspace(0.4, 0.7, 3); sniper_r = np.linspace(0.4, 0.7, 3)
hydra_r = [0.85, 0.95]; l1_r = [10.0]
combos = []
for o, s, h, l1 in itertools.product(oracle_r, sniper_r, hydra_r, l1_r):
combos.append({
'w_titan': 0.4, 'w_struct': 0.3, 'thresh': l1, 'l1_thresh': l1,
'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h, 'legacy_thresh': 0.95
})
files = glob.glob(os.path.join(CACHE_DIR, "*.pkl"))
results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0}
results_list.sort(key=lambda x: x['net_profit'], reverse=True)
best = results_list[0]
# Auto-Diagnosis
diag = []
if best['total_trades'] > 2000 and best['net_profit'] < 10: diag.append("⚠️ Overtrading")
if best['win_rate'] > 55 and best['net_profit'] < 0: diag.append("⚠️ Fee Burn")
if abs(best['avg_loss']) > best['avg_win']: diag.append("⚠️ Risk/Reward Inversion")
if best['max_loss_streak'] > 10: diag.append("⚠️ Consecutive Loss Risk")
if not diag: diag.append("βœ… System Healthy")
print("\n" + "="*60)
print(f"πŸ† CHAMPION REPORT [{target_regime}]:")
print(f" πŸ’° Final Balance: ${best['final_balance']:,.2f}")
print(f" πŸš€ Net PnL: ${best['net_profit']:,.2f}")
print("-" * 60)
print(f" πŸ“Š Total Trades: {best['total_trades']}")
print(f" πŸ“ˆ Win Rate: {best['win_rate']:.1f}%")
print(f" βœ… Winning Trades: {best['win_count']} (Avg: {best['avg_win']*100:.2f}%)")
print(f" ❌ Losing Trades: {best['loss_count']} (Avg: {best['avg_loss']*100:.2f}%)")
print(f" 🌊 Max Streaks: Win {best['max_win_streak']} | Loss {best['max_loss_streak']}")
print(f" βš–οΈ Profit Factor: {best['profit_factor']:.2f}")
print("-" * 60)
print(f" 🧠 CONSENSUS ANALYTICS:")
print(f" 🀝 Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%")
print(f" 🌟 High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%")
print("-" * 60)
print(f" 🩺 DIAGNOSIS: {' '.join(diag)}")
print(f" βš™οΈ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}")
print("="*60)
return best['config'], best
async def run_strategic_optimization_task():
print("\nπŸ§ͺ [STRATEGIC BACKTEST] Vectorized Hydra Speed...")
r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm)
try:
await dm.initialize(); await proc.initialize()
if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True)
hub = AdaptiveHub(r2); await hub.initialize()
opt = HeavyDutyBacktester(dm, proc)
scenarios = [
{"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
{"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
{"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
{"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
]
for s in scenarios:
opt.set_date_range(s["start"], s["end"])
best_cfg, best_stats = await opt.run_optimization(s["regime"])
if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats)
await hub._save_state_to_r2()
print("βœ… [System] DNA Updated.")
finally:
print("πŸ”Œ [System] Closing connections...")
await dm.close()
if __name__ == "__main__":
asyncio.run(run_strategic_optimization_task())