Spaces:
Paused
Paused
Auto-Fix by Architect: Auto-Fix by Agency: REQUEST_CODE_MODIFICATION | The Quant Warning indicates a critical flaw in the model's response logic, requiring architectural review to resolve ambiguity in trade signal interpretation. This aligns with the "Zero Crashes" objective by preventing faulty execution from unclear model outputs.
4d5bcd7 verified | import os | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import pandas_ta as ta | |
| import xgboost as xgb | |
| from collections import deque, defaultdict | |
| import traceback | |
| import sys | |
| # ✅ GEM-FIX: استيراد Mixin لضمان التوافق (احتياطي) | |
| from sklearn.base import ClassifierMixin | |
| class GuardianHydra: | |
| """ | |
| GuardianHydra V1.7 (Quant Warning Resolution) | |
| - Fixed: `_estimator_type` undefined error during load_model. | |
| - Added: Manual type injection for XGBClassifier. | |
| - Enhanced: Quant Warning handling with fallback and diagnostics. | |
| - Resolved: Ambiguity in trade signal interpretation via structured confidence mapping. | |
| """ | |
| def __init__(self, model_dir): | |
| self.model_dir = model_dir | |
| self.initialized = False | |
| self.models = {} | |
| self.feature_cols = [] | |
| self.verbose = True | |
| self.smoothing_buffer = defaultdict(lambda: { | |
| 'crash': deque(maxlen=3), | |
| 'giveback': deque(maxlen=3), | |
| 'stagnation': deque(maxlen=3) | |
| }) | |
| self.ATR_PERIOD = 14 | |
| if self.verbose: print("🐲 [Hydra X-RAY] Instance Created. Waiting for data...") | |
| def set_silent_mode(self, silent=True): | |
| """ Control Logging Output """ | |
| self.verbose = not silent | |
| def initialize(self): | |
| if self.verbose: print(f"🐲 [Hydra X-RAY] Loading from: {self.model_dir}") | |
| if not os.path.exists(self.model_dir): | |
| if self.verbose: print(f"❌ [FATAL] Directory missing: {self.model_dir}") | |
| return False | |
| try: | |
| # 1. Load Features | |
| feat_path = os.path.join(self.model_dir, "hydra_features_list.pkl") | |
| if not os.path.exists(feat_path): | |
| if self.verbose: print(f"❌ Feature list missing: {feat_path}") | |
| return False | |
| self.feature_cols = joblib.load(feat_path) | |
| if self.verbose: print(f"✅ Features List Loaded ({len(self.feature_cols)} items)") | |
| # 2. Load Models (RAW with Type Injection) | |
| heads = ['crash', 'giveback', 'stagnation'] | |
| for h in heads: | |
| model_path = os.path.join(self.model_dir, f"hydra_head_{h}_raw.json") | |
| if not os.path.exists(model_path): | |
| if self.verbose: print(f"❌ Model missing: {model_path}") | |
| return False | |
| # ✅ إنشاء الكائن | |
| clf = xgb.XGBClassifier() | |
| # ========================================================= | |
| # 💉 GEM-ARCHITECT PATCH: FORCE ESTIMATOR TYPE | |
| # ========================================================= | |
| # هذا السطر يخبر XGBoost قسراً أن هذا الكائن هو Classifier | |
| # لتجاوز فحص validate_loader الداخلي الذي يسبب الخطأ | |
| clf._estimator_type = "classifier" | |
| # ========================================================= | |
| try: | |
| clf.load_model(model_path) | |
| self.models[h] = clf | |
| if self.verbose: print(f"✅ Loaded Head: {h}") | |
| except Exception as load_err: | |
| if self.verbose: print(f"⚠️ Failed to load {h} with XGBClassifier. Trying raw Booster...") | |
| # Fallback to raw booster if classifier wrapper fails completely | |
| bst = xgb.Booster() | |
| bst.load_model(model_path) | |
| self.models[h] = bst # Note: Prediction logic handles this differently usually, but we keep clf flow first. | |
| self.initialized = True | |
| if self.verbose: print(f"✅ [Hydra X-RAY] System Ready.") | |
| return True | |
| except Exception as e: | |
| if self.verbose: print(f"❌ [Hydra Init Error] {e}") | |
| traceback.print_exc() | |
| return False | |
| def _engineer_features(self, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context): | |
| try: | |
| # 1. Check Raw Data Inputs | |
| if not ohlcv_1m or len(ohlcv_1m) < 1: | |
| if self.verbose: print("⚠️ [X-RAY] 1m Data is EMPTY!") | |
| return None | |
| df_1m = pd.DataFrame(ohlcv_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| last_close = df_1m['close'].iloc[-1] | |
| if len(df_1m) < 50: | |
| if self.verbose: print(f"⚠️ [X-RAY] Not enough history: {len(df_1m)} < 50") | |
| return None | |
| # 2. Indicator Calculation | |
| df_1m['atr'] = ta.atr(df_1m['high'], df_1m['low'], df_1m['close'], length=self.ATR_PERIOD) | |
| df_1m['rsi'] = ta.rsi(df_1m['close'], length=14) | |
| last_rsi = df_1m['rsi'].iloc[-1] | |
| last_atr = df_1m['atr'].iloc[-1] | |
| bb = ta.bbands(df_1m['close'], length=20, std=2) | |
| if bb is not None: | |
| w_col = [c for c in bb.columns if 'BBB' in c] | |
| df_1m['bb_width'] = bb[w_col[0]] if w_col else 0.0 | |
| else: | |
| df_1m['bb_width'] = 0.0 | |
| vol_ma = df_1m['volume'].rolling(50).mean() | |
| df_1m['rel_vol'] = df_1m['volume'] / (vol_ma + 1e-9) | |
| # HTF Mocking if missing | |
| df_5m = pd.DataFrame(ohlcv_5m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_5m else pd.DataFrame() | |
| rsi_5m = ta.rsi(df_5m['close'], length=14).iloc[-1] if len(df_5m) > 14 else 50 | |
| df_15m = pd.DataFrame(ohlcv_15m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_15m else pd.DataFrame() | |
| rsi_15m = ta.rsi(df_15m['close'], length=14).iloc[-1] if len(df_15m) > 14 else 50 | |
| dist_ema20_1h = 0.0 | |
| if len(df_15m) > 4: | |
| ema20_1h_approx = df_15m['close'].ewm(span=80, adjust=False).mean().iloc[-1] | |
| dist_ema20_1h = (last_close - ema20_1h_approx) / last_close | |
| # Trade Context | |
| entry_price = float(trade_context.get('entry_price', 0.0)) | |
| if entry_price == 0: entry_price = last_close | |
| atr_val = last_atr if last_atr > 0 else (entry_price * 0.01) | |
| sl_dist_unit = 1.5 * atr_val | |
| pnl_amt = last_close - entry_price | |
| norm_pnl_r = pnl_amt / sl_dist_unit | |
| duration_mins = trade_context.get('time_in_trade_mins', 10) | |
| highest_price = float(trade_context.get('highest_price', entry_price)) | |
| if highest_price < entry_price: highest_price = entry_price | |
| max_pnl_amt = highest_price - entry_price | |
| max_pnl_r = max_pnl_amt / sl_dist_unit if sl_dist_unit > 0 else 0.0 | |
| # Assemble Vector | |
| feat_dict = { | |
| 'rsi_1m': last_rsi, | |
| 'rsi_5m': rsi_5m, | |
| 'rsi_15m': rsi_15m, | |
| 'bb_width': df_1m['bb_width'].iloc[-1], | |
| 'rel_vol': df_1m['rel_vol'].iloc[-1], | |
| 'dist_ema20_1h': dist_ema20_1h, | |
| 'atr_pct': atr_val / last_close, | |
| 'norm_pnl_r': norm_pnl_r, | |
| 'max_pnl_r': max_pnl_r, | |
| 'dist_tp_atr': 0.0, | |
| 'dist_sl_atr': 0.0, | |
| 'time_in_trade': float(duration_mins), | |
| 'entry_type': 0.0, | |
| 'oracle_conf': 0.8, | |
| 'l2_score': 0.7, | |
| 'target_class': 3.0 | |
| } | |
| vector = pd.DataFrame([feat_dict]) | |
| for col in self.feature_cols: | |
| if col not in vector.columns: | |
| vector[col] = 0.0 | |
| if vector.isnull().values.any(): | |
| vector = vector.fillna(0) | |
| return vector[self.feature_cols].astype(float) | |
| except Exception as e: | |
| if self.verbose: | |
| print(f"❌ [X-RAY] Feature Error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def analyze_position(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data): | |
| if not self.initialized: | |
| return {'action': 'HOLD', 'reason': 'Not Init'} | |
| try: | |
| features = self._engineer_features(ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data) | |
| if features is None: | |
| return {'action': 'HOLD', 'reason': 'Feat Fail'} | |
| probs = {} | |
| if self.verbose: print(f"🔬 [X-RAY] Predicting for {symbol}...") | |
| for h in ['crash', 'giveback', 'stagnation']: | |
| try: | |
| model = self.models[h] | |
| # Check if it's a Raw Booster or Sklearn Wrapper | |
| if isinstance(model, xgb.Booster): | |
| # For raw booster, we need DMatrix | |
| dtest = xgb.DMatrix(features) | |
| # Booster returns raw probability directly for binary classification | |
| raw_prob = model.predict(dtest)[0] | |
| else: | |
| # Sklearn Wrapper | |
| full_pred = model.predict_proba(features) | |
| raw_prob = full_pred[0][1] if len(full_pred[0]) > 1 else 0.0 | |
| probs[h] = raw_prob | |
| if raw_prob > 0.0 and self.verbose: | |
| print(f" 🔥 {h.upper()} Non-Zero Prob: {raw_prob:.4f}") | |
| except Exception as e: | |
| if self.verbose: print(f" ❌ Error predicting {h}: {e}") | |
| probs[h] = 0.0 | |
| # Handle case where all probabilities are zero (Quant Silence) | |
| total_prob = sum(probs.values()) | |
| if total_prob <= 0.0: | |
| if self.verbose: | |
| print("🚨 [QUANT SILENCE] All model outputs are zero. Possible bug or data issue.") | |
| # Instead of returning HOLD, attempt to re-initialize models once before giving up | |
| if not hasattr(self, '_reinit_attempted'): | |
| if self.verbose: | |
| print("🔄 Attempting one-time reinitialization due to Quant Silence...") | |
| self._reinit_attempted = True | |
| success = self.initialize() | |
| if success: | |
| # Re-check after re-init | |
| if not self.initialized: | |
| return {'action': 'HOLD', 'reason': 'Re-init failed'} | |
| return self.analyze_position(symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data) | |
| return {'action': 'HOLD', 'reason': 'Quant Silence'} | |
| # ✅ QUANT WARNING RESOLUTION: Clear Signal Mapping | |
| crash_prob = probs.get('crash', 0.0) | |
| giveback_prob = probs.get('giveback', 0.0) | |
| stagnation_prob = probs.get('stagnation', 0.0) | |
| # Define thresholds for clear decisions | |
| threshold_crash = 0.6 | |
| threshold_giveback = 0.55 | |
| threshold_stagnation = 0.5 | |
| # Decision Logic with Priority | |
| if crash_prob >= threshold_crash: | |
| action = 'CLOSE' | |
| reason = 'High Crash Risk Detected' | |
| confidence = crash_prob | |
| elif giveback_prob >= threshold_giveback: | |
| action = 'TAKE_PROFIT' | |
| reason = 'Giveback Signal Strong' | |
| confidence = giveback_prob | |
| elif stagnation_prob >= threshold_stagnation: | |
| action = 'HOLD' | |
| reason = 'Stagnation Detected' | |
| confidence = stagnation_prob | |
| else: | |
| # Default fallback when no strong signal | |
| action = 'HOLD' | |
| reason = 'No Clear Signal' | |
| confidence = max(crash_prob, giveback_prob, stagnation_prob) | |
| return self._pkg(action, confidence, reason, probs) | |
| except Exception as e: | |
| if self.verbose: print(f"❌ [X-RAY] Analyze Error: {e}") | |
| return {'action': 'HOLD', 'reason': 'Error'} | |
| def _pkg(self, action, conf, reason, probs): | |
| return { | |
| 'action': action, | |
| 'confidence': float(conf), | |
| 'reason': reason, | |
| 'probs': {k: float(v) for k, v in probs.items()}, | |
| 'scores': {'v2': probs.get('crash',0), 'v3': probs.get('giveback',0)} | |
| } | |
| # ✅ GEM-FIX: Ensure estimator compatibility at module level | |
| # This ensures that any dynamically loaded models are recognized correctly by scikit-learn tools | |
| xgb.XGBClassifier._estimator_type = "classifier" |