Spaces:
Paused
Paused
File size: 32,384 Bytes
a2a5dbf bbf82d6 a2a5dbf e5b484a a2a5dbf 9839a07 8894213 f080b76 6ab19fc 9c36ce6 355c953 fd0e6bd 3702f60 a2a5dbf bbf82d6 fe86b15 9133da3 fe86b15 4f4b3b7 b5b1b54 fe86b15 816da63 fe86b15 a2a5dbf 849fc8a ad148b0 9839a07 ad148b0 a2a5dbf ad148b0 bbf82d6 d2882ac bbf82d6 fe86b15 86f8502 bbf82d6 2c0f90a 355c953 c78e147 6ab19fc fd0e6bd 6ab19fc fd0e6bd bbf82d6 816da63 c78e147 2c0f90a 863cf84 fd0e6bd 7e33737 fd0e6bd 355c953 fd0e6bd 355c953 fd0e6bd 355c953 fd0e6bd 355c953 fd0e6bd 355c953 dbb490b fd0e6bd 4f4b3b7 355c953 fd0e6bd 355c953 95f813e 355c953 fd0e6bd d0a22d2 fd0e6bd 879205b fd0e6bd a98e9c9 fd0e6bd 816da63 fd0e6bd bbf82d6 c78e147 fd0e6bd bbf82d6 816da63 fd0e6bd 816da63 fd0e6bd 816da63 fd0e6bd 816da63 fd0e6bd bbf82d6 fd0e6bd bbf82d6 fd0e6bd bbf82d6 fd0e6bd bbf82d6 816da63 bbf82d6 fd0e6bd bbf82d6 7e33737 816da63 7e33737 bbf82d6 fd0e6bd bbf82d6 fd0e6bd bbf82d6 fd0e6bd bbf82d6 fd0e6bd bbf82d6 816da63 bbf82d6 7e33737 fd0e6bd bbf82d6 fd0e6bd bbf82d6 816da63 bbf82d6 fd0e6bd bbf82d6 fd0e6bd bbf82d6 c78e147 bbf82d6 816da63 bbf82d6 816da63 bbf82d6 816da63 bbf82d6 816da63 bbf82d6 a98e9c9 bbf82d6 8492f56 bbf82d6 7e33737 fd0e6bd bbf82d6 fd0e6bd 0905f47 fd0e6bd cda2b97 fd0e6bd cda2b97 7e33737 cda2b97 7e33737 cda2b97 6ab19fc 92e3f55 6ab19fc bbf82d6 7e33737 c78e147 7e33737 bbf82d6 fd0e6bd 7e33737 6ab19fc bbf82d6 7e33737 8492f56 d2882ac fd0e6bd d2882ac c78e147 e5b484a 816da63 7e33737 6ab19fc bbf82d6 7e33737 8492f56 a98e9c9 7e33737 bbf82d6 7e33737 816da63 7e33737 816da63 7e33737 bbf82d6 7e33737 e5b484a 7e33737 816da63 fd0e6bd 816da63 d2882ac bbf82d6 15e1ac6 e5b484a fd0e6bd 816da63 7e33737 d2882ac fd0e6bd 15e1ac6 8492f56 cda2b97 d2882ac 6ab19fc 816da63 c78e147 816da63 d2882ac cda2b97 816da63 fd0e6bd 816da63 7e33737 fd0e6bd cda2b97 6ab19fc fd0e6bd 816da63 6ab19fc e5b484a cda2b97 15e1ac6 cda2b97 816da63 15e1ac6 d2882ac 7e33737 d2882ac 816da63 cda2b97 943da29 bbf82d6 cda2b97 bbf82d6 cda2b97 e5b484a 1361354 cda2b97 e5b484a cda2b97 fd0e6bd 8492f56 fd0e6bd 445dda3 fd0e6bd 445dda3 fd0e6bd cda2b97 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 | # ============================================================
# π§ͺ backtest_engine.py (V118.5 - GEM-Architect: Hyper-Vectorized)
# ============================================================
import asyncio
import pandas as pd
import numpy as np
import pandas_ta as ta
import time
import logging
import itertools
import os
import glob
import gc
import sys
import traceback
from datetime import datetime, timezone
from typing import Dict, Any, List
# β
Ψ§Ψ³ΨͺΩΨ±Ψ§Ψ― Ψ§ΩΩ
ΨΨ±ΩΨ§Ψͺ Ψ§ΩΨ£Ψ³Ψ§Ψ³ΩΨ©
try:
from ml_engine.processor import MLProcessor, SystemLimits
from ml_engine.data_manager import DataManager
from learning_hub.adaptive_hub import StrategyDNA, AdaptiveHub
from r2 import R2Service
import ccxt.async_support as ccxt
import xgboost as xgb
except ImportError:
print("β [Import Error] Critical ML modules missing.")
pass
logging.getLogger('ml_engine').setLevel(logging.WARNING)
CACHE_DIR = "backtest_real_scores"
class HeavyDutyBacktester:
def __init__(self, data_manager, processor):
self.dm = data_manager
self.proc = processor
# ποΈ ΩΨ«Ψ§ΩΨ© Ψ΄Ψ¨ΩΨ© Ψ§ΩΨ¨ΨΨ«
self.GRID_DENSITY = 3
# Ψ₯ΨΉΨ―Ψ§Ψ―Ψ§Ψͺ Ψ§ΩΩ
ΨΩΨΈΨ©
self.INITIAL_CAPITAL = 10.0
self.TRADING_FEES = 0.001
self.MAX_SLOTS = 4
self.TARGET_COINS = [
'SOL/USDT', 'XRP/USDT', 'DOGE/USDT'
]
self.force_start_date = None
self.force_end_date = None
# π₯ ΨͺΩΨΈΩΩ Ψ§ΩΩΨ§Ψ΄ π₯
if os.path.exists(CACHE_DIR):
files = glob.glob(os.path.join(CACHE_DIR, "*"))
print(f"π§Ή [System] Flushing Cache: Deleting {len(files)} old files...", flush=True)
for f in files:
try: os.remove(f)
except: pass
else:
os.makedirs(CACHE_DIR)
print(f"π§ͺ [Backtest V118.5] Hyper-Vectorized Mode. Models: {self._check_models_status()}")
def _check_models_status(self):
status = []
if self.proc.titan: status.append("Titan")
if self.proc.oracle and getattr(self.proc.oracle, 'model_direction', None): status.append("Oracle")
if self.proc.sniper and getattr(self.proc.sniper, 'models', None): status.append("Sniper")
if self.proc.guardian_hydra: status.append("Hydra")
return "+".join(status) if status else "None"
def set_date_range(self, start_str, end_str):
self.force_start_date = start_str
self.force_end_date = end_str
# ==============================================================
# β‘ FAST DATA DOWNLOADER
# ==============================================================
async def _fetch_all_data_fast(self, sym, start_ms, end_ms):
print(f" β‘ [Network] Downloading {sym}...", flush=True)
limit = 1000
duration_per_batch = limit * 60 * 1000
tasks = []
current = start_ms
while current < end_ms:
tasks.append(current)
current += duration_per_batch
all_candles = []
sem = asyncio.Semaphore(10)
async def _fetch_batch(timestamp):
async with sem:
for _ in range(3):
try:
return await self.dm.exchange.fetch_ohlcv(sym, '1m', since=timestamp, limit=limit)
except: await asyncio.sleep(1)
return []
chunk_size = 20
for i in range(0, len(tasks), chunk_size):
chunk_tasks = tasks[i:i + chunk_size]
futures = [_fetch_batch(ts) for ts in chunk_tasks]
results = await asyncio.gather(*futures)
for res in results:
if res: all_candles.extend(res)
if not all_candles: return None
filtered = [c for c in all_candles if c[0] >= start_ms and c[0] <= end_ms]
seen = set(); unique_candles = []
for c in filtered:
if c[0] not in seen:
unique_candles.append(c)
seen.add(c[0])
unique_candles.sort(key=lambda x: x[0])
print(f" β
Downloaded {len(unique_candles)} candles.", flush=True)
return unique_candles
# ==============================================================
# ποΈ VECTORIZED INDICATORS (Robust)
# ==============================================================
def _calculate_indicators_vectorized(self, df, timeframe='1m'):
for col in ['close', 'high', 'low', 'volume', 'open']:
df[col] = df[col].astype(float)
df['rsi'] = ta.rsi(df['close'], length=14)
df['ema20'] = ta.ema(df['close'], length=20)
df['ema50'] = ta.ema(df['close'], length=50)
df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
# Global calc
df['vol_ma50'] = df['volume'].rolling(50).mean()
df['rel_vol'] = df['volume'] / (df['vol_ma50'] + 1e-9)
if timeframe in ['1m', '5m', '15m']:
sma20 = df['close'].rolling(20).mean()
std20 = df['close'].rolling(20).std()
df['bb_width'] = ((sma20 + 2*std20) - (sma20 - 2*std20)) / sma20
vol_mean = df['volume'].rolling(20).mean()
vol_std = df['volume'].rolling(20).std()
df['vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9)
df['atr_pct'] = df['atr'] / df['close']
# L1 Score
rsi_penalty = np.where(df['rsi'] > 70, (df['rsi'] - 70) * 2, 0)
l1_score_raw = (df['rel_vol'] * 10) + (df['atr_pct'] * 1000) - rsi_penalty
df['l1_score'] = l1_score_raw.fillna(0)
if timeframe == '1m':
df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
df['ret'] = df['close'].pct_change()
roll_max = df['high'].rolling(50).max()
roll_min = df['low'].rolling(50).min()
diff = (roll_max - roll_min).replace(0, 1e-9)
df['fib_pos'] = (df['close'] - roll_min) / diff
df['volatility'] = df['atr'] / df['close']
df['trend_slope'] = (df['ema20'] - df['ema20'].shift(5)) / df['ema20'].shift(5)
for lag in [1, 2, 3, 5, 10, 20]:
df[f'log_ret_lag_{lag}'] = df['log_ret'].shift(lag).fillna(0)
df[f'rsi_lag_{lag}'] = (df['rsi'].shift(lag).fillna(50) / 100.0)
df[f'fib_pos_lag_{lag}'] = df['fib_pos'].shift(lag).fillna(0.5)
df[f'volatility_lag_{lag}'] = df['volatility'].shift(lag).fillna(0)
r = df['volume'].rolling(500).mean()
s = df['volume'].rolling(500).std()
df['vol_zscore_50'] = ((df['volume'] - r) / s).fillna(0)
df.fillna(0, inplace=True)
return df
# ==============================================================
# π§ CPU PROCESSING (HYPER-VECTORIZED)
# ==============================================================
async def _process_data_in_memory(self, sym, candles, start_ms, end_ms):
safe_sym = sym.replace('/', '_')
period_suffix = f"{start_ms}_{end_ms}"
scores_file = f"{CACHE_DIR}/{safe_sym}_{period_suffix}_scores.pkl"
if os.path.exists(scores_file):
print(f" π [{sym}] Data Exists -> Skipping.")
return
print(f" βοΈ [CPU] Analyzing {sym} (Hyper-Vectorized Mode)...", flush=True)
t0 = time.time()
# 1. Data Prep
df_1m = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_1m['datetime'] = pd.to_datetime(df_1m['timestamp'], unit='ms')
df_1m.set_index('datetime', inplace=True)
df_1m = df_1m.sort_index()
frames = {}
agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
frames['1m'] = self._calculate_indicators_vectorized(df_1m.copy(), timeframe='1m')
frames['1m']['timestamp'] = frames['1m'].index.floor('1min').astype(np.int64) // 10**6
fast_1m = {col: frames['1m'][col].values for col in frames['1m'].columns}
numpy_htf = {}
for tf_str, tf_code in [('5m', '5T'), ('15m', '15T'), ('1h', '1h'), ('4h', '4h'), ('1d', '1D')]:
resampled = df_1m.resample(tf_code).agg(agg_dict).dropna()
resampled = self._calculate_indicators_vectorized(resampled, timeframe=tf_str)
resampled['timestamp'] = resampled.index.astype(np.int64) // 10**6
frames[tf_str] = resampled
numpy_htf[tf_str] = {col: resampled[col].values for col in resampled.columns}
# 2. Time Alignment (Vectorized)
map_1m_to_1h = np.clip(np.searchsorted(numpy_htf['1h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['1h']['timestamp'])-1)
map_1m_to_5m = np.clip(np.searchsorted(numpy_htf['5m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['5m']['timestamp'])-1)
map_1m_to_15m = np.clip(np.searchsorted(numpy_htf['15m']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['15m']['timestamp'])-1)
map_1m_to_4h = np.clip(np.searchsorted(numpy_htf['4h']['timestamp'], fast_1m['timestamp']), 0, len(numpy_htf['4h']['timestamp'])-1)
# 3. Model Access
oracle_dir_model = getattr(self.proc.oracle, 'model_direction', None)
sniper_models = getattr(self.proc.sniper, 'models', [])
hydra_models = getattr(self.proc.guardian_hydra, 'models', {}) if self.proc.guardian_hydra else {}
legacy_v2 = getattr(self.proc.guardian_legacy, 'model_v2', None)
# 4. π₯ Pre-Calc Legacy V2 (Vectorized) π₯
global_v2_probs = np.zeros(len(fast_1m['close']))
if legacy_v2:
try:
# Direct array construction
l_log = fast_1m['log_ret']
l_rsi = fast_1m['rsi'] / 100.0
l_fib = fast_1m['fib_pos']
l_vol = fast_1m['volatility']
l5_log = numpy_htf['5m']['log_ret'][map_1m_to_5m]
l5_rsi = numpy_htf['5m']['rsi'][map_1m_to_5m] / 100.0
l5_fib = numpy_htf['5m']['fib_pos'][map_1m_to_5m]
l5_trd = numpy_htf['5m']['trend_slope'][map_1m_to_5m]
l15_log = numpy_htf['15m']['log_ret'][map_1m_to_15m]
l15_rsi = numpy_htf['15m']['rsi'][map_1m_to_15m] / 100.0
l15_fib618 = numpy_htf['15m']['dist_fib618'][map_1m_to_15m]
l15_trd = numpy_htf['15m']['trend_slope'][map_1m_to_15m]
lag_cols = []
for lag in [1, 2, 3, 5, 10, 20]:
lag_cols.extend([
fast_1m[f'log_ret_lag_{lag}'], fast_1m[f'rsi_lag_{lag}'],
fast_1m[f'fib_pos_lag_{lag}'], fast_1m[f'volatility_lag_{lag}']
])
X_GLOBAL_V2 = np.column_stack([l_log, l_rsi, l_fib, l_vol, l5_log, l5_rsi, l5_fib, l5_trd, l15_log, l15_rsi, l15_fib618, l15_trd, *lag_cols])
global_v2_probs = legacy_v2.predict(xgb.DMatrix(X_GLOBAL_V2))
if len(global_v2_probs.shape) > 1: global_v2_probs = global_v2_probs[:, 2]
except: pass
# 5. π₯ Pre-Assemble Hydra Static π₯
global_hydra_static = None
if hydra_models:
try:
h_rsi_1m = fast_1m['rsi']
h_rsi_5m = numpy_htf['5m']['rsi'][map_1m_to_5m]
h_rsi_15m = numpy_htf['15m']['rsi'][map_1m_to_15m]
h_bb = fast_1m['bb_width']
h_vol = fast_1m['rel_vol']
h_atr = fast_1m['atr']
h_close = fast_1m['close']
global_hydra_static = np.column_stack([h_rsi_1m, h_rsi_5m, h_rsi_15m, h_bb, h_vol, h_atr, h_close])
except: pass
# 6. Candidate Filtering
valid_indices_mask = fast_1m['l1_score'] >= 5.0
valid_indices = np.where(valid_indices_mask)[0]
# Skip warmup and tail
mask_bounds = (valid_indices > 500) & (valid_indices < len(fast_1m['close']) - 245)
final_valid_indices = valid_indices[mask_bounds]
print(f" π― Raw Candidates (Score > 5): {len(final_valid_indices)}. Vectorized Scoring...", flush=True)
# π HYPER-VECTORIZATION START π
# Instead of looping, we construct the BIG matrices for all candidates at once.
# This brings speed back to ~60s
num_candidates = len(final_valid_indices)
if num_candidates == 0: return
# --- A. ORACLE MATRIX CONSTRUCTION ---
oracle_preds = np.full(num_candidates, 0.5)
if oracle_dir_model:
try:
# Mapped Indices for all candidates
idx_1h = map_1m_to_1h[final_valid_indices]
idx_15m = map_1m_to_15m[final_valid_indices]
idx_4h = map_1m_to_4h[final_valid_indices]
titan_scores = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95)
oracle_features = []
for col in getattr(self.proc.oracle, 'feature_cols', []):
if col.startswith('1h_'):
c = col[3:]
oracle_features.append(numpy_htf['1h'][c][idx_1h] if c in numpy_htf['1h'] else np.zeros(num_candidates))
elif col.startswith('15m_'):
c = col[4:]
oracle_features.append(numpy_htf['15m'][c][idx_15m] if c in numpy_htf['15m'] else np.zeros(num_candidates))
elif col.startswith('4h_'):
c = col[3:]
oracle_features.append(numpy_htf['4h'][c][idx_4h] if c in numpy_htf['4h'] else np.zeros(num_candidates))
elif col == 'sim_titan_score': oracle_features.append(titan_scores)
elif col == 'sim_mc_score': oracle_features.append(np.full(num_candidates, 0.5))
elif col == 'sim_pattern_score': oracle_features.append(np.full(num_candidates, 0.5))
else: oracle_features.append(np.zeros(num_candidates))
X_oracle_big = np.column_stack(oracle_features)
preds = oracle_dir_model.predict(X_oracle_big)
# Handle output shape
if len(preds.shape) > 1 and preds.shape[1] > 1:
oracle_preds = preds[:, 1] # Prob of Class 1
else:
oracle_preds = preds.flatten()
# If model outputs 0/1 class, we might need proba. Assuming predict gives prob or class.
# Adjust if simple XGB classifier gives 0/1. For backtest, assume regression or proba.
except Exception as e: print(f"Oracle Error: {e}")
# --- B. SNIPER MATRIX CONSTRUCTION ---
sniper_preds = np.full(num_candidates, 0.5)
if sniper_models:
try:
sniper_features = []
for col in getattr(self.proc.sniper, 'feature_names', []):
if col in fast_1m: sniper_features.append(fast_1m[col][final_valid_indices])
elif col == 'L_score': sniper_features.append(fast_1m.get('vol_zscore_50', np.zeros(len(fast_1m['close'])))[final_valid_indices])
else: sniper_features.append(np.zeros(num_candidates))
X_sniper_big = np.column_stack(sniper_features)
# Ensemble Average
preds_list = [m.predict(X_sniper_big) for m in sniper_models]
sniper_preds = np.mean(preds_list, axis=0)
except Exception as e: print(f"Sniper Error: {e}")
# --- C. HYDRA MATRIX CONSTRUCTION (The Heavy One) ---
hydra_risk_preds = np.zeros(num_candidates)
hydra_time_preds = np.zeros(num_candidates, dtype=int)
# Hydra is sequence-based (window of 240). Vectorizing this is tricky without exploding memory.
# We will iterate but ONLY for prediction input construction, which is lighter than full logic.
# Actually, for 95k candidates, a (95000, 240, features) array is huge.
# We MUST batch Hydra. But efficiently.
if hydra_models and global_hydra_static is not None:
# We process in chunks of 5000 to keep memory sane
chunk_size = 5000
for i in range(0, num_candidates, chunk_size):
chunk_indices = final_valid_indices[i : i + chunk_size]
# Build batch X
batch_X = []
valid_batch_indices = [] # Map back to chunk index
for k, idx in enumerate(chunk_indices):
start = idx + 1
end = start + 240
# Quick slice
sl_static = global_hydra_static[start:end]
entry_p = fast_1m['close'][idx]
sl_close = sl_static[:, 6]
sl_atr = sl_static[:, 5]
sl_dist = np.maximum(1.5 * sl_atr, entry_p * 0.015)
sl_pnl = sl_close - entry_p
sl_norm_pnl = sl_pnl / sl_dist
# Accumulate max - vectorized for the window
sl_cum_max = np.maximum.accumulate(sl_close)
sl_cum_max = np.maximum(sl_cum_max, entry_p)
sl_max_pnl_r = (sl_cum_max - entry_p) / sl_dist
sl_atr_pct = sl_atr / sl_close
# Static cols
zeros = np.zeros(240); ones = np.ones(240)
row = np.column_stack([
sl_static[:, 0], sl_static[:, 1], sl_static[:, 2],
sl_static[:, 3], sl_static[:, 4],
zeros, sl_atr_pct, sl_norm_pnl, sl_max_pnl_r,
zeros, zeros, time_vec,
zeros, ones*0.6, ones*0.7, ones*3.0
])
batch_X.append(row)
valid_batch_indices.append(i + k) # Global index in final_valid_indices
if batch_X:
try:
big_X = np.array(batch_X) # Shape: (Batch, 240, Feats)
# Flatten for 2D model if needed, or keeping 3D depending on Hydra.
# Assuming Hydra uses 2D input (stacking windows):
big_X_flat = big_X.reshape(-1, big_X.shape[-1])
preds_flat = hydra_models['crash'].predict_proba(big_X_flat)[:, 1]
# Reshape back to (Batch, 240)
preds_batch = preds_flat.reshape(len(batch_X), 240)
# Extract Max Risk & Time
batch_max_risk = np.max(preds_batch, axis=1)
# Find first index > thresh (0.6) for time
over_thresh = preds_batch > 0.6
# argmax gives first True index
has_crash = over_thresh.any(axis=1)
crash_times_rel = np.argmax(over_thresh, axis=1)
# Map back to global results
for j, glob_idx in enumerate(valid_batch_indices):
hydra_risk_preds[glob_idx] = batch_max_risk[j]
if has_crash[j]:
# Calc absolute timestamp
start_t_idx = final_valid_indices[glob_idx] + 1
abs_time = fast_1m['timestamp'][start_t_idx + crash_times_rel[j]]
hydra_time_preds[glob_idx] = abs_time
except Exception: pass
# --- D. LEGACY V2 MAPPING ---
legacy_risk_preds = np.zeros(num_candidates)
legacy_time_preds = np.zeros(num_candidates, dtype=int)
if legacy_v2:
# Vectorized mapping logic
# For each candidate at idx, scan global_v2_probs[idx+1 : idx+241]
# This is a sliding window max. Can be slow if looped.
# Fast approx: Check max just for the entry? No, need lookahead.
# We loop simply because it's fast scalar lookups.
for k, idx in enumerate(final_valid_indices):
start = idx + 1
if start + 240 < len(global_v2_probs):
window = global_v2_probs[start : start + 240]
legacy_risk_preds[k] = np.max(window)
# Time logic can be added if needed, sticking to max risk for now
# --- E. CONSTRUCT FINAL DATAFRAME ---
# Titan Proxy
titan_scores_final = np.clip(fast_1m['l1_score'][final_valid_indices] / 40.0, 0.1, 0.95)
l1_scores_final = fast_1m['l1_score'][final_valid_indices]
timestamps_final = fast_1m['timestamp'][final_valid_indices]
closes_final = fast_1m['close'][final_valid_indices]
ai_df = pd.DataFrame({
'timestamp': timestamps_final,
'symbol': sym,
'close': closes_final,
'real_titan': titan_scores_final,
'oracle_conf': oracle_preds,
'sniper_score': sniper_preds,
'l1_score': l1_scores_final,
'risk_hydra_crash': hydra_risk_preds,
'time_hydra_crash': hydra_time_preds,
'risk_legacy_v2': legacy_risk_preds,
'time_legacy_panic': legacy_time_preds
})
dt = time.time() - t0
if not ai_df.empty:
ai_df.to_pickle(scores_file)
print(f" β
[{sym}] Completed {len(ai_df)} signals in {dt:.2f} seconds.", flush=True)
del frames, fast_1m, numpy_htf, global_v2_probs, global_hydra_static
gc.collect()
async def generate_truth_data(self):
if self.force_start_date and self.force_end_date:
dt_start = datetime.strptime(self.force_start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
dt_end = datetime.strptime(self.force_end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
start_time_ms = int(dt_start.timestamp() * 1000)
end_time_ms = int(dt_end.timestamp() * 1000)
print(f"\nπ [Phase 1] Processing Era: {self.force_start_date} -> {self.force_end_date}")
else: return
for sym in self.TARGET_COINS:
try:
candles = await self._fetch_all_data_fast(sym, start_time_ms, end_time_ms)
if candles: await self._process_data_in_memory(sym, candles, start_time_ms, end_time_ms)
except Exception as e: print(f" β SKIP {sym}: {e}", flush=True)
gc.collect()
@staticmethod
def _worker_optimize(combinations_batch, scores_files, initial_capital, fees_pct, max_slots):
print(f" β³ [System] Loading {len(scores_files)} datasets into memory...", flush=True)
all_data = []
for fp in scores_files:
try:
df = pd.read_pickle(fp)
if not df.empty: all_data.append(df)
except: pass
if not all_data: return []
global_df = pd.concat(all_data)
global_df.sort_values('timestamp', inplace=True)
# π Numpy Conversion π
arr_ts = global_df['timestamp'].values
arr_close = global_df['close'].values.astype(np.float64)
arr_symbol = global_df['symbol'].values
arr_oracle = global_df['oracle_conf'].values.astype(np.float64)
arr_sniper = global_df['sniper_score'].values.astype(np.float64)
arr_hydra_risk = global_df['risk_hydra_crash'].values.astype(np.float64)
arr_hydra_time = global_df['time_hydra_crash'].values.astype(np.int64)
arr_titan = global_df['real_titan'].values.astype(np.float64)
arr_l1 = global_df['l1_score'].values.astype(np.float64)
unique_syms = np.unique(arr_symbol)
sym_map = {s: i for i, s in enumerate(unique_syms)}
arr_sym_int = np.array([sym_map[s] for s in arr_symbol], dtype=np.int32)
total_len = len(arr_ts)
print(f" π [System] Starting Optimized Grid Search on {len(combinations_batch)} combos...", flush=True)
results = []
for idx, config in enumerate(combinations_batch):
# No Annoying Progress Logs
wallet_bal = initial_capital
wallet_alloc = 0.0
positions = {}
trades_log = []
oracle_thresh = config.get('oracle_thresh', 0.6)
sniper_thresh = config.get('sniper_thresh', 0.4)
hydra_thresh = config['hydra_thresh']
l1_thresh = config.get('l1_thresh', 15.0)
mask_buy = (arr_l1 >= l1_thresh) & (arr_oracle >= oracle_thresh) & (arr_sniper >= sniper_thresh)
peak_bal = initial_capital
max_dd = 0.0
for i in range(total_len):
ts = arr_ts[i]
sym_id = arr_sym_int[i]
price = arr_close[i]
# Exits
if sym_id in positions:
pos = positions[sym_id]
entry = pos[0]; h_risk = pos[2]; h_time = pos[3]
is_crash = (h_risk > hydra_thresh) and (h_time > 0) and (ts >= h_time)
pnl = (price - entry) / entry
if is_crash or pnl > 0.04 or pnl < -0.02:
wallet_bal += pos[1] * (1 + pnl - (fees_pct*2))
wallet_alloc -= pos[1]
trades_log.append((pnl, pos[4]))
del positions[sym_id]
tot = wallet_bal + wallet_alloc
if tot > peak_bal: peak_bal = tot
else:
dd = (peak_bal - tot) / peak_bal
if dd > max_dd: max_dd = dd
# Entries
if len(positions) < max_slots:
if mask_buy[i]:
if sym_id not in positions:
if wallet_bal >= 5.0:
cons_score = (arr_titan[i] + arr_oracle[i] + arr_sniper[i]) / 3.0
size = min(10.0, wallet_bal * 0.98)
positions[sym_id] = [price, size, arr_hydra_risk[i], arr_hydra_time[i], cons_score]
wallet_bal -= size
wallet_alloc += size
# Stats
final_bal = wallet_bal + wallet_alloc
net_profit = final_bal - initial_capital
total_t = len(trades_log)
win_count = sum(1 for p, _ in trades_log if p > 0)
loss_count = total_t - win_count
win_rate = (win_count / total_t * 100) if total_t > 0 else 0.0
hc_count = sum(1 for _, s in trades_log if s > 0.65)
hc_wins = sum(1 for p, s in trades_log if s > 0.65 and p > 0)
hc_win_rate = (hc_wins/hc_count*100) if hc_count > 0 else 0.0
hc_avg_pnl = (sum(p for p, s in trades_log if s > 0.65)/hc_count*100) if hc_count > 0 else 0.0
agree_rate = (hc_count / total_t * 100) if total_t > 0 else 0.0
# β
FIX: Ensure 'thresh' key exists for AdaptiveHub compatibility
config['thresh'] = l1_thresh
results.append({
'config': config, 'final_balance': final_bal, 'net_profit': net_profit,
'total_trades': total_t, 'win_count': win_count, 'loss_count': loss_count,
'win_rate': win_rate, 'max_drawdown': max_dd * 100,
'consensus_agreement_rate': agree_rate,
'high_consensus_win_rate': hc_win_rate,
'high_consensus_avg_pnl': hc_avg_pnl
})
print("")
return results
async def run_optimization(self, target_regime="RANGE"):
await self.generate_truth_data()
d = self.GRID_DENSITY
oracle_range = np.linspace(0.45, 0.8, d).tolist()
sniper_range = np.linspace(0.35, 0.7, d).tolist()
hydra_range = np.linspace(0.70, 0.95, d).tolist()
l1_range = [10.0, 15.0, 20.0, 25.0]
titan_range = [0.4, 0.6]
pattern_range = [0.2, 0.4]
combinations = []
for o, s, h, l1, wt, wp in itertools.product(oracle_range, sniper_range, hydra_range, l1_range, titan_range, pattern_range):
combinations.append({
'w_titan': wt, 'w_struct': wp, 'l1_thresh': l1,
'oracle_thresh': o, 'sniper_thresh': s, 'hydra_thresh': h,
'legacy_thresh': 0.95
})
valid_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR) if f.endswith('_scores.pkl')]
print(f"\nπ§© [Phase 2] Optimizing {len(combinations)} Configs (Full Stack) for {target_regime}...")
best_res = self._worker_optimize(combinations, valid_files, self.INITIAL_CAPITAL, self.TRADING_FEES, self.MAX_SLOTS)
if not best_res: return None, None
best = sorted(best_res, key=lambda x: x['final_balance'], reverse=True)[0]
print("\n" + "="*60)
print(f"π CHAMPION REPORT [{target_regime}]:")
print(f" π° Final Balance: ${best['final_balance']:,.2f}")
print(f" π Net PnL: ${best['net_profit']:,.2f}")
print("-" * 60)
print(f" π Total Trades: {best['total_trades']}")
print(f" π Win Rate: {best['win_rate']:.1f}%")
print(f" π Max Drawdown: {best['max_drawdown']:.1f}%")
print("-" * 60)
print(f" π§ CONSENSUS ANALYTICS:")
print(f" π€ Model Agreement Rate: {best['consensus_agreement_rate']:.1f}%")
print(f" π High-Consensus Win Rate: {best['high_consensus_win_rate']:.1f}%")
print(f" π High-Consensus Avg PnL: {best['high_consensus_avg_pnl']:.2f}%")
print("-" * 60)
print(f" βοΈ Oracle={best['config']['oracle_thresh']:.2f} | Sniper={best['config']['sniper_thresh']:.2f} | Hydra={best['config']['hydra_thresh']:.2f}")
print(f" βοΈ Weights: Titan={best['config']['w_titan']:.2f} | Patterns={best['config']['w_struct']:.2f} | L1={best['config']['l1_thresh']}")
print("="*60)
return best['config'], best
async def run_strategic_optimization_task():
print("\nπ§ͺ [STRATEGIC BACKTEST] Hyper-Vectorized Mode...")
r2 = R2Service()
dm = DataManager(None, None, r2)
proc = MLProcessor(dm)
await dm.initialize(); await proc.initialize()
if proc.guardian_hydra: proc.guardian_hydra.set_silent_mode(True)
try:
hub = AdaptiveHub(r2); await hub.initialize()
optimizer = HeavyDutyBacktester(dm, proc)
scenarios = [
{"regime": "BULL", "start": "2024-01-01", "end": "2024-03-30"},
{"regime": "BEAR", "start": "2023-08-01", "end": "2023-09-15"},
{"regime": "DEAD", "start": "2023-06-01", "end": "2023-08-01"},
{"regime": "RANGE", "start": "2024-07-01", "end": "2024-09-30"}
]
for scen in scenarios:
target = scen["regime"]
optimizer.set_date_range(scen["start"], scen["end"])
best_cfg, best_stats = await optimizer.run_optimization(target_regime=target)
if best_cfg:
hub.submit_challenger(target, best_cfg, best_stats)
await hub._save_state_to_r2()
print("β
[System] ALL Strategic DNA Updated & Saved.")
finally:
await dm.close()
if __name__ == "__main__":
asyncio.run(run_strategic_optimization_task()) |