Tradtesting / backtest_engine.py
Riy777's picture
Update backtest_engine.py
bbf82d6 verified
raw
history blame
32.4 kB
# ============================================================
# 🧪 backtest_engine.py (V118.5 - GEM-Architect: Hyper-Vectorized)
# ============================================================
import asyncio
import pandas as pd
import numpy as np
import pandas_ta as ta
import time
import logging
import itertools
import os
import glob
import gc
import sys
import traceback
from datetime import datetime, timezone
from typing import Dict, Any, List
# ✅ استيراد المحركات الأساسية
try:
from ml_engine.processor import MLProcessor, SystemLimits
from ml_engine.data_manager import DataManager
from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
from r2 import R2Service
import ccxt.async_support as ccxt
import xgboost as xgb
except ImportError:
print("❌ [Import Error] Critical ML modules missing.")
pass
logging.getLogger('ml_engine').setLevel(logging.WARNING)
CACHE_DIR = "backtest_real_scores"
class HeavyDutyBacktester:
def __init__(self, data_manager, processor):
self.dm = data_manager
self.proc = processor
# 🎛️ كثافة شبكة البحث
self.GRID_DENSITY = 3
# إعدادات المحفظة
self.INITIAL_CAPITAL = 10.0
self.TRADING_FEES = 0.001
self.MAX_SLOTS = 4
self.TARGET_COINS = [
'SOL/USDT', 'XRP/USDT', 'DOGE/USDT'
]
self.force_start_date = None
self.force_end_date = None
# 🔥 تنظيف الكاش 🔥
if os.path.exists(CACHE_DIR):
files = glob.glob(os.path.join(CACHE_DIR, "*"))
print(f"🧹 [System] Flushing Cache: Deleting {len(files)} old files...", flush=True)
for f in files:
try: os.remove(f)
except: pass
else:
os.makedirs(CACHE_DIR)
print(f"🧪 [Backtest V118.5] Hyper-Vectorized Mode. Models: {self._check_models_status()}")
def _check_models_status(self):
status = []
if self.proc.titan: status.append("Titan")
if self.proc.oracle and getattr(self.proc.oracle, 'model_direction', None): status.append("Oracle")
if self.proc.sniper and getattr(self.proc.sniper, 'models', None): status.append("Sniper")
if self.proc.guardian_hydra: status.append("Hydra")
return "+".join(status) if status else "None"
def set_date_range(self, start_str, end_str):
self.force_start_date = start_str
self.force_end_date = end_str
# ==============================================================
# ⚡ FAST DATA DOWNLOADER
# ==============================================================
async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
print(f" ⚡ [Network] Downloading {sym}...", flush=True)
limit = 1000
duration_per_batch = limit * 60 * 1000
tasks = []
current = start_ms
while current < end_ms:
tasks.append(current)
current += duration_per_batch
all_candles = []
sem = asyncio.Semaphore(10)
async def _fetch_batch(timestamp):
async with sem:
for _ in range(3):
try:
return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
except: await asyncio.sleep(1)
return []
chunk_size = 20
for i in range(0, len(tasks), chunk_size):
chunk_tasks = tasks[i:i + chunk_size]
futures = [_fetch_batch(ts) for ts in chunk_tasks]
results = await asyncio.gather(*futures)
for res in results:
if res: all_candles.extend(res)
if not all_candles: return None
filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
seen = set(); unique_candles = []
for c in filtered:
if c[0] not in seen:
unique_candles.append(c)
seen.add(c[0])
unique_candles.sort(key=lambda x: x[0])
print(f" ✅ Downloaded {len(unique_candles)} candles.", flush=True)
return unique_candles
# ==============================================================
# 🏎️ VECTORIZED INDICATORS (Robust)
# ==============================================================
def _calculate_indicators_vectorized(self, df, timeframe='1m'):
for col in ['close', 'high', 'low', 'volume', 'open']:
df[col] = df[col].astype(float)
df['rsi'] = ta.rsi(df['close'], length=14)
df['ema20'] = ta.ema(df['close'], length=20)
df['ema50'] = ta.ema(df['close'], length=50)
df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
# Global calc
df['vol_ma50'] = df['volume'].rolling(50).mean()
df['rel_vol'] = df['volume'] / (df['vol_ma50'] + 1e-9)
if timeframe in ['1m', '5m', '15m']:
sma20 = df['close'].rolling(20).mean()
std20 = df['close'].rolling(20).std()
df['bb_width'] = ((sma20 + 2*std20) - (sma20 - 2*std20)) / sma20
vol_mean = df['volume'].rolling(20).mean()
vol_std = df['volume'].rolling(20).std()
df['vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9)
df['atr_pct'] = df['atr'] / df['close']
# L1 Score
rsi_penalty = np.where(df['rsi'] > 70, (df['rsi'] - 70) * 2, 0)
l1_score_raw = (df['rel_vol'] * 10) + (df['atr_pct'] * 1000) - rsi_penalty
df['l1_score'] = l1_score_raw.fillna(0)
if timeframe == '1m':
df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
df['ret'] = df['close'].pct_change()
roll_max = df['high'].rolling(50).max()
roll_min = df['low'].rolling(50).min()
diff = (roll_max - roll_min).replace(0, 1e-9)
df['fib_pos'] = (df['close'] - roll_min) / diff
df['volatility'] = df['atr'] / df['close']
df['trend_slope'] = (df['ema20'] - df['ema20'].shift(5)) / df['ema20'].shift(5)
for lag in [1, 2, 3, 5, 10, 20]:
df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0)
df[f'rsi_lag_{lag}'] = (df['rsi'].shift(lag).fillna(50) / 100.0)
df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5)
df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0)
r = df['volume'].rolling(500).mean()
s = df['volume'].rolling(500).std()
df['vol_zscore_50'] = ((df['volume'] - r) / s).fillna(0)
df.fillna(0, inplace=True)
return df
# ==============================================================
# 🧠 CPU PROCESSING (HYPER-VECTORIZED)
# ==============================================================
async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
safe_sym = sym.replace('/', '_')
period_suffix = f"{start_ms}_{end_ms}"
scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
if os.path.exists(scores_file):
print(f" 📂 [{sym}] Data Exists -> Skipping.")
return
print(f" ⚙️ [CPU] Analyzing {sym} (Hyper-Vectorized Mode)...", flush=True)
t0 = time.time()
# 1. Data Prep
df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
df_1m.set_index('datetime', inplace=True)
df_1m = df_1m.sort_index()
frames = {}
agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
numpy_htf = {}
for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
resampled = df_1m.resample(tf_code).agg(agg_dict).dropna()
resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str)
resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
frames[tf_str] = resampled
numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns}
# 2. Time Alignment (Vectorized)
map_1m_to_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['1h']['timestamp'])-1)
map_1m_to_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['5m']['timestamp'])-1)
map_1m_to_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['15m']['timestamp'])-1)
map_1m_to_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['4h']['timestamp'])-1)
# 3. Model Access
oracle_dir_model = getattr(self.proc.oracle, 'model_direction', None)
sniper_models = getattr(self.proc.sniper, 'models', [])
hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None)
# 4. 🔥 Pre-Calc Legacy V2 (Vectorized) 🔥
global_v2_probs = np.zeros(len(fast_1m['close']))
if legacy_v2:
try:
# Direct array construction
l_log = fast_1m['log_ret']
l_rsi = fast_1m['rsi'] / 100.0
l_fib = fast_1m['fib_pos']
l_vol = fast_1m['volatility']
l5_log = numpy_htf['5m']['log_ret'][map_1m_to_5m]
l5_rsi = numpy_htf['5m']['rsi'][map_1m_to_5m] / 100.0
l5_fib = numpy_htf['5m']['fib_pos'][map_1m_to_5m]
l5_trd = numpy_htf['5m']['trend_slope'][map_1m_to_5m]
l15_log = numpy_htf['15m']['log_ret'][map_1m_to_15m]
l15_rsi = numpy_htf['15m']['rsi'][map_1m_to_15m] / 100.0
l15_fib618 = numpy_htf['15m']['dist_fib618'][map_1m_to_15m]
l15_trd = numpy_htf['15m']['trend_slope'][map_1m_to_15m]
lag_cols = []
for lag in [1, 2, 3, 5, 10, 20]:
lag_cols.extend([
fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'],
fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']
])
X_GLOBAL_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lag_cols])
global_v2_probs = legacy_v2.predict(xgb.DMatrix(X_GLOBAL_V2))
if len(global_v2_probs.shape) > 1: global_v2_probs = global_v2_probs[:, 2]
except: pass
# 5. 🔥 Pre-Assemble Hydra Static 🔥
global_hydra_static = None
if hydra_models:
try:
h_rsi_1m = fast_1m['rsi']
h_rsi_5m = numpy_htf['5m']['rsi'][map_1m_to_5m]
h_rsi_15m = numpy_htf['15m']['rsi'][map_1m_to_15m]
h_bb = fast_1m['bb_width']
h_vol = fast_1m['rel_vol']
h_atr = fast_1m['atr']
h_close = fast_1m['close']
global_hydra_static = np.column_stack([h_rsi_1m, h_rsi_5m, h_rsi_15m, h_bb, h_vol, h_atr, h_close])
except: pass
# 6. Candidate Filtering
valid_indices_mask = fast_1m['l1_score'] >= 5.0
valid_indices = np.where(valid_indices_mask)[0]
# Skip warmup and tail
mask_bounds = (valid_indices > 500) & (valid_indices < len(fast_1m['close']) - 245)
final_valid_indices = valid_indices[mask_bounds]
print(f" 🎯 Raw Candidates (Score > 5): {len(final_valid_indices)}. Vectorized Scoring...", flush=True)
# 🚀 HYPER-VECTORIZATION START 🚀
# Instead of looping, we construct the BIG matrices for all candidates at once.
# This brings speed back to ~60s
num_candidates = len(final_valid_indices)
if num_candidates == 0: return
# --- A. ORACLE MATRIX CONSTRUCTION ---
oracle_preds = np.full(num_candidates, 0.5)
if oracle_dir_model:
try:
# Mapped Indices for all candidates
idx_1h = map_1m_to_1h[final_valid_indices]
idx_15m = map_1m_to_15m[final_valid_indices]
idx_4h = map_1m_to_4h[final_valid_indices]
titan_scores = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95)
oracle_features = []
for col in getattr(self.proc.oracle, 'feature_cols', []):
if col.startswith('1h_'):
c = col[3:]
oracle_features.append(numpy_htf['1h'][c][idx_1h] if c in numpy_htf['1h'] else np.zeros(num_candidates))
elif col.startswith('15m_'):
c = col[4:]
oracle_features.append(numpy_htf['15m'][c][idx_15m] if c in numpy_htf['15m'] else np.zeros(num_candidates))
elif col.startswith('4h_'):
c = col[3:]
oracle_features.append(numpy_htf['4h'][c][idx_4h] if c in numpy_htf['4h'] else np.zeros(num_candidates))
elif col == 'sim_titan_score': oracle_features.append(titan_scores)
elif col == 'sim_mc_score': oracle_features.append(np.full(num_candidates, 0.5))
elif col == 'sim_pattern_score': oracle_features.append(np.full(num_candidates, 0.5))
else: oracle_features.append(np.zeros(num_candidates))
X_oracle_big = np.column_stack(oracle_features)
preds = oracle_dir_model.predict(X_oracle_big)
# Handle output shape
if len(preds.shape) > 1 and preds.shape[1] > 1:
oracle_preds = preds[:, 1] # Prob of Class 1
else:
oracle_preds = preds.flatten()
# If model outputs 0/1 class, we might need proba. Assuming predict gives prob or class.
# Adjust if simple XGB classifier gives 0/1. For backtest, assume regression or proba.
except Exception as e: print(f"Oracle Error: {e}")
# --- B. SNIPER MATRIX CONSTRUCTION ---
sniper_preds = np.full(num_candidates, 0.5)
if sniper_models:
try:
sniper_features = []
for col in getattr(self.proc.sniper, 'feature_names', []):
if col in fast_1m: sniper_features.append(fast_1m[col][final_valid_indices])
elif col == 'L_score': sniper_features.append(fast_1m.get('vol_zscore_50', np.zeros(len(fast_1m['close'])))[final_valid_indices])
else: sniper_features.append(np.zeros(num_candidates))
X_sniper_big = np.column_stack(sniper_features)
# Ensemble Average
preds_list = [m.predict(X_sniper_big) for m in sniper_models]
sniper_preds = np.mean(preds_list, axis=0)
except Exception as e: print(f"Sniper Error: {e}")
# --- C. HYDRA MATRIX CONSTRUCTION (The Heavy One) ---
hydra_risk_preds = np.zeros(num_candidates)
hydra_time_preds = np.zeros(num_candidates, dtype=int)
# Hydra is sequence-based (window of 240). Vectorizing this is tricky without exploding memory.
# We will iterate but ONLY for prediction input construction, which is lighter than full logic.
# Actually, for 95k candidates, a (95000, 240, features) array is huge.
# We MUST batch Hydra. But efficiently.
if hydra_models and global_hydra_static is not None:
# We process in chunks of 5000 to keep memory sane
chunk_size = 5000
for i in range(0, num_candidates, chunk_size):
chunk_indices = final_valid_indices[i : i + chunk_size]
# Build batch X
batch_X = []
valid_batch_indices = [] # Map back to chunk index
for k, idx in enumerate(chunk_indices):
start = idx + 1
end = start + 240
# Quick slice
sl_static = global_hydra_static[start:end]
entry_p = fast_1m['close'][idx]
sl_close = sl_static[:, 6]
sl_atr = sl_static[:, 5]
sl_dist = np.maximum(1.5 * sl_atr, entry_p * 0.015)
sl_pnl = sl_close - entry_p
sl_norm_pnl = sl_pnl / sl_dist
# Accumulate max - vectorized for the window
sl_cum_max = np.maximum.accumulate(sl_close)
sl_cum_max = np.maximum(sl_cum_max, entry_p)
sl_max_pnl_r = (sl_cum_max - entry_p) / sl_dist
sl_atr_pct = sl_atr / sl_close
# Static cols
zeros = np.zeros(240); ones = np.ones(240)
row = np.column_stack([
sl_static[:, 0], sl_static[:, 1], sl_static[:, 2],
sl_static[:, 3], sl_static[:, 4],
zeros, sl_atr_pct, sl_norm_pnl, sl_max_pnl_r,
zeros, zeros, time_vec,
zeros, ones*0.6, ones*0.7, ones*3.0
])
batch_X.append(row)
valid_batch_indices.append(i + k) # Global index in final_valid_indices
if batch_X:
try:
big_X = np.array(batch_X) # Shape: (Batch, 240, Feats)
# Flatten for 2D model if needed, or keeping 3D depending on Hydra.
# Assuming Hydra uses 2D input (stacking windows):
big_X_flat = big_X.reshape(-1, big_X.shape[-1])
preds_flat = hydra_models['crash'].predict_proba(big_X_flat)[:, 1]
# Reshape back to (Batch, 240)
preds_batch = preds_flat.reshape(len(batch_X), 240)
# Extract Max Risk & Time
batch_max_risk = np.max(preds_batch, axis=1)
# Find first index > thresh (0.6) for time
over_thresh = preds_batch > 0.6
# argmax gives first True index
has_crash = over_thresh.any(axis=1)
crash_times_rel = np.argmax(over_thresh, axis=1)
# Map back to global results
for j, glob_idx in enumerate(valid_batch_indices):
hydra_risk_preds[glob_idx] = batch_max_risk[j]
if has_crash[j]:
# Calc absolute timestamp
start_t_idx = final_valid_indices[glob_idx] + 1
abs_time = fast_1m['timestamp'][start_t_idx + crash_times_rel[j]]
hydra_time_preds[glob_idx] = abs_time
except Exception: pass
# --- D. LEGACY V2 MAPPING ---
legacy_risk_preds = np.zeros(num_candidates)
legacy_time_preds = np.zeros(num_candidates, dtype=int)
if legacy_v2:
# Vectorized mapping logic
# For each candidate at idx, scan global_v2_probs[idx+1 : idx+241]
# This is a sliding window max. Can be slow if looped.
# Fast approx: Check max just for the entry? No, need lookahead.
# We loop simply because it's fast scalar lookups.
for k, idx in enumerate(final_valid_indices):
start = idx + 1
if start + 240 < len(global_v2_probs):
window = global_v2_probs[start : start + 240]
legacy_risk_preds[k] = np.max(window)
# Time logic can be added if needed, sticking to max risk for now
# --- E. CONSTRUCT FINAL DATAFRAME ---
# Titan Proxy
titan_scores_final = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95)
l1_scores_final = fast_1m['l1_score'][final_valid_indices]
timestamps_final = fast_1m['timestamp'][final_valid_indices]
closes_final = fast_1m['close'][final_valid_indices]
ai_df = pd.DataFrame({
'timestamp': timestamps_final,
'symbol': sym,
'close': closes_final,
'real_titan': titan_scores_final,
'oracle_conf': oracle_preds,
'sniper_score': sniper_preds,
'l1_score': l1_scores_final,
'risk_hydra_crash': hydra_risk_preds,
'time_hydra_crash': hydra_time_preds,
'risk_legacy_v2': legacy_risk_preds,
'time_legacy_panic': legacy_time_preds
})
dt = time.time() - t0
if not ai_df.empty:
ai_df.to_pickle(scores_file)
print(f" ✅ [{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True)
del frames, fast_1m, numpy_htf, global_v2_probs, global_hydra_static
gc.collect()
async def generate_truth_data(self):
if self.force_start_date and self.force_end_date:
dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
start_time_ms = int(dt_start.timestamp() * 1000)
end_time_ms = int(dt_end.timestamp() * 1000)
print(f"\n🚜 [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
else: return
for sym in self.TARGET_COINS:
try:
candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
if candles: await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
except Exception as e: print(f" ❌ SKIP {sym}: {e}", flush=True)
gc.collect()
@staticmethod
def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
print(f" ⏳ [System] Loading {len(scores_files)} datasets into memory...", flush=True)
all_data = []
for fp in scores_files:
try:
df = pd.read_pickle(fp)
if not df.empty: all_data.append(df)
except: pass
if not all_data: return []
global_df = pd.concat(all_data)
global_df.sort_values('timestamp', inplace=True)
# 🚀 Numpy Conversion 🚀
arr_ts = global_df['timestamp'].values
arr_close = global_df['close'].values.astype(np.float64)
arr_symbol = global_df['symbol'].values
arr_oracle = global_df['oracle_conf'].values.astype(np.float64)
arr_sniper = global_df['sniper_score'].values.astype(np.float64)
arr_hydra_risk = global_df['risk_hydra_crash'].values.astype(np.float64)
arr_hydra_time = global_df['time_hydra_crash'].values.astype(np.int64)
arr_titan = global_df['real_titan'].values.astype(np.float64)
arr_l1 = global_df['l1_score'].values.astype(np.float64)
unique_syms = np.unique(arr_symbol)
sym_map = {s: i for i, s in enumerate(unique_syms)}
arr_sym_int = np.array([sym_map[s] for s in arr_symbol], dtype=np.int32)
total_len = len(arr_ts)
print(f" 🚀 [System] Starting Optimized Grid Search on {len(combinations_batch)} combos...", flush=True)
results = []
for idx, config in enumerate(combinations_batch):
# No Annoying Progress Logs
wallet_bal = initial_capital
wallet_alloc = 0.0
positions = {}
trades_log = []
oracle_thresh = config.get('oracle_thresh', 0.6)
sniper_thresh = config.get('sniper_thresh', 0.4)
hydra_thresh = config['hydra_thresh']
l1_thresh = config.get('l1_thresh', 15.0)
mask_buy = (arr_l1 >= l1_thresh) & (arr_oracle >= oracle_thresh) & (arr_sniper >= sniper_thresh)
peak_bal = initial_capital
max_dd = 0.0
for i in range(total_len):
ts = arr_ts[i]
sym_id = arr_sym_int[i]
price = arr_close[i]
# Exits
if sym_id in positions:
pos = positions[sym_id]
entry = pos[0]; h_risk = pos[2]; h_time = pos[3]
is_crash = (h_risk > hydra_thresh) and (h_time > 0) and (ts >= h_time)
pnl = (price - entry) / entry
if is_crash or pnl > 0.04 or pnl < -0.02:
wallet_bal += pos[1] * (1 + pnl - (fees_pct*2))
wallet_alloc -= pos[1]
trades_log.append((pnl, pos[4]))
del positions[sym_id]
tot = wallet_bal + wallet_alloc
if tot > peak_bal: peak_bal = tot
else:
dd = (peak_bal - tot) / peak_bal
if dd > max_dd: max_dd = dd
# Entries
if len(positions) < max_slots:
if mask_buy[i]:
if sym_id not in positions:
if wallet_bal >= 5.0:
cons_score = (arr_titan[i] + arr_oracle[i] + arr_sniper[i]) / 3.0
size = min(10.0, wallet_bal * 0.98)
positions[sym_id] = [price, size, arr_hydra_risk[i], arr_hydra_time[i], cons_score]
wallet_bal -= size
wallet_alloc += size
# Stats
final_bal = wallet_bal + wallet_alloc
net_profit = final_bal - initial_capital
total_t = len(trades_log)
win_count = sum(1 for p, _ in trades_log if p > 0)
loss_count = total_t - win_count
win_rate = (win_count / total_t * 100) if total_t > 0 else 0.0
hc_count = sum(1 for _, s in trades_log if s > 0.65)
hc_wins = sum(1 for p, s in trades_log if s > 0.65 and p > 0)
hc_win_rate = (hc_wins/hc_count*100) if hc_count > 0 else 0.0
hc_avg_pnl = (sum(p for p, s in trades_log if s > 0.65)/hc_count*100) if hc_count > 0 else 0.0
agree_rate = (hc_count / total_t * 100) if total_t > 0 else 0.0
# ✅ FIX: Ensure 'thresh' key exists for AdaptiveHub compatibility
config['thresh'] = l1_thresh
results.append({
'config': config, 'final_balance': final_bal, 'net_profit': net_profit,
'total_trades': total_t, 'win_count': win_count, 'loss_count': loss_count,
'win_rate': win_rate, 'max_drawdown': max_dd * 100,
'consensus_agreement_rate': agree_rate,
'high_consensus_win_rate': hc_win_rate,
'high_consensus_avg_pnl': hc_avg_pnl
})
print("")
return results
async def run_optimization(self, target_regime="RANGE"):
await self.generate_truth_data()
d = self.GRID_DENSITY
oracle_range = np.linspace(0.45, 0.8, d).tolist()
sniper_range = np.linspace(0.35, 0.7, d).tolist()
hydra_range = np.linspace(0.70, 0.95, d).tolist()
l1_range = [10.0, 15.0, 20.0, 25.0]
titan_range = [0.4, 0.6]
pattern_range = [0.2, 0.4]
combinations = []
for o, s, h, l1, wt, wp in itertools.product(oracle_range, sniper_range, hydra_range, l1_range, titan_range, pattern_range):
combinations.append({
'w_titan': wt, 'w_struct': wp, 'l1_thresh': l1,
'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h,
'legacy_thresh': 0.95
})
valid_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith('_scores.pkl')]
print(f"\n🧩 [Phase 2] Optimizing {len(combinations)} Configs (Full Stack) for {target_regime}...")
best_res = self._worker_optimize(combinations, valid_files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
if not best_res: return None, None
best = sorted(best_res, key=lambda x: x['final_balance'], reverse=True)[0]
print("\n" + "="*60)
print(f"🏆 CHAMPION REPORT [{target_regime}]:")
print(f" 💰 Final Balance: ${best['final_balance']:,.2f}")
print(f" 🚀 Net PnL: ${best['net_profit']:,.2f}")
print("-" * 60)
print(f" 📊 Total Trades: {best['total_trades']}")
print(f" 📈 Win Rate: {best['win_rate']:.1f}%")
print(f" 📉 Max Drawdown: {best['max_drawdown']:.1f}%")
print("-" * 60)
print(f" 🧠 CONSENSUS ANALYTICS:")
print(f" 🤝 Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%")
print(f" 🌟 High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%")
print(f" 💎 High-Consensus Avg PnL: {best['high_consensus_avg_pnl']:.2f}%")
print("-" * 60)
print(f" ⚙️ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}")
print(f" ⚖️ Weights: Titan={best['config']['w_titan']:.2f} | Patterns={best['config']['w_struct']:.2f} | L1={best['config']['l1_thresh']}")
print("="*60)
return best['config'], best
async def run_strategic_optimization_task():
print("\n🧪 [STRATEGIC BACKTEST] Hyper-Vectorized Mode...")
r2 = R2Service()
dm = DataManager(None, None, r2)
proc = MLProcessor(dm)
await dm.initialize(); await proc.initialize()
if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True)
try:
hub = AdaptiveHub(r2); await hub.initialize()
optimizer = HeavyDutyBacktester(dm, proc)
scenarios = [
{"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
{"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
{"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
{"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
]
for scen in scenarios:
target = scen["regime"]
optimizer.set_date_range(scen["start"], scen["end"])
best_cfg, best_stats = await optimizer.run_optimization(target_regime=target)
if best_cfg:
hub.submit_challenger(target, best_cfg, best_stats)
await hub._save_state_to_r2()
print("✅ [System] ALL Strategic DNA Updated & Saved.")
finally:
await dm.close()
if __name__ == "__main__":
asyncio.run(run_strategic_optimization_task())