Tradtesting / ml_engine /guard_engine.py
Riy777's picture
Update ml_engine/guard_engine.py
4829c8c verified
raw
history blame
7.05 kB
# ml_engine/guard_engine.py
# (V1.4 - GEM-Architect Fix: Full Visibility, Async Wrapper & Column Fix)
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import logging
import traceback
import asyncio
# إعداد تسجيل بسيط للأخطاء
logger = logging.getLogger("GuardEngine")
class GuardEngine:
def __init__(self, models_dir="ml_models/guard_v2"):
self.models_dir = models_dir
self.exit_model = None
self.exit_features = None
self.initialized = False
# إعدادات الحارس (V2 Settings)
self.EXIT_THRESHOLD = 0.80 # العتبة الذهبية
async def initialize(self):
"""تحميل نموذج الحماية وقائمة ميزاته"""
if self.initialized: return
try:
print(f"🛡️ [GuardEngine] Loading V2 models from {self.models_dir}...")
model_path = os.path.join(self.models_dir, "Guard_Exit_V2.json")
feat_path = os.path.join(self.models_dir, "Guard_Exit_V2_features.pkl")
if os.path.exists(model_path) and os.path.exists(feat_path):
self.exit_model = xgb.Booster()
self.exit_model.load_model(model_path)
self.exit_features = joblib.load(feat_path)
self.initialized = True
print(f"✅ [GuardEngine] Exit Guard V2 Loaded! (Threshold: {self.EXIT_THRESHOLD})")
else:
print(f"❌ [GuardEngine] CRITICAL: Model files missing in {self.models_dir}")
except Exception as e:
print(f"❌ [GuardEngine] Initialization failed: {e}")
traceback.print_exc()
def _engineer_features(self, df_raw):
"""مصنع الميزات الحي (مع إصلاح الأعمدة المفقودة)"""
df = df_raw.copy()
# 1. ميزات تيتان (Titan Features)
df['RSI_14'] = ta.rsi(df['close'], length=14)
df['MFI_14'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14)
macd = ta.macd(df['close'])
if macd is not None:
df['MACD'] = macd.iloc[:, 0]
df['MACD_Hist'] = macd.iloc[:, 1]
df['ADX_14'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0]
for p in [9, 21, 50, 200]:
ema = ta.ema(df['close'], length=p)
df[f'EMA_{p}'] = ema
df[f'Dist_EMA_{p}'] = (df['close'] / ema) - 1
df['ATR_14'] = ta.atr(df['high'], df['low'], df['close'], length=14)
df['ATR_Pct'] = df['ATR_14'] / df['close']
# --- [ FIX START: Bollinger Bands Robustness ] ---
# حساب البولنجر باند
bb = ta.bbands(df['close'], length=20, std=2)
if bb is not None:
# 1. الحسابات المشتقة
df['BB_Width'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]
df['BB_Pos'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])
# 2. دمج الأعمدة الخام (لأن النموذج يطلبها)
df = pd.concat([df, bb], axis=1)
# 3. التحوط ضد التسميات الغريبة (Double Suffix Fix)
# النموذج يطلب 'BBU_20_2.0_2.0' بدلاً من 'BBU_20_2.0'
# نقوم بنسخ الأعمدة بالأسماء المتوقعة لتجنب الانهيار
if 'BBU_20_2.0' in df.columns:
df['BBU_20_2.0_2.0'] = df['BBU_20_2.0']
df['BBL_20_2.0_2.0'] = df['BBL_20_2.0']
df['BBM_20_2.0_2.0'] = df['BBM_20_2.0']
# --- [ FIX END ] ---
# 2. ميزات الأنماط (Pattern Features)
for i in range(1, 13):
df[f'Close_R_{i}'] = df['close'].shift(i) / df['close']
df[f'Vol_R_{i}'] = df['volume'].shift(i) / (df['volume'] + 1e-9)
df['Body_Size'] = abs(df['close'] - df['open']) / df['open']
df['Upper_Wick'] = (df['high'] - np.maximum(df['close'], df['open'])) / df['open']
df['Lower_Wick'] = (np.minimum(df['close'], df['open']) - df['low']) / df['open']
return df
def check_exit_signal(self, ohlcv_5m):
"""
الفحص الحي (Synchronous)
"""
if not self.initialized or not self.exit_model:
return {'action': 'HOLD', 'confidence': 0.0, 'reason': 'Guard not ready'}
try:
if len(ohlcv_5m) < 200:
return {'action': 'HOLD', 'confidence': 0.0, 'reason': f'Not enough data ({len(ohlcv_5m)})'}
# [Log Stamp] طباعة للتأكد من ان الحارس يعمل
# print(f"🛡️ [Guard Internal] Processing {len(ohlcv_5m)} candles...")
df = pd.DataFrame(ohlcv_5m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df = self._engineer_features(df)
latest_row = df.iloc[-1:].copy()
# التأكد من وجود جميع الميزات المطلوبة
# يتم ملء أي ميزة مفقودة بـ 0 لتجنب الانهيار التام، لكن السجل سيظهر تحذيراً إذا تكرر
try:
dmatrix = xgb.DMatrix(latest_row[self.exit_features])
except KeyError as e:
# محاولة أخيرة: طباعة الأعمدة المفقودة بدقة للمعالجة
missing = list(set(self.exit_features) - set(latest_row.columns))
print(f"⚠️ [Guard Fix Attempt] Missing cols: {missing}. Filling with 0.")
for col in missing:
latest_row[col] = 0.0
dmatrix = xgb.DMatrix(latest_row[self.exit_features])
prob = self.exit_model.predict(dmatrix)[0]
# [Log Stamp] طباعة النتيجة
# print(f"🛡️ [Guard Internal] Result: {prob:.4f} (Threshold: {self.EXIT_THRESHOLD})")
if prob >= self.EXIT_THRESHOLD:
return {
'action': 'EXIT_NOW',
'confidence': float(prob),
'reason': f'Guard V2 Exit Signal (Conf: {prob:.2f} >= {self.EXIT_THRESHOLD})'
}
else:
return {
'action': 'HOLD',
'confidence': float(prob),
'reason': f'Guard V2 Secure (Conf: {prob:.2f})'
}
except Exception as e:
print(f"❌ [Guard Internal Error] {e}")
return {'action': 'ERROR', 'confidence': 0.0, 'reason': str(e)}
async def check_exit_signal_async(self, ohlcv_5m):
"""
Wrapper غير متزامن يحل مشكلة التجمد والـ AttributeError.
"""
return await asyncio.to_thread(self.check_exit_signal, ohlcv_5m)