# ml_engine/guard_engine.py # (V1.4 - GEM-Architect Fix: Full Visibility, Async Wrapper & Column Fix) import os import joblib import numpy as np import pandas as pd import pandas_ta as ta import xgboost as xgb import logging import traceback import asyncio # إعداد تسجيل بسيط للأخطاء logger = logging.getLogger("GuardEngine") class GuardEngine: def __init__(self, models_dir="ml_models/guard_v2"): self.models_dir = models_dir self.exit_model = None self.exit_features = None self.initialized = False # إعدادات الحارس (V2 Settings) self.EXIT_THRESHOLD = 0.80 # العتبة الذهبية async def initialize(self): """تحميل نموذج الحماية وقائمة ميزاته""" if self.initialized: return try: print(f"🛡️ [GuardEngine] Loading V2 models from {self.models_dir}...") model_path = os.path.join(self.models_dir, "Guard_Exit_V2.json") feat_path = os.path.join(self.models_dir, "Guard_Exit_V2_features.pkl") if os.path.exists(model_path) and os.path.exists(feat_path): self.exit_model = xgb.Booster() self.exit_model.load_model(model_path) self.exit_features = joblib.load(feat_path) self.initialized = True print(f"✅ [GuardEngine] Exit Guard V2 Loaded! (Threshold: {self.EXIT_THRESHOLD})") else: print(f"❌ [GuardEngine] CRITICAL: Model files missing in {self.models_dir}") except Exception as e: print(f"❌ [GuardEngine] Initialization failed: {e}") traceback.print_exc() def _engineer_features(self, df_raw): """مصنع الميزات الحي (مع إصلاح الأعمدة المفقودة)""" df = df_raw.copy() # 1. ميزات تيتان (Titan Features) df['RSI_14'] = ta.rsi(df['close'], length=14) df['MFI_14'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14) macd = ta.macd(df['close']) if macd is not None: df['MACD'] = macd.iloc[:, 0] df['MACD_Hist'] = macd.iloc[:, 1] df['ADX_14'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0] for p in [9, 21, 50, 200]: ema = ta.ema(df['close'], length=p) df[f'EMA_{p}'] = ema df[f'Dist_EMA_{p}'] = (df['close'] / ema) - 1 df['ATR_14'] = ta.atr(df['high'], df['low'], df['close'], length=14) df['ATR_Pct'] = df['ATR_14'] / df['close'] # --- [ FIX START: Bollinger Bands Robustness ] --- # حساب البولنجر باند bb = ta.bbands(df['close'], length=20, std=2) if bb is not None: # 1. الحسابات المشتقة df['BB_Width'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1] df['BB_Pos'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]) # 2. دمج الأعمدة الخام (لأن النموذج يطلبها) df = pd.concat([df, bb], axis=1) # 3. التحوط ضد التسميات الغريبة (Double Suffix Fix) # النموذج يطلب 'BBU_20_2.0_2.0' بدلاً من 'BBU_20_2.0' # نقوم بنسخ الأعمدة بالأسماء المتوقعة لتجنب الانهيار if 'BBU_20_2.0' in df.columns: df['BBU_20_2.0_2.0'] = df['BBU_20_2.0'] df['BBL_20_2.0_2.0'] = df['BBL_20_2.0'] df['BBM_20_2.0_2.0'] = df['BBM_20_2.0'] # --- [ FIX END ] --- # 2. ميزات الأنماط (Pattern Features) for i in range(1, 13): df[f'Close_R_{i}'] = df['close'].shift(i) / df['close'] df[f'Vol_R_{i}'] = df['volume'].shift(i) / (df['volume'] + 1e-9) df['Body_Size'] = abs(df['close'] - df['open']) / df['open'] df['Upper_Wick'] = (df['high'] - np.maximum(df['close'], df['open'])) / df['open'] df['Lower_Wick'] = (np.minimum(df['close'], df['open']) - df['low']) / df['open'] return df def check_exit_signal(self, ohlcv_5m): """ الفحص الحي (Synchronous) """ if not self.initialized or not self.exit_model: return {'action': 'HOLD', 'confidence': 0.0, 'reason': 'Guard not ready'} try: if len(ohlcv_5m) < 200: return {'action': 'HOLD', 'confidence': 0.0, 'reason': f'Not enough data ({len(ohlcv_5m)})'} # [Log Stamp] طباعة للتأكد من ان الحارس يعمل # print(f"🛡️ [Guard Internal] Processing {len(ohlcv_5m)} candles...") df = pd.DataFrame(ohlcv_5m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df = self._engineer_features(df) latest_row = df.iloc[-1:].copy() # التأكد من وجود جميع الميزات المطلوبة # يتم ملء أي ميزة مفقودة بـ 0 لتجنب الانهيار التام، لكن السجل سيظهر تحذيراً إذا تكرر try: dmatrix = xgb.DMatrix(latest_row[self.exit_features]) except KeyError as e: # محاولة أخيرة: طباعة الأعمدة المفقودة بدقة للمعالجة missing = list(set(self.exit_features) - set(latest_row.columns)) print(f"⚠️ [Guard Fix Attempt] Missing cols: {missing}. Filling with 0.") for col in missing: latest_row[col] = 0.0 dmatrix = xgb.DMatrix(latest_row[self.exit_features]) prob = self.exit_model.predict(dmatrix)[0] # [Log Stamp] طباعة النتيجة # print(f"🛡️ [Guard Internal] Result: {prob:.4f} (Threshold: {self.EXIT_THRESHOLD})") if prob >= self.EXIT_THRESHOLD: return { 'action': 'EXIT_NOW', 'confidence': float(prob), 'reason': f'Guard V2 Exit Signal (Conf: {prob:.2f} >= {self.EXIT_THRESHOLD})' } else: return { 'action': 'HOLD', 'confidence': float(prob), 'reason': f'Guard V2 Secure (Conf: {prob:.2f})' } except Exception as e: print(f"❌ [Guard Internal Error] {e}") return {'action': 'ERROR', 'confidence': 0.0, 'reason': str(e)} async def check_exit_signal_async(self, ohlcv_5m): """ Wrapper غير متزامن يحل مشكلة التجمد والـ AttributeError. """ return await asyncio.to_thread(self.check_exit_signal, ohlcv_5m)