Tradtesting / ml_engine /hybrid_guardian.py
Riy777's picture
Update ml_engine/hybrid_guardian.py
1406547 verified
# ml_engine/hybrid_guardian.py
# (V35.4 - GEM-Architect: Configurable Thresholds)
import os
import json
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import traceback
class HybridDeepSteward:
def __init__(self, v2_model_path, v3_model_path, v3_features_map_path):
"""
The Hybrid Guardian (Configurable & Time-Aware)
- Thresholds are now controlled by Processor/SystemLimits.
"""
self.v2_path = v2_model_path
self.v3_path = v3_model_path
self.v3_features_path = v3_features_map_path
self.model_v2 = None
self.model_v3 = None
self.v3_feature_names = []
self.initialized = False
self.FEATS_1M = ['log_ret_1m', 'rsi_1m', 'fib_pos_1m', 'volatility_1m']
self.FEATS_5M = ['log_ret_5m', 'rsi_5m', 'fib_pos_5m', 'trend_slope_5m']
self.FEATS_15M = ['log_ret_15m', 'rsi_15m', 'dist_fib618_15m', 'trend_slope_15m']
# Default Thresholds (Will be overwritten by configure_thresholds)
self.V2_PANIC_TRIGGER = 0.95
self.V3_SOFT_EXIT = 0.85
self.V3_HARD_EXIT = 0.95
self.V3_ULTRA_CONF = 0.98
def initialize(self):
try:
if os.path.exists(self.v2_path):
self.model_v2 = xgb.Booster(); self.model_v2.load_model(self.v2_path)
if os.path.exists(self.v3_path):
self.model_v3 = xgb.Booster(); self.model_v3.load_model(self.v3_path)
if os.path.exists(self.v3_features_path):
with open(self.v3_features_path, 'r') as f: self.v3_feature_names = json.load(f)
self.initialized = True
return True
except: return False
# ✅ GEM-Architect: External Control Method
def configure_thresholds(self, v2_panic, v3_hard, v3_soft, v3_ultra):
self.V2_PANIC_TRIGGER = float(v2_panic)
self.V3_HARD_EXIT = float(v3_hard)
self.V3_SOFT_EXIT = float(v3_soft)
self.V3_ULTRA_CONF = float(v3_ultra)
print(f" 🕸️ [Legacy Guard] Thresholds Updated: V2_PANIC={v2_panic}, V3_HARD={v3_hard}")
def _prepare_df(self, ohlcv_data):
if not ohlcv_data: return pd.DataFrame()
try:
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
for c in ['open', 'high', 'low', 'close', 'volume']: df[c] = pd.to_numeric(df[c], errors='coerce')
return df.dropna(subset=['close'])
except: return pd.DataFrame()
def _calc_legacy_row(self, df, suffix):
if len(df) < 20: return pd.Series()
df = df.copy(); c = df['close']; h = df['high']; l = df['low']
df[f'log_ret_{suffix}'] = np.log(c / c.shift(1)).fillna(0)
df[f'rsi_{suffix}'] = (ta.rsi(c, 14) / 100.0).fillna(0.5)
roll_max = h.rolling(50).max(); roll_min = l.rolling(50).min()
diff = (roll_max - roll_min).replace(0, 1e-9)
df[f'fib_pos_{suffix}'] = ((c - roll_min) / diff).fillna(0.5)
if suffix == '1m': df[f'volatility_{suffix}'] = (ta.atr(h, l, c, 14) / c).fillna(0)
elif suffix in ['5m', '15m']:
ema = ta.ema(c, 20)
df[f'trend_slope_{suffix}'] = ((ema - ema.shift(5)) / ema.shift(5)).fillna(0)
if suffix == '15m':
fib618 = roll_max - (diff * 0.382)
df[f'dist_fib618_{suffix}'] = ((c - fib618) / c).fillna(0)
return df.iloc[-1]
def _engineer_legacy_v2_vector(self, ohlcv_1m, ohlcv_5m, ohlcv_15m):
try:
df1 = self._prepare_df(ohlcv_1m); df5 = self._prepare_df(ohlcv_5m); df15 = self._prepare_df(ohlcv_15m)
if len(df1) < 30: return None
row1 = self._calc_legacy_row(df1, "1m"); row5 = self._calc_legacy_row(df5, "5m"); row15 = self._calc_legacy_row(df15, "15m")
vec = []
for f in self.FEATS_1M: vec.append(float(row1.get(f, 0)))
for f in self.FEATS_5M: vec.append(float(row5.get(f, 0)))
for f in self.FEATS_15M: vec.append(float(row15.get(f, 0)))
lags = [1, 2, 3, 5, 10, 20]
for lag in lags:
if len(df1) > lag + 1:
lag_row = self._calc_legacy_row(df1.iloc[:-(lag)], "1m")
for f in self.FEATS_1M: vec.append(float(lag_row.get(f, 0)))
else:
for _ in self.FEATS_1M: vec.append(0.0)
final_vec = np.array(vec)
return final_vec.reshape(1, -1) if len(final_vec) == 36 else None
except: return None
def _engineer_v3_dataframe(self, ohlcv_1m, ohlcv_5m, ohlcv_15m):
try:
df1 = self._prepare_df(ohlcv_1m); df5 = self._prepare_df(ohlcv_5m); df15 = self._prepare_df(ohlcv_15m)
if len(df1) < 50: return None
for d in [df1, df5, df15]:
d['rsi'] = ta.rsi(d['close'], 14).fillna(50)
d['ema_50'] = ta.ema(d['close'], 50).fillna(d['close'])
d['ema_200'] = ta.ema(d['close'], 200).fillna(d['close'])
d['dist_ema50'] = (d['close'] - d['ema_50'])/d['close']
d['dist_ema200'] = (d['close'] - d['ema_200'])/d['close']
d['log_ret'] = np.log(d['close']/d['close'].shift(1)).fillna(0)
r1 = df1.iloc[-1]; r5 = df5.iloc[-1]; r15 = df15.iloc[-1]
feats = r1.to_dict()
for k, v in r5.items(): feats[f"{k}_5m"] = v
for k, v in r15.items(): feats[f"{k}_15m"] = v
df_aligned = pd.DataFrame(columns=self.v3_feature_names)
df_raw = pd.DataFrame([feats])
for col in self.v3_feature_names:
df_aligned.loc[0, col] = df_raw.iloc[0].get(col, 0.0)
return df_aligned.astype(float)
except: return None
# ==========================================================================
# 🔍 GEM-Architect: The Time-Weighted Logic (30-Minute Check)
# ==========================================================================
def _validate_with_time_weighted_ob(self, order_book, signal_type, volume_30m_usd):
if not order_book: return True
if volume_30m_usd <= 0: return True
try:
bids = order_book.get('bids', [])
if not bids: return True
bid_wall_usd = sum([float(x[0]) * float(x[1]) for x in bids[:15]])
absorption_ratio = bid_wall_usd / volume_30m_usd
if signal_type == 'EXIT_HARD':
if absorption_ratio > 0.05:
return False # VETO THE EXIT
return True
except Exception:
return True
# ==========================================================================
# 🛡️ MAIN ANALYZE (UPDATED)
# ==========================================================================
def analyze_position(self, ohlcv_1m, ohlcv_5m, ohlcv_15m, entry_price, order_book=None, volume_30m_usd=0.0):
scores = {'v2': 0.0, 'v3': 0.0}
if not self.initialized: return {'action': 'HOLD', 'scores': scores}
try:
# 1. Calculate Models
vec_v2 = self._engineer_legacy_v2_vector(ohlcv_1m, ohlcv_5m, ohlcv_15m)
if vec_v2 is not None:
pred_v2 = self.model_v2.predict(xgb.DMatrix(vec_v2))
scores['v2'] = float(pred_v2[0][2]) if len(pred_v2.shape)>1 else float(pred_v2[2]) if len(pred_v2)>1 else float(pred_v2[0])
if self.model_v3 and self.v3_feature_names:
df_v3 = self._engineer_v3_dataframe(ohlcv_1m, ohlcv_5m, ohlcv_15m)
if df_v3 is not None:
pred_v3 = self.model_v3.predict(xgb.DMatrix(df_v3))
scores['v3'] = float(pred_v3[0]) if isinstance(pred_v3, np.ndarray) else float(pred_v3)
# 2. Decision & VETO
v2 = scores['v2']
v3 = scores['v3']
allow_exit = self._validate_with_time_weighted_ob(order_book, 'EXIT_HARD', volume_30m_usd)
# --- V2 CHECK ---
if v2 >= self.V2_PANIC_TRIGGER:
if allow_exit:
return {'action': 'EXIT_HARD', 'reason': f'🚨 V2 PANIC ({v2:.2f})', 'scores': scores}
else:
return {'action': 'HOLD', 'reason': f'🛡️ V2 BLOCKED: Strong Buy Wall (30m Context)', 'scores': scores}
# --- V3 CHECK ---
if v3 >= self.V3_HARD_EXIT:
if allow_exit:
return {'action': 'EXIT_HARD', 'reason': f'🔥 V3 KILL ({v3:.2f})', 'scores': scores}
else:
return {'action': 'HOLD', 'reason': f'🛡️ V3 BLOCKED: Strong Buy Wall (30m Context)', 'scores': scores}
return {'action': 'HOLD', 'reason': f'Monitor (V2:{v2:.2f} V3:{v3:.2f})', 'scores': scores}
except Exception as e:
print(f"❌ Analysis Error: {e}")
return {'action': 'HOLD', 'reason': 'Error', 'scores': scores}