Tradtesting / backtest_engine.py
Riy777's picture
Rename backtest_engine .py to backtest_engine.py
79f30b9 verified
# ============================================================
# 🧪 backtest_engine.py (V159.0 - GEM-Architect: Hyper-Speed Jump Logic)
# ============================================================
import asyncio
import pandas as pd
import numpy as np
import time
import logging
import itertools
import os
import glob
import gc
import sys
import traceback
from datetime import datetime, timezone
from typing import Dict, Any, List
# محاولة استيراد المكتبات
try:
import pandas_ta as ta
except ImportError:
ta = None
try:
from ml_engine.processor import MLProcessor
from ml_engine.data_manager import DataManager
from learning_hub.adaptive_hub import AdaptiveHub
from r2 import R2Service
import xgboost as xgb
except ImportError:
pass
logging.getLogger('ml_engine').setLevel(logging.WARNING)
CACHE_DIR = "backtest_real_scores"
# ============================================================
# ⚡ VECTORIZED HELPERS
# ============================================================
def _z_roll_np(arr, w=500):
if len(arr) < w: return np.zeros_like(arr)
mean = pd.Series(arr).rolling(w).mean().fillna(0).values
std = pd.Series(arr).rolling(w).std().fillna(1).values
return np.nan_to_num((arr - mean) / (std + 1e-9))
def _revive_score_distribution(scores):
scores = np.array(scores, dtype=np.float32).flatten()
s_min, s_max = np.min(scores), np.max(scores)
if (s_max - s_min) < 1e-6: return scores
if s_max < 0.8 or s_min > 0.2:
return (scores - s_min) / (s_max - s_min)
return scores
# ============================================================
# 🧪 THE BACKTESTER CLASS
# ============================================================
class HeavyDutyBacktester:
def __init__(self, data_manager, processor):
self.dm = data_manager
self.proc = processor
# 🎛️ الكثافة (Density): عدد الخطوات في النطاق
self.GRID_DENSITY = 3 # 3 is enough for quick checks, 5 for deep dive
self.INITIAL_CAPITAL = 10.0
self.TRADING_FEES = 0.001
self.MAX_SLOTS = 4
# 🎛️ CONTROL PANEL - DYNAMIC RANGES
self.GRID_RANGES = {
'TITAN': np.linspace(0.10, 0.50, self.GRID_DENSITY),
'ORACLE': np.linspace(0.40, 0.80, self.GRID_DENSITY),
'SNIPER': np.linspace(0.30, 0.70, self.GRID_DENSITY),
'PATTERN': np.linspace(0.10, 0.50, self.GRID_DENSITY),
'L1_SCORE': [10.0],
# Guardians
'HYDRA_CRASH': np.linspace(0.60, 0.85, self.GRID_DENSITY),
'HYDRA_GIVEBACK': np.linspace(0.60, 0.85, self.GRID_DENSITY),
'LEGACY_V2': np.linspace(0.85, 0.98, self.GRID_DENSITY),
}
self.TARGET_COINS = [
'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT',
'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT',
'SEI/USDT', 'MINA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'API3/USDT',
'FLOKI/USDT', 'BABYDOGE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT',
'ONDO/USDT', 'SNX/USDT', 'HBAR/USDT', 'XLM/USDT', 'AGIX/USDT', 'IMX/USDT',
'LRC/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT',
'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'RPL/USDT',
'MNT/USDT', 'RAY/USDT', 'CAKE/USDT', 'SRM/USDT', 'PENDLE/USDT', 'ATOM/USDT'
]
self.force_start_date = None
self.force_end_date = None
if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
print(f"🧪 [Backtest V159.0] Hyper-Speed Jump Engine (CPU Optimized).")
def set_date_range(self, start_str, end_str):
self.force_start_date = start_str
self.force_end_date = end_str
async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
print(f" ⚡ [Network] Downloading {sym}...", flush=True)
limit = 1000
tasks = []
curr = start_ms
while curr < end_ms:
tasks.append(curr)
curr += limit * 60 * 1000
all_candles = []
sem = asyncio.Semaphore(20)
async def _fetch_batch(timestamp):
async with sem:
for _ in range(3):
try: return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
except: await asyncio.sleep(0.5)
return []
chunk_size = 50
for i in range(0, len(tasks), chunk_size):
res = await asyncio.gather(*[_fetch_batch(t) for t in tasks[i:i+chunk_size]])
for r in res:
if r: all_candles.extend(r)
if not all_candles: return None
df = pd.DataFrame(all_candles, columns=['timestamp', 'o', 'h', 'l', 'c', 'v'])
df.drop_duplicates('timestamp', inplace=True)
df = df[(df['timestamp'] >= start_ms) & (df['timestamp'] <= end_ms)].sort_values('timestamp')
print(f" ✅ Downloaded {len(df)} candles.", flush=True)
return df.values.tolist()
# ----------------------------------------------------------------------
# 🏎️ VECTORIZED INDICATORS
# ----------------------------------------------------------------------
def _calculate_indicators_vectorized(self, df, timeframe='1m'):
if df.empty: return df
cols = ['close', 'high', 'low', 'volume', 'open']
for c in cols: df[c] = df[c].astype(np.float64)
# EMAs
df['ema9'] = df['close'].ewm(span=9, adjust=False).mean()
df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema21'] = df['close'].ewm(span=21, adjust=False).mean()
df['ema50'] = df['close'].ewm(span=50, adjust=False).mean()
df['ema200'] = df['close'].ewm(span=200, adjust=False).mean()
if ta:
df['RSI'] = ta.rsi(df['close'], length=14).fillna(50)
df['ATR'] = ta.atr(df['high'], df['low'], df['close'], length=14).fillna(0)
bb = ta.bbands(df['close'], length=20, std=2.0)
df['bb_width'] = bb.iloc[:, 3].fillna(0) if bb is not None else 0.0
macd = ta.macd(df['close'])
if macd is not None:
df['MACD'] = macd.iloc[:, 0].fillna(0)
df['MACD_h'] = macd.iloc[:, 1].fillna(0)
else: df['MACD'] = 0; df['MACD_h'] = 0
df['ADX'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0].fillna(0)
df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20).fillna(0)
df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14).fillna(50)
df['slope'] = ta.slope(df['close'], length=7).fillna(0)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
df['vwap'] = vwap.fillna(df['close']) if vwap is not None else df['close']
c = df['close'].values
df['EMA_9_dist'] = (c / df['ema9'].values) - 1
df['EMA_21_dist'] = (c / df['ema21'].values) - 1
df['EMA_50_dist'] = (c / df['ema50'].values) - 1
df['EMA_200_dist'] = (c / df['ema200'].values) - 1
df['VWAP_dist'] = (c / df['vwap'].values) - 1
df['ATR_pct'] = df['ATR'] / (c + 1e-9)
if timeframe == '1d': df['Trend_Strong'] = np.where(df['ADX'] > 25, 1.0, 0.0)
df['vol_z'] = _z_roll_np(df['volume'].values, 20)
df['rel_vol'] = df['volume'] / (df['volume'].rolling(50).mean() + 1e-9)
df['log_ret'] = np.concatenate([[0], np.diff(np.log(c + 1e-9))])
roll_min = df['low'].rolling(50).min(); roll_max = df['high'].rolling(50).max()
df['fib_pos'] = (c - roll_min) / (roll_max - roll_min + 1e-9)
df['volatility'] = df['ATR_pct']
e20 = df['ema20'].values
e20_s = np.roll(e20, 5); e20_s[:5] = e20[0]
df['trend_slope'] = (e20 - e20_s) / (e20_s + 1e-9)
fib618 = roll_max - ((roll_max - roll_min) * 0.382)
df['dist_fib618'] = (c - fib618) / (c + 1e-9)
df['dist_ema50'] = df['EMA_50_dist']
df['dist_ema200'] = df['EMA_200_dist']
if timeframe == '1m':
df['return_1m'] = df['log_ret']
df['rsi_14'] = df['RSI']
e9 = df['ema9'].values; e9_s = np.roll(e9, 1); e9_s[0] = e9[0]
df['ema_9_slope'] = (e9 - e9_s) / (e9_s + 1e-9)
df['ema_21_dist'] = df['EMA_21_dist']
df['atr_z'] = _z_roll_np(df['ATR'].values, 100)
df['vol_zscore_50'] = _z_roll_np(df['volume'].values, 50)
rng = df['high'].values - df['low'].values
df['candle_range'] = _z_roll_np(rng, 500)
df['close_pos_in_range'] = (c - df['low'].values) / (rng + 1e-9)
dollar_vol = c * df['volume'].values
amihud = np.abs(df['log_ret']) / (dollar_vol + 1e-9)
df['amihud'] = _z_roll_np(amihud, 500)
sign = np.sign(np.diff(c, prepend=c[0]))
signed_vol = sign * df['volume'].values
ofi = pd.Series(signed_vol).rolling(30).sum().fillna(0).values
df['ofi'] = _z_roll_np(ofi, 500)
df['vwap_dev'] = _z_roll_np(c - df['vwap'].values, 500)
for lag in [1, 2, 3, 5, 10, 20]:
df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0)
df[f'rsi_lag_{lag}'] = df['RSI'].shift(lag).fillna(50)/100.0
df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5)
df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0)
df.fillna(0, inplace=True)
return df
async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
safe_sym = sym.replace('/', '_')
period_suffix = f"{start_ms}_{end_ms}"
scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
if os.path.exists(scores_file):
print(f" 📂 [{sym}] Data Exists -> Skipping.")
return
print(f" ⚙️ [CPU] Analyzing {sym}...", flush=True)
t0 = time.time()
df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
df_1m.set_index('datetime', inplace=True)
df_1m = df_1m.sort_index()
frames = {}
frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
numpy_htf = {}
for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
resampled = df_1m.resample(tf_code).agg(agg_dict).dropna()
if resampled.empty:
numpy_htf[tf_str] = {}
continue
resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str)
resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
frames[tf_str] = resampled
numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns}
arr_ts_1m = fast_1m['timestamp']
def get_map(tf):
if tf not in numpy_htf or 'timestamp' not in numpy_htf[tf]: return np.zeros(len(arr_ts_1m), dtype=int)
return np.clip(np.searchsorted(numpy_htf[tf]['timestamp'], arr_ts_1m), 0, len(numpy_htf[tf]['timestamp']) - 1)
map_5m = get_map('5m'); map_15m = get_map('15m'); map_1h = get_map('1h'); map_4h = get_map('4h')
titan_model = getattr(self.proc.titan, 'model', None)
oracle_dir = getattr(self.proc.oracle, 'model_direction', None)
oracle_cols = getattr(self.proc.oracle, 'feature_cols', [])
sniper_models = getattr(self.proc.sniper, 'models', [])
sniper_cols = getattr(self.proc.sniper, 'feature_names', [])
hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None)
# --- BATCH PREDICTIONS ---
global_titan_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32)
if titan_model:
titan_cols = [
'5m_open', '5m_high', '5m_low', '5m_close', '5m_volume', '5m_RSI', '5m_MACD', '5m_MACD_h',
'5m_CCI', '5m_ADX', '5m_EMA_9_dist', '5m_EMA_21_dist', '5m_EMA_50_dist', '5m_EMA_200_dist',
'5m_BB_w', '5m_BB_p', '5m_MFI', '5m_VWAP_dist', '15m_timestamp', '15m_RSI', '15m_MACD',
'15m_MACD_h', '15m_CCI', '15m_ADX', '15m_EMA_9_dist', '15m_EMA_21_dist', '15m_EMA_50_dist',
'15m_EMA_200_dist', '15m_BB_w', '15m_BB_p', '15m_MFI', '15m_VWAP_dist', '1h_timestamp',
'1h_RSI', '1h_MACD_h', '1h_EMA_50_dist', '1h_EMA_200_dist', '1h_ATR_pct', '4h_timestamp',
'4h_RSI', '4h_MACD_h', '4h_EMA_50_dist', '4h_EMA_200_dist', '4h_ATR_pct', '1d_timestamp',
'1d_RSI', '1d_EMA_200_dist', '1d_Trend_Strong'
]
try:
t_vecs = []
for col in titan_cols:
parts = col.split('_', 1); tf = parts[0]; feat = parts[1]
target_arr = numpy_htf.get(tf, {})
target_map = locals().get(f"map_{tf}", np.zeros(len(arr_ts_1m), dtype=int))
if feat in target_arr: t_vecs.append(target_arr[feat][target_map])
elif feat == 'timestamp' and 'timestamp' in target_arr: t_vecs.append(target_arr['timestamp'][target_map])
elif feat in ['open','high','low','close','volume'] and feat in target_arr: t_vecs.append(target_arr[feat][target_map])
else: t_vecs.append(np.zeros(len(arr_ts_1m)))
X_TITAN = np.column_stack(t_vecs)
global_titan_scores = _revive_score_distribution(titan_model.predict(xgb.DMatrix(X_TITAN, feature_names=titan_cols)))
except: pass
global_oracle_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32)
if oracle_dir:
try:
o_vecs = []
for col in oracle_cols:
if col.startswith('1h_'): o_vecs.append(numpy_htf['1h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_1h])
elif col.startswith('15m_'): o_vecs.append(numpy_htf['15m'].get(col[4:], np.zeros(len(arr_ts_1m)))[map_15m])
elif col.startswith('4h_'): o_vecs.append(numpy_htf['4h'].get(col[3:], np.zeros(len(arr_ts_1m)))[map_4h])
elif col == 'sim_titan_score': o_vecs.append(global_titan_scores)
elif col == 'sim_mc_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5))
elif col == 'sim_pattern_score': o_vecs.append(np.full(len(arr_ts_1m), 0.5))
else: o_vecs.append(np.zeros(len(arr_ts_1m)))
X_ORACLE = np.column_stack(o_vecs)
preds_o = oracle_dir.predict(X_ORACLE)
preds_o = preds_o if isinstance(preds_o, np.ndarray) and len(preds_o.shape)==1 else preds_o[:, 0]
global_oracle_scores = _revive_score_distribution(preds_o)
except: pass
global_sniper_scores = np.full(len(arr_ts_1m), 0.5, dtype=np.float32)
if sniper_models:
try:
s_vecs = []
for col in sniper_cols:
if col in fast_1m: s_vecs.append(fast_1m[col])
elif col == 'atr' and 'atr_z' in fast_1m: s_vecs.append(fast_1m['atr_z'])
else: s_vecs.append(np.zeros(len(arr_ts_1m)))
X_SNIPER = np.column_stack(s_vecs)
preds = [m.predict(X_SNIPER) for m in sniper_models]
global_sniper_scores = _revive_score_distribution(np.mean(preds, axis=0))
except: pass
global_v2_scores = np.zeros(len(arr_ts_1m), dtype=np.float32)
if legacy_v2:
try:
l_log = fast_1m['log_ret']; l_rsi = fast_1m['RSI'] / 100.0; l_fib = fast_1m['fib_pos']; l_vol = fast_1m['volatility']
l5_log = numpy_htf['5m']['log_ret'][map_5m]; l5_rsi = numpy_htf['5m']['RSI'][map_5m] / 100.0; l5_fib = numpy_htf['5m']['fib_pos'][map_5m]; l5_trd = numpy_htf['5m']['trend_slope'][map_5m]
l15_log = numpy_htf['15m']['log_ret'][map_15m]; l15_rsi = numpy_htf['15m']['RSI'][map_15m] / 100.0; l15_fib618 = numpy_htf['15m']['dist_fib618'][map_15m]; l15_trd = numpy_htf['15m']['trend_slope'][map_15m]
lags = []
for lag in [1, 2, 3, 5, 10, 20]:
lags.extend([fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'], fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']])
X_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lags])
preds = legacy_v2.predict(xgb.DMatrix(X_V2))
global_v2_scores = preds[:, 2] if len(preds.shape) > 1 else preds
global_v2_scores = global_v2_scores.flatten()
except: pass
global_hydra_crash = np.zeros(len(arr_ts_1m), dtype=np.float32)
global_hydra_give = np.zeros(len(arr_ts_1m), dtype=np.float32)
if hydra_models:
try:
zeros = np.zeros(len(arr_ts_1m))
h_static = np.column_stack([
fast_1m['RSI'], numpy_htf['5m']['RSI'][map_5m], numpy_htf['15m']['RSI'][map_15m],
fast_1m['bb_width'], fast_1m['rel_vol'], fast_1m['atr'], fast_1m['close']
])
X_H = np.column_stack([
h_static[:,0], h_static[:,1], h_static[:,2], h_static[:,3], h_static[:,4],
zeros, fast_1m['ATR_pct'], zeros, zeros, zeros, zeros, zeros, zeros,
global_oracle_scores, np.full(len(arr_ts_1m), 0.7), np.full(len(arr_ts_1m), 3.0)
])
probs_c = hydra_models['crash'].predict_proba(X_H)[:, 1]
global_hydra_crash = probs_c.astype(np.float32)
probs_g = hydra_models['giveback'].predict_proba(X_H)[:, 1]
global_hydra_give = probs_g.astype(np.float32)
except: pass
# Filter
rsi_1h = numpy_htf['1h'].get('RSI', np.zeros(len(arr_ts_1m)))[map_1h]
# Keep candles where at least minimal promise exists (reduces size)
is_candidate_mask = (rsi_1h <= 70) & (global_titan_scores > 0.3) & (global_oracle_scores > 0.3)
candidate_indices = np.where(is_candidate_mask)[0]
end_limit = len(arr_ts_1m) - 60
candidate_indices = candidate_indices[candidate_indices < end_limit]
candidate_indices = candidate_indices[candidate_indices >= 500]
print(f" 🌪️ Final List: {len(candidate_indices)} candidates ready for testing.", flush=True)
ai_results = pd.DataFrame({
'timestamp': arr_ts_1m[candidate_indices],
'symbol': sym,
'close': fast_1m['close'][candidate_indices],
'real_titan': global_titan_scores[candidate_indices],
'oracle_conf': global_oracle_scores[candidate_indices],
'sniper_score': global_sniper_scores[candidate_indices],
'pattern_score': np.full(len(candidate_indices), 0.5),
'risk_hydra_crash': global_hydra_crash[candidate_indices],
'risk_hydra_giveback': global_hydra_give[candidate_indices],
'risk_legacy_v2': global_v2_scores[candidate_indices],
'time_hydra_crash': np.zeros(len(candidate_indices), dtype=int),
'l1_score': 50.0
})
dt = time.time() - t0
if not ai_results.empty:
ai_results.to_pickle(scores_file)
print(f" ✅ [{sym}] Completed in {dt:.2f} seconds. ({len(ai_results)} signals)", flush=True)
gc.collect()
async def generate_truth_data(self):
if self.force_start_date:
dt_s = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
dt_e = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
ms_s = int(dt_s.timestamp()*1000); ms_e = int(dt_e.timestamp()*1000)
print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
for sym in self.TARGET_COINS:
c = await self._fetch_all_data_fast(sym, ms_s, ms_e)
if c: await self._process_data_in_memory(sym, c, ms_s, ms_e)
@staticmethod
def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
"""🚀 HYPER-SPEED JUMP LOGIC (NO LOOPING OVER IDLE CANDLES)"""
print(f" ⏳ [System] Loading {len(scores_files)} datasets...", flush=True)
data = []
for f in scores_files:
try: data.append(pd.read_pickle(f))
except: pass
if not data: return []
df = pd.concat(data).sort_values('timestamp').reset_index(drop=True)
# Pre-load arrays for max speed
ts = df['timestamp'].values
close = df['close'].values.astype(float)
sym = df['symbol'].values
u_syms = np.unique(sym); sym_map = {s: i for i, s in enumerate(u_syms)}; sym_id = np.array([sym_map[s] for s in sym])
oracle = df['oracle_conf'].values
sniper = df['sniper_score'].values
titan = df['real_titan'].values
pattern = df['pattern_score'].values
l1 = df['l1_score'].values
hydra = df['risk_hydra_crash'].values
hydra_give = df['risk_hydra_giveback'].values
legacy = df['risk_legacy_v2'].values
N = len(ts)
print(f" 🚀 [System] Testing {len(combinations_batch)} configs on {N} candidates...", flush=True)
res = []
for cfg in combinations_batch:
# 1. Vectorized Entry Mask (The Speed Secret)
# Instead of checking every candle, we calculate ALL valid entries at once
entry_mask = (l1 >= cfg['L1_SCORE']) & \
(oracle >= cfg['ORACLE']) & \
(sniper >= cfg['SNIPER']) & \
(titan >= cfg['TITAN']) & \
(pattern >= cfg.get('PATTERN', 0.10))
# Get only the indices where entry is possible
valid_entry_indices = np.where(entry_mask)[0]
# Extract thresholds locally to avoid dictionary lookups in inner loop
h_crash_thresh = cfg['HYDRA_CRASH']
h_give_thresh = cfg['HYDRA_GIVEBACK']
leg_thresh = cfg['LEGACY_V2']
# Simulation State
pos = {} # sym_id -> (entry_price, size)
bal = float(initial_capital)
alloc = 0.0
log = []
# Iterate ONLY on relevant indices (Jump!)
# But we must respect time. So we iterate valid indices,
# and check exits for OPEN positions at that time step?
# Problem: If we jump, we miss exits between entries.
# Fix: We must iterate all rows for exits, but we can skip logic if no pos.
# OR: Since df is filtered candidates only, gaps exist.
# We assume candidates are frequent enough or we only check exits on candidate candles.
# *Refinement*: The dataframe `df` only contains ~30k candidates out of 100k candles.
# Exiting only on candidate candles is an approximation, but acceptable for optimization speed.
for i in range(N):
s = sym_id[i]; p = float(close[i])
# A. Check Exits (If holding this symbol)
if s in pos:
entry_p, size_val = pos[s]
pnl = (p - entry_p) / entry_p
# Guardian Logic (Local vars)
is_guard = (hydra[i] > h_crash_thresh) or \
(hydra_give[i] > h_give_thresh) or \
(legacy[i] > leg_thresh)
# VETO (Price Confirmation)
confirmed = is_guard and (pnl < -0.0015)
if confirmed or (pnl > 0.04) or (pnl < -0.02):
realized = pnl - (fees_pct * 2)
bal += size_val * (1.0 + realized)
alloc -= size_val
del pos[s]
log.append({'pnl': realized})
continue # Can't buy same candle we sold
# B. Check Entries (Only if mask is True)
if entry_mask[i] and len(pos) < max_slots:
if s not in pos and bal >= 5.0:
size = min(10.0, bal * 0.98)
pos[s] = (p, size)
bal -= size; alloc += size
# Calc Stats
final_bal = bal + alloc
profit = final_bal - initial_capital
tot = len(log)
winning = [x for x in log if x['pnl'] > 0]
losing = [x for x in log if x['pnl'] <= 0]
win_rate = (len(winning)/tot*100) if tot > 0 else 0.0
avg_win = np.mean([x['pnl'] for x in winning]) if winning else 0.0
avg_loss = np.mean([x['pnl'] for x in losing]) if losing else 0.0
gross_p = sum([x['pnl'] for x in winning])
gross_l = abs(sum([x['pnl'] for x in losing]))
profit_factor = (gross_p / gross_l) if gross_l > 0 else 99.9
# Simple streaks
max_win_s = 0; max_loss_s = 0; curr_w = 0; curr_l = 0
for t in log:
if t['pnl'] > 0: curr_w +=1; curr_l = 0; max_win_s = max(max_win_s, curr_w)
else: curr_l +=1; curr_w = 0; max_loss_s = max(max_loss_s, curr_l)
res.append({
'config': cfg, 'final_balance': final_bal, 'net_profit': profit,
'total_trades': tot, 'win_rate': win_rate, 'profit_factor': profit_factor,
'win_count': len(winning), 'loss_count': len(losing),
'avg_win': avg_win, 'avg_loss': avg_loss,
'max_win_streak': max_win_s, 'max_loss_streak': max_loss_s,
'consensus_agreement_rate': 0.0, 'high_consensus_win_rate': 0.0
})
return res
async def run_optimization(self, target_regime="RANGE"):
await self.generate_truth_data()
keys = list(self.GRID_RANGES.keys())
values = list(self.GRID_RANGES.values())
combos = [dict(zip(keys, c)) for c in itertools.product(*values)]
files = glob.glob(os.path.join(CACHE_DIR, "*.pkl"))
results_list = self._worker_optimize(combos, files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
if not results_list: return None, {'net_profit': 0.0, 'win_rate': 0.0}
results_list.sort(key=lambda x: x['net_profit'], reverse=True)
best = results_list[0]
mapped_config = {
'w_titan': best['config']['TITAN'],
'w_struct': best['config']['PATTERN'],
'thresh': best['config']['L1_SCORE'],
'oracle_thresh': best['config']['ORACLE'],
'sniper_thresh': best['config']['SNIPER'],
'hydra_thresh': best['config']['HYDRA_CRASH'],
'legacy_thresh': best['config']['LEGACY_V2']
}
# Diagnosis
diag = []
if best['total_trades'] > 2000 and best['net_profit'] < 10: diag.append("⚠️ Overtrading")
if best['win_rate'] > 55 and best['net_profit'] < 0: diag.append("⚠️ Fee Burn")
if abs(best['avg_loss']) > best['avg_win'] and best['win_count'] > 0: diag.append("⚠️ Risk/Reward Inversion")
if best['max_loss_streak'] > 10: diag.append("⚠️ Consecutive Loss Risk")
if not diag: diag.append("✅ System Healthy")
print("\n" + "="*60)
print(f"🏆 CHAMPION REPORT [{target_regime}]:")
print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}")
print("-" * 60)
print(f" 📊 Total Trades: {best['total_trades']}")
print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
print(f" ✅ Winning Trades: {best['win_count']} (Avg: {best['avg_win']*100:.2f}%)")
print(f" ❌ Losing Trades: {best['loss_count']} (Avg: {best['avg_loss']*100:.2f}%)")
print(f" 🌊 Max Streaks: Win {best['max_win_streak']} | Loss {best['max_loss_streak']}")
print(f" ⚖️ Profit Factor: {best['profit_factor']:.2f}")
print("-" * 60)
print(f" 🧠 CONSENSUS ANALYTICS:")
print(f" 🤝 Model Agreement Rate: {best.get('consensus_agreement_rate', 0.0):.1f}%")
print(f" 🌟 High-Consensus Win Rate: {best.get('high_consensus_win_rate', 0.0):.1f}%")
print("-" * 60)
print(f" 🩺 DIAGNOSIS: {' '.join(diag)}")
p_str = ""
for k, v in mapped_config.items():
if isinstance(v, float): p_str += f"{k}={v:.2f} | "
else: p_str += f"{k}={v} | "
print(f" ⚙️ Config: {p_str}")
print("="*60)
return mapped_config, best
async def run_strategic_optimization_task():
print("\n🧪 [STRATEGIC BACKTEST] Hyper-Speed Jump Engine...")
r2 = R2Service(); dm = DataManager(None, None, r2); proc = MLProcessor(dm)
try:
await dm.initialize(); await proc.initialize()
if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True)
hub = AdaptiveHub(r2); await hub.initialize()
opt = HeavyDutyBacktester(dm, proc)
scenarios = [
{"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
{"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"},
{"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
{"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
]
for s in scenarios:
opt.set_date_range(s["start"], s["end"])
best_cfg, best_stats = await opt.run_optimization(s["regime"])
if best_cfg: hub.submit_challenger(s["regime"], best_cfg, best_stats)
await hub._save_state_to_r2()
print("✅ [System] DNA Updated.")
finally:
print("🔌 [System] Closing connections...")
await dm.close()
if __name__ == "__main__":
asyncio.run(run_strategic_optimization_task())