Trad / ml_engine /oracle_engine.py
Riy777's picture
Update ml_engine/oracle_engine.py
160a8a2 verified
# ==============================================================================
# 🧠 ml_engine/oracle_engine.py (V5.0 - Precision Feature Mapping)
# ==============================================================================
# GEM-Architect Approved
# - Fixes Shape Mismatch (6 vs 22) using the ACTUAL feature map.
# - Calculates: ATR, Vol Spike, Spread, Amihud, Wick Ratios, Body Range, etc.
# ==============================================================================
import os
import joblib
import numpy as np
import pandas as pd
import lightgbm as lgb
import warnings
import traceback
warnings.filterwarnings('ignore')
class OracleEngine:
def __init__(self, model_dir="ml_models/Unified_Models_V1"):
self.model_path = os.path.join(model_dir, "oracle_lgbm.txt")
self.model = None
self.initialized = False
# πŸ† THE GOLDEN CONFIGURATION
self.CONFIDENCE_THRESHOLD = 0.005
# Exact Feature List from oracle_meta.json
self.EXPECTED_FEATURES = [
"cnn_prob_neutral", "cnn_prob_loss", "cnn_prob_win",
"ctx_1h_atr_pct_signal", "ctx_1h_vol_spike", "ctx_1h_proxy_spread",
"ctx_1h_amihud", "ctx_1h_avg_ticket_usd", "ctx_1h_upper_wick_ratio",
"ctx_1h_lower_wick_ratio", "ctx_1h_body_to_range",
"ctx_4h_atr_pct_signal", "ctx_4h_vol_spike", "ctx_4h_proxy_spread",
"ctx_4h_amihud", "ctx_4h_avg_ticket_usd", "ctx_4h_upper_wick_ratio",
"ctx_4h_lower_wick_ratio", "ctx_4h_body_to_range",
"ret_var_30", "ret_skew_30", "ret_kurt_30"
]
async def initialize(self):
"""Load LightGBM Model"""
if self.initialized: return True
print(f"🧠 [Oracle] Loading Strategic Brain from {self.model_path}...")
try:
if not os.path.exists(self.model_path):
print(f"❌ [Oracle] Model missing: {self.model_path}")
return False
self.model = lgb.Booster(model_file=self.model_path)
self.initialized = True
print(f"βœ… [Oracle] Online. Strategy Threshold: >{self.CONFIDENCE_THRESHOLD*100:.1f}%.")
return True
except Exception as e:
print(f"❌ [Oracle] Init Error: {e}")
return False
def _prepare_dataframe(self, data_input):
"""Helper to safely convert List or DF to DataFrame"""
try:
if data_input is None: return None
if isinstance(data_input, pd.DataFrame):
return data_input if not data_input.empty else None
if isinstance(data_input, list):
if len(data_input) == 0: return None
df = pd.DataFrame(data_input, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
return df
return None
except Exception:
return None
def _calc_metrics(self, df):
"""Calculates specific context metrics for a given DataFrame"""
try:
if len(df) < 20: return np.zeros(8) # Return 8 zeros if not enough data
c = df['close'].values.astype(float)
h = df['high'].values.astype(float)
l = df['low'].values.astype(float)
o = df['open'].values.astype(float)
v = df['volume'].values.astype(float)
# 1. ATR % Signal
tr = np.maximum(h - l, np.abs(h - np.roll(c, 1)))
atr = pd.Series(tr).rolling(14).mean().iloc[-1]
atr_pct = (atr / c[-1]) if c[-1] > 0 else 0
# 2. Vol Spike
vol_ma = pd.Series(v).rolling(20).mean().iloc[-1]
vol_spike = (v[-1] / vol_ma) if vol_ma > 0 else 1.0
# 3. Proxy Spread
raw_spread = (h - l) / np.maximum(c, 1e-9)
proxy_spread = np.median(raw_spread[-14:]) * 0.5
# 4. Amihud
ret = np.abs(np.diff(np.log(c + 1e-9)))
dollar_vol = c[1:] * v[1:]
amihud = np.mean(ret[-20:] / np.maximum(dollar_vol[-20:], 1.0))
# 5. Avg Ticket (Estimate)
avg_ticket = np.mean(dollar_vol[-20:]) / 1000.0 # Rough proxy
# 6. Wick Ratios
rng = np.maximum(h[-1] - l[-1], 1e-9)
upper_wick = (h[-1] - np.maximum(o[-1], c[-1])) / rng
lower_wick = (np.minimum(o[-1], c[-1]) - l[-1]) / rng
body_range = np.abs(c[-1] - o[-1]) / rng
return np.array([atr_pct, vol_spike, proxy_spread, amihud, avg_ticket, upper_wick, lower_wick, body_range])
except:
return np.zeros(8)
def _calc_ret_stats(self, df):
"""Calculate Return Statistics (Var, Skew, Kurt)"""
try:
if len(df) < 30: return np.zeros(3)
close = df['close'].values.astype(float)
prev_close = np.roll(close, 1); prev_close[0] = close[0]
log_ret = np.log(close / np.maximum(prev_close, 1e-9))
s = pd.Series(log_ret[-30:])
return np.array([s.var(), s.skew(), s.kurt()])
except:
return np.zeros(3)
async def predict(self, symbol_data: dict) -> dict:
"""
Decision Core with Full Feature Mapping.
"""
if not self.initialized:
return {'action': 'WAIT', 'reason': 'Oracle Not Init', 'oracle_score': 0.0}
try:
# 1. Get Inputs
titan_probs = symbol_data.get('pattern_probs') or symbol_data.get('titan_probs')
if not titan_probs or len(titan_probs) != 3:
return {'action': 'WAIT', 'reason': 'No Pattern Input', 'oracle_score': 0.0}
# 2. Get DataFrames (15m is used for 1h proxy if 1h missing, logic below)
raw_ohlcv = symbol_data.get('ohlcv', {})
df_15m = self._prepare_dataframe(raw_ohlcv.get('15m'))
df_1h = self._prepare_dataframe(raw_ohlcv.get('1h'))
# Fallback if 1h is missing but 15m exists
if df_1h is None and df_15m is not None: df_1h = df_15m
# Fallback if 4h is missing (use 1h)
df_4h = df_1h
if df_15m is None:
return {'action': 'WAIT', 'reason': 'No Market Data', 'oracle_score': 0.0}
# 3. Calculate Feature Groups
# Group 1: Titan [3]
f_titan = np.array(titan_probs)
# Group 2: Context 1H [8]
f_1h = self._calc_metrics(df_1h) # Uses 1H data
# Group 3: Context 4H [8]
f_4h = self._calc_metrics(df_4h) # Uses 4H (or 1H fallback) data
# Group 4: Return Stats (from 15m for sensitivity) [3]
f_stats = self._calc_ret_stats(df_15m)
# 4. Concatenate All 22 Features
# Order MUST match: Titan(3) + 1H(8) + 4H(8) + Stats(3) = 22
input_vector = np.concatenate([f_titan, f_1h, f_4h, f_stats])
# Reshape for LightGBM
input_vector = input_vector.reshape(1, -1)
# 5. Predict
predicted_pnl = float(self.model.predict(input_vector)[0])
# 6. Decision Logic
cnn_win_prob = titan_probs[2]
is_buy = False
reason = ""
if predicted_pnl > self.CONFIDENCE_THRESHOLD:
is_buy = True
reason = f"Golden Setup (Exp: {predicted_pnl*100:.2f}%)"
elif predicted_pnl > (self.CONFIDENCE_THRESHOLD * 0.5) and cnn_win_prob > 0.7:
is_buy = True
reason = f"High Certainty (Win: {cnn_win_prob:.2f})"
else:
reason = f"Weak (Exp: {predicted_pnl*100:.2f}%)"
result = {
'confidence': float(cnn_win_prob),
'oracle_score': float(predicted_pnl),
'target_class': "TP2" if predicted_pnl > 0.01 else "TP1",
'action_type': 'BUY',
'analysis_summary': f"Oracle: {predicted_pnl*100:.2f}% | Pattern: {cnn_win_prob:.2f}"
}
if is_buy:
result['action'] = 'WATCH'
else:
result['action'] = 'WAIT'
result['reason'] = reason
return result
except Exception as e:
# traceback.print_exc()
return {'action': 'WAIT', 'reason': f'Oracle Err: {str(e)[:20]}', 'oracle_score': 0.0}