Tradcloneai / ml_engine /guardian_hydra.py
Riy777's picture
Auto-Fix by Architect: Auto-Fix by Agency: REQUEST_CODE_MODIFICATION | The Quant Warning indicates a critical flaw in the model's response logic, requiring architectural review to resolve ambiguity in trade signal interpretation. This aligns with the "Zero Crashes" objective by preventing faulty execution from unclear model outputs.
4d5bcd7 verified
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
from collections import deque, defaultdict
import traceback
import sys
# ✅ GEM-FIX: استيراد Mixin لضمان التوافق (احتياطي)
from sklearn.base import ClassifierMixin
class GuardianHydra:
"""
GuardianHydra V1.7 (Quant Warning Resolution)
- Fixed: `_estimator_type` undefined error during load_model.
- Added: Manual type injection for XGBClassifier.
- Enhanced: Quant Warning handling with fallback and diagnostics.
- Resolved: Ambiguity in trade signal interpretation via structured confidence mapping.
"""
def __init__(self, model_dir):
self.model_dir = model_dir
self.initialized = False
self.models = {}
self.feature_cols = []
self.verbose = True
self.smoothing_buffer = defaultdict(lambda: {
'crash': deque(maxlen=3),
'giveback': deque(maxlen=3),
'stagnation': deque(maxlen=3)
})
self.ATR_PERIOD = 14
if self.verbose: print("🐲 [Hydra X-RAY] Instance Created. Waiting for data...")
def set_silent_mode(self, silent=True):
""" Control Logging Output """
self.verbose = not silent
def initialize(self):
if self.verbose: print(f"🐲 [Hydra X-RAY] Loading from: {self.model_dir}")
if not os.path.exists(self.model_dir):
if self.verbose: print(f"❌ [FATAL] Directory missing: {self.model_dir}")
return False
try:
# 1. Load Features
feat_path = os.path.join(self.model_dir, "hydra_features_list.pkl")
if not os.path.exists(feat_path):
if self.verbose: print(f"❌ Feature list missing: {feat_path}")
return False
self.feature_cols = joblib.load(feat_path)
if self.verbose: print(f"✅ Features List Loaded ({len(self.feature_cols)} items)")
# 2. Load Models (RAW with Type Injection)
heads = ['crash', 'giveback', 'stagnation']
for h in heads:
model_path = os.path.join(self.model_dir, f"hydra_head_{h}_raw.json")
if not os.path.exists(model_path):
if self.verbose: print(f"❌ Model missing: {model_path}")
return False
# ✅ إنشاء الكائن
clf = xgb.XGBClassifier()
# =========================================================
# 💉 GEM-ARCHITECT PATCH: FORCE ESTIMATOR TYPE
# =========================================================
# هذا السطر يخبر XGBoost قسراً أن هذا الكائن هو Classifier
# لتجاوز فحص validate_loader الداخلي الذي يسبب الخطأ
clf._estimator_type = "classifier"
# =========================================================
try:
clf.load_model(model_path)
self.models[h] = clf
if self.verbose: print(f"✅ Loaded Head: {h}")
except Exception as load_err:
if self.verbose: print(f"⚠️ Failed to load {h} with XGBClassifier. Trying raw Booster...")
# Fallback to raw booster if classifier wrapper fails completely
bst = xgb.Booster()
bst.load_model(model_path)
self.models[h] = bst # Note: Prediction logic handles this differently usually, but we keep clf flow first.
self.initialized = True
if self.verbose: print(f"✅ [Hydra X-RAY] System Ready.")
return True
except Exception as e:
if self.verbose: print(f"❌ [Hydra Init Error] {e}")
traceback.print_exc()
return False
def _engineer_features(self, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context):
try:
# 1. Check Raw Data Inputs
if not ohlcv_1m or len(ohlcv_1m) < 1:
if self.verbose: print("⚠️ [X-RAY] 1m Data is EMPTY!")
return None
df_1m = pd.DataFrame(ohlcv_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
last_close = df_1m['close'].iloc[-1]
if len(df_1m) < 50:
if self.verbose: print(f"⚠️ [X-RAY] Not enough history: {len(df_1m)} < 50")
return None
# 2. Indicator Calculation
df_1m['atr'] = ta.atr(df_1m['high'], df_1m['low'], df_1m['close'], length=self.ATR_PERIOD)
df_1m['rsi'] = ta.rsi(df_1m['close'], length=14)
last_rsi = df_1m['rsi'].iloc[-1]
last_atr = df_1m['atr'].iloc[-1]
bb = ta.bbands(df_1m['close'], length=20, std=2)
if bb is not None:
w_col = [c for c in bb.columns if 'BBB' in c]
df_1m['bb_width'] = bb[w_col[0]] if w_col else 0.0
else:
df_1m['bb_width'] = 0.0
vol_ma = df_1m['volume'].rolling(50).mean()
df_1m['rel_vol'] = df_1m['volume'] / (vol_ma + 1e-9)
# HTF Mocking if missing
df_5m = pd.DataFrame(ohlcv_5m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_5m else pd.DataFrame()
rsi_5m = ta.rsi(df_5m['close'], length=14).iloc[-1] if len(df_5m) > 14 else 50
df_15m = pd.DataFrame(ohlcv_15m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_15m else pd.DataFrame()
rsi_15m = ta.rsi(df_15m['close'], length=14).iloc[-1] if len(df_15m) > 14 else 50
dist_ema20_1h = 0.0
if len(df_15m) > 4:
ema20_1h_approx = df_15m['close'].ewm(span=80, adjust=False).mean().iloc[-1]
dist_ema20_1h = (last_close - ema20_1h_approx) / last_close
# Trade Context
entry_price = float(trade_context.get('entry_price', 0.0))
if entry_price == 0: entry_price = last_close
atr_val = last_atr if last_atr > 0 else (entry_price * 0.01)
sl_dist_unit = 1.5 * atr_val
pnl_amt = last_close - entry_price
norm_pnl_r = pnl_amt / sl_dist_unit
duration_mins = trade_context.get('time_in_trade_mins', 10)
highest_price = float(trade_context.get('highest_price', entry_price))
if highest_price < entry_price: highest_price = entry_price
max_pnl_amt = highest_price - entry_price
max_pnl_r = max_pnl_amt / sl_dist_unit if sl_dist_unit > 0 else 0.0
# Assemble Vector
feat_dict = {
'rsi_1m': last_rsi,
'rsi_5m': rsi_5m,
'rsi_15m': rsi_15m,
'bb_width': df_1m['bb_width'].iloc[-1],
'rel_vol': df_1m['rel_vol'].iloc[-1],
'dist_ema20_1h': dist_ema20_1h,
'atr_pct': atr_val / last_close,
'norm_pnl_r': norm_pnl_r,
'max_pnl_r': max_pnl_r,
'dist_tp_atr': 0.0,
'dist_sl_atr': 0.0,
'time_in_trade': float(duration_mins),
'entry_type': 0.0,
'oracle_conf': 0.8,
'l2_score': 0.7,
'target_class': 3.0
}
vector = pd.DataFrame([feat_dict])
for col in self.feature_cols:
if col not in vector.columns:
vector[col] = 0.0
if vector.isnull().values.any():
vector = vector.fillna(0)
return vector[self.feature_cols].astype(float)
except Exception as e:
if self.verbose:
print(f"❌ [X-RAY] Feature Error: {e}")
traceback.print_exc()
return None
def analyze_position(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data):
if not self.initialized:
return {'action': 'HOLD', 'reason': 'Not Init'}
try:
features = self._engineer_features(ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data)
if features is None:
return {'action': 'HOLD', 'reason': 'Feat Fail'}
probs = {}
if self.verbose: print(f"🔬 [X-RAY] Predicting for {symbol}...")
for h in ['crash', 'giveback', 'stagnation']:
try:
model = self.models[h]
# Check if it's a Raw Booster or Sklearn Wrapper
if isinstance(model, xgb.Booster):
# For raw booster, we need DMatrix
dtest = xgb.DMatrix(features)
# Booster returns raw probability directly for binary classification
raw_prob = model.predict(dtest)[0]
else:
# Sklearn Wrapper
full_pred = model.predict_proba(features)
raw_prob = full_pred[0][1] if len(full_pred[0]) > 1 else 0.0
probs[h] = raw_prob
if raw_prob > 0.0 and self.verbose:
print(f" 🔥 {h.upper()} Non-Zero Prob: {raw_prob:.4f}")
except Exception as e:
if self.verbose: print(f" ❌ Error predicting {h}: {e}")
probs[h] = 0.0
# Handle case where all probabilities are zero (Quant Silence)
total_prob = sum(probs.values())
if total_prob <= 0.0:
if self.verbose:
print("🚨 [QUANT SILENCE] All model outputs are zero. Possible bug or data issue.")
# Instead of returning HOLD, attempt to re-initialize models once before giving up
if not hasattr(self, '_reinit_attempted'):
if self.verbose:
print("🔄 Attempting one-time reinitialization due to Quant Silence...")
self._reinit_attempted = True
success = self.initialize()
if success:
# Re-check after re-init
if not self.initialized:
return {'action': 'HOLD', 'reason': 'Re-init failed'}
return self.analyze_position(symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data)
return {'action': 'HOLD', 'reason': 'Quant Silence'}
# ✅ QUANT WARNING RESOLUTION: Clear Signal Mapping
crash_prob = probs.get('crash', 0.0)
giveback_prob = probs.get('giveback', 0.0)
stagnation_prob = probs.get('stagnation', 0.0)
# Define thresholds for clear decisions
threshold_crash = 0.6
threshold_giveback = 0.55
threshold_stagnation = 0.5
# Decision Logic with Priority
if crash_prob >= threshold_crash:
action = 'CLOSE'
reason = 'High Crash Risk Detected'
confidence = crash_prob
elif giveback_prob >= threshold_giveback:
action = 'TAKE_PROFIT'
reason = 'Giveback Signal Strong'
confidence = giveback_prob
elif stagnation_prob >= threshold_stagnation:
action = 'HOLD'
reason = 'Stagnation Detected'
confidence = stagnation_prob
else:
# Default fallback when no strong signal
action = 'HOLD'
reason = 'No Clear Signal'
confidence = max(crash_prob, giveback_prob, stagnation_prob)
return self._pkg(action, confidence, reason, probs)
except Exception as e:
if self.verbose: print(f"❌ [X-RAY] Analyze Error: {e}")
return {'action': 'HOLD', 'reason': 'Error'}
def _pkg(self, action, conf, reason, probs):
return {
'action': action,
'confidence': float(conf),
'reason': reason,
'probs': {k: float(v) for k, v in probs.items()},
'scores': {'v2': probs.get('crash',0), 'v3': probs.get('giveback',0)}
}
# ✅ GEM-FIX: Ensure estimator compatibility at module level
# This ensures that any dynamically loaded models are recognized correctly by scikit-learn tools
xgb.XGBClassifier._estimator_type = "classifier"