Trad / ml_engine /processor.py
Riy777's picture
Update ml_engine/processor.py
5af05d7 verified
# ==============================================================================
# ๐Ÿง  ml_engine/processor.py (V70.7 - GEM-Architect: Gate Override Fix)
# ==============================================================================
# - Fix: Forces the Pattern Gate to respect the lower (more permissive) threshold.
# This ensures coins with ~40% pattern score pass to Oracle if SystemLimit is 0.25.
# ==============================================================================
import asyncio
import traceback
import os
import numpy as np
from typing import Dict, Any, List, Optional
# --- Imports ---
try: from .pattern_engine import PatternEngine
except ImportError: PatternEngine = None
try: from .monte_carlo import MonteCarloEngine
except ImportError: MonteCarloEngine = None
try: from .oracle_engine import OracleEngine
except ImportError: OracleEngine = None
try: from .sniper_engine import SniperEngine
except ImportError: SniperEngine = None
try: from .hybrid_guardian import HybridDeepSteward
except ImportError: HybridDeepSteward = None
try: from .guardian_hydra import GuardianHydra
except ImportError: GuardianHydra = None
# Base Paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
MODELS_UNIFIED_DIR = os.path.join(BASE_DIR, "ml_models", "Unified_Models_V1")
MODELS_SNIPER_DIR = os.path.join(BASE_DIR, "ml_models", "guard_v2")
MODELS_HYDRA_DIR = os.path.join(BASE_DIR, "ml_models", "guard_v1")
MODEL_V2_PATH = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V2_Production.json")
MODEL_V3_PATH = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V3_Production.json")
MODEL_V3_FEAT = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V3_Features.json")
# ============================================================
# ๐ŸŽ›๏ธ SYSTEM LIMITS
# ============================================================
class SystemLimits:
"""
GEM-Architect: Logic Gates Configuration (Data-Driven).
"""
# --- Layer 2: Pattern Net Gate ---
L2_GATE_PATTERN_NET = 0.25 # โœ… Permissive Gate to let Oracle decide
# --- Layer 2: Composite Score Weights ---
L2_WEIGHT_ORACLE = 0.30 # The King
L2_WEIGHT_PATTERN = 0.50 # The Scout
L2_WEIGHT_MC = 0.20 # The Anchor
L2_MIN_COMPOSITE_SCORE = 50.0
# --- Layer 3 & 4 Limits ---
L3_WHALE_IMPACT_MAX = 15.0
L3_NEWS_IMPACT_MAX = 10.0
L3_MC_ADVANCED_MAX = 10.0
L4_ENTRY_THRESHOLD = 0.40
L4_WEIGHT_ML = 0.60
L4_WEIGHT_OB = 0.40
L4_OB_WALL_RATIO = 0.35
# --- Guardians ---
HYDRA_CRASH_THRESH = 0.80
HYDRA_GIVEBACK_THRESH = 0.70
HYDRA_STAGNATION_THRESH = 0.60
LEGACY_V2_PANIC_THRESH = 0.98
LEGACY_V3_HARD_THRESH = 0.95
LEGACY_V3_SOFT_THRESH = 0.88
LEGACY_V3_ULTRA_THRESH = 0.99
@classmethod
def to_dict(cls) -> Dict[str, Any]:
return {k: v for k, v in cls.__dict__.items() if not k.startswith('__') and not callable(v)}
# ============================================================
# ๐Ÿง  MLProcessor Class
# ============================================================
class MLProcessor:
def __init__(self, data_manager=None):
self.data_manager = data_manager
self.initialized = False
self.initialization_attempted = False
self.pattern_net = PatternEngine(model_dir=MODELS_UNIFIED_DIR) if PatternEngine else None
self.oracle = OracleEngine(model_dir=MODELS_UNIFIED_DIR) if OracleEngine else None
self.mc_analyzer = MonteCarloEngine() if MonteCarloEngine else None
self.sniper = SniperEngine(models_dir=MODELS_SNIPER_DIR) if SniperEngine else None
self.guardian_hydra = GuardianHydra(model_dir=MODELS_HYDRA_DIR) if GuardianHydra else None
self.guardian_legacy = None
if HybridDeepSteward:
self.guardian_legacy = HybridDeepSteward(
v2_model_path=MODEL_V2_PATH,
v3_model_path=MODEL_V3_PATH,
v3_features_map_path=MODEL_V3_FEAT
)
print(f"๐Ÿง  [Processor V70.7] Gate Override Logic Active.")
async def initialize(self):
"""Async Initialization"""
if self.initialized: return True
if self.initialization_attempted: return self.initialized
self.initialization_attempted = True
print("โš™๏ธ [Processor] Starting Neural Grid...")
try:
tasks = []
if self.pattern_net: tasks.append(self.pattern_net.initialize())
if self.oracle: tasks.append(self.oracle.initialize())
if self.sniper:
self.sniper.configure_settings(
threshold=SystemLimits.L4_ENTRY_THRESHOLD,
wall_ratio=SystemLimits.L4_OB_WALL_RATIO,
w_ml=SystemLimits.L4_WEIGHT_ML,
w_ob=SystemLimits.L4_WEIGHT_OB
)
tasks.append(self.sniper.initialize())
if self.mc_analyzer and hasattr(self.mc_analyzer, 'initialize'):
if asyncio.iscoroutinefunction(self.mc_analyzer.initialize):
tasks.append(self.mc_analyzer.initialize())
if tasks: await asyncio.gather(*tasks)
if self.guardian_hydra: self.guardian_hydra.initialize()
if self.guardian_legacy:
if asyncio.iscoroutinefunction(self.guardian_legacy.initialize):
await self.guardian_legacy.initialize()
else:
self.guardian_legacy.initialize()
self.initialized = True
return True
except Exception as e:
print(f"โŒ [Processor] Init Error: {e}")
traceback.print_exc()
return False
# ============================================================
# ๐Ÿข LAYER 2: Pattern + Oracle + MC (Centered Logic)
# ============================================================
async def execute_layer2_analysis(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.initialized: await self.initialize()
symbol = raw_data.get('symbol')
ohlcv = raw_data.get('ohlcv')
limits = raw_data.get('dynamic_limits', {})
result = raw_data.copy()
result.update({
'is_valid': False,
'reason': 'Unknown',
'l2_score': 0.0,
'pattern_score': 0.0,
'oracle_score': 0.0,
'mc_score': 0.0,
'pattern_probs': [0, 0, 0]
})
try:
# 1. Pattern Net Analysis
pattern_res = {'score': 0.0, 'probs': [0,0,0]}
if self.pattern_net:
pattern_res = await asyncio.to_thread(self.pattern_net.predict, ohlcv)
nn_score = pattern_res.get('score', 0.0)
pattern_probs = pattern_res.get('probs', [0,0,0])
result['pattern_score'] = nn_score
result['pattern_probs'] = pattern_probs
# 2. Monte Carlo
mc_val = 0.5
if self.mc_analyzer and '1h' in ohlcv:
try:
closes = [c[4] for c in ohlcv['1h']]
raw_mc = self.mc_analyzer.run_light_check(closes)
mc_val = 0.5 + (raw_mc * 5.0)
mc_val = max(0.0, min(1.0, mc_val))
except: pass
result['mc_score'] = mc_val
# ๐Ÿ›‘ Soft Gate Check (The Fix)
# We use min() to ensure we pick the MOST PERMISSIVE threshold.
# If dynamic is 0.50 and system is 0.25, we use 0.25.
dynamic_gate = limits.get('l2_gate_pattern', 1.0)
gate_pattern = min(dynamic_gate, SystemLimits.L2_GATE_PATTERN_NET)
if nn_score < gate_pattern:
result['reason'] = f"Pattern Score {nn_score:.2f} < {gate_pattern}"
return result
# 3. Oracle Analysis (Now reachable!)
oracle_input = raw_data.copy()
oracle_input['titan_probs'] = pattern_probs
oracle_input['pattern_probs'] = pattern_probs
oracle_res = {'oracle_score': 0.0}
if self.oracle:
thresh = limits.get('l3_oracle_thresh', 0.005)
if hasattr(self.oracle, 'set_threshold'): self.oracle.set_threshold(thresh)
oracle_res = await self.oracle.predict(oracle_input)
raw_oracle_pred = oracle_res.get('oracle_score', 0.0)
result['oracle_score'] = raw_oracle_pred
# โš–๏ธ SCORING Logic
oracle_val = 50.0 + (raw_oracle_pred / 0.005) * 50.0
oracle_val = max(0.0, min(100.0, oracle_val))
oracle_val_norm = oracle_val / 100.0
# 4. Composite Scoring
composite_score = (
(oracle_val_norm * SystemLimits.L2_WEIGHT_ORACLE) +
(nn_score * SystemLimits.L2_WEIGHT_PATTERN) +
(mc_val * SystemLimits.L2_WEIGHT_MC)
) * 100
result['l2_score'] = composite_score
if composite_score < SystemLimits.L2_MIN_COMPOSITE_SCORE:
result['reason'] = f"Composite {composite_score:.1f} < {SystemLimits.L2_MIN_COMPOSITE_SCORE}"
return result
# โœ… PASSED
result['is_valid'] = True
result['reason'] = 'PASSED'
result['titan_score'] = nn_score * 100
result['titan_probs'] = pattern_probs
return result
except Exception as e:
print(f"โŒ [Layer 2] Error {symbol}: {e}")
result['reason'] = f"Error: {str(e)}"
return result
# ... (Rest of the class methods identical to V70.6) ...
async def execute_layer4_sniper(self, symbol: str, ohlcv_1m: List, order_book: Dict) -> Dict[str, Any]:
if not self.initialized: await self.initialize()
if not self.sniper: return {'signal': 'WAIT', 'confidence_prob': 0.0, 'reason': 'No Sniper'}
try:
self.sniper.configure_settings(
threshold=SystemLimits.L4_ENTRY_THRESHOLD,
wall_ratio=SystemLimits.L4_OB_WALL_RATIO,
w_ml=SystemLimits.L4_WEIGHT_ML,
w_ob=SystemLimits.L4_WEIGHT_OB
)
result = await self.sniper.check_entry_signal_async(ohlcv_1m, order_book, symbol=symbol)
return result
except Exception as e:
return {'signal': 'WAIT', 'confidence_prob': 0.0, 'reason': f"Sniper Error: {e}"}
def consult_guardians(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context, ob_snapshot=None):
if not self.initialized:
return {'action': 'HOLD', 'reason': 'System not initialized', 'probs': {}, 'scores': {}}
limits = trade_context.get('dynamic_limits', {})
h_crash_thresh = limits.get('hydra_crash', SystemLimits.HYDRA_CRASH_THRESH)
h_giveback_thresh = limits.get('hydra_giveback', SystemLimits.HYDRA_GIVEBACK_THRESH)
h_stag_thresh = limits.get('hydra_stagnation', SystemLimits.HYDRA_STAGNATION_THRESH)
entry_price = float(trade_context.get('entry_price', 0.0))
highest_price = trade_context.get('highest_price', entry_price)
max_pnl_pct = ((highest_price - entry_price) / entry_price) * 100 if entry_price > 0 else 0.0
time_in_trade_mins = trade_context.get('time_in_trade_mins', 0.0)
hydra_result = {'action': 'HOLD', 'reason': 'Disabled', 'probs': {}}
if self.guardian_hydra:
try:
hydra_result = self.guardian_hydra.analyze_position(symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context)
h_probs = hydra_result.get('probs', {})
p_crash = h_probs.get('crash', 0.0)
p_giveback = h_probs.get('giveback', 0.0)
p_stagnation = h_probs.get('stagnation', 0.0)
if p_crash >= h_crash_thresh:
hydra_result['action'] = 'EXIT_HARD'
hydra_result['reason'] = f"Hydra Crash Risk {p_crash:.2f}"
elif p_giveback >= h_giveback_thresh and max_pnl_pct >= 0.6:
hydra_result['action'] = 'EXIT_SOFT'
hydra_result['reason'] = f"Hydra Giveback {p_giveback:.2f}"
elif p_stagnation >= h_stag_thresh and time_in_trade_mins > 90:
hydra_result['action'] = 'EXIT_SOFT'
hydra_result['reason'] = f"Hydra Stagnation {p_stagnation:.2f}"
except Exception: pass
legacy_result = {'action': 'HOLD', 'reason': 'Disabled', 'scores': {}}
if self.guardian_legacy:
try:
vol_30m = trade_context.get('volume_30m_usd', 0.0)
legacy_result = self.guardian_legacy.analyze_position(
ohlcv_1m, ohlcv_5m, ohlcv_15m, entry_price,
order_book=ob_snapshot,
volume_30m_usd=vol_30m
)
except Exception: pass
final_action = 'HOLD'
final_reason = f"Safe."
hydra_act = hydra_result.get('action', 'HOLD')
legacy_act = legacy_result.get('action', 'HOLD')
if hydra_act in ['EXIT_HARD', 'EXIT_SOFT', 'TIGHTEN_SL', 'TRAIL_SL']:
final_action = hydra_act
final_reason = f"๐Ÿฒ {hydra_result.get('reason')}"
elif legacy_act in ['EXIT_HARD', 'EXIT_SOFT']:
final_action = legacy_act
final_reason = f"๐Ÿ•ธ๏ธ {legacy_result.get('reason')}"
return {
'action': final_action,
'reason': final_reason,
'probs': hydra_result.get('probs', {}),
'scores': legacy_result.get('scores', {})
}
async def run_advanced_monte_carlo(self, symbol, timeframe='1h'):
if self.mc_analyzer and self.data_manager:
try:
ohlcv = await self.data_manager.get_latest_ohlcv(symbol, timeframe, limit=300)
if ohlcv: return self.mc_analyzer.run_advanced_simulation([c[4] for c in ohlcv])
except: pass
return 0.0
async def consult_oracle(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]:
if self.oracle: return await self.oracle.predict(symbol_data)
return {'action': 'WAIT'}
async def check_sniper_entry(self, ohlcv_1m_data, order_book_data, context_data=None):
return await self.execute_layer4_sniper("UNKNOWN", ohlcv_1m_data, order_book_data)