# ============================================================================== # 🧠 ml_engine/oracle_engine.py (V5.0 - Precision Feature Mapping) # ============================================================================== # GEM-Architect Approved # - Fixes Shape Mismatch (6 vs 22) using the ACTUAL feature map. # - Calculates: ATR, Vol Spike, Spread, Amihud, Wick Ratios, Body Range, etc. # ============================================================================== import os import joblib import numpy as np import pandas as pd import lightgbm as lgb import warnings import traceback warnings.filterwarnings('ignore') class OracleEngine: def __init__(self, model_dir="ml_models/Unified_Models_V1"): self.model_path = os.path.join(model_dir, "oracle_lgbm.txt") self.model = None self.initialized = False # 🏆 THE GOLDEN CONFIGURATION self.CONFIDENCE_THRESHOLD = 0.005 # Exact Feature List from oracle_meta.json self.EXPECTED_FEATURES = [ "cnn_prob_neutral", "cnn_prob_loss", "cnn_prob_win", "ctx_1h_atr_pct_signal", "ctx_1h_vol_spike", "ctx_1h_proxy_spread", "ctx_1h_amihud", "ctx_1h_avg_ticket_usd", "ctx_1h_upper_wick_ratio", "ctx_1h_lower_wick_ratio", "ctx_1h_body_to_range", "ctx_4h_atr_pct_signal", "ctx_4h_vol_spike", "ctx_4h_proxy_spread", "ctx_4h_amihud", "ctx_4h_avg_ticket_usd", "ctx_4h_upper_wick_ratio", "ctx_4h_lower_wick_ratio", "ctx_4h_body_to_range", "ret_var_30", "ret_skew_30", "ret_kurt_30" ] async def initialize(self): """Load LightGBM Model""" if self.initialized: return True print(f"🧠 [Oracle] Loading Strategic Brain from {self.model_path}...") try: if not os.path.exists(self.model_path): print(f"❌ [Oracle] Model missing: {self.model_path}") return False self.model = lgb.Booster(model_file=self.model_path) self.initialized = True print(f"✅ [Oracle] Online. Strategy Threshold: >{self.CONFIDENCE_THRESHOLD*100:.1f}%.") return True except Exception as e: print(f"❌ [Oracle] Init Error: {e}") return False def _prepare_dataframe(self, data_input): """Helper to safely convert List or DF to DataFrame""" try: if data_input is None: return None if isinstance(data_input, pd.DataFrame): return data_input if not data_input.empty else None if isinstance(data_input, list): if len(data_input) == 0: return None df = pd.DataFrame(data_input, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) return df return None except Exception: return None def _calc_metrics(self, df): """Calculates specific context metrics for a given DataFrame""" try: if len(df) < 20: return np.zeros(8) # Return 8 zeros if not enough data c = df['close'].values.astype(float) h = df['high'].values.astype(float) l = df['low'].values.astype(float) o = df['open'].values.astype(float) v = df['volume'].values.astype(float) # 1. ATR % Signal tr = np.maximum(h - l, np.abs(h - np.roll(c, 1))) atr = pd.Series(tr).rolling(14).mean().iloc[-1] atr_pct = (atr / c[-1]) if c[-1] > 0 else 0 # 2. Vol Spike vol_ma = pd.Series(v).rolling(20).mean().iloc[-1] vol_spike = (v[-1] / vol_ma) if vol_ma > 0 else 1.0 # 3. Proxy Spread raw_spread = (h - l) / np.maximum(c, 1e-9) proxy_spread = np.median(raw_spread[-14:]) * 0.5 # 4. Amihud ret = np.abs(np.diff(np.log(c + 1e-9))) dollar_vol = c[1:] * v[1:] amihud = np.mean(ret[-20:] / np.maximum(dollar_vol[-20:], 1.0)) # 5. Avg Ticket (Estimate) avg_ticket = np.mean(dollar_vol[-20:]) / 1000.0 # Rough proxy # 6. Wick Ratios rng = np.maximum(h[-1] - l[-1], 1e-9) upper_wick = (h[-1] - np.maximum(o[-1], c[-1])) / rng lower_wick = (np.minimum(o[-1], c[-1]) - l[-1]) / rng body_range = np.abs(c[-1] - o[-1]) / rng return np.array([atr_pct, vol_spike, proxy_spread, amihud, avg_ticket, upper_wick, lower_wick, body_range]) except: return np.zeros(8) def _calc_ret_stats(self, df): """Calculate Return Statistics (Var, Skew, Kurt)""" try: if len(df) < 30: return np.zeros(3) close = df['close'].values.astype(float) prev_close = np.roll(close, 1); prev_close[0] = close[0] log_ret = np.log(close / np.maximum(prev_close, 1e-9)) s = pd.Series(log_ret[-30:]) return np.array([s.var(), s.skew(), s.kurt()]) except: return np.zeros(3) async def predict(self, symbol_data: dict) -> dict: """ Decision Core with Full Feature Mapping. """ if not self.initialized: return {'action': 'WAIT', 'reason': 'Oracle Not Init', 'oracle_score': 0.0} try: # 1. Get Inputs titan_probs = symbol_data.get('pattern_probs') or symbol_data.get('titan_probs') if not titan_probs or len(titan_probs) != 3: return {'action': 'WAIT', 'reason': 'No Pattern Input', 'oracle_score': 0.0} # 2. Get DataFrames (15m is used for 1h proxy if 1h missing, logic below) raw_ohlcv = symbol_data.get('ohlcv', {}) df_15m = self._prepare_dataframe(raw_ohlcv.get('15m')) df_1h = self._prepare_dataframe(raw_ohlcv.get('1h')) # Fallback if 1h is missing but 15m exists if df_1h is None and df_15m is not None: df_1h = df_15m # Fallback if 4h is missing (use 1h) df_4h = df_1h if df_15m is None: return {'action': 'WAIT', 'reason': 'No Market Data', 'oracle_score': 0.0} # 3. Calculate Feature Groups # Group 1: Titan [3] f_titan = np.array(titan_probs) # Group 2: Context 1H [8] f_1h = self._calc_metrics(df_1h) # Uses 1H data # Group 3: Context 4H [8] f_4h = self._calc_metrics(df_4h) # Uses 4H (or 1H fallback) data # Group 4: Return Stats (from 15m for sensitivity) [3] f_stats = self._calc_ret_stats(df_15m) # 4. Concatenate All 22 Features # Order MUST match: Titan(3) + 1H(8) + 4H(8) + Stats(3) = 22 input_vector = np.concatenate([f_titan, f_1h, f_4h, f_stats]) # Reshape for LightGBM input_vector = input_vector.reshape(1, -1) # 5. Predict predicted_pnl = float(self.model.predict(input_vector)[0]) # 6. Decision Logic cnn_win_prob = titan_probs[2] is_buy = False reason = "" if predicted_pnl > self.CONFIDENCE_THRESHOLD: is_buy = True reason = f"Golden Setup (Exp: {predicted_pnl*100:.2f}%)" elif predicted_pnl > (self.CONFIDENCE_THRESHOLD * 0.5) and cnn_win_prob > 0.7: is_buy = True reason = f"High Certainty (Win: {cnn_win_prob:.2f})" else: reason = f"Weak (Exp: {predicted_pnl*100:.2f}%)" result = { 'confidence': float(cnn_win_prob), 'oracle_score': float(predicted_pnl), 'target_class': "TP2" if predicted_pnl > 0.01 else "TP1", 'action_type': 'BUY', 'analysis_summary': f"Oracle: {predicted_pnl*100:.2f}% | Pattern: {cnn_win_prob:.2f}" } if is_buy: result['action'] = 'WATCH' else: result['action'] = 'WAIT' result['reason'] = reason return result except Exception as e: # traceback.print_exc() return {'action': 'WAIT', 'reason': f'Oracle Err: {str(e)[:20]}', 'oracle_score': 0.0}