# ============================================================ # 🧠 ml_engine/processor.py (V36.0 - GEM-Architect: The Cybernetic Processor) # ============================================================ import asyncio import traceback import logging import os import sys import numpy as np from typing import Dict, Any, List, Optional # --- استيراد المحركات (كما هي) --- try: from .titan_engine import TitanEngine except ImportError: TitanEngine = None try: from .patterns import ChartPatternAnalyzer except ImportError: ChartPatternAnalyzer = None try: from .monte_carlo import MonteCarloEngine except ImportError: MonteCarloEngine = None try: from .oracle_engine import OracleEngine except ImportError: OracleEngine = None try: from .sniper_engine import SniperEngine except ImportError: SniperEngine = None try: from .hybrid_guardian import HybridDeepSteward except ImportError: HybridDeepSteward = None try: from .guardian_hydra import GuardianHydra except ImportError: GuardianHydra = None # ============================================================ # 📂 مسارات النماذج # ============================================================ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) MODELS_L2_DIR = os.path.join(BASE_DIR, "ml_models", "layer2") MODELS_PATTERN_DIR = os.path.join(BASE_DIR, "ml_models", "xgboost_pattern2") MODELS_UNIFIED_DIR = os.path.join(BASE_DIR, "ml_models", "Unified_Models_V1") MODELS_SNIPER_DIR = os.path.join(BASE_DIR, "ml_models", "guard_v2") MODELS_HYDRA_DIR = os.path.join(BASE_DIR, "ml_models", "guard_v1") MODEL_V2_PATH = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V2_Production.json") MODEL_V3_PATH = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V3_Production.json") MODEL_V3_FEAT = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V3_Features.json") # ============================================================ # 🎛️ SYSTEM LIMITS & THRESHOLDS (Dynamic DNA Container) # ============================================================ class SystemLimits: """ GEM-Architect: The Dynamic Constitution. يتم تحديث هذه القيم آلياً بواسطة AdaptiveHub بناءً على حالة السوق (Bull/Bear/etc). """ # --- Layer 1 (Data Manager Control) --- L1_MIN_AFFINITY_SCORE = 15.0 # سيتم الكتابة عليها من الـ DNA # --- Layer 2 Weights (Dynamic) --- # هذه الأوزان تتغير حسب أداء النماذج (Tactical Loop) وحالة السوق L2_WEIGHT_TITAN = 0.40 L2_WEIGHT_PATTERNS = 0.30 L2_WEIGHT_MC = 0.10 # (Hydra/Sniper قد يساهمون في L2 أو L4 حسب التصميم) # إعدادات الأنماط (تتغير حسب الاستراتيجية) PATTERN_TF_WEIGHTS = {'15m': 0.40, '1h': 0.30, '5m': 0.20, '4h': 0.10, '1d': 0.00} PATTERN_THRESH_BULLISH = 0.60 PATTERN_THRESH_BEARISH = 0.40 # --- Layer 3 (Oracle) --- L3_CONFIDENCE_THRESHOLD = 0.65 L3_WHALE_IMPACT_MAX = 0.10 L3_NEWS_IMPACT_MAX = 0.05 L3_MC_ADVANCED_MAX = 0.10 # --- Layer 4 (Sniper & Execution) --- L4_ENTRY_THRESHOLD = 0.40 # أوزان داخلية لـ Sniper (ML vs OrderBook) L4_WEIGHT_ML = 0.60 L4_WEIGHT_OB = 0.40 # نسبة الجدار المسموح بها (تتغير جذرياً بين Bull و Bear) L4_OB_WALL_RATIO = 0.40 # --- Layer 0: Hydra & Guardian Thresholds --- HYDRA_CRASH_THRESH = 0.60 HYDRA_GIVEBACK_THRESH = 0.70 HYDRA_STAGNATION_THRESH = 0.50 # Legacy Guard Thresholds LEGACY_V2_PANIC_THRESH = 0.95 LEGACY_V3_HARD_THRESH = 0.95 LEGACY_V3_SOFT_THRESH = 0.85 LEGACY_V3_ULTRA_THRESH = 0.98 @classmethod def to_dict(cls) -> Dict[str, Any]: return {k: v for k, v in cls.__dict__.items() if not k.startswith('__') and not callable(v)} @classmethod def update_from_dict(cls, config: Dict[str, Any]): """تحديث القيم من AdaptiveHub""" if not config: return for k, v in config.items(): if hasattr(cls, k): setattr(cls, k, v) # print(f"🔄 [SystemLimits] Updated. TitanW={cls.L2_WEIGHT_TITAN:.2f}, WallRatio={cls.L4_OB_WALL_RATIO}") # ============================================================ # 🧠 MLProcessor Class # ============================================================ class MLProcessor: def __init__(self, data_manager=None): self.data_manager = data_manager self.initialized = False self.titan = TitanEngine(model_dir=MODELS_L2_DIR) if TitanEngine else None self.pattern_engine = ChartPatternAnalyzer(models_dir=MODELS_PATTERN_DIR) if ChartPatternAnalyzer else None self.mc_analyzer = MonteCarloEngine() if MonteCarloEngine else None self.oracle = OracleEngine(model_dir=MODELS_UNIFIED_DIR) if OracleEngine else None self.sniper = SniperEngine(models_dir=MODELS_SNIPER_DIR) if SniperEngine else None self.guardian_hydra = None if GuardianHydra: self.guardian_hydra = GuardianHydra(model_dir=MODELS_HYDRA_DIR) self.guardian_legacy = None if HybridDeepSteward: self.guardian_legacy = HybridDeepSteward( v2_model_path=MODEL_V2_PATH, v3_model_path=MODEL_V3_PATH, v3_features_map_path=MODEL_V3_FEAT ) print(f"🧠 [MLProcessor V36.0] Cybernetic Control Active.") async def initialize(self): if self.initialized: return print("⚙️ [Processor] Initializing Neural Grid...") try: tasks = [] if self.titan: tasks.append(self.titan.initialize()) if self.pattern_engine: # التكوين الأولي (سيتم تحديثه لاحقاً ديناميكياً) self.pattern_engine.configure_thresholds( weights=SystemLimits.PATTERN_TF_WEIGHTS, bull_thresh=SystemLimits.PATTERN_THRESH_BULLISH, bear_thresh=SystemLimits.PATTERN_THRESH_BEARISH ) tasks.append(self.pattern_engine.initialize()) if self.oracle: if hasattr(self.oracle, 'set_threshold'): self.oracle.set_threshold(SystemLimits.L3_CONFIDENCE_THRESHOLD) tasks.append(self.oracle.initialize()) if self.sniper: # التكوين الأولي if hasattr(self.sniper, 'configure_settings'): self.sniper.configure_settings( threshold=SystemLimits.L4_ENTRY_THRESHOLD, wall_ratio=SystemLimits.L4_OB_WALL_RATIO, w_ml=SystemLimits.L4_WEIGHT_ML, w_ob=SystemLimits.L4_WEIGHT_OB ) tasks.append(self.sniper.initialize()) if tasks: await asyncio.gather(*tasks) if self.guardian_hydra: self.guardian_hydra.initialize() print(" 🛡️ [Guard 1] Hydra X-Ray: Active") if self.guardian_legacy: if asyncio.iscoroutinefunction(self.guardian_legacy.initialize): await self.guardian_legacy.initialize() else: self.guardian_legacy.initialize() # تطبيق العتبات المبدئية self.guardian_legacy.configure_thresholds( v2_panic=SystemLimits.LEGACY_V2_PANIC_THRESH, v3_hard=SystemLimits.LEGACY_V3_HARD_THRESH, v3_soft=SystemLimits.LEGACY_V3_SOFT_THRESH, v3_ultra=SystemLimits.LEGACY_V3_ULTRA_THRESH ) print(f" 🛡️ [Guard 2] Legacy Steward: Active") self.initialized = True print("✅ [Processor] All Systems Operational.") except Exception as e: print(f"❌ [Processor FATAL] Init failed: {e}") traceback.print_exc() async def process_compound_signal(self, raw_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ L2 Processing: هنا يتم دمج الدرجات بناءً على الأوزان الديناميكية الحالية (SystemLimits). """ if not self.initialized: await self.initialize() symbol = raw_data.get('symbol') ohlcv_data = raw_data.get('ohlcv') current_price = raw_data.get('current_price', 0.0) if not symbol or not ohlcv_data: return None try: # 1. Titan Engine score_titan = 0.5 titan_res = {} if self.titan: titan_res = await asyncio.to_thread(self.titan.predict, ohlcv_data) score_titan = titan_res.get('score', 0.5) # 2. Pattern Engine (Inject Dynamic Config First) score_patterns = 0.5 pattern_res = {} pattern_name = "Neutral" if self.pattern_engine: # تحديث التكوين قبل التحليل لضمان استخدام أحدث أوزان الاستراتيجية self.pattern_engine.configure_thresholds( weights=SystemLimits.PATTERN_TF_WEIGHTS, bull_thresh=SystemLimits.PATTERN_THRESH_BULLISH, bear_thresh=SystemLimits.PATTERN_THRESH_BEARISH ) pattern_res = await self.pattern_engine.detect_chart_patterns(ohlcv_data) score_patterns = pattern_res.get('pattern_confidence', 0.5) pattern_name = pattern_res.get('pattern_detected', 'Neutral') # 3. Monte Carlo (Light) mc_score = 0.5 if self.mc_analyzer and '1h' in ohlcv_data: closes = [c[4] for c in ohlcv_data['1h']] raw_mc = self.mc_analyzer.run_light_check(closes) mc_score = 0.5 + (raw_mc * 5.0) mc_score = max(0.0, min(1.0, mc_score)) # 4. Hybrid Calculation (Using Dynamic Weights from SystemLimits) # هذه الأوزان يتم تحديثها بواسطة AdaptiveHub w_titan = SystemLimits.L2_WEIGHT_TITAN w_patt = SystemLimits.L2_WEIGHT_PATTERNS w_mc = SystemLimits.L2_WEIGHT_MC # تطبيع الأوزان (لضمان أن مجموعها 1.0 تقريباً) total_w = w_titan + w_patt + w_mc if total_w <= 0: total_w = 1.0 hybrid_score = ((score_titan * w_titan) + (score_patterns * w_patt) + (mc_score * w_mc)) / total_w return { 'symbol': symbol, 'current_price': current_price, 'enhanced_final_score': hybrid_score, 'titan_score': score_titan, 'patterns_score': score_patterns, 'mc_score': mc_score, 'components': { 'titan_score': score_titan, 'patterns_score': score_patterns, 'mc_score': mc_score }, 'pattern_name': pattern_name, 'ohlcv': ohlcv_data, 'titan_details': titan_res, 'pattern_details': pattern_res.get('details', {}) } except Exception as e: print(f"❌ [Processor] Error processing {symbol}: {e}") return None async def consult_oracle(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]: """ L3 Processing: Oracle يستخدم العتبة الحالية من SystemLimits. """ if not self.initialized: await self.initialize() if self.oracle: # تحديث العتبة ديناميكياً if hasattr(self.oracle, 'set_threshold'): self.oracle.set_threshold(SystemLimits.L3_CONFIDENCE_THRESHOLD) decision = await self.oracle.predict(symbol_data) conf = decision.get('confidence', 0.0) # طبقة أمان إضافية (Redundant Safety) if decision.get('action') in ['WATCH', 'BUY'] and conf < SystemLimits.L3_CONFIDENCE_THRESHOLD: decision['action'] = 'WAIT' decision['reason'] = f"Processor Veto: Conf {conf:.2f} < Limit {SystemLimits.L3_CONFIDENCE_THRESHOLD}" return decision return {'action': 'WAIT', 'reason': 'Oracle Engine Missing'} async def check_sniper_entry(self, ohlcv_1m_data: List, order_book_data: Dict[str, Any]) -> Dict[str, Any]: """ L4 Processing: Sniper يستلم إعدادات دفتر الطلبات (DNA) قبل اتخاذ القرار. """ if not self.initialized: await self.initialize() if self.sniper: # ✅ الحقن الديناميكي لإعدادات Sniper قبل التنفيذ if hasattr(self.sniper, 'configure_settings'): self.sniper.configure_settings( threshold=SystemLimits.L4_ENTRY_THRESHOLD, wall_ratio=SystemLimits.L4_OB_WALL_RATIO, w_ml=SystemLimits.L4_WEIGHT_ML, w_ob=SystemLimits.L4_WEIGHT_OB ) return await self.sniper.check_entry_signal_async(ohlcv_1m_data, order_book_data) return {'signal': 'WAIT', 'reason': 'Sniper Engine Missing'} def consult_dual_guardians(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context, order_book_snapshot=None): """ L0 Guardians: الحراس يستخدمون العتبات الديناميكية (Panic Thresholds) التي قد تختلف بين Bull و Bear. """ response = {'action': 'HOLD', 'detailed_log': '', 'probs': {}} # 1. Hydra hydra_result = {'action': 'HOLD', 'reason': 'Disabled', 'probs': {}} if self.guardian_hydra and self.guardian_hydra.initialized: hydra_result = self.guardian_hydra.analyze_position(symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context) h_probs = hydra_result.get('probs', {}) p_crash = h_probs.get('crash', 0.0) p_giveback = h_probs.get('giveback', 0.0) # استخدام العتبات الحية if hydra_result['action'] == 'HOLD': if p_crash >= SystemLimits.HYDRA_CRASH_THRESH: hydra_result['action'] = 'EXIT_HARD' hydra_result['reason'] = f"Hydra Crash Risk {p_crash:.2f}" elif p_giveback >= SystemLimits.HYDRA_GIVEBACK_THRESH: hydra_result['action'] = 'EXIT_SOFT' hydra_result['reason'] = f"Hydra Giveback Risk {p_giveback:.2f}" # 2. Legacy (Volume-Aware Veto) legacy_result = {'action': 'HOLD', 'reason': 'Disabled', 'scores': {}} if self.guardian_legacy and self.guardian_legacy.initialized: # تحديث العتبات قبل التحليل self.guardian_legacy.configure_thresholds( v2_panic=SystemLimits.LEGACY_V2_PANIC_THRESH, v3_hard=SystemLimits.LEGACY_V3_HARD_THRESH, v3_soft=SystemLimits.LEGACY_V3_SOFT_THRESH, v3_ultra=SystemLimits.LEGACY_V3_ULTRA_THRESH ) entry_price = float(trade_context.get('entry_price', 0.0)) vol_30m = trade_context.get('volume_30m_usd', 0.0) legacy_result = self.guardian_legacy.analyze_position( ohlcv_1m, ohlcv_5m, ohlcv_15m, entry_price, order_book=order_book_snapshot, volume_30m_usd=vol_30m ) # 3. Final Arbitration & Display h_probs = hydra_result.get('probs', {}) l_scores = legacy_result.get('scores', {}) h_c = h_probs.get('crash', 0.0) h_g = h_probs.get('giveback', 0.0) h_s = h_probs.get('stagnation', 0.0) l_v2 = l_scores.get('v2', 0.0) l_v3 = l_scores.get('v3', 0.0) stamp_str = f"🐲[C:{h_c:.0%}|G:{h_g:.0%}|S:{h_s:.0%}] 🕸️[V2:{l_v2:.0%}|V3:{l_v3:.0%}]" final_action = 'HOLD' final_reason = f"Safe. {stamp_str}" if hydra_result['action'] in ['EXIT_HARD', 'EXIT_SOFT', 'TIGHTEN_SL', 'TRAIL_SL']: final_action = hydra_result['action'] final_reason = f"🐲 HYDRA: {hydra_result['reason']} | {stamp_str}" elif legacy_result['action'] in ['EXIT_HARD', 'EXIT_SOFT']: final_action = legacy_result['action'] final_reason = f"🕸️ LEGACY: {legacy_result['reason']} | {stamp_str}" return { 'action': final_action, 'reason': final_reason, 'detailed_log': f"{final_action} | {stamp_str}", 'probs': h_probs, 'scores': l_scores } async def run_advanced_monte_carlo(self, symbol, timeframe='1h'): if self.mc_analyzer and self.data_manager: try: ohlcv = await self.data_manager.get_latest_ohlcv(symbol, timeframe, limit=300) if ohlcv: return self.mc_analyzer.run_advanced_simulation([c[4] for c in ohlcv]) except Exception: pass return 0.0