import os import joblib import numpy as np import pandas as pd import pandas_ta as ta import lightgbm as lgb import warnings from typing import Dict, Any, List, Optional # --- [ إعدادات النظام ] --- warnings.filterwarnings('ignore', category=FutureWarning) class OracleEngine: def __init__(self, model_dir: str = "ml_models/Unified_Models_V1"): """ Oracle V4.2: Config-Injectable Strategic Brain """ self.model_dir = model_dir self.model_direction = None self.model_target = None self.model_strength = None # ✅ القيمة الافتراضية (سيتم تحديثها من Processor) self.confidence_threshold = 0.65 self.feature_cols = [] self.initialized = False print("🧠 [Oracle V4.2] Engine Instance Created.") # ✅ دالة لاستقبال الإعدادات المركزية def set_threshold(self, threshold: float): self.confidence_threshold = threshold # print(f"🔧 [Oracle] Threshold updated to: {self.confidence_threshold}") async def initialize(self): """تحميل النماذج وخريطة الميزات""" if self.initialized: return True print(f"🧠 [Oracle V4] Loading artifacts from {self.model_dir}...") try: feat_path = os.path.join(self.model_dir, "feature_columns.pkl") if not os.path.exists(feat_path): print(f"❌ [Oracle] Feature map missing: {feat_path}") return False self.feature_cols = joblib.load(feat_path) dir_path = os.path.join(self.model_dir, "lgbm_direction.txt") tgt_path = os.path.join(self.model_dir, "lgbm_target_class.txt") str_path = os.path.join(self.model_dir, "lgbm_strength.txt") if os.path.exists(dir_path): self.model_direction = lgb.Booster(model_file=dir_path) else: return False if os.path.exists(tgt_path): self.model_target = lgb.Booster(model_file=tgt_path) if os.path.exists(str_path): self.model_strength = lgb.Booster(model_file=str_path) self.initialized = True print(f"✅ [Oracle V4] Ready. Dynamic Threshold: {self.confidence_threshold}") return True except Exception as e: print(f"❌ [Oracle] Init Error: {e}") return False def _calculate_snapshot_features(self, df, tf_prefix): df = df.copy() df['close'] = df['close'].astype(float) df['volume'] = df['volume'].astype(float) if len(df) < 15: return pd.DataFrame() try: df[f'{tf_prefix}_slope'] = ta.slope(df['close'], length=7) df[f'{tf_prefix}_rsi'] = ta.rsi(df['close'], length=14) atr = ta.atr(df['high'], df['low'], df['close'], length=14) df[f'{tf_prefix}_atr_pct'] = atr / df['close'] vol_mean = df['volume'].rolling(20).mean() vol_std = df['volume'].rolling(20).std() df[f'{tf_prefix}_vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9) cols = [f'{tf_prefix}_slope', f'{tf_prefix}_rsi', f'{tf_prefix}_atr_pct', f'{tf_prefix}_vol_z'] return df[cols].ffill().bfill().fillna(0) except Exception as e: return pd.DataFrame() def _create_feature_vector(self, ohlcv_data: Dict[str, Any], titan_score: float, mc_score: float, pattern_score: float) -> Optional[pd.DataFrame]: try: raw_1h = ohlcv_data.get('1h') if not raw_1h or len(raw_1h) < 30: return None df_1h = pd.DataFrame(raw_1h, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_15m = pd.DataFrame(ohlcv_data.get('15m', []), columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df_4h = pd.DataFrame(ohlcv_data.get('4h', []), columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) feats_1h = self._calculate_snapshot_features(df_1h, "1h").iloc[-1:].reset_index(drop=True) if len(df_15m) > 20: feats_15m = self._calculate_snapshot_features(df_15m, "15m").iloc[-1:].reset_index(drop=True) else: feats_15m = pd.DataFrame(np.zeros((1, 4)), columns=[f'15m_{c}' for c in ['slope', 'rsi', 'atr_pct', 'vol_z']]) if len(df_4h) > 20: feats_4h = self._calculate_snapshot_features(df_4h, "4h").iloc[-1:].reset_index(drop=True) else: feats_4h = pd.DataFrame(np.zeros((1, 4)), columns=[f'4h_{c}' for c in ['slope', 'rsi', 'atr_pct', 'vol_z']]) vector = pd.concat([feats_1h, feats_15m, feats_4h], axis=1) vector['sim_titan_score'] = float(titan_score) vector['sim_mc_score'] = float(mc_score) vector['sim_pattern_score'] = float(pattern_score) final_vector = pd.DataFrame(columns=self.feature_cols) for col in self.feature_cols: if col in vector.columns: val = vector[col].iloc[0] final_vector.at[0, col] = float(val) if not pd.isna(val) else 0.0 else: final_vector.at[0, col] = 0.0 return final_vector.astype(float) except Exception as e: print(f"❌ Vector Creation Error: {e}") return None # ========================================================================== # 🔮 التنبؤ (Inference Logic - SPOT MODE) # ========================================================================== async def predict(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]: """تحليل الفرصة باستخدام العتبة الديناميكية""" if not self.initialized: return {'action': 'WAIT', 'reason': 'Not initialized', 'confidence': 0.0} try: ohlcv = symbol_data.get('ohlcv') current_price = symbol_data.get('current_price', 0.0) titan = symbol_data.get('titan_score', 0.0) mc = symbol_data.get('mc_score', 0.0) patt = symbol_data.get('patterns_score', 0.0) features = self._create_feature_vector(ohlcv, titan, mc, patt) if features is None: return {'action': 'WAIT', 'reason': 'Features failed', 'confidence': 0.0} # 1. التنبؤ بالاتجاه dir_probs = self.model_direction.predict(features)[0] if isinstance(dir_probs, (np.ndarray, list)): prob_long = float(dir_probs[0]) prob_short = float(dir_probs[1]) else: prob_short = float(dir_probs) prob_long = 1.0 - prob_short if prob_short > prob_long: return { 'action': 'WAIT', 'reason': f'Bearish (Short Prob: {prob_short:.2f})', 'direction': 'SHORT', 'confidence': prob_long, 'short_confidence': prob_short } confidence = prob_long # 2. البوابة المنطقية (استخدام العتبة الديناميكية المحقونة) # ✅ هنا التغيير الجوهري: نستخدم self.confidence_threshold بدلاً من الثابت if confidence < self.confidence_threshold: return { 'action': 'WAIT', 'reason': f'Low Confidence ({confidence:.2f} < {self.confidence_threshold})', 'direction': 'LONG', 'confidence': confidence, 'short_confidence': prob_short } # 3. التنبؤ بالأهداف والقوة strength = 0.5 if self.model_strength: strength = float(self.model_strength.predict(features)[0]) strength = max(0.0, min(1.0, strength)) tp_class_idx = 1 if self.model_target: tgt_probs = self.model_target.predict(features)[0] tp_class_idx = np.argmax(tgt_probs) tp_labels = ['TP1', 'TP2', 'TP3', 'TP4'] target_profile = tp_labels[tp_class_idx] # 4. حساب المستويات atr_pct_val = features['1h_atr_pct'].iloc[0] if atr_pct_val == 0: atr_pct_val = 0.02 atr_abs = atr_pct_val * current_price tp_map = { 'TP1': current_price + (1.0 * atr_abs), 'TP2': current_price + (1.8 * atr_abs), 'TP3': current_price + (2.8 * atr_abs), 'TP4': current_price + (4.5 * atr_abs), } primary_tp = tp_map[target_profile] sl_price = current_price - (1.2 * atr_abs) return { 'action': 'WATCH', 'action_type': 'BUY', 'confidence': float(confidence), 'strength': float(strength), 'target_class': target_profile, 'primary_tp': float(primary_tp), 'sl_price': float(sl_price), 'tp_map': tp_map, 'analysis_summary': f"SPOT BUY (Conf: {confidence:.0%}) | Str: {strength:.2f} | Aim: {target_profile}" } except Exception as e: print(f"❌ [Oracle] Prediction Error: {e}") return {'action': 'WAIT', 'reason': 'Error', 'confidence': 0.0}