| |
| |
| |
|
|
| import asyncio |
| import uuid |
| import time |
| import traceback |
| import json |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Any |
|
|
| |
| from smart_portfolio import SmartPortfolio |
| from ml_engine.processor import SystemLimits |
| from governance_engine import GovernanceEngine |
|
|
| class TradeManager: |
| def __init__(self, r2_service, data_manager, processor): |
| self.r2 = r2_service |
| self.data_manager = data_manager |
| self.processor = processor |
| |
| |
| self.learning_hub = None |
| |
| |
| self.smart_portfolio = SmartPortfolio(r2_service, data_manager) |
| self.governance = GovernanceEngine() |
| |
| self.open_positions = {} |
| self.watchlist = {} |
| self.sentry_tasks = {} |
| |
| |
| self.pending_verifications = [] |
| |
| self.running = True |
| self.latest_guardian_log = "🛡️ Guardian & Governance Systems Online." |
| self.FEE_RATE = 0.001 |
| self.ORACLE_CHECK_INTERVAL = 900 |
| |
| |
| self.ai_stats = { |
| "hybrid": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0}, |
| "crash": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0}, |
| "giveback": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0}, |
| "stagnation": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0} |
| } |
|
|
| |
| self.type_stats = { |
| "SAFE_BOTTOM": {"wins": 0, "losses": 0, "profit_usd": 0.0, "loss_usd": 0.0}, |
| "MOMENTUM_LAUNCH": {"wins": 0, "losses": 0, "profit_usd": 0.0, "loss_usd": 0.0}, |
| "ACCUMULATION_SQUEEZE": {"wins": 0, "losses": 0, "profit_usd": 0.0, "loss_usd": 0.0} |
| } |
| |
| self.execution_lock = asyncio.Lock() |
| print(f"🛡️ [TradeManager V67.4] Full Systems Online (Split PnL Audit).") |
|
|
| async def initialize_sentry_exchanges(self): |
| """تهيئة المحفظة واستعادة كافة البيانات""" |
| print("🛡️ [TradeManager] Syncing state & Initializing Portfolio...") |
| await self.smart_portfolio.initialize() |
| await self.sync_internal_state_with_r2() |
| |
| |
| try: |
| saved_stats = await self.r2.get_guardian_stats_async() |
| if saved_stats: self.ai_stats = saved_stats |
| print(" -> [Stats] Guardian metrics loaded from R2.") |
| except Exception: pass |
|
|
| |
| await self._load_type_stats_from_r2() |
| |
| |
| await self._load_pending_verifications() |
| asyncio.create_task(self._verification_engine_loop()) |
|
|
| |
| |
| |
| async def _load_pending_verifications(self): |
| """تحميل التذاكر التي لم تكتمل مدتها من R2""" |
| try: |
| data = await self.r2.get_file_json_async("diagnostics/pending_verifications.json") |
| if data: |
| self.pending_verifications = data |
| print(f" 🕵️ [Verification] Loaded {len(self.pending_verifications)} pending audits.") |
| except Exception: |
| self.pending_verifications = [] |
|
|
| async def _save_pending_verifications(self): |
| """حفظ التذاكر الحالية لضمان عدم ضياعها عند إعادة التشغيل""" |
| try: |
| await self.r2.upload_json_async(self.pending_verifications, "diagnostics/pending_verifications.json") |
| except Exception as e: |
| print(f"❌ Error saving verifications: {e}") |
|
|
| async def _register_verification_ticket(self, symbol, entry_price, size_usd, votes): |
| """إنشاء تذكرة جديدة عند دخول الصفقة""" |
| ticket = { |
| "id": str(uuid.uuid4()), |
| "symbol": symbol, |
| "entry_price": float(entry_price), |
| "size_usd": float(size_usd), |
| "entry_time": datetime.now().isoformat(), |
| "votes": votes, |
| "target_time": (datetime.now() + timedelta(hours=1)).isoformat() |
| } |
| self.pending_verifications.append(ticket) |
| await self._save_pending_verifications() |
| print(f" 🎫 [Verification] Ticket created for {symbol}. Result in 1h.") |
|
|
| async def _verification_engine_loop(self): |
| """المحرك الخلفي: يعمل بشكل مستقل لفحص التذاكر المستحقة (Entry Models Audit)""" |
| print(" ⚙️ [Verification Engine] Started background audit loop...") |
| |
| |
| models_to_track = ["Titan", "Patterns", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"] |
|
|
| while self.running: |
| try: |
| await asyncio.sleep(60) |
| if not self.pending_verifications: continue |
| |
| now = datetime.now() |
| updated = False |
| remaining_tickets = [] |
| |
| |
| tickets_to_process = list(self.pending_verifications) |
| model_updates = {} |
| |
| for ticket in tickets_to_process: |
| target_time = datetime.fromisoformat(ticket['target_time']) |
| |
| if now >= target_time: |
| |
| symbol = ticket['symbol'] |
| entry_p = ticket['entry_price'] |
| size = ticket['size_usd'] |
| votes = ticket['votes'] |
| |
| |
| curr_p = await self.data_manager.get_latest_price_async(symbol) |
| |
| if curr_p > 0: |
| pnl_pct = (curr_p - entry_p) / entry_p |
| pnl_usd = pnl_pct * size |
| |
| is_win = pnl_pct > 0 |
| result_str = "WIN" if is_win else "LOSS" |
| |
| print(f" 🕵️ [Audit Complete] {symbol}: {result_str} after 1h ({pnl_pct:+.2f}%) -> Net: ${pnl_usd:.2f}") |
| |
| |
| for model in models_to_track: |
| if votes.get(model, False): |
| if model not in model_updates: |
| model_updates[model] = {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} |
| |
| if is_win: |
| model_updates[model]["wins"] += 1 |
| model_updates[model]["profit_accum"] += pnl_usd |
| else: |
| model_updates[model]["losses"] += 1 |
| model_updates[model]["loss_accum"] += abs(pnl_usd) |
| |
| model_updates[model]["pnl"] += pnl_usd |
| |
| updated = True |
| else: |
| |
| remaining_tickets.append(ticket) |
| else: |
| |
| remaining_tickets.append(ticket) |
| |
| if updated: |
| |
| if model_updates: |
| current_stats = await self.r2.get_diagnostic_stats_async() |
| for model, change in model_updates.items(): |
| if model not in current_stats: |
| current_stats[model] = {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} |
| |
| current_stats[model]["wins"] += change["wins"] |
| current_stats[model]["losses"] += change["losses"] |
| current_stats[model]["pnl"] += change["pnl"] |
| |
| current_stats[model]["profit_accum"] = current_stats[model].get("profit_accum", 0.0) + change["profit_accum"] |
| current_stats[model]["loss_accum"] = current_stats[model].get("loss_accum", 0.0) + change["loss_accum"] |
|
|
| await self.r2.update_diagnostic_stats_async(model_updates) |
|
|
| self.pending_verifications = remaining_tickets |
| await self._save_pending_verifications() |
| |
| except Exception as e: |
| print(f"❌ [Verification Loop Error] {e}") |
| await asyncio.sleep(60) |
|
|
| |
| |
| |
| async def _load_type_stats_from_r2(self): |
| """تحميل إحصائيات أنواع العملات من R2""" |
| try: |
| saved_stats = await self.r2.get_file_json_async("stats/coin_type_performance_v1.json") |
| if saved_stats: |
| self.type_stats = saved_stats |
| print(" 📊 [Stats] Coin Type performance loaded.") |
| else: |
| print(" ℹ️ [Stats] No existing type stats found. Starting fresh.") |
| except Exception: |
| print(" ⚠️ [Stats] Error loading type stats.") |
|
|
| async def _save_type_stats_to_r2(self): |
| """حفظ إحصائيات الأنواع إلى R2""" |
| try: |
| await self.r2.upload_json_async(self.type_stats, "stats/coin_type_performance_v1.json") |
| except Exception as e: |
| print(f"❌ Failed to save type stats: {e}") |
|
|
| async def sync_internal_state_with_r2(self): |
| """استرجاع الصفقات المفتوحة من R2""" |
| try: |
| open_trades_list = await self.r2.get_open_trades_async() |
| self.open_positions = {trade['symbol']: trade for trade in open_trades_list} |
| print(f" -> [Sync] Recovered {len(self.open_positions)} active trades.") |
| |
| total_allocated = sum(float(t.get('entry_capital', 0.0)) for t in self.open_positions.values()) |
| self.smart_portfolio.state["allocated_capital_usd"] = total_allocated |
| except Exception as e: |
| print(f"❌ [TradeManager] R2 Sync Failed: {e}") |
| self.open_positions = {} |
|
|
| async def ensure_active_guardians(self): |
| """التأكد من أن كل صفقة مفتوحة لها حارس""" |
| active_symbols = list(self.open_positions.keys()) |
| if not active_symbols: return "💤 No active trades." |
| |
| restored_count = 0 |
| status_msgs = [] |
| |
| for symbol in active_symbols: |
| task = self.sentry_tasks.get(symbol) |
| is_alive = task and not task.done() |
| |
| if not is_alive: |
| print(f"🚨 [Watchdog] Found DEAD guardian for {symbol}. Resurrecting...") |
| self.sentry_tasks[symbol] = asyncio.create_task(self._guardian_loop(symbol)) |
| restored_count += 1 |
| status_msgs.append(f"♻️ Resurrected {symbol}") |
| else: |
| status_msgs.append(f"✅ {symbol} Running") |
| |
| if restored_count > 0: |
| self.latest_guardian_log = f"⚠️ Watchdog restored: {', '.join(status_msgs)}" |
| return f"⚠️ Watchdog restored {restored_count} guardians." |
| return "✅ All guardians active." |
|
|
| |
| |
| |
| def _snapshot_model_votes(self, signal_data: Dict[str, Any]) -> Dict[str, bool]: |
| """ |
| تحدد أي النماذج صوّتت بـ 'شراء' وقت الدخول. (Updated for News & MC_A) |
| """ |
| votes = {} |
| limits = signal_data.get('dynamic_limits', {}) |
| comps = signal_data.get('components', {}) or {} |
|
|
| votes['Titan'] = comps.get('titan_score', signal_data.get('titan_score', 0)) > 0.5 |
| votes['Patterns'] = comps.get('patterns_score', signal_data.get('patterns_score', 0)) > 0.5 |
| |
| oracle_thresh = limits.get('l3_oracle_thresh', SystemLimits.L3_CONFIDENCE_THRESHOLD) |
| votes['Oracle'] = signal_data.get('confidence', 0) >= oracle_thresh |
|
|
| sniper_thresh = limits.get('l4_sniper_thresh', SystemLimits.L4_ENTRY_THRESHOLD) |
| votes['Sniper'] = signal_data.get('sniper_score', 0) >= sniper_thresh |
|
|
| |
| votes['MonteCarlo_L'] = comps.get('mc_score', 0.5) > 0.5 |
| votes['MonteCarlo_A'] = signal_data.get('mc_advanced_score', 0) > 0.0 |
| |
| |
| votes['News'] = signal_data.get('news_score', 0) > 0.02 |
|
|
| votes['Governance'] = signal_data.get('governance_grade', 'REJECT') != 'REJECT' |
|
|
| return votes |
|
|
| async def select_and_execute_best_signal(self, oracle_approved_signals: List[Dict[str, Any]]): |
| """اختيار أفضل إشارة وتنفيذها""" |
| if not self.processor.initialized: await self.processor.initialize() |
| sniper_candidates = [] |
| print(f"\n🔎 [Sniper] Scanning {len(oracle_approved_signals)} candidates...") |
|
|
| for signal in oracle_approved_signals: |
| symbol = signal['symbol'] |
| if signal.get('action_type') != 'BUY': continue |
| if symbol in self.open_positions: continue |
|
|
| ohlcv_task = self.data_manager.get_latest_ohlcv(symbol, '1m', 1000) |
| ob_task = self.data_manager.get_order_book_snapshot(symbol) |
| ohlcv_1m, order_book = await asyncio.gather(ohlcv_task, ob_task) |
| |
| if not ohlcv_1m or len(ohlcv_1m) < 100: |
| print(f" -> ⚠️ [Skip] {symbol}: Insufficient 1m data.") |
| continue |
| |
| |
| sniper_result = await self.processor.check_sniper_entry(ohlcv_1m, order_book, context_data=signal) |
| |
| sniper_signal = sniper_result.get('signal', 'WAIT') |
| final_conf = sniper_result.get('confidence_prob', 0.0) |
| reason_str = sniper_result.get('reason', 'N/A') |
|
|
| |
| log_msg = (f" -> 🔭 {symbol:<6} | Decision: {sniper_signal} | Score: {final_conf:.2f} | Reason: {reason_str}") |
| print(log_msg) |
|
|
| if sniper_signal == 'BUY': |
| signal['sniper_entry_price'] = sniper_result.get('entry_price', 0) |
| signal['sniper_score'] = final_conf |
| |
| if 'components' not in signal: signal['components'] = {} |
| signal['components']['sniper_score'] = final_conf |
| |
| sniper_candidates.append(signal) |
|
|
| if not sniper_candidates: |
| print(" -> 📉 No candidates passed the Sniper L4 check.") |
| return |
|
|
| sniper_candidates.sort(key=lambda x: (x.get('confidence', 0) + x.get('sniper_score', 0)), reverse=True) |
| best_signal = sniper_candidates[0] |
| |
| async with self.execution_lock: |
| print(f"🚀 [EXECUTING] Attempting entry for best candidate: {best_signal['symbol']}") |
| await self._execute_entry_from_signal(best_signal['symbol'], best_signal) |
|
|
| async def _execute_entry_from_signal(self, symbol, signal_data): |
| """تنفيذ الدخول الفعلي""" |
| try: |
| print(f" 🏛️ [Governance] Convening Senate for {symbol}...") |
| |
| t15_task = self.data_manager.get_latest_ohlcv(symbol, '15m', 200) |
| t1h_task = self.data_manager.get_latest_ohlcv(symbol, '1h', 200) |
| ob_task = self.data_manager.get_order_book_snapshot(symbol) |
| |
| t15, t1h, ob = await asyncio.gather(t15_task, t1h_task, ob_task) |
| ohlcv_dict = {'15m': t15, '1h': t1h} |
| |
| strategy_type = signal_data.get('strategy_type', 'NORMAL') |
| |
| |
| gov_decision = await self.governance.evaluate_trade(symbol, ohlcv_dict, ob, strategy_type=strategy_type) |
| |
| if gov_decision['grade'] == 'REJECT': |
| print(f"⛔ [Governance VETO] {symbol} Rejected. Grade: REJECT") |
| return |
|
|
| print(f" ✅ [Governance PASS] Grade: {gov_decision['grade']} | Score: {gov_decision['governance_score']:.1f}") |
| |
| signal_data['governance_grade'] = gov_decision['grade'] |
| signal_data['governance_score'] = gov_decision['governance_score'] |
| signal_data['governance_details'] = gov_decision['components'] |
|
|
| |
| is_approved, plan = await self.smart_portfolio.request_entry_approval(signal_data, len(self.open_positions)) |
| |
| if not is_approved: |
| print(f"⛔ [Portfolio Rejection] {symbol}: {plan.get('reason')}") |
| return |
|
|
| approved_size_usd = plan['approved_size_usd'] |
| approved_tp = plan['approved_tp'] |
| |
| trade_id = str(uuid.uuid4()) |
| current_price = float(signal_data.get('sniper_entry_price', 0.0)) |
| if current_price <= 0.0: current_price = await self.data_manager.get_latest_price_async(symbol) |
| entry_fee_usd = approved_size_usd * self.FEE_RATE |
|
|
| |
| model_votes = self._snapshot_model_votes(signal_data) |
|
|
| decision_snapshot = { |
| 'components': signal_data.get('components', {}), |
| 'oracle_conf': signal_data.get('confidence', 0), |
| 'governance_grade': gov_decision['grade'], |
| 'governance_score': gov_decision['governance_score'], |
| 'governance_details': gov_decision['components'], |
| 'system_confidence': plan.get('system_confidence', 0.5), |
| 'market_mood': plan.get('market_mood', 'N/A'), |
| 'regime_at_entry': getattr(SystemLimits, 'CURRENT_REGIME', 'UNKNOWN'), |
| |
| 'dynamic_limits': signal_data.get('dynamic_limits', {}), |
| 'asset_regime': signal_data.get('asset_regime', 'UNKNOWN') |
| } |
|
|
| new_trade = { |
| 'id': trade_id, |
| 'symbol': symbol, |
| 'entry_price': current_price, |
| 'direction': 'LONG', |
| 'entry_time': datetime.now().isoformat(), |
| 'status': 'OPEN', |
| 'tp_price': approved_tp, |
| 'sl_price': float(signal_data.get('sl_price', current_price * 0.95)), |
| 'last_update': datetime.now().isoformat(), |
| 'last_oracle_check': datetime.now().isoformat(), |
| 'strategy': 'OracleV4_Governance_Hydra', |
| 'entry_capital': approved_size_usd, |
| 'entry_fee_usd': entry_fee_usd, |
| 'decision_data': decision_snapshot, |
| 'highest_price': current_price, |
| 'strategy_type': strategy_type, |
| 'model_votes': model_votes |
| } |
| |
| self.open_positions[symbol] = new_trade |
| if self.watchlist: self.watchlist.clear() |
| |
| |
| await self.smart_portfolio.register_new_position(approved_size_usd) |
| |
| |
| portfolio_state = await self.r2.get_portfolio_state_async() |
| if portfolio_state.get('first_trade_timestamp') is None: |
| portfolio_state['first_trade_timestamp'] = new_trade['entry_time'] |
| await self.r2.save_portfolio_state_async(portfolio_state) |
| |
| await self.r2.save_open_trades_async(list(self.open_positions.values())) |
| |
| |
| if symbol in self.sentry_tasks: self.sentry_tasks[symbol].cancel() |
| self.sentry_tasks[symbol] = asyncio.create_task(self._guardian_loop(symbol)) |
| |
| |
| await self._register_verification_ticket(symbol, current_price, approved_size_usd, model_votes) |
| |
| print(f"✅ [ENTRY] {symbol} @ {current_price} | Type: {strategy_type} | Grade: {gov_decision['grade']} | Size: ${approved_size_usd:.2f}") |
|
|
| except Exception as e: |
| print(f"❌ [Entry Error] {symbol}: {e}") |
| traceback.print_exc() |
|
|
| async def _guardian_loop(self, symbol: str): |
| """حلقة الحراسة المستمرة (Guardian Loop)""" |
| print(f"🛡️ [Dual-Core] STARTING WATCH for {symbol}...") |
| last_ai_check_time = 0 |
| |
| while self.running: |
| if symbol not in self.open_positions: break |
| try: |
| await asyncio.sleep(1) |
| trade = self.open_positions.get(symbol) |
| if not trade: break |
| |
| current_ticker_price = await self.data_manager.get_latest_price_async(symbol) |
| |
| |
| if 'highest_price' not in trade: trade['highest_price'] = float(trade['entry_price']) |
| if current_ticker_price > float(trade['highest_price']): trade['highest_price'] = current_ticker_price |
|
|
| |
| if current_ticker_price >= trade['tp_price']: |
| print(f"🎯 [TP HIT] {symbol} @ {current_ticker_price}") |
| async with self.execution_lock: await self._execute_exit(symbol, trade['tp_price'], "TP_HIT") |
| break |
| if current_ticker_price <= trade['sl_price']: |
| print(f"🛑 [SL HIT] {symbol} @ {current_ticker_price}") |
| async with self.execution_lock: await self._execute_exit(symbol, trade['sl_price'], "SL_HIT") |
| break |
|
|
| |
| if time.time() - last_ai_check_time > 60: |
| t1 = self.data_manager.get_latest_ohlcv(symbol, '1m', 1000) |
| t5 = self.data_manager.get_latest_ohlcv(symbol, '5m', 300) |
| t15 = self.data_manager.get_latest_ohlcv(symbol, '15m', 200) |
| tob = self.data_manager.get_order_book_snapshot(symbol) |
| |
| try: d1, d5, d15, d_ob = await asyncio.gather(t1, t5, t15, tob) |
| except: continue |
| |
| if d1 and d5 and d15 and len(d5) >= 6: |
| last_6_5m = d5[-6:] |
| vol_30m_sum = sum([float(c[5]) * float(c[4]) for c in last_6_5m]) |
| |
| |
| highest_price = float(trade['highest_price']) |
| time_in_trade_mins = (datetime.now() - datetime.fromisoformat(trade['entry_time'])).total_seconds() / 60 |
| |
| context_data = { |
| 'entry_price': trade['entry_price'], |
| 'tp_price': trade['tp_price'], |
| 'sl_price': trade['sl_price'], |
| 'entry_time': trade['entry_time'], |
| 'oracle_conf': trade.get('decision_data', {}).get('oracle_conf', 0.8), |
| 'system_conf': trade.get('decision_data', {}).get('system_confidence', 0.8), |
| 'highest_price': highest_price, |
| 'time_in_trade_mins': time_in_trade_mins, |
| 'volume_30m_usd': vol_30m_sum, |
| |
| 'dynamic_limits': trade.get('decision_data', {}).get('dynamic_limits', {}) |
| } |
|
|
| |
| decision = self.processor.consult_dual_guardians(symbol, d1, d5, d15, context_data, order_book_snapshot=d_ob) |
| action = decision.get('action', 'HOLD') |
| reason = decision.get('reason', '') |
| ai_metrics = decision.get('probs') or decision.get('scores') or {} |
|
|
| self.latest_guardian_log = f"🛡️ {action} | {reason}" |
| |
| if action in ['EXIT_HARD', 'EXIT_SOFT']: |
| print(f"🐲 [Dual-Core Trigger] {action}: {reason}") |
| async with self.execution_lock: |
| await self._execute_exit(symbol, current_ticker_price, f"DualGuard_{action}", ai_scores=ai_metrics) |
| break |
| elif action in ['TIGHTEN_SL', 'TRAIL_SL']: |
| await self._handle_sl_update(symbol, action, trade, current_ticker_price) |
|
|
| last_ai_check_time = time.time() |
| self.open_positions[symbol]['last_update'] = datetime.now().isoformat() |
| |
| |
| last_oracle = datetime.fromisoformat(trade.get('last_oracle_check', datetime.now().isoformat())) |
| if (datetime.now() - last_oracle).total_seconds() > self.ORACLE_CHECK_INTERVAL: |
| self.open_positions[symbol]['last_oracle_check'] = datetime.now().isoformat() |
| await self._consult_oracle_strategy_update(symbol, trade) |
|
|
| except asyncio.CancelledError: break |
| except Exception as e: |
| print(f"❌ [Sentry Error] {symbol}: {e}"); traceback.print_exc(); await asyncio.sleep(5) |
|
|
| async def _handle_sl_update(self, symbol, action, trade, current_price): |
| """تحديث وقف الخسارة ديناميكياً""" |
| if action == 'TIGHTEN_SL': |
| entry_p = float(trade['entry_price']) |
| if float(trade['sl_price']) < entry_p: |
| print(f"🛡️ [Dual-Core] TIGHTEN_SL -> Entry {entry_p}") |
| self.open_positions[symbol]['sl_price'] = entry_p |
| await self.r2.save_open_trades_async(list(self.open_positions.values())) |
|
|
| elif action == 'TRAIL_SL': |
| entry_p = float(trade['entry_price']) |
| if current_price > entry_p: |
| potential_sl = entry_p + ((current_price - entry_p) * 0.5) |
| if potential_sl > float(trade['sl_price']): |
| print(f"🛡️ [Dual-Core] TRAIL_SL -> {potential_sl:.4f}") |
| self.open_positions[symbol]['sl_price'] = potential_sl |
| await self.r2.save_open_trades_async(list(self.open_positions.values())) |
|
|
| async def _consult_oracle_strategy_update(self, symbol, trade): |
| try: |
| tasks = [self.data_manager.get_latest_ohlcv(symbol, tf, limit=100) for tf in ["15m", "1h", "4h"]] |
| results = await asyncio.gather(*tasks) |
| ohlcv_data = {tf: res for tf, res in zip(["15m", "1h", "4h"], results) if res} |
| if '1h' not in ohlcv_data: return |
| |
| curr_p = await self.data_manager.get_latest_price_async(symbol) |
| |
| decision_data = trade.get('decision_data', {}) |
| saved_limits = decision_data.get('dynamic_limits', {}) |
| saved_regime = decision_data.get('asset_regime', 'UNKNOWN') |
|
|
| raw_input = { |
| 'symbol': symbol, |
| 'ohlcv': ohlcv_data, |
| 'current_price': curr_p, |
| 'dynamic_limits': saved_limits, |
| 'asset_regime': saved_regime |
| } |
|
|
| l2 = await self.processor.process_compound_signal(raw_input) |
| if not l2: return |
| |
| oracle = await self.processor.consult_oracle(l2) |
| if oracle.get('action') == 'WAIT' or oracle.get('direction') == 'SHORT': |
| print(f"🚨 [Oracle] Outlook Bearish (Re-Check). Exiting {symbol}...") |
| await self.force_exit_by_manager(symbol, reason="Oracle_Bearish_Flip") |
| return |
| except Exception: pass |
|
|
| |
| |
| |
| def _launch_post_exit_analysis(self, symbol, exit_price, exit_time, position_size_usd, ai_scores=None, trade_obj=None): |
| """إطلاق مهمة التدقيق الخلفي (Audit) لقرار الخروج""" |
| asyncio.create_task(self._analyze_after_exit_task(symbol, exit_price, exit_time, position_size_usd, ai_scores, trade_obj)) |
|
|
| def _update_specific_stat(self, key, is_good, usd_impact): |
| if key not in self.ai_stats: return |
| self.ai_stats[key]["total"] += 1 |
| if is_good: self.ai_stats[key]["good"] += 1; self.ai_stats[key]["saved"] += abs(usd_impact) |
| else: self.ai_stats[key]["missed"] += abs(usd_impact) |
|
|
| async def _analyze_after_exit_task(self, symbol, exit_price, exit_time, position_size_usd, ai_scores, trade_obj): |
| """مهمة التدقيق: هل كان الخروج صحيحاً؟""" |
| await asyncio.sleep(900) |
| try: |
| curr = await self.data_manager.get_latest_price_async(symbol) |
| if curr == 0: return |
| |
| change_pct = (curr - exit_price) / exit_price |
| usd_impact = change_pct * position_size_usd |
| is_good_exit = change_pct < 0 |
| |
| self._update_specific_stat("hybrid", is_good_exit, usd_impact) |
| |
| record = {"symbol": symbol, "exit_price": exit_price, "price_15m": curr, "usd_impact": usd_impact, "verdict": "SUCCESS" if is_good_exit else "MISS"} |
| await self.r2.append_deep_steward_audit(record) |
| |
| |
| await self.r2.save_guardian_stats_async(self.ai_stats) |
| |
| except Exception: pass |
|
|
| async def _execute_exit(self, symbol, price, reason, ai_scores=None): |
| """تنفيذ الخروج وتحديث الإحصائيات""" |
| if symbol not in self.open_positions: return |
| try: |
| trade = self.open_positions.pop(symbol) |
| entry_price = float(trade['entry_price']); exit_price = float(price) |
| entry_capital = float(trade.get('entry_capital', 100.0)); entry_fee = float(trade.get('entry_fee_usd', 0.0)) |
| |
| exit_val_gross = (exit_price / entry_price) * entry_capital |
| exit_fee = exit_val_gross * self.FEE_RATE |
| total_fees = entry_fee + exit_fee |
| |
| gross_pnl_usd = exit_val_gross - entry_capital |
| true_net_pnl_usd = gross_pnl_usd - total_fees |
| true_net_pct = (true_net_pnl_usd / entry_capital) * 100 |
| |
| |
| await self.smart_portfolio.register_closed_position(entry_capital, gross_pnl_usd, total_fees) |
| |
| trade.update({ |
| 'status': 'CLOSED', 'exit_price': exit_price, 'exit_reason': reason, |
| 'profit_pct': true_net_pct, 'net_pnl_usd': true_net_pnl_usd, 'fees_paid_usd': total_fees, |
| 'exit_time': datetime.now().isoformat() |
| }) |
| |
| |
| portfolio = await self.r2.get_portfolio_state_async() |
| portfolio['total_trades'] = portfolio.get('total_trades', 0) + 1 |
| is_win = true_net_pnl_usd >= 0 |
|
|
| if is_win: |
| portfolio['winning_trades'] = portfolio.get('winning_trades', 0) + 1 |
| portfolio['total_profit_usd'] = portfolio.get('total_profit_usd', 0) + true_net_pnl_usd |
| trade['result'] = 'WIN' |
| else: |
| portfolio['losing_trades'] = portfolio.get('losing_trades', 0) + 1 |
| portfolio['total_loss_usd'] = portfolio.get('total_loss_usd', 0) + abs(true_net_pnl_usd) |
| trade['result'] = 'LOSS' |
|
|
| |
| strat_type = trade.get('strategy_type', 'UNKNOWN') |
| if strat_type in self.type_stats: |
| if is_win: |
| self.type_stats[strat_type]['wins'] += 1 |
| self.type_stats[strat_type]['profit_usd'] += true_net_pnl_usd |
| else: |
| self.type_stats[strat_type]['losses'] += 1 |
| self.type_stats[strat_type]['loss_usd'] += true_net_pnl_usd |
| |
| asyncio.create_task(self._save_type_stats_to_r2()) |
|
|
| await self.r2.save_portfolio_state_async(portfolio) |
| await self.r2.save_open_trades_async(list(self.open_positions.values())) |
| await self.r2.append_to_closed_trades_history(trade) |
| |
| print(f"✅ [EXIT] {symbol} | Type: {strat_type} | PnL: {true_net_pct:.2f}% (${true_net_pnl_usd:.2f}) | {reason}") |
| |
| |
| |
| |
| try: |
| decision_data = trade.get('decision_data', {}) |
| if 'governance_grade' in decision_data: |
| training_record = { |
| "symbol": symbol, |
| "entry_time": trade['entry_time'], |
| "exit_time": trade['exit_time'], |
| "governance_grade": decision_data['governance_grade'], |
| "governance_score": decision_data.get('governance_score', 0), |
| "governance_components": decision_data.get('governance_details', {}), |
| "entry_price": trade['entry_price'], |
| "exit_price": trade['exit_price'], |
| "profit_pct": true_net_pct, |
| "result": trade['result'], |
| "strategy_type": strat_type |
| } |
| asyncio.create_task(self.r2.append_governance_training_data(training_record)) |
| except Exception as ge: |
| print(f"⚠️ [Learning Error] Failed to save governance training data: {ge}") |
|
|
| |
| |
| |
| if self.learning_hub and hasattr(self.learning_hub, 'tuner') and self.learning_hub.tuner: |
| print(f"🔧 [Tuner] Registering result for {strat_type}") |
| |
| asyncio.create_task(self.learning_hub.tuner.register_trade_result(strat_type, is_win)) |
| |
| |
| self._launch_post_exit_analysis(symbol, exit_price, trade.get('exit_time'), entry_capital, ai_scores, trade) |
| |
| self.latest_guardian_log = f"✅ Closed {symbol} ({reason})" |
| if symbol in self.sentry_tasks: self.sentry_tasks[symbol].cancel(); del self.sentry_tasks[symbol] |
|
|
| except Exception as e: |
| print(f"❌ [Exit Error] {e}"); traceback.print_exc() |
| if symbol not in self.open_positions: self.open_positions[symbol] = trade |
|
|
| async def force_exit_by_manager(self, symbol, reason): |
| p = await self.data_manager.get_latest_price_async(symbol) |
| async with self.execution_lock: await self._execute_exit(symbol, p, reason) |
|
|
| async def start_sentry_loops(self): |
| await self.ensure_active_guardians() |
|
|
| async def stop_sentry_loops(self): |
| self.running = False |
| for task in self.sentry_tasks.values(): task.cancel() |