# ml_engine/titan_engine.py # (V1.2 - Titan Inference Engine - Infinity Safety Patch) import os import joblib import numpy as np import pandas as pd import pandas_ta as ta import xgboost as xgb import json import traceback class TitanEngine: def __init__(self, model_dir="ml_models/layer2"): self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json") self.features_path = os.path.join(model_dir, "Titan_Features.pkl") self.model = None self.feature_names = None self.initialized = False async def initialize(self): """تحميل النموذج وقائمة الميزات من القرص""" print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...") try: if os.path.exists(self.model_path) and os.path.exists(self.features_path): # تحميل نموذج XGBoost self.model = xgb.Booster() self.model.load_model(self.model_path) # تحميل قائمة الميزات لضمان الترتيب الصحيح self.feature_names = joblib.load(self.features_path) self.initialized = True print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.") else: print(f"❌ [Titan] ملفات النموذج مفقودة!") except Exception as e: print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}") def apply_inverted_pyramid(self, df, tf): """نفس منطق هندسة الميزات المستخدم في التدريب تماماً مع تحصين ضد الأخطاء""" df = df.copy().sort_values('timestamp').reset_index(drop=True) # تعيين الفهرس للسهولة في pandas_ta df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms'))) try: # --- المستوى 1: دقيق (5m, 15m) --- if tf in ['5m', '15m']: df['RSI'] = ta.rsi(df['close'], length=14) macd = ta.macd(df['close']) if macd is not None and not macd.empty: df['MACD'] = macd.iloc[:, 0] df['MACD_h'] = macd.iloc[:, 1] else: df['MACD'] = np.nan df['MACD_h'] = np.nan df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20) adx = ta.adx(df['high'], df['low'], df['close'], length=14) df['ADX'] = adx.iloc[:, 0] if adx is not None and not adx.empty else np.nan for p in [9, 21, 50, 200]: ema = ta.ema(df['close'], length=p) # تحصين ضد القسمة على None if ema is not None and not ema.empty: df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1 else: df[f'EMA_{p}_dist'] = np.nan bb = ta.bbands(df['close'], length=20, std=2.0) if bb is not None and not bb.empty: df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1] df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]) else: df['BB_w'] = np.nan df['BB_p'] = np.nan df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14) vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) if vwap is not None and not vwap.empty: df['VWAP_dist'] = (df['close'] / vwap) - 1 else: df['VWAP_dist'] = np.nan # --- المستوى 2: تكتيكي (1h, 4h) --- elif tf in ['1h', '4h']: df['RSI'] = ta.rsi(df['close'], length=14) macd = ta.macd(df['close']) df['MACD_h'] = macd.iloc[:, 1] if macd is not None and not macd.empty else np.nan ema50 = ta.ema(df['close'], length=50) df['EMA_50_dist'] = (df['close'] / ema50) - 1 if ema50 is not None and not ema50.empty else np.nan ema200 = ta.ema(df['close'], length=200) df['EMA_200_dist'] = (df['close'] / ema200) - 1 if ema200 is not None and not ema200.empty else np.nan atr = ta.atr(df['high'], df['low'], df['close'], length=14) df['ATR_pct'] = (atr / df['close']) if atr is not None and not atr.empty else np.nan # --- المستوى 3: استراتيجي (1d) --- elif tf == '1d': df['RSI'] = ta.rsi(df['close'], length=14) ema200 = ta.ema(df['close'], length=200) df['EMA_200_dist'] = (df['close'] / ema200) - 1 if ema200 is not None and not ema200.empty else np.nan adx = ta.adx(df['high'], df['low'], df['close']) if adx is not None and not adx.empty: df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0) else: df['Trend_Strong'] = 0 except Exception as e: # print(f"⚠️ [Titan Warning] Error calculating indicators for {tf}: {e}") # في حال حدوث خطأ، نترك الأعمدة كما هي (ستكون NaN إذا لم يتم إنشاؤها) pass return df.reset_index(drop=True) def predict(self, ohlcv_data: dict) -> dict: """ استقبال البيانات الخام، تجهيزها، ثم استدعاء النموذج للتنبؤ. """ if not self.initialized or not self.model: return {'score': 0.0, 'error': 'Titan not initialized'} try: # 1. تجهيز البيانات لكل إطار processed_tfs = {} for tf, data in ohlcv_data.items(): if not data: continue # تحويل القوائم إلى DataFrame إذا لزم الأمر if isinstance(data, list): df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) else: df = data.copy() if df.empty: continue # ========================================================== # 🛡️ GEM-ARCHITECT FIX: Force Float Conversion # ========================================================== # هذا التعديل يضمن أن المكتبات الحسابية تعمل على أرقام وليس نصوص # في حال كانت البيانات القادمة من API بصيغة String cols_to_convert = ['open', 'high', 'low', 'close', 'volume'] for col in cols_to_convert: if col in df.columns: df[col] = df[col].astype(float) # ========================================================== # تطبيق المؤشرات حسب الإطار df = self.apply_inverted_pyramid(df, tf) processed_tfs[tf] = df # 2. الدمج (Alignment) if '5m' not in processed_tfs or processed_tfs['5m'].empty: return {'score': 0.0, 'error': 'Missing 5m base timeframe'} latest_5m = processed_tfs['5m'].iloc[-1:].copy() latest_ts = latest_5m['timestamp'].iloc[0] base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'}) # دمج باقي الأطر for tf, df in processed_tfs.items(): if tf == '5m' or df.empty: continue relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy() if relevant_row.empty: continue cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']] for col in cols: base_row[f"{tf}_{col}"] = relevant_row[col].values[0] # 3. تجهيز شعاع الإدخال (Feature Vector) مع التنظيف input_data = [] for feat in self.feature_names: val = base_row.get(feat, np.nan) if isinstance(val, (pd.Series, np.ndarray)): val = val.iloc[0] if len(val) > 0 else np.nan # 🛡️ [CRITICAL FIX] تنظيف القيم اللانهائية (Inf) # XGBoost ينهار إذا وجد inf، لذلك نحولها إلى nan if np.isinf(val): val = np.nan input_data.append(val) # 4. التنبؤ # تحويل إلى DMatrix dtest = xgb.DMatrix([input_data], feature_names=self.feature_names, missing=np.nan) prediction = self.model.predict(dtest)[0] return { 'score': float(prediction), 'timestamp': int(latest_ts), 'status': 'OK' } except Exception as e: # print(f"❌ [Titan Error] {e}") # traceback.print_exc() return {'score': 0.0, 'error': str(e)}