Trad / ml_engine /guardian_hydra.py
Riy777's picture
Update ml_engine/guardian_hydra.py
ba7da31 verified
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
from collections import deque, defaultdict
import traceback
import sys
class GuardianHydra:
"""
GuardianHydra V1.4 (Configurable Verbosity)
- Added set_silent_mode() to suppress logs during Backtesting.
"""
def __init__(self, model_dir):
self.model_dir = model_dir
self.initialized = False
self.models = {}
self.feature_cols = []
self.verbose = True # βœ… Default to True for Live System
self.smoothing_buffer = defaultdict(lambda: {
'crash': deque(maxlen=3),
'giveback': deque(maxlen=3),
'stagnation': deque(maxlen=3)
})
self.ATR_PERIOD = 14
# βœ… Silent check
if self.verbose: print("🐲 [Hydra X-RAY] Instance Created. Waiting for data...")
def set_silent_mode(self, silent=True):
""" βœ… Control Logging Output (True = No Logs, False = X-RAY Logs) """
self.verbose = not silent
def initialize(self):
if self.verbose: print(f"🐲 [Hydra X-RAY] Loading from: {self.model_dir}")
if not os.path.exists(self.model_dir):
if self.verbose: print(f"❌ [FATAL] Directory missing: {self.model_dir}")
return False
try:
# 1. Load Features
feat_path = os.path.join(self.model_dir, "hydra_features_list.pkl")
if not os.path.exists(feat_path):
if self.verbose: print(f"❌ Feature list missing: {feat_path}")
return False
self.feature_cols = joblib.load(feat_path)
if self.verbose: print(f"βœ… Features List Loaded ({len(self.feature_cols)} items)")
# 2. Load Models (RAW)
heads = ['crash', 'giveback', 'stagnation']
for h in heads:
model_path = os.path.join(self.model_dir, f"hydra_head_{h}_raw.json")
if not os.path.exists(model_path):
if self.verbose: print(f"❌ Model missing: {model_path}")
return False
clf = xgb.XGBClassifier()
clf.load_model(model_path)
self.models[h] = clf
if self.verbose: print(f"βœ… Loaded Head: {h}")
self.initialized = True
if self.verbose: print(f"βœ… [Hydra X-RAY] System Ready.")
return True
except Exception as e:
if self.verbose: print(f"❌ [Hydra Init Error] {e}")
traceback.print_exc()
return False
def _engineer_features(self, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context):
try:
# 1. Check Raw Data Inputs
if not ohlcv_1m or len(ohlcv_1m) < 1:
if self.verbose: print("⚠️ [X-RAY] 1m Data is EMPTY!")
return None
df_1m = pd.DataFrame(ohlcv_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# [DIAGNOSTIC 1] Print Input Sample
last_close = df_1m['close'].iloc[-1]
if self.verbose:
print(f"πŸ” [Input Check] Last Close: {last_close} | Candles: {len(df_1m)}")
if len(df_1m) < 50:
if self.verbose: print(f"⚠️ [X-RAY] Not enough history: {len(df_1m)} < 50")
return None
# 2. Indicator Calculation
df_1m['atr'] = ta.atr(df_1m['high'], df_1m['low'], df_1m['close'], length=self.ATR_PERIOD)
df_1m['rsi'] = ta.rsi(df_1m['close'], length=14)
# [DIAGNOSTIC 2] Check Indicators
last_rsi = df_1m['rsi'].iloc[-1]
last_atr = df_1m['atr'].iloc[-1]
if (pd.isna(last_rsi) or pd.isna(last_atr)) and self.verbose:
print(f"⚠️ [X-RAY] Indicators are NaN! RSI: {last_rsi}, ATR: {last_atr}")
# ... rest of calculations ...
bb = ta.bbands(df_1m['close'], length=20, std=2)
if bb is not None:
w_col = [c for c in bb.columns if 'BBB' in c]
df_1m['bb_width'] = bb[w_col[0]] if w_col else 0.0
else:
df_1m['bb_width'] = 0.0
vol_ma = df_1m['volume'].rolling(50).mean()
df_1m['rel_vol'] = df_1m['volume'] / (vol_ma + 1e-9)
# HTF Mocking if missing
df_5m = pd.DataFrame(ohlcv_5m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_5m else pd.DataFrame()
rsi_5m = ta.rsi(df_5m['close'], length=14).iloc[-1] if len(df_5m) > 14 else 50
df_15m = pd.DataFrame(ohlcv_15m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_15m else pd.DataFrame()
rsi_15m = ta.rsi(df_15m['close'], length=14).iloc[-1] if len(df_15m) > 14 else 50
dist_ema20_1h = 0.0
if len(df_15m) > 4:
ema20_1h_approx = df_15m['close'].ewm(span=80, adjust=False).mean().iloc[-1]
dist_ema20_1h = (last_close - ema20_1h_approx) / last_close
# Trade Context
entry_price = float(trade_context.get('entry_price', 0.0))
if entry_price == 0: entry_price = last_close # Fallback
atr_val = last_atr if last_atr > 0 else (entry_price * 0.01)
sl_dist_unit = 1.5 * atr_val
pnl_amt = last_close - entry_price
norm_pnl_r = pnl_amt / sl_dist_unit
duration_mins = trade_context.get('time_in_trade_mins', 10)
highest_price = float(trade_context.get('highest_price', entry_price))
if highest_price < entry_price: highest_price = entry_price
max_pnl_amt = highest_price - entry_price
max_pnl_r = max_pnl_amt / sl_dist_unit if sl_dist_unit > 0 else 0.0
# Assemble Vector
feat_dict = {
'rsi_1m': last_rsi,
'rsi_5m': rsi_5m,
'rsi_15m': rsi_15m,
'bb_width': df_1m['bb_width'].iloc[-1],
'rel_vol': df_1m['rel_vol'].iloc[-1],
'dist_ema20_1h': dist_ema20_1h,
'atr_pct': atr_val / last_close,
'norm_pnl_r': norm_pnl_r,
'max_pnl_r': max_pnl_r,
'dist_tp_atr': 0.0,
'dist_sl_atr': 0.0,
'time_in_trade': float(duration_mins),
'entry_type': 0.0,
'oracle_conf': 0.8,
'l2_score': 0.7,
'target_class': 3.0
}
vector = pd.DataFrame([feat_dict])
for col in self.feature_cols:
if col not in vector.columns:
vector[col] = 0.0
if vector.isnull().values.any():
if self.verbose:
print("⚠️ [X-RAY] Final Vector contains NaNs! Model will fail or output 0.")
print(vector.iloc[0].to_dict())
vector = vector.fillna(0)
return vector[self.feature_cols].astype(float)
except Exception as e:
if self.verbose:
print(f"❌ [X-RAY] Feature Error: {e}")
traceback.print_exc()
return None
def analyze_position(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data):
if not self.initialized:
return {'action': 'HOLD', 'reason': 'Not Init'}
try:
features = self._engineer_features(ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data)
if features is None:
if self.verbose: print(f"🚫 [X-RAY] {symbol}: Feature Engineering Failed.")
return {'action': 'HOLD', 'reason': 'Feat Fail'}
probs = {}
if self.verbose: print(f"πŸ”¬ [X-RAY] Predicting for {symbol}...")
for h in ['crash', 'giveback', 'stagnation']:
try:
full_pred = self.models[h].predict_proba(features)
raw_prob = full_pred[0][1]
probs[h] = raw_prob
if raw_prob > 0.0 and self.verbose:
print(f" πŸ”₯ {h.upper()} Non-Zero Prob: {raw_prob:.4f}")
except Exception as e:
if self.verbose: print(f" ❌ Error predicting {h}: {e}")
probs[h] = 0.0
return self._pkg('HOLD', 0.0, "X-RAY DIAGNOSTIC", probs)
except Exception as e:
if self.verbose: print(f"❌ [X-RAY] Analyze Error: {e}")
return {'action': 'HOLD', 'reason': 'Error'}
def _pkg(self, action, conf, reason, probs):
return {
'action': action,
'confidence': float(conf),
'reason': reason,
'probs': {k: float(v) for k, v in probs.items()},
'scores': {'v2': probs.get('crash',0), 'v3': probs.get('giveback',0)}
}