import os import numpy as np import pandas as pd import pandas_ta as ta import lightgbm as lgb from scipy.signal import find_peaks import warnings from typing import Dict, Any, List, Optional # --- [ 0. إعدادات ] --- warnings.filterwarnings('ignore', category=FutureWarning) PIPELINE_SETTINGS = { 'SWING_PROMINENCE_PCT': 0.02, # 2% } DECISION_CONFIDENCE_THRESHOLD = 0.30 N_STRATEGY_MODELS = 11 STRATEGY_MAP = { 0: 'WAIT', 1: 'SWING_LONG', 2: 'SCALP_LONG', 3: 'SWING_SHORT', 4: 'SCALP_SHORT' } # (الأطر الزمنية التي تم تدريب النماذج عليها) TIMEBYTES_TO_PROCESS = ['15m', '1h', '4h'] class OracleEngine: def __init__(self, model_dir: str = "ml_models/Unified_Models_V1"): """ تهيئة "العقل" الاحتمالي (L3 Oracle). """ self.model_dir = model_dir self.strategy_boosters: List[lgb.Booster] = [] self.quantile_boosters: Dict[str, lgb.Booster] = {} self.feature_names: List[str] = [] self.initialized = False # (تحديث الإصدار) print("🧠 [OracleEngine V2.3] تم الإنشاء (Spot/Long-Only Mode). جاهز للتهيئة.") async def initialize(self): """ تحميل جميع النماذج الـ 15 (11 استراتيجية + 4 أهداف) إلى الذاكرة. """ if self.initialized: return True print(f"🧠 [OracleEngine V2.3] جاري تحميل 15 نموذجاً من {self.model_dir}...") try: # 1. تحميل نماذج "لجنة القرار" (Strategy Ensemble) for i in range(N_STRATEGY_MODELS): model_file = os.path.join(self.model_dir, f"lgbm_strategy_fold_{i}.txt") if not os.path.exists(model_file): print(f"❌ [Oracle Error] ملف نموذج مفقود: {model_file}") return False booster = lgb.Booster(model_file=model_file) self.strategy_boosters.append(booster) print(f" ✅ تم تحميل {len(self.strategy_boosters)} نماذج استراتيجية.") # 2. تحميل نماذج "لجنة الأهداف" (Quantile Models) quantile_names = ['tp_p20', 'tp_p50', 'tp_p80', 'sl_p80'] for name in quantile_names: model_file = os.path.join(self.model_dir, f"lgbm_{name}.txt") if not os.path.exists(model_file): print(f"❌ [Oracle Error] ملف نموذج مفقود: {model_file}") return False booster = lgb.Booster(model_file=model_file) self.quantile_boosters[name] = booster print(f" ✅ تم تحميل {len(self.quantile_boosters)} نماذج أهداف.") # 3. حفظ قائمة الميزات self.feature_names = self.strategy_boosters[0].feature_name() self.initialized = True print(f"✅ [OracleEngine V2.3] جاهز. (Threshold: {DECISION_CONFIDENCE_THRESHOLD*100}%)") print(f" -> سيعمل على الأطر: {TIMEBYTES_TO_PROCESS}") return True except Exception as e: print(f"❌ [OracleEngine V2.3] فشل فادح أثناء التهيئة: {e}") self.initialized = False return False # --- [ دوال هندسة الميزات (مطابقة 100% للتدريب) ] --- def _calculate_base_ta(self, df: pd.DataFrame) -> pd.DataFrame: df.ta.rsi(length=14, append=True) df.ta.adx(length=14, append=True) df.ta.macd(fast=12, slow=26, signal=9, append=True) df.ta.bbands(length=20, std=2, append=True) df.ta.atr(length=14, append=True) for length in [9, 21, 50, 100, 200]: df[f'EMA_{length}'] = ta.ema(df['close'], length=length) return df def _calculate_market_structure(self, df: pd.DataFrame, prominence_pct: float) -> pd.DataFrame: prominence_value = df['close'].mean() * prominence_pct high_peaks_idx, _ = find_peaks(df['high'], prominence=prominence_value) low_peaks_idx, _ = find_peaks(-df['low'], prominence=prominence_value) df['last_SH_price'] = df.iloc[high_peaks_idx]['high'].reindex(df.index).ffill().bfill() df['last_SL_price'] = df.iloc[low_peaks_idx]['low'].reindex(df.index).ffill().bfill() df['BOS_Long'] = np.where(df['close'] > df['last_SH_price'].shift(1), 1, 0) df['BOS_Short'] = np.where(df['low'] < df['last_SL_price'].shift(1), 1, 0) return df def _calculate_fibonacci_matrix(self, df: pd.DataFrame) -> pd.DataFrame: wave_range = df['last_SH_price'] - df['last_SL_price'] df['fibo_0.382'] = df['last_SH_price'] - (wave_range * 0.382) df['fibo_0.500'] = df['last_SH_price'] - (wave_range * 0.500) df['fibo_0.618'] = df['last_SL_price'] + (wave_range * 0.618) df['fibo_ext_1.618'] = df['last_SH_price'] + (wave_range * 1.618) df['dist_to_0.618_pct'] = (df['close'] - df['fibo_0.618']) / (df['close'] + 1e-9) df['dist_to_1.618_pct'] = (df['fibo_ext_1.618'] - df['close']) / (df['close'] + 1e-9) df['is_in_golden_pocket'] = np.where( (df['close'] < df['fibo_0.500']) & (df['close'] > df['fibo_0.618']), 1, 0 ) df.replace([np.inf, -np.inf], np.nan, inplace=True) return df def _calculate_alpha_strategies(self, df: pd.DataFrame) -> pd.DataFrame: df['volume_zscore'] = (df['volume'] - df['volume'].rolling(50).mean()) / (df['volume'].rolling(50).std() + 1e-9) df['dist_from_EMA200_pct'] = (df['close'] - df['EMA_200']) / (df['EMA_200'] + 1e-9) bbu_col = next((col for col in df.columns if 'BBU_20_2.0' in str(col)), None) bbl_col = next((col for col in df.columns if 'BBL_20_2.0' in str(col)), None) bbm_col = next((col for col in df.columns if 'BBM_20_2.0' in str(col)), None) if all([bbu_col, bbl_col, bbm_col]): df['BBW_pct'] = (df[bbu_col] - df[bbl_col]) / (df[bbm_col] + 1e-9) df['is_squeeze'] = np.where(df['BBW_pct'] < df['BBW_pct'].rolling(100).min(), 1, 0) else: df['BBW_pct'] = np.nan df['is_squeeze'] = 0 df['is_trending'] = np.where(df['ADX_14'] > 25, 1, 0) df['ATR_pct'] = (df['ATRr_14'] / df['close']) * 100 return df def _create_feature_vector(self, ohlcv_tf_data: List) -> Optional[pd.DataFrame]: """ تشغيل خط أنابيب الميزات الكامل على بيانات إطار زمني واحد. """ if ohlcv_tf_data is None or len(ohlcv_tf_data) < 200: return None df = pd.DataFrame(ohlcv_tf_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df = df.astype(float) df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.set_index('datetime') # تشغيل خط الأنابيب df = self._calculate_base_ta(df) df = self._calculate_market_structure(df, PIPELINE_SETTINGS['SWING_PROMINENCE_PCT']) df = self._calculate_fibonacci_matrix(df) df = self._calculate_alpha_strategies(df) # ملء أي قيم NaN أولية df = df.ffill().bfill() # أخذ آخر صف فقط latest_features = df.iloc[-1:] try: feature_vector = latest_features[self.feature_names] # --- [ إصلاح المشكلة: NaN Bug ] --- feature_vector = feature_vector.fillna(0) # --- [ نهاية الإصلاح ] --- if feature_vector.isnull().values.any(): print("⚠️ [Oracle Warning] Feature vector still contains NaN after fill(0).") return None return feature_vector except Exception as e: print(f"❌ [Oracle Error] عدم تطابق الميزات: {e}") return None async def predict(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]: """ الدالة الرئيسية: تحليل إشارة مرشحة وإرجاع قرار كامل. """ if not self.initialized: return {'action': 'WAIT', 'reason': 'Oracle Engine not initialized'} ohlcv_data = symbol_data.get('ohlcv') current_price = symbol_data.get('current_price') if not ohlcv_data or not current_price: return {'action': 'WAIT', 'reason': 'Missing OHLCV or price data'} try: all_tf_decisions = [] # --- [ الخطوة 1: تحليل كل إطار زمني على حدة ] --- for tf in TIMEBYTES_TO_PROCESS: feature_vector = self._create_feature_vector(ohlcv_data.get(tf)) if feature_vector is None: print(f" -> {symbol_data['symbol']} @ {tf}: Skipped (Insufficient data or NaN)") continue all_probs = [ booster.predict(feature_vector, num_iteration=booster.best_iteration) for booster in self.strategy_boosters ] ensemble_probs = np.mean(all_probs, axis=0)[0] # --- [ إصلاح المشكلة: منطق WAIT ] --- actionable_probs = ensemble_probs.copy() actionable_probs[0] = 0.0 # (تجاهل WAIT) predicted_strategy_idx = np.argmax(actionable_probs) confidence = actionable_probs[predicted_strategy_idx] strategy_name = STRATEGY_MAP.get(predicted_strategy_idx, 'WAIT') # --- [ نهاية الإصلاح ] --- all_tf_decisions.append({ 'timeframe': tf, 'strategy': strategy_name, 'confidence': float(confidence), 'feature_vector': feature_vector }) if not all_tf_decisions: return {'action': 'IGNORE', 'reason': 'Feature calculation failed for all TFs'} # --- [ الخطوة 2: اختيار القرار الأفضل (أعلى ثقة) ] --- best_decision = max(all_tf_decisions, key=lambda x: x['confidence']) strategy_name = best_decision['strategy'] confidence = best_decision['confidence'] best_tf = best_decision['timeframe'] # --- [ 🛑 🛑 🛑 التعديل الجديد: فلتر Spot Only ] --- # إذا كانت الاستراتيجية تحتوي على "SHORT"، يتم تجاهلها فوراً if "SHORT" in strategy_name: return { 'action': 'IGNORE', 'reason': f"Spot Mode: Ignored {strategy_name} signal (Shorts disabled)", 'confidence': confidence, 'strategy': strategy_name } # --- [ نهاية التعديل ] --- # --- [ الخطوة 3: تطبيق فلتر الثقة ] --- if confidence < DECISION_CONFIDENCE_THRESHOLD: return { 'action': 'IGNORE', 'reason': f"Best Actionable Signal ({strategy_name} @ {best_tf}) confidence ({confidence:.2f}) is below threshold ({DECISION_CONFIDENCE_THRESHOLD})", 'confidence': confidence, 'strategy': strategy_name } # --- [ الخطوة 4: (نجحت الثقة) - تشغيل "لجنة الأهداف" ] --- winning_feature_vector = best_decision['feature_vector'] preds_quantile = {} for name, booster in self.quantile_boosters.items(): preds_quantile[name] = booster.predict(winning_feature_vector, num_iteration=booster.best_iteration)[0] # --- [ الخطوة 5: تحديد الأهداف النهائية ] --- tp_pct = preds_quantile['tp_p50'] # (الهدف الواقعي) sl_pct = preds_quantile['sl_p80'] # (وقف الخسارة الآمن) if tp_pct <= 0 or sl_pct <= 0: return {'action': 'IGNORE', 'reason': f'Quantile model predicted negative TP/SL ({tp_pct=}, {sl_pct=})'} if "LONG" in strategy_name: tp_price = current_price * (1 + tp_pct) sl_price = current_price * (1 - sl_pct) action_type = "BUY" else: # (لن نصل إلى هنا لأننا قمنا بتصفية SHORT مسبقاً) return {'action': 'IGNORE', 'reason': 'Strategy not actionable (Unknown type)'} # --- [ الخطوة 6: إرجاع القرار الكامل ] --- return { 'action': 'WATCH', 'confidence': confidence, 'analysis_summary': f"Oracle Consensus @ {best_tf}: {strategy_name} (Conf: {confidence:.2%})", 'strategy': strategy_name, 'action_type': action_type, 'tp_price': float(tp_price), 'sl_price': float(sl_price), 'quantile_tp_pct': float(tp_pct), 'quantile_sl_pct': float(sl_pct) } except Exception as e: print(f"❌ [OracleEngine V2.3] فشل فادح أثناء التنبؤ: {e}") import traceback traceback.print_exc() return {'action': 'WAIT', 'reason': f'Exception: {e}'}