Tradtesting / ml_engine /titan_engine.py
Riy777's picture
Update ml_engine/titan_engine.py
5264c00 verified
# ml_engine/titan_engine.py
# (V1.2 - Titan Inference Engine - Infinity Safety Patch)
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import json
import traceback
class TitanEngine:
def __init__(self, model_dir="ml_models/layer2"):
self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json")
self.features_path = os.path.join(model_dir, "Titan_Features.pkl")
self.model = None
self.feature_names = None
self.initialized = False
async def initialize(self):
"""تحميل النموذج وقائمة الميزات من القرص"""
print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...")
try:
if os.path.exists(self.model_path) and os.path.exists(self.features_path):
# تحميل نموذج XGBoost
self.model = xgb.Booster()
self.model.load_model(self.model_path)
# تحميل قائمة الميزات لضمان الترتيب الصحيح
self.feature_names = joblib.load(self.features_path)
self.initialized = True
print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.")
else:
print(f"❌ [Titan] ملفات النموذج مفقودة!")
except Exception as e:
print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}")
def apply_inverted_pyramid(self, df, tf):
"""نفس منطق هندسة الميزات المستخدم في التدريب تماماً مع تحصين ضد الأخطاء"""
df = df.copy().sort_values('timestamp').reset_index(drop=True)
# تعيين الفهرس للسهولة في pandas_ta
df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms')))
try:
# --- المستوى 1: دقيق (5m, 15m) ---
if tf in ['5m', '15m']:
df['RSI'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'])
if macd is not None and not macd.empty:
df['MACD'] = macd.iloc[:, 0]
df['MACD_h'] = macd.iloc[:, 1]
else:
df['MACD'] = np.nan
df['MACD_h'] = np.nan
df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20)
adx = ta.adx(df['high'], df['low'], df['close'], length=14)
df['ADX'] = adx.iloc[:, 0] if adx is not None and not adx.empty else np.nan
for p in [9, 21, 50, 200]:
ema = ta.ema(df['close'], length=p)
# تحصين ضد القسمة على None
if ema is not None and not ema.empty:
df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1
else:
df[f'EMA_{p}_dist'] = np.nan
bb = ta.bbands(df['close'], length=20, std=2.0)
if bb is not None and not bb.empty:
df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]
df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])
else:
df['BB_w'] = np.nan
df['BB_p'] = np.nan
df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
if vwap is not None and not vwap.empty:
df['VWAP_dist'] = (df['close'] / vwap) - 1
else:
df['VWAP_dist'] = np.nan
# --- المستوى 2: تكتيكي (1h, 4h) ---
elif tf in ['1h', '4h']:
df['RSI'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'])
df['MACD_h'] = macd.iloc[:, 1] if macd is not None and not macd.empty else np.nan
ema50 = ta.ema(df['close'], length=50)
df['EMA_50_dist'] = (df['close'] / ema50) - 1 if ema50 is not None and not ema50.empty else np.nan
ema200 = ta.ema(df['close'], length=200)
df['EMA_200_dist'] = (df['close'] / ema200) - 1 if ema200 is not None and not ema200.empty else np.nan
atr = ta.atr(df['high'], df['low'], df['close'], length=14)
df['ATR_pct'] = (atr / df['close']) if atr is not None and not atr.empty else np.nan
# --- المستوى 3: استراتيجي (1d) ---
elif tf == '1d':
df['RSI'] = ta.rsi(df['close'], length=14)
ema200 = ta.ema(df['close'], length=200)
df['EMA_200_dist'] = (df['close'] / ema200) - 1 if ema200 is not None and not ema200.empty else np.nan
adx = ta.adx(df['high'], df['low'], df['close'])
if adx is not None and not adx.empty:
df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0)
else:
df['Trend_Strong'] = 0
except Exception as e:
# print(f"⚠️ [Titan Warning] Error calculating indicators for {tf}: {e}")
# في حال حدوث خطأ، نترك الأعمدة كما هي (ستكون NaN إذا لم يتم إنشاؤها)
pass
return df.reset_index(drop=True)
def predict(self, ohlcv_data: dict) -> dict:
"""
استقبال البيانات الخام، تجهيزها، ثم استدعاء النموذج للتنبؤ.
"""
if not self.initialized or not self.model:
return {'score': 0.0, 'error': 'Titan not initialized'}
try:
# 1. تجهيز البيانات لكل إطار
processed_tfs = {}
for tf, data in ohlcv_data.items():
if not data: continue
# تحويل القوائم إلى DataFrame إذا لزم الأمر
if isinstance(data, list):
df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
else:
df = data.copy()
if df.empty: continue
# ==========================================================
# 🛡️ GEM-ARCHITECT FIX: Force Float Conversion
# ==========================================================
# هذا التعديل يضمن أن المكتبات الحسابية تعمل على أرقام وليس نصوص
# في حال كانت البيانات القادمة من API بصيغة String
cols_to_convert = ['open', 'high', 'low', 'close', 'volume']
for col in cols_to_convert:
if col in df.columns:
df[col] = df[col].astype(float)
# ==========================================================
# تطبيق المؤشرات حسب الإطار
df = self.apply_inverted_pyramid(df, tf)
processed_tfs[tf] = df
# 2. الدمج (Alignment)
if '5m' not in processed_tfs or processed_tfs['5m'].empty:
return {'score': 0.0, 'error': 'Missing 5m base timeframe'}
latest_5m = processed_tfs['5m'].iloc[-1:].copy()
latest_ts = latest_5m['timestamp'].iloc[0]
base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'})
# دمج باقي الأطر
for tf, df in processed_tfs.items():
if tf == '5m' or df.empty: continue
relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy()
if relevant_row.empty: continue
cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']]
for col in cols:
base_row[f"{tf}_{col}"] = relevant_row[col].values[0]
# 3. تجهيز شعاع الإدخال (Feature Vector) مع التنظيف
input_data = []
for feat in self.feature_names:
val = base_row.get(feat, np.nan)
if isinstance(val, (pd.Series, np.ndarray)):
val = val.iloc[0] if len(val) > 0 else np.nan
# 🛡️ [CRITICAL FIX] تنظيف القيم اللانهائية (Inf)
# XGBoost ينهار إذا وجد inf، لذلك نحولها إلى nan
if np.isinf(val):
val = np.nan
input_data.append(val)
# 4. التنبؤ
# تحويل إلى DMatrix
dtest = xgb.DMatrix([input_data], feature_names=self.feature_names, missing=np.nan)
prediction = self.model.predict(dtest)[0]
return {
'score': float(prediction),
'timestamp': int(latest_ts),
'status': 'OK'
}
except Exception as e:
# print(f"❌ [Titan Error] {e}")
# traceback.print_exc()
return {'score': 0.0, 'error': str(e)}