Tradtesting / ml_engine /oracle_engine.py
Riy777's picture
Update ml_engine/oracle_engine.py
dc313fd verified
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import lightgbm as lgb
import warnings
from typing import Dict, Any, List, Optional
# --- [ إعدادات النظام ] ---
warnings.filterwarnings('ignore', category=FutureWarning)
class OracleEngine:
def __init__(self, model_dir: str = "ml_models/Unified_Models_V1"):
"""
Oracle V4.2: Config-Injectable Strategic Brain
"""
self.model_dir = model_dir
self.model_direction = None
self.model_target = None
self.model_strength = None
# ✅ القيمة الافتراضية (سيتم تحديثها من Processor)
self.confidence_threshold = 0.65
self.feature_cols = []
self.initialized = False
print("🧠 [Oracle V4.2] Engine Instance Created.")
# ✅ دالة لاستقبال الإعدادات المركزية
def set_threshold(self, threshold: float):
self.confidence_threshold = threshold
# print(f"🔧 [Oracle] Threshold updated to: {self.confidence_threshold}")
async def initialize(self):
"""تحميل النماذج وخريطة الميزات"""
if self.initialized: return True
print(f"🧠 [Oracle V4] Loading artifacts from {self.model_dir}...")
try:
feat_path = os.path.join(self.model_dir, "feature_columns.pkl")
if not os.path.exists(feat_path):
print(f"❌ [Oracle] Feature map missing: {feat_path}")
return False
self.feature_cols = joblib.load(feat_path)
dir_path = os.path.join(self.model_dir, "lgbm_direction.txt")
tgt_path = os.path.join(self.model_dir, "lgbm_target_class.txt")
str_path = os.path.join(self.model_dir, "lgbm_strength.txt")
if os.path.exists(dir_path):
self.model_direction = lgb.Booster(model_file=dir_path)
else:
return False
if os.path.exists(tgt_path):
self.model_target = lgb.Booster(model_file=tgt_path)
if os.path.exists(str_path):
self.model_strength = lgb.Booster(model_file=str_path)
self.initialized = True
print(f"✅ [Oracle V4] Ready. Dynamic Threshold: {self.confidence_threshold}")
return True
except Exception as e:
print(f"❌ [Oracle] Init Error: {e}")
return False
def _calculate_snapshot_features(self, df, tf_prefix):
df = df.copy()
df['close'] = df['close'].astype(float)
df['volume'] = df['volume'].astype(float)
if len(df) < 15:
return pd.DataFrame()
try:
df[f'{tf_prefix}_slope'] = ta.slope(df['close'], length=7)
df[f'{tf_prefix}_rsi'] = ta.rsi(df['close'], length=14)
atr = ta.atr(df['high'], df['low'], df['close'], length=14)
df[f'{tf_prefix}_atr_pct'] = atr / df['close']
vol_mean = df['volume'].rolling(20).mean()
vol_std = df['volume'].rolling(20).std()
df[f'{tf_prefix}_vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9)
cols = [f'{tf_prefix}_slope', f'{tf_prefix}_rsi', f'{tf_prefix}_atr_pct', f'{tf_prefix}_vol_z']
return df[cols].ffill().bfill().fillna(0)
except Exception as e:
return pd.DataFrame()
def _create_feature_vector(self, ohlcv_data: Dict[str, Any], titan_score: float, mc_score: float, pattern_score: float) -> Optional[pd.DataFrame]:
try:
raw_1h = ohlcv_data.get('1h')
if not raw_1h or len(raw_1h) < 30: return None
df_1h = pd.DataFrame(raw_1h, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_15m = pd.DataFrame(ohlcv_data.get('15m', []), columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_4h = pd.DataFrame(ohlcv_data.get('4h', []), columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
feats_1h = self._calculate_snapshot_features(df_1h, "1h").iloc[-1:].reset_index(drop=True)
if len(df_15m) > 20:
feats_15m = self._calculate_snapshot_features(df_15m, "15m").iloc[-1:].reset_index(drop=True)
else:
feats_15m = pd.DataFrame(np.zeros((1, 4)), columns=[f'15m_{c}' for c in ['slope', 'rsi', 'atr_pct', 'vol_z']])
if len(df_4h) > 20:
feats_4h = self._calculate_snapshot_features(df_4h, "4h").iloc[-1:].reset_index(drop=True)
else:
feats_4h = pd.DataFrame(np.zeros((1, 4)), columns=[f'4h_{c}' for c in ['slope', 'rsi', 'atr_pct', 'vol_z']])
vector = pd.concat([feats_1h, feats_15m, feats_4h], axis=1)
vector['sim_titan_score'] = float(titan_score)
vector['sim_mc_score'] = float(mc_score)
vector['sim_pattern_score'] = float(pattern_score)
final_vector = pd.DataFrame(columns=self.feature_cols)
for col in self.feature_cols:
if col in vector.columns:
val = vector[col].iloc[0]
final_vector.at[0, col] = float(val) if not pd.isna(val) else 0.0
else:
final_vector.at[0, col] = 0.0
return final_vector.astype(float)
except Exception as e:
print(f"❌ Vector Creation Error: {e}")
return None
# ==========================================================================
# 🔮 التنبؤ (Inference Logic - SPOT MODE)
# ==========================================================================
async def predict(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]:
"""تحليل الفرصة باستخدام العتبة الديناميكية"""
if not self.initialized:
return {'action': 'WAIT', 'reason': 'Not initialized', 'confidence': 0.0}
try:
ohlcv = symbol_data.get('ohlcv')
current_price = symbol_data.get('current_price', 0.0)
titan = symbol_data.get('titan_score', 0.0)
mc = symbol_data.get('mc_score', 0.0)
patt = symbol_data.get('patterns_score', 0.0)
features = self._create_feature_vector(ohlcv, titan, mc, patt)
if features is None:
return {'action': 'WAIT', 'reason': 'Features failed', 'confidence': 0.0}
# 1. التنبؤ بالاتجاه
dir_probs = self.model_direction.predict(features)[0]
if isinstance(dir_probs, (np.ndarray, list)):
prob_long = float(dir_probs[0])
prob_short = float(dir_probs[1])
else:
prob_short = float(dir_probs)
prob_long = 1.0 - prob_short
if prob_short > prob_long:
return {
'action': 'WAIT',
'reason': f'Bearish (Short Prob: {prob_short:.2f})',
'direction': 'SHORT',
'confidence': prob_long,
'short_confidence': prob_short
}
confidence = prob_long
# 2. البوابة المنطقية (استخدام العتبة الديناميكية المحقونة)
# ✅ هنا التغيير الجوهري: نستخدم self.confidence_threshold بدلاً من الثابت
if confidence < self.confidence_threshold:
return {
'action': 'WAIT',
'reason': f'Low Confidence ({confidence:.2f} < {self.confidence_threshold})',
'direction': 'LONG',
'confidence': confidence,
'short_confidence': prob_short
}
# 3. التنبؤ بالأهداف والقوة
strength = 0.5
if self.model_strength:
strength = float(self.model_strength.predict(features)[0])
strength = max(0.0, min(1.0, strength))
tp_class_idx = 1
if self.model_target:
tgt_probs = self.model_target.predict(features)[0]
tp_class_idx = np.argmax(tgt_probs)
tp_labels = ['TP1', 'TP2', 'TP3', 'TP4']
target_profile = tp_labels[tp_class_idx]
# 4. حساب المستويات
atr_pct_val = features['1h_atr_pct'].iloc[0]
if atr_pct_val == 0: atr_pct_val = 0.02
atr_abs = atr_pct_val * current_price
tp_map = {
'TP1': current_price + (1.0 * atr_abs),
'TP2': current_price + (1.8 * atr_abs),
'TP3': current_price + (2.8 * atr_abs),
'TP4': current_price + (4.5 * atr_abs),
}
primary_tp = tp_map[target_profile]
sl_price = current_price - (1.2 * atr_abs)
return {
'action': 'WATCH',
'action_type': 'BUY',
'confidence': float(confidence),
'strength': float(strength),
'target_class': target_profile,
'primary_tp': float(primary_tp),
'sl_price': float(sl_price),
'tp_map': tp_map,
'analysis_summary': f"SPOT BUY (Conf: {confidence:.0%}) | Str: {strength:.2f} | Aim: {target_profile}"
}
except Exception as e:
print(f"❌ [Oracle] Prediction Error: {e}")
return {'action': 'WAIT', 'reason': 'Error', 'confidence': 0.0}