Spaces:
Paused
Paused
File size: 9,602 Bytes
172cf4d a20afeb 172cf4d 6973033 11a5219 088dd38 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 11a5219 6973033 11a5219 a20afeb 11a5219 a20afeb 088dd38 172cf4d 6973033 a20afeb 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 172cf4d 6973033 11a5219 5264c00 a20afeb 6973033 172cf4d a20afeb 11a5219 172cf4d 6973033 a20afeb 6973033 11a5219 6973033 a20afeb 6973033 a20afeb 6973033 a20afeb 6973033 a20afeb 11a5219 a20afeb 6973033 172cf4d a20afeb 172cf4d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# ml_engine/titan_engine.py
# (V1.2 - Titan Inference Engine - Infinity Safety Patch)
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import json
import traceback
class TitanEngine:
def __init__(self, model_dir="ml_models/layer2"):
self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json")
self.features_path = os.path.join(model_dir, "Titan_Features.pkl")
self.model = None
self.feature_names = None
self.initialized = False
async def initialize(self):
"""تحميل النموذج وقائمة الميزات من القرص"""
print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...")
try:
if os.path.exists(self.model_path) and os.path.exists(self.features_path):
# تحميل نموذج XGBoost
self.model = xgb.Booster()
self.model.load_model(self.model_path)
# تحميل قائمة الميزات لضمان الترتيب الصحيح
self.feature_names = joblib.load(self.features_path)
self.initialized = True
print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.")
else:
print(f"❌ [Titan] ملفات النموذج مفقودة!")
except Exception as e:
print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}")
def apply_inverted_pyramid(self, df, tf):
"""نفس منطق هندسة الميزات المستخدم في التدريب تماماً مع تحصين ضد الأخطاء"""
df = df.copy().sort_values('timestamp').reset_index(drop=True)
# تعيين الفهرس للسهولة في pandas_ta
df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms')))
try:
# --- المستوى 1: دقيق (5m, 15m) ---
if tf in ['5m', '15m']:
df['RSI'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'])
if macd is not None and not macd.empty:
df['MACD'] = macd.iloc[:, 0]
df['MACD_h'] = macd.iloc[:, 1]
else:
df['MACD'] = np.nan
df['MACD_h'] = np.nan
df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20)
adx = ta.adx(df['high'], df['low'], df['close'], length=14)
df['ADX'] = adx.iloc[:, 0] if adx is not None and not adx.empty else np.nan
for p in [9, 21, 50, 200]:
ema = ta.ema(df['close'], length=p)
# تحصين ضد القسمة على None
if ema is not None and not ema.empty:
df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1
else:
df[f'EMA_{p}_dist'] = np.nan
bb = ta.bbands(df['close'], length=20, std=2.0)
if bb is not None and not bb.empty:
df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]
df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])
else:
df['BB_w'] = np.nan
df['BB_p'] = np.nan
df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
if vwap is not None and not vwap.empty:
df['VWAP_dist'] = (df['close'] / vwap) - 1
else:
df['VWAP_dist'] = np.nan
# --- المستوى 2: تكتيكي (1h, 4h) ---
elif tf in ['1h', '4h']:
df['RSI'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'])
df['MACD_h'] = macd.iloc[:, 1] if macd is not None and not macd.empty else np.nan
ema50 = ta.ema(df['close'], length=50)
df['EMA_50_dist'] = (df['close'] / ema50) - 1 if ema50 is not None and not ema50.empty else np.nan
ema200 = ta.ema(df['close'], length=200)
df['EMA_200_dist'] = (df['close'] / ema200) - 1 if ema200 is not None and not ema200.empty else np.nan
atr = ta.atr(df['high'], df['low'], df['close'], length=14)
df['ATR_pct'] = (atr / df['close']) if atr is not None and not atr.empty else np.nan
# --- المستوى 3: استراتيجي (1d) ---
elif tf == '1d':
df['RSI'] = ta.rsi(df['close'], length=14)
ema200 = ta.ema(df['close'], length=200)
df['EMA_200_dist'] = (df['close'] / ema200) - 1 if ema200 is not None and not ema200.empty else np.nan
adx = ta.adx(df['high'], df['low'], df['close'])
if adx is not None and not adx.empty:
df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0)
else:
df['Trend_Strong'] = 0
except Exception as e:
# print(f"⚠️ [Titan Warning] Error calculating indicators for {tf}: {e}")
# في حال حدوث خطأ، نترك الأعمدة كما هي (ستكون NaN إذا لم يتم إنشاؤها)
pass
return df.reset_index(drop=True)
def predict(self, ohlcv_data: dict) -> dict:
"""
استقبال البيانات الخام، تجهيزها، ثم استدعاء النموذج للتنبؤ.
"""
if not self.initialized or not self.model:
return {'score': 0.0, 'error': 'Titan not initialized'}
try:
# 1. تجهيز البيانات لكل إطار
processed_tfs = {}
for tf, data in ohlcv_data.items():
if not data: continue
# تحويل القوائم إلى DataFrame إذا لزم الأمر
if isinstance(data, list):
df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
else:
df = data.copy()
if df.empty: continue
# ==========================================================
# 🛡️ GEM-ARCHITECT FIX: Force Float Conversion
# ==========================================================
# هذا التعديل يضمن أن المكتبات الحسابية تعمل على أرقام وليس نصوص
# في حال كانت البيانات القادمة من API بصيغة String
cols_to_convert = ['open', 'high', 'low', 'close', 'volume']
for col in cols_to_convert:
if col in df.columns:
df[col] = df[col].astype(float)
# ==========================================================
# تطبيق المؤشرات حسب الإطار
df = self.apply_inverted_pyramid(df, tf)
processed_tfs[tf] = df
# 2. الدمج (Alignment)
if '5m' not in processed_tfs or processed_tfs['5m'].empty:
return {'score': 0.0, 'error': 'Missing 5m base timeframe'}
latest_5m = processed_tfs['5m'].iloc[-1:].copy()
latest_ts = latest_5m['timestamp'].iloc[0]
base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'})
# دمج باقي الأطر
for tf, df in processed_tfs.items():
if tf == '5m' or df.empty: continue
relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy()
if relevant_row.empty: continue
cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']]
for col in cols:
base_row[f"{tf}_{col}"] = relevant_row[col].values[0]
# 3. تجهيز شعاع الإدخال (Feature Vector) مع التنظيف
input_data = []
for feat in self.feature_names:
val = base_row.get(feat, np.nan)
if isinstance(val, (pd.Series, np.ndarray)):
val = val.iloc[0] if len(val) > 0 else np.nan
# 🛡️ [CRITICAL FIX] تنظيف القيم اللانهائية (Inf)
# XGBoost ينهار إذا وجد inf، لذلك نحولها إلى nan
if np.isinf(val):
val = np.nan
input_data.append(val)
# 4. التنبؤ
# تحويل إلى DMatrix
dtest = xgb.DMatrix([input_data], feature_names=self.feature_names, missing=np.nan)
prediction = self.model.predict(dtest)[0]
return {
'score': float(prediction),
'timestamp': int(latest_ts),
'status': 'OK'
}
except Exception as e:
# print(f"❌ [Titan Error] {e}")
# traceback.print_exc()
return {'score': 0.0, 'error': str(e)} |