Spaces:
Sleeping
Sleeping
| # ============================================================================== | |
| # ๐ก๏ธ trade_manager.py (V82.5 - GEM-Architect: Self-Healing Accounting) | |
| # ============================================================================== | |
| # - Fix: Forces Atomic R2 updates on trade exit. | |
| # - Feature: "Self-Healing" logic detects accounting drift and auto-corrects stats. | |
| # - Fix: Ensures 'smart_portfolio' state is persisted immediately after exit. | |
| # ============================================================================== | |
| import asyncio | |
| import uuid | |
| import time | |
| import traceback | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional | |
| # --- Core Modules --- | |
| from smart_portfolio import SmartPortfolio | |
| from ml_engine.processor import SystemLimits | |
| from governance_engine import GovernanceEngine | |
| # --- Logger Setup --- | |
| logger = logging.getLogger("TradeManager") | |
| class TradeManager: | |
| """ | |
| The Central Nervous System of the Trading Operation. | |
| Manages Entries, Exits, Position Monitoring, State Persistence, and Learning Feedback. | |
| """ | |
| def __init__(self, r2_service, data_manager, processor): | |
| self.r2 = r2_service | |
| self.data_manager = data_manager | |
| self.processor = processor | |
| # --- External Connections --- | |
| self.learning_hub = None | |
| self.agency_director = None | |
| # --- Internal Engines --- | |
| self.smart_portfolio = SmartPortfolio(r2_service, data_manager) | |
| self.governance = GovernanceEngine() | |
| # --- State Management --- | |
| self.open_positions: Dict[str, Dict] = {} | |
| self.watchlist: Dict[str, Dict] = {} | |
| self.sentry_tasks: Dict[str, asyncio.Task] = {} | |
| # --- Advanced Tracking --- | |
| self.pending_verifications = [] | |
| self.shadow_watchlist = [] | |
| # --- System Flags --- | |
| self.running = True | |
| self.execution_lock = asyncio.Lock() | |
| self.latest_guardian_log = "๐ก๏ธ Guardian & Governance Systems Online." | |
| # --- Constants --- | |
| self.FEE_RATE = 0.001 # 0.1% per trade | |
| # --- Statistics Containers --- | |
| self.ai_stats = { | |
| "hybrid": {"total":0, "good":0, "saved":0.0, "missed":0.0}, | |
| "crash": {"total":0, "good":0, "saved":0.0, "missed":0.0}, | |
| "giveback": {"total":0, "good":0, "saved":0.0, "missed":0.0}, | |
| "stagnation": {"total":0, "good":0, "saved":0.0, "missed":0.0} | |
| } | |
| self.type_stats = { | |
| "SAFE_BOTTOM": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0}, | |
| "MOMENTUM_LAUNCH": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0}, | |
| "ACCUMULATION_SQUEEZE": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0}, | |
| "NORMAL": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0} | |
| } | |
| print(f"๐ก๏ธ [TradeManager V82.5] Accounting Integrity Systems Active.") | |
| # ========================================================================== | |
| # ๐๏ธ SECTION 1: SELECTION & GOVERNANCE | |
| # ========================================================================== | |
| async def select_and_execute_best_signal(self, candidates: List[Dict]): | |
| if not candidates: return | |
| print(f"๐ [TradeManager] Validating {len(candidates)} candidates from Sniper L4...") | |
| best_candidate = None | |
| for cand in candidates: | |
| symbol = cand['symbol'] | |
| # 1. Sniper Threshold | |
| s_score = cand.get('sniper_score', 0.0) | |
| entry_threshold = SystemLimits.L4_ENTRY_THRESHOLD | |
| if s_score < entry_threshold: | |
| print(f" ๐ {symbol}: Sniper Score {s_score:.2f} < {entry_threshold} (Skipped)") | |
| self._add_to_shadow_watchlist(cand, f"Sniper Low Score ({s_score:.2f})") | |
| continue | |
| # 2. Portfolio Constraints | |
| if symbol in self.open_positions: | |
| print(f" โ ๏ธ {symbol}: Already in position.") | |
| continue | |
| if not await self.smart_portfolio.can_trade(): | |
| print(f" โ Portfolio Halted or Max Trades Reached.") | |
| return | |
| # 3. Governance Engine | |
| print(f" ๐๏ธ {symbol}: Consulting Governance Engine...") | |
| try: | |
| # Governance needs moderate history | |
| t1m = await self.data_manager.get_latest_ohlcv(symbol, '1m', 150) | |
| ob = await self.data_manager.get_order_book_snapshot(symbol) | |
| t15m = cand.get('ohlcv', {}).get('15m') | |
| if not t15m: | |
| t15m = await self.data_manager.get_latest_ohlcv(symbol, '15m', 150) | |
| ohlcv_dict = { | |
| '1m': t1m, | |
| '5m': await self.data_manager.get_latest_ohlcv(symbol, '5m', 150), | |
| '15m': t15m, | |
| '1h': cand.get('ohlcv', {}).get('1h', []), | |
| 'daily': await self.data_manager.get_latest_ohlcv(symbol, '1d', 60) | |
| } | |
| gov_decision = await self.governance.evaluate_trade( | |
| symbol=symbol, | |
| ohlcv_data=ohlcv_dict, | |
| order_book=ob, | |
| strategy_type=cand.get('strategy_type', 'NORMAL') | |
| ) | |
| cand['decision_data'] = gov_decision | |
| gov_status = gov_decision['status'] | |
| gov_score = gov_decision['governance_score'] | |
| if gov_status == "REJECTED": | |
| print(f" โ Governance Veto: {gov_decision.get('reason')} (Score: {gov_score:.1f})") | |
| self._add_to_shadow_watchlist(cand, f"Governance Veto: {gov_decision.get('reason')}") | |
| continue | |
| print(f" โ Governance APPROVED (Score: {gov_score:.1f})") | |
| best_candidate = cand | |
| break | |
| except Exception as e: | |
| print(f" โ Governance Error for {symbol}: {e}") | |
| traceback.print_exc() | |
| continue | |
| if best_candidate: | |
| await self._execute_entry(best_candidate) | |
| else: | |
| print("-> ๐ No candidates passed final Governance checks.") | |
| # ========================================================================== | |
| # ๐ SECTION 2: EXECUTION | |
| # ========================================================================== | |
| async def _execute_entry(self, candidate): | |
| symbol = candidate['symbol'] | |
| async with self.execution_lock: | |
| allocation = self.smart_portfolio.allocate_capital(candidate) | |
| amount_usd = allocation['amount_usd'] | |
| if amount_usd < 5.0: | |
| print(f"โ ๏ธ Insufficient capital allocated: ${amount_usd:.2f}") | |
| return | |
| print(f"๐ [EXECUTING] BUY {symbol} | Allocated: ${amount_usd:.2f}...") | |
| entry_price = float(candidate.get('sniper_entry_price', 0) or candidate.get('current_price', 0)) | |
| if entry_price <= 0: | |
| entry_price = await self.data_manager.get_latest_price_async(symbol) | |
| atr = candidate.get('atr_value', entry_price * 0.02) | |
| if atr == 0: atr = entry_price * 0.02 | |
| gov_data = candidate.get('decision_data', {}) | |
| reward_ratio = gov_data.get('reward_ratio', 2.0) | |
| sl_price = entry_price - (atr * 1.5) | |
| tp_price = entry_price + (atr * 1.5 * reward_ratio) | |
| if sl_price <= 0: sl_price = entry_price * 0.95 | |
| trade_id = str(uuid.uuid4())[:8] | |
| trade_record = { | |
| "trade_id": trade_id, | |
| "symbol": symbol, | |
| "entry_time": datetime.now().isoformat(), | |
| "entry_price": entry_price, | |
| "entry_capital": amount_usd, | |
| "tp_price": tp_price, | |
| "sl_price": sl_price, | |
| "quantity": amount_usd / entry_price, | |
| "strategy": candidate.get('strategy_type', 'NORMAL'), | |
| "decision_data": gov_data, | |
| "sniper_data": candidate.get('sniper_data', {}), | |
| "ai_scores": { | |
| "l2_score": candidate.get('l2_score', 0), | |
| "oracle": candidate.get('oracle_score', 0), | |
| "pattern": candidate.get('pattern_score', 0), | |
| "mc": candidate.get('mc_score', 0) | |
| }, | |
| "highest_price": entry_price, | |
| "status": "OPEN", | |
| "logs": [f"Trade opened at {entry_price}"] | |
| } | |
| self.open_positions[symbol] = trade_record | |
| self.smart_portfolio.register_trade_entry(amount_usd) | |
| # FORCE SYNC ON ENTRY | |
| await self._save_active_trades() | |
| if hasattr(self.smart_portfolio, '_save_state_to_r2'): | |
| await self.smart_portfolio._save_state_to_r2() | |
| if symbol not in self.sentry_tasks: | |
| self.sentry_tasks[symbol] = asyncio.create_task(self._sentry_guard(symbol)) | |
| print(f"โ Trade {trade_id} Active. TP: {tp_price:.4f} | SL: {sl_price:.4f}") | |
| if self.agency_director: | |
| event_payload = { | |
| "type": "TRADE_OPEN", | |
| "symbol": symbol, | |
| "price": entry_price, | |
| "size": amount_usd, | |
| "reason": gov_data.get('reason', 'Sniper Signal') | |
| } | |
| asyncio.create_task(self.agency_director.process_event("TRADE_OPEN", event_payload)) | |
| # ========================================================================== | |
| # ๐ก๏ธ SECTION 3: MONITORING (The Sentry) | |
| # ========================================================================== | |
| async def _sentry_guard(self, symbol): | |
| """ | |
| Continuous monitoring loop for an active trade. | |
| Updated V82.5: Fetches DEEP history for EMA200/V3 Model stability. | |
| """ | |
| logger.info(f"๐ก๏ธ Sentry started for {symbol}") | |
| try: | |
| while symbol in self.open_positions and self.running: | |
| trade = self.open_positions[symbol] | |
| # 1. Update Market Data | |
| curr_price = await self.data_manager.get_latest_price_async(symbol) | |
| if curr_price <= 0: | |
| await asyncio.sleep(5) | |
| continue | |
| if curr_price > trade['highest_price']: | |
| trade['highest_price'] = curr_price | |
| trade['current_price'] = curr_price | |
| # 2. Check Hard Exits | |
| if curr_price >= trade['tp_price']: | |
| await self._execute_exit(symbol, curr_price, "TAKE_PROFIT_HARD") | |
| break | |
| if curr_price <= trade['sl_price']: | |
| await self._execute_exit(symbol, curr_price, "STOP_LOSS_HARD") | |
| break | |
| # 3. Consult Guardians (AI Exit Logic) | |
| # โก CRITICAL FIX: Ensure enough data for EMA_200 (Requires >200 candles) | |
| t1m = await self.data_manager.get_latest_ohlcv(symbol, '1m', 500) | |
| t5m = await self.data_manager.get_latest_ohlcv(symbol, '5m', 300) | |
| t15m = await self.data_manager.get_latest_ohlcv(symbol, '15m', 300) | |
| if not t1m or len(t1m) < 100: | |
| await asyncio.sleep(5) | |
| continue | |
| start_time = datetime.fromisoformat(trade['entry_time']) | |
| duration = (datetime.now() - start_time).total_seconds() / 60 | |
| trade_context = trade.copy() | |
| trade_context['time_in_trade_mins'] = duration | |
| trade_context['volume_30m_usd'] = 0 | |
| guardian_res = self.processor.consult_guardians( | |
| symbol, t1m, t5m, t15m, trade_context | |
| ) | |
| action = guardian_res.get('action', 'HOLD') | |
| if action in ['EXIT_HARD', 'EXIT_SOFT']: | |
| reason = guardian_res.get('reason', 'Guardian Exit') | |
| await self._execute_exit(symbol, curr_price, f"GUARDIAN_{action}: {reason}") | |
| break | |
| # Live X-Ray Logging | |
| scores = guardian_res.get('scores', {}) | |
| probs = guardian_res.get('probs', {}) | |
| v2 = scores.get('v2', 0.0) | |
| v3 = scores.get('v3', 0.0) | |
| p_crash = probs.get('crash', 0.0) | |
| p_giveback = probs.get('giveback', 0.0) | |
| p_stag = probs.get('stagnation', 0.0) | |
| pnl_pct = ((curr_price - trade['entry_price']) / trade['entry_price']) * 100 | |
| self.latest_guardian_log = ( | |
| f"๐ก๏ธ {symbol}: {action} ({pnl_pct:+.2f}%) | " | |
| f"Hybrid[V2:{v2:.2f}|V3:{v3:.2f}] | " | |
| f"Hydra[๐ฅ:{p_crash:.2f}|๐ธ:{p_giveback:.2f}|๐:{p_stag:.2f}]" | |
| ) | |
| await asyncio.sleep(3) | |
| except asyncio.CancelledError: | |
| logger.info(f"๐ก๏ธ Sentry for {symbol} stopped.") | |
| except Exception as e: | |
| logger.error(f"โ Sentry Error {symbol}: {e}") | |
| traceback.print_exc() | |
| await asyncio.sleep(10) | |
| # ========================================================================== | |
| # ๐ช SECTION 4: EXITS & POST-ANALYSIS (ATOMIC & MATH-VERIFIED) | |
| # ========================================================================== | |
| async def _execute_exit(self, symbol, exit_price, reason): | |
| async with self.execution_lock: | |
| if symbol not in self.open_positions: return | |
| # 1. Retrieve & Calculate | |
| trade = self.open_positions[symbol] | |
| entry_price = trade['entry_price'] | |
| quantity = trade['quantity'] | |
| entry_capital = trade['entry_capital'] | |
| raw_pnl = (exit_price - entry_price) * quantity | |
| total_fees = (entry_capital * self.FEE_RATE) + ((exit_price * quantity) * self.FEE_RATE) | |
| net_pnl = raw_pnl - total_fees | |
| pnl_pct = (net_pnl / entry_capital) * 100 | |
| print(f"๐ฐ [EXIT] {symbol} | Net PnL: ${net_pnl:.4f} (Fees: ${total_fees:.4f}) | {reason}") | |
| # 2. Update Trade Record | |
| trade['exit_price'] = exit_price | |
| trade['exit_time'] = datetime.now().isoformat() | |
| trade['net_pnl'] = net_pnl | |
| trade['exit_reason'] = reason | |
| trade['status'] = "CLOSED" | |
| trade['result'] = "WIN" if net_pnl > 0 else "LOSS" | |
| # 3. ATOMIC R2 UPDATE SEQUENCE (WITH MATH INTEGRITY CHECK) | |
| try: | |
| # Step A: Save History First | |
| await self.r2.save_trade_history_async(trade) | |
| # Step B: Remove from Active & Sync | |
| self.open_positions.pop(symbol) | |
| await self._save_active_trades() | |
| # Step C: Update Internal Wallet (Source of Truth for Capital) | |
| self.smart_portfolio.register_trade_exit( | |
| capital_returned=(entry_capital + net_pnl), | |
| net_pnl=net_pnl, | |
| is_win=(net_pnl > 0) | |
| ) | |
| # Step D: Force R2 Sync for Portfolio State | |
| if hasattr(self.smart_portfolio, '_save_state_to_r2'): | |
| await self.smart_portfolio._save_state_to_r2() | |
| # Step E: Update Global Counters & ENFORCE MATH INTEGRITY | |
| stats = await self.r2.get_portfolio_state_async() | |
| # 1. Update counters based on current trade | |
| stats['total_trades'] = int(stats.get('total_trades', 0)) + 1 | |
| if net_pnl > 0: | |
| stats['winning_trades'] = int(stats.get('winning_trades', 0)) + 1 | |
| else: | |
| stats['losing_trades'] = int(stats.get('losing_trades', 0)) + 1 | |
| # 2. Get ACTUAL capital from SmartPortfolio (The Truth) | |
| actual_current_capital = self.smart_portfolio.state.get('current_capital', 10.0) | |
| stats['current_capital_usd'] = actual_current_capital | |
| # 3. ACCOUNTING EQUATION CHECK: Current = Initial + Profits - Losses | |
| initial_cap = stats.get('initial_capital_usd', 10.0) | |
| current_total_profit = float(stats.get('total_profit_usd', 0.0)) + (net_pnl if net_pnl > 0 else 0.0) | |
| current_total_loss = float(stats.get('total_loss_usd', 0.0)) + (abs(net_pnl) if net_pnl < 0 else 0.0) | |
| # Calculate discrepancy | |
| calculated_capital = initial_cap + current_total_profit - current_total_loss | |
| discrepancy = actual_current_capital - calculated_capital | |
| # 4. SELF-HEALING LOGIC | |
| if abs(discrepancy) > 0.0001: | |
| print(f"โ ๏ธ [Math Fix] Drift detected: ${discrepancy:.4f}. Auto-correcting stats...") | |
| # If actual < calculated, we have missing losses (fees/slippage) | |
| if discrepancy < 0: | |
| current_total_loss += abs(discrepancy) | |
| # If actual > calculated, we have missing profits | |
| else: | |
| current_total_profit += discrepancy | |
| # Set corrected values | |
| stats['total_profit_usd'] = current_total_profit | |
| stats['total_loss_usd'] = current_total_loss | |
| # Recalculate Win Rate | |
| total = stats['total_trades'] | |
| wins = stats['winning_trades'] | |
| stats['win_rate'] = (wins / total * 100) if total > 0 else 0.0 | |
| await self.r2.save_portfolio_state_async(stats) | |
| print(f"๐ [Stats] Integrity Verified. Cap: ${actual_current_capital:.2f}") | |
| self._update_stats(trade, reason, net_pnl) | |
| if symbol in self.sentry_tasks: | |
| self.sentry_tasks[symbol].cancel() | |
| del self.sentry_tasks[symbol] | |
| self.latest_guardian_log = f"โ Closed {symbol} ({reason})" | |
| except Exception as e: | |
| print(f"โ Critical Save Error during Exit for {symbol}: {e}") | |
| traceback.print_exc() | |
| if self.agency_director: | |
| event_payload = { | |
| "type": "TRADE_CLOSED", | |
| "symbol": symbol, | |
| "pnl_usd": net_pnl, | |
| "pnl_pct": pnl_pct, | |
| "reason": reason, | |
| "duration_mins": (datetime.now() - datetime.fromisoformat(trade['entry_time'])).total_seconds() / 60 | |
| } | |
| asyncio.create_task(self.agency_director.process_event("TRADE_CLOSED", event_payload)) | |
| self._launch_post_exit_analysis(symbol, exit_price, trade['exit_time'], entry_capital, trade.get('ai_scores', {}), trade) | |
| def _update_stats(self, trade, reason, net_pnl): | |
| strat_type = trade.get('strategy', 'NORMAL') | |
| if strat_type in self.type_stats: | |
| if net_pnl > 0: | |
| self.type_stats[strat_type]['wins'] += 1 | |
| self.type_stats[strat_type]['profit_usd'] += net_pnl | |
| else: | |
| self.type_stats[strat_type]['losses'] += 1 | |
| self.type_stats[strat_type]['loss_usd'] += abs(net_pnl) | |
| if "GUARDIAN" in reason: | |
| key = "hybrid" | |
| if "Crash" in reason: key = "crash" | |
| elif "Giveback" in reason: key = "giveback" | |
| elif "Stagnation" in reason: key = "stagnation" | |
| self.ai_stats[key]["total"] += 1 | |
| if net_pnl > 0: | |
| self.ai_stats[key]["good"] += 1 | |
| self.ai_stats[key]["saved"] += net_pnl | |
| else: | |
| self.ai_stats[key]["missed"] += abs(net_pnl) | |
| asyncio.create_task(self.r2.save_guardian_stats_async(self.ai_stats)) | |
| # ========================================================================== | |
| # ๐ง SECTION 5: LEARNING & SHADOW TRACKING | |
| # ========================================================================== | |
| def _launch_post_exit_analysis(self, symbol, exit_price, exit_time, capital, ai_scores, full_trade_record): | |
| if not self.learning_hub: return | |
| try: | |
| sample = { | |
| "symbol": symbol, | |
| "entry_time": full_trade_record['entry_time'], | |
| "exit_time": exit_time, | |
| "final_pnl": full_trade_record['net_pnl'], | |
| "ai_inputs": ai_scores, | |
| "market_conditions": full_trade_record.get('decision_data', {}) | |
| } | |
| if hasattr(self.learning_hub, 'ingest_trade_result'): | |
| self.learning_hub.ingest_trade_result(sample) | |
| print(f"๐ง [Learning] Ingested trade result for {symbol}.") | |
| except Exception as e: | |
| print(f"โ ๏ธ Learning Ingest Error: {e}") | |
| def _add_to_shadow_watchlist(self, candidate, reason): | |
| entry = { | |
| "symbol": candidate['symbol'], | |
| "time": time.time(), | |
| "price_at_rejection": candidate.get('current_price', 0), | |
| "reason": reason, | |
| "scores": { | |
| "l2": candidate.get('l2_score', 0), | |
| "sniper": candidate.get('sniper_score', 0) | |
| } | |
| } | |
| self.shadow_watchlist.append(entry) | |
| if len(self.shadow_watchlist) > 100: | |
| self.shadow_watchlist.pop(0) | |
| # ========================================================================== | |
| # ๐พ SECTION 6: STATE PERSISTENCE & UTILS | |
| # ========================================================================== | |
| async def _save_active_trades(self): | |
| await self.r2.save_active_trades_async(self.open_positions) | |
| async def sync_internal_state_with_r2(self): | |
| print("๐ก๏ธ [TradeManager] Syncing State from R2...") | |
| saved_trades = await self.r2.load_active_trades_async() | |
| if saved_trades: | |
| self.open_positions = saved_trades | |
| print(f" -> [Sync] Recovered {len(self.open_positions)} active trades.") | |
| for sym in self.open_positions: | |
| if sym not in self.sentry_tasks: | |
| self.sentry_tasks[sym] = asyncio.create_task(self._sentry_guard(sym)) | |
| else: | |
| self.open_positions = {} | |
| print(" -> [Sync] No active trades found.") | |
| g_stats = await self.r2.get_guardian_stats_async() | |
| if g_stats: | |
| self.ai_stats = g_stats | |
| print(" -> [Sync] Guardian Stats loaded.") | |
| await self.smart_portfolio.sync_state() | |
| async def force_exit_by_manager(self, symbol, reason): | |
| p = await self.data_manager.get_latest_price_async(symbol) | |
| await self._execute_exit(symbol, p, reason) | |
| async def ensure_active_guardians(self): | |
| active = [t for t in self.sentry_tasks.values() if not t.done()] | |
| return f"Active Guardians: {len(active)}" | |
| async def initialize_sentry_exchanges(self): | |
| pass | |
| async def start_sentry_loops(self): | |
| await self.ensure_active_guardians() | |
| async def stop_sentry_loops(self): | |
| self.running = False | |
| for t in self.sentry_tasks.values(): t.cancel() |