| | import os |
| | import joblib |
| | import numpy as np |
| | import pandas as pd |
| | import pandas_ta as ta |
| | import lightgbm as lgb |
| | import warnings |
| | from typing import Dict, Any, List, Optional |
| |
|
| | |
| | warnings.filterwarnings('ignore', category=FutureWarning) |
| |
|
| | class OracleEngine: |
| | def __init__(self, model_dir: str = "ml_models/Unified_Models_V1"): |
| | """ |
| | Oracle V4.2: Config-Injectable Strategic Brain |
| | """ |
| | self.model_dir = model_dir |
| | self.model_direction = None |
| | self.model_target = None |
| | self.model_strength = None |
| | |
| | |
| | self.confidence_threshold = 0.65 |
| | |
| | self.feature_cols = [] |
| | self.initialized = False |
| | print("🧠 [Oracle V4.2] Engine Instance Created.") |
| |
|
| | |
| | def set_threshold(self, threshold: float): |
| | self.confidence_threshold = threshold |
| | |
| |
|
| | async def initialize(self): |
| | """تحميل النماذج وخريطة الميزات""" |
| | if self.initialized: return True |
| | |
| | print(f"🧠 [Oracle V4] Loading artifacts from {self.model_dir}...") |
| | try: |
| | feat_path = os.path.join(self.model_dir, "feature_columns.pkl") |
| | if not os.path.exists(feat_path): |
| | print(f"❌ [Oracle] Feature map missing: {feat_path}") |
| | return False |
| | self.feature_cols = joblib.load(feat_path) |
| |
|
| | dir_path = os.path.join(self.model_dir, "lgbm_direction.txt") |
| | tgt_path = os.path.join(self.model_dir, "lgbm_target_class.txt") |
| | str_path = os.path.join(self.model_dir, "lgbm_strength.txt") |
| |
|
| | if os.path.exists(dir_path): |
| | self.model_direction = lgb.Booster(model_file=dir_path) |
| | else: |
| | return False |
| |
|
| | if os.path.exists(tgt_path): |
| | self.model_target = lgb.Booster(model_file=tgt_path) |
| | |
| | if os.path.exists(str_path): |
| | self.model_strength = lgb.Booster(model_file=str_path) |
| |
|
| | self.initialized = True |
| | print(f"✅ [Oracle V4] Ready. Dynamic Threshold: {self.confidence_threshold}") |
| | return True |
| |
|
| | except Exception as e: |
| | print(f"❌ [Oracle] Init Error: {e}") |
| | return False |
| |
|
| | def _calculate_snapshot_features(self, df, tf_prefix): |
| | df = df.copy() |
| | df['close'] = df['close'].astype(float) |
| | df['volume'] = df['volume'].astype(float) |
| | |
| | if len(df) < 15: |
| | return pd.DataFrame() |
| |
|
| | try: |
| | df[f'{tf_prefix}_slope'] = ta.slope(df['close'], length=7) |
| | df[f'{tf_prefix}_rsi'] = ta.rsi(df['close'], length=14) |
| | atr = ta.atr(df['high'], df['low'], df['close'], length=14) |
| | df[f'{tf_prefix}_atr_pct'] = atr / df['close'] |
| | vol_mean = df['volume'].rolling(20).mean() |
| | vol_std = df['volume'].rolling(20).std() |
| | df[f'{tf_prefix}_vol_z'] = (df['volume'] - vol_mean) / (vol_std + 1e-9) |
| | |
| | cols = [f'{tf_prefix}_slope', f'{tf_prefix}_rsi', f'{tf_prefix}_atr_pct', f'{tf_prefix}_vol_z'] |
| | return df[cols].ffill().bfill().fillna(0) |
| | except Exception as e: |
| | return pd.DataFrame() |
| |
|
| | def _create_feature_vector(self, ohlcv_data: Dict[str, Any], titan_score: float, mc_score: float, pattern_score: float) -> Optional[pd.DataFrame]: |
| | try: |
| | raw_1h = ohlcv_data.get('1h') |
| | if not raw_1h or len(raw_1h) < 30: return None |
| | |
| | df_1h = pd.DataFrame(raw_1h, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) |
| | df_15m = pd.DataFrame(ohlcv_data.get('15m', []), columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) |
| | df_4h = pd.DataFrame(ohlcv_data.get('4h', []), columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) |
| |
|
| | feats_1h = self._calculate_snapshot_features(df_1h, "1h").iloc[-1:].reset_index(drop=True) |
| | |
| | if len(df_15m) > 20: |
| | feats_15m = self._calculate_snapshot_features(df_15m, "15m").iloc[-1:].reset_index(drop=True) |
| | else: |
| | feats_15m = pd.DataFrame(np.zeros((1, 4)), columns=[f'15m_{c}' for c in ['slope', 'rsi', 'atr_pct', 'vol_z']]) |
| |
|
| | if len(df_4h) > 20: |
| | feats_4h = self._calculate_snapshot_features(df_4h, "4h").iloc[-1:].reset_index(drop=True) |
| | else: |
| | feats_4h = pd.DataFrame(np.zeros((1, 4)), columns=[f'4h_{c}' for c in ['slope', 'rsi', 'atr_pct', 'vol_z']]) |
| |
|
| | vector = pd.concat([feats_1h, feats_15m, feats_4h], axis=1) |
| | |
| | vector['sim_titan_score'] = float(titan_score) |
| | vector['sim_mc_score'] = float(mc_score) |
| | vector['sim_pattern_score'] = float(pattern_score) |
| |
|
| | final_vector = pd.DataFrame(columns=self.feature_cols) |
| | for col in self.feature_cols: |
| | if col in vector.columns: |
| | val = vector[col].iloc[0] |
| | final_vector.at[0, col] = float(val) if not pd.isna(val) else 0.0 |
| | else: |
| | final_vector.at[0, col] = 0.0 |
| | |
| | return final_vector.astype(float) |
| |
|
| | except Exception as e: |
| | print(f"❌ Vector Creation Error: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| | async def predict(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """تحليل الفرصة باستخدام العتبة الديناميكية""" |
| | if not self.initialized: |
| | return {'action': 'WAIT', 'reason': 'Not initialized', 'confidence': 0.0} |
| |
|
| | try: |
| | ohlcv = symbol_data.get('ohlcv') |
| | current_price = symbol_data.get('current_price', 0.0) |
| | |
| | titan = symbol_data.get('titan_score', 0.0) |
| | mc = symbol_data.get('mc_score', 0.0) |
| | patt = symbol_data.get('patterns_score', 0.0) |
| |
|
| | features = self._create_feature_vector(ohlcv, titan, mc, patt) |
| | if features is None: |
| | return {'action': 'WAIT', 'reason': 'Features failed', 'confidence': 0.0} |
| |
|
| | |
| | dir_probs = self.model_direction.predict(features)[0] |
| | |
| | if isinstance(dir_probs, (np.ndarray, list)): |
| | prob_long = float(dir_probs[0]) |
| | prob_short = float(dir_probs[1]) |
| | else: |
| | prob_short = float(dir_probs) |
| | prob_long = 1.0 - prob_short |
| |
|
| | if prob_short > prob_long: |
| | return { |
| | 'action': 'WAIT', |
| | 'reason': f'Bearish (Short Prob: {prob_short:.2f})', |
| | 'direction': 'SHORT', |
| | 'confidence': prob_long, |
| | 'short_confidence': prob_short |
| | } |
| |
|
| | confidence = prob_long |
| |
|
| | |
| | |
| | if confidence < self.confidence_threshold: |
| | return { |
| | 'action': 'WAIT', |
| | 'reason': f'Low Confidence ({confidence:.2f} < {self.confidence_threshold})', |
| | 'direction': 'LONG', |
| | 'confidence': confidence, |
| | 'short_confidence': prob_short |
| | } |
| |
|
| | |
| | strength = 0.5 |
| | if self.model_strength: |
| | strength = float(self.model_strength.predict(features)[0]) |
| | strength = max(0.0, min(1.0, strength)) |
| |
|
| | tp_class_idx = 1 |
| | if self.model_target: |
| | tgt_probs = self.model_target.predict(features)[0] |
| | tp_class_idx = np.argmax(tgt_probs) |
| | |
| | tp_labels = ['TP1', 'TP2', 'TP3', 'TP4'] |
| | target_profile = tp_labels[tp_class_idx] |
| |
|
| | |
| | atr_pct_val = features['1h_atr_pct'].iloc[0] |
| | if atr_pct_val == 0: atr_pct_val = 0.02 |
| |
|
| | atr_abs = atr_pct_val * current_price |
| | |
| | tp_map = { |
| | 'TP1': current_price + (1.0 * atr_abs), |
| | 'TP2': current_price + (1.8 * atr_abs), |
| | 'TP3': current_price + (2.8 * atr_abs), |
| | 'TP4': current_price + (4.5 * atr_abs), |
| | } |
| | |
| | primary_tp = tp_map[target_profile] |
| | sl_price = current_price - (1.2 * atr_abs) |
| |
|
| | return { |
| | 'action': 'WATCH', |
| | 'action_type': 'BUY', |
| | 'confidence': float(confidence), |
| | 'strength': float(strength), |
| | 'target_class': target_profile, |
| | 'primary_tp': float(primary_tp), |
| | 'sl_price': float(sl_price), |
| | 'tp_map': tp_map, |
| | 'analysis_summary': f"SPOT BUY (Conf: {confidence:.0%}) | Str: {strength:.2f} | Aim: {target_profile}" |
| | } |
| |
|
| | except Exception as e: |
| | print(f"❌ [Oracle] Prediction Error: {e}") |
| | return {'action': 'WAIT', 'reason': 'Error', 'confidence': 0.0} |