Tradtesting / ml_engine /oracle_engine.py
Riy777's picture
Update ml_engine/oracle_engine.py
e212774 verified
raw
history blame
13.8 kB
import os
import numpy as np
import pandas as pd
import pandas_ta as ta
import lightgbm as lgb
from scipy.signal import find_peaks
import warnings
from typing import Dict, Any, List, Optional
# --- [ 0. إعدادات ] ---
warnings.filterwarnings('ignore', category=FutureWarning)
PIPELINE_SETTINGS = {
'SWING_PROMINENCE_PCT': 0.02, # 2%
}
DECISION_CONFIDENCE_THRESHOLD = 0.30
N_STRATEGY_MODELS = 11
STRATEGY_MAP = {
0: 'WAIT',
1: 'SWING_LONG',
2: 'SCALP_LONG',
3: 'SWING_SHORT',
4: 'SCALP_SHORT'
}
# (الأطر الزمنية التي تم تدريب النماذج عليها)
TIMEBYTES_TO_PROCESS = ['15m', '1h', '4h']
class OracleEngine:
def __init__(self, model_dir: str = "ml_models/Unified_Models_V1"):
"""
تهيئة "العقل" الاحتمالي (L3 Oracle).
"""
self.model_dir = model_dir
self.strategy_boosters: List[lgb.Booster] = []
self.quantile_boosters: Dict[str, lgb.Booster] = {}
self.feature_names: List[str] = []
self.initialized = False
# (تحديث الإصدار)
print("🧠 [OracleEngine V2.3] تم الإنشاء (Spot/Long-Only Mode). جاهز للتهيئة.")
async def initialize(self):
"""
تحميل جميع النماذج الـ 15 (11 استراتيجية + 4 أهداف) إلى الذاكرة.
"""
if self.initialized:
return True
print(f"🧠 [OracleEngine V2.3] جاري تحميل 15 نموذجاً من {self.model_dir}...")
try:
# 1. تحميل نماذج "لجنة القرار" (Strategy Ensemble)
for i in range(N_STRATEGY_MODELS):
model_file = os.path.join(self.model_dir, f"lgbm_strategy_fold_{i}.txt")
if not os.path.exists(model_file):
print(f"❌ [Oracle Error] ملف نموذج مفقود: {model_file}")
return False
booster = lgb.Booster(model_file=model_file)
self.strategy_boosters.append(booster)
print(f" ✅ تم تحميل {len(self.strategy_boosters)} نماذج استراتيجية.")
# 2. تحميل نماذج "لجنة الأهداف" (Quantile Models)
quantile_names = ['tp_p20', 'tp_p50', 'tp_p80', 'sl_p80']
for name in quantile_names:
model_file = os.path.join(self.model_dir, f"lgbm_{name}.txt")
if not os.path.exists(model_file):
print(f"❌ [Oracle Error] ملف نموذج مفقود: {model_file}")
return False
booster = lgb.Booster(model_file=model_file)
self.quantile_boosters[name] = booster
print(f" ✅ تم تحميل {len(self.quantile_boosters)} نماذج أهداف.")
# 3. حفظ قائمة الميزات
self.feature_names = self.strategy_boosters[0].feature_name()
self.initialized = True
print(f"✅ [OracleEngine V2.3] جاهز. (Threshold: {DECISION_CONFIDENCE_THRESHOLD*100}%)")
print(f" -> سيعمل على الأطر: {TIMEBYTES_TO_PROCESS}")
return True
except Exception as e:
print(f"❌ [OracleEngine V2.3] فشل فادح أثناء التهيئة: {e}")
self.initialized = False
return False
# --- [ دوال هندسة الميزات (مطابقة 100% للتدريب) ] ---
def _calculate_base_ta(self, df: pd.DataFrame) -> pd.DataFrame:
df.ta.rsi(length=14, append=True)
df.ta.adx(length=14, append=True)
df.ta.macd(fast=12, slow=26, signal=9, append=True)
df.ta.bbands(length=20, std=2, append=True)
df.ta.atr(length=14, append=True)
for length in [9, 21, 50, 100, 200]:
df[f'EMA_{length}'] = ta.ema(df['close'], length=length)
return df
def _calculate_market_structure(self, df: pd.DataFrame, prominence_pct: float) -> pd.DataFrame:
prominence_value = df['close'].mean() * prominence_pct
high_peaks_idx, _ = find_peaks(df['high'], prominence=prominence_value)
low_peaks_idx, _ = find_peaks(-df['low'], prominence=prominence_value)
df['last_SH_price'] = df.iloc[high_peaks_idx]['high'].reindex(df.index).ffill().bfill()
df['last_SL_price'] = df.iloc[low_peaks_idx]['low'].reindex(df.index).ffill().bfill()
df['BOS_Long'] = np.where(df['close'] > df['last_SH_price'].shift(1), 1, 0)
df['BOS_Short'] = np.where(df['low'] < df['last_SL_price'].shift(1), 1, 0)
return df
def _calculate_fibonacci_matrix(self, df: pd.DataFrame) -> pd.DataFrame:
wave_range = df['last_SH_price'] - df['last_SL_price']
df['fibo_0.382'] = df['last_SH_price'] - (wave_range * 0.382)
df['fibo_0.500'] = df['last_SH_price'] - (wave_range * 0.500)
df['fibo_0.618'] = df['last_SL_price'] + (wave_range * 0.618)
df['fibo_ext_1.618'] = df['last_SH_price'] + (wave_range * 1.618)
df['dist_to_0.618_pct'] = (df['close'] - df['fibo_0.618']) / (df['close'] + 1e-9)
df['dist_to_1.618_pct'] = (df['fibo_ext_1.618'] - df['close']) / (df['close'] + 1e-9)
df['is_in_golden_pocket'] = np.where(
(df['close'] < df['fibo_0.500']) & (df['close'] > df['fibo_0.618']), 1, 0
)
df.replace([np.inf, -np.inf], np.nan, inplace=True)
return df
def _calculate_alpha_strategies(self, df: pd.DataFrame) -> pd.DataFrame:
df['volume_zscore'] = (df['volume'] - df['volume'].rolling(50).mean()) / (df['volume'].rolling(50).std() + 1e-9)
df['dist_from_EMA200_pct'] = (df['close'] - df['EMA_200']) / (df['EMA_200'] + 1e-9)
bbu_col = next((col for col in df.columns if 'BBU_20_2.0' in str(col)), None)
bbl_col = next((col for col in df.columns if 'BBL_20_2.0' in str(col)), None)
bbm_col = next((col for col in df.columns if 'BBM_20_2.0' in str(col)), None)
if all([bbu_col, bbl_col, bbm_col]):
df['BBW_pct'] = (df[bbu_col] - df[bbl_col]) / (df[bbm_col] + 1e-9)
df['is_squeeze'] = np.where(df['BBW_pct'] < df['BBW_pct'].rolling(100).min(), 1, 0)
else:
df['BBW_pct'] = np.nan
df['is_squeeze'] = 0
df['is_trending'] = np.where(df['ADX_14'] > 25, 1, 0)
df['ATR_pct'] = (df['ATRr_14'] / df['close']) * 100
return df
def _create_feature_vector(self, ohlcv_tf_data: List) -> Optional[pd.DataFrame]:
"""
تشغيل خط أنابيب الميزات الكامل على بيانات إطار زمني واحد.
"""
if ohlcv_tf_data is None or len(ohlcv_tf_data) < 200:
return None
df = pd.DataFrame(ohlcv_tf_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df = df.astype(float)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.set_index('datetime')
# تشغيل خط الأنابيب
df = self._calculate_base_ta(df)
df = self._calculate_market_structure(df, PIPELINE_SETTINGS['SWING_PROMINENCE_PCT'])
df = self._calculate_fibonacci_matrix(df)
df = self._calculate_alpha_strategies(df)
# ملء أي قيم NaN أولية
df = df.ffill().bfill()
# أخذ آخر صف فقط
latest_features = df.iloc[-1:]
try:
feature_vector = latest_features[self.feature_names]
# --- [ إصلاح المشكلة: NaN Bug ] ---
feature_vector = feature_vector.fillna(0)
# --- [ نهاية الإصلاح ] ---
if feature_vector.isnull().values.any():
print("⚠️ [Oracle Warning] Feature vector still contains NaN after fill(0).")
return None
return feature_vector
except Exception as e:
print(f"❌ [Oracle Error] عدم تطابق الميزات: {e}")
return None
async def predict(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]:
"""
الدالة الرئيسية: تحليل إشارة مرشحة وإرجاع قرار كامل.
"""
if not self.initialized:
return {'action': 'WAIT', 'reason': 'Oracle Engine not initialized'}
ohlcv_data = symbol_data.get('ohlcv')
current_price = symbol_data.get('current_price')
if not ohlcv_data or not current_price:
return {'action': 'WAIT', 'reason': 'Missing OHLCV or price data'}
try:
all_tf_decisions = []
# --- [ الخطوة 1: تحليل كل إطار زمني على حدة ] ---
for tf in TIMEBYTES_TO_PROCESS:
feature_vector = self._create_feature_vector(ohlcv_data.get(tf))
if feature_vector is None:
print(f" -> {symbol_data['symbol']} @ {tf}: Skipped (Insufficient data or NaN)")
continue
all_probs = [
booster.predict(feature_vector, num_iteration=booster.best_iteration)
for booster in self.strategy_boosters
]
ensemble_probs = np.mean(all_probs, axis=0)[0]
# --- [ إصلاح المشكلة: منطق WAIT ] ---
actionable_probs = ensemble_probs.copy()
actionable_probs[0] = 0.0 # (تجاهل WAIT)
predicted_strategy_idx = np.argmax(actionable_probs)
confidence = actionable_probs[predicted_strategy_idx]
strategy_name = STRATEGY_MAP.get(predicted_strategy_idx, 'WAIT')
# --- [ نهاية الإصلاح ] ---
all_tf_decisions.append({
'timeframe': tf,
'strategy': strategy_name,
'confidence': float(confidence),
'feature_vector': feature_vector
})
if not all_tf_decisions:
return {'action': 'IGNORE', 'reason': 'Feature calculation failed for all TFs'}
# --- [ الخطوة 2: اختيار القرار الأفضل (أعلى ثقة) ] ---
best_decision = max(all_tf_decisions, key=lambda x: x['confidence'])
strategy_name = best_decision['strategy']
confidence = best_decision['confidence']
best_tf = best_decision['timeframe']
# --- [ 🛑 🛑 🛑 التعديل الجديد: فلتر Spot Only ] ---
# إذا كانت الاستراتيجية تحتوي على "SHORT"، يتم تجاهلها فوراً
if "SHORT" in strategy_name:
return {
'action': 'IGNORE',
'reason': f"Spot Mode: Ignored {strategy_name} signal (Shorts disabled)",
'confidence': confidence,
'strategy': strategy_name
}
# --- [ نهاية التعديل ] ---
# --- [ الخطوة 3: تطبيق فلتر الثقة ] ---
if confidence < DECISION_CONFIDENCE_THRESHOLD:
return {
'action': 'IGNORE',
'reason': f"Best Actionable Signal ({strategy_name} @ {best_tf}) confidence ({confidence:.2f}) is below threshold ({DECISION_CONFIDENCE_THRESHOLD})",
'confidence': confidence,
'strategy': strategy_name
}
# --- [ الخطوة 4: (نجحت الثقة) - تشغيل "لجنة الأهداف" ] ---
winning_feature_vector = best_decision['feature_vector']
preds_quantile = {}
for name, booster in self.quantile_boosters.items():
preds_quantile[name] = booster.predict(winning_feature_vector, num_iteration=booster.best_iteration)[0]
# --- [ الخطوة 5: تحديد الأهداف النهائية ] ---
tp_pct = preds_quantile['tp_p50'] # (الهدف الواقعي)
sl_pct = preds_quantile['sl_p80'] # (وقف الخسارة الآمن)
if tp_pct <= 0 or sl_pct <= 0:
return {'action': 'IGNORE', 'reason': f'Quantile model predicted negative TP/SL ({tp_pct=}, {sl_pct=})'}
if "LONG" in strategy_name:
tp_price = current_price * (1 + tp_pct)
sl_price = current_price * (1 - sl_pct)
action_type = "BUY"
else:
# (لن نصل إلى هنا لأننا قمنا بتصفية SHORT مسبقاً)
return {'action': 'IGNORE', 'reason': 'Strategy not actionable (Unknown type)'}
# --- [ الخطوة 6: إرجاع القرار الكامل ] ---
return {
'action': 'WATCH',
'confidence': confidence,
'analysis_summary': f"Oracle Consensus @ {best_tf}: {strategy_name} (Conf: {confidence:.2%})",
'strategy': strategy_name,
'action_type': action_type,
'tp_price': float(tp_price),
'sl_price': float(sl_price),
'quantile_tp_pct': float(tp_pct),
'quantile_sl_pct': float(sl_pct)
}
except Exception as e:
print(f"❌ [OracleEngine V2.3] فشل فادح أثناء التنبؤ: {e}")
import traceback
traceback.print_exc()
return {'action': 'WAIT', 'reason': f'Exception: {e}'}