Tradtesting / trade_manager.py
Riy777's picture
Update trade_manager.py
1b9d614 verified
# ============================================================
# 🛡️ trade_manager.py (V64.0 - GEM-Architect: Ultimate Integrity)
# ============================================================
import asyncio
import uuid
import time
import traceback
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
# استيراد المكونات الأساسية
from smart_portfolio import SmartPortfolio
from ml_engine.processor import SystemLimits
from governance_engine import GovernanceEngine
class TradeManager:
def __init__(self, r2_service, data_manager, processor):
self.r2 = r2_service
self.data_manager = data_manager
self.processor = processor
# ✅ سيتم حقنه من الخارج لربط حلقة التعلم
self.learning_hub = None
# تهيئة المحفظة والحوكمة
self.smart_portfolio = SmartPortfolio(r2_service, data_manager)
self.governance = GovernanceEngine()
self.open_positions = {}
self.watchlist = {}
self.sentry_tasks = {}
# ✅ قائمة تذاكر التحقق المعلقة (Verification Tickets)
self.pending_verifications = []
self.running = True
self.latest_guardian_log = "🛡️ Guardian & Governance Systems Online."
self.FEE_RATE = 0.001
self.ORACLE_CHECK_INTERVAL = 900
# إحصائيات الذكاء الاصطناعي (الحراس)
self.ai_stats = {
"hybrid": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0},
"crash": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0},
"giveback": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0},
"stagnation": {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0}
}
# 📊 إحصائيات أنواع العملات (Type Stats)
self.type_stats = {
"SAFE_BOTTOM": {"wins": 0, "losses": 0, "profit_usd": 0.0, "loss_usd": 0.0},
"MOMENTUM_LAUNCH": {"wins": 0, "losses": 0, "profit_usd": 0.0, "loss_usd": 0.0}
}
self.execution_lock = asyncio.Lock()
print(f"🛡️ [TradeManager V64.0] Full Systems Online (Entry Audit + Exit Audit).")
async def initialize_sentry_exchanges(self):
"""تهيئة المحفظة واستعادة كافة البيانات"""
print("🛡️ [TradeManager] Syncing state & Initializing Portfolio...")
await self.smart_portfolio.initialize()
await self.sync_internal_state_with_r2()
# تحميل إحصائيات الحراس
try:
saved_stats = await self.r2.get_guardian_stats_async()
if saved_stats: self.ai_stats = saved_stats
print(" -> [Stats] Guardian metrics loaded from R2.")
except Exception: pass
# تحميل إحصائيات الأنواع
await self._load_type_stats_from_r2()
# ✅ تحميل تذاكر التحقق المعلقة وإطلاق المحرك الخلفي لها
await self._load_pending_verifications()
asyncio.create_task(self._verification_engine_loop())
# ============================================================
# 🕵️ Persistent Verification Engine (The Truth Machine - Entry)
# ============================================================
async def _load_pending_verifications(self):
"""تحميل التذاكر التي لم تكتمل مدتها من R2"""
try:
data = await self.r2.get_file_json_async("diagnostics/pending_verifications.json")
if data:
self.pending_verifications = data
print(f" 🕵️ [Verification] Loaded {len(self.pending_verifications)} pending audits.")
except Exception:
self.pending_verifications = []
async def _save_pending_verifications(self):
"""حفظ التذاكر الحالية لضمان عدم ضياعها عند إعادة التشغيل"""
try:
await self.r2.upload_json_async(self.pending_verifications, "diagnostics/pending_verifications.json")
except Exception as e:
print(f"❌ Error saving verifications: {e}")
async def _register_verification_ticket(self, symbol, entry_price, size_usd, votes):
"""إنشاء تذكرة جديدة عند دخول الصفقة"""
ticket = {
"id": str(uuid.uuid4()),
"symbol": symbol,
"entry_price": float(entry_price),
"size_usd": float(size_usd),
"entry_time": datetime.now().isoformat(),
"votes": votes, # من صوت بنعم؟
"target_time": (datetime.now() + timedelta(hours=1)).isoformat() # متى يحين موعد الحكم؟
}
self.pending_verifications.append(ticket)
await self._save_pending_verifications()
print(f" 🎫 [Verification] Ticket created for {symbol}. Result in 1h.")
async def _verification_engine_loop(self):
"""المحرك الخلفي: يعمل بشكل مستقل لفحص التذاكر المستحقة (Entry Models Audit)"""
print(" ⚙️ [Verification Engine] Started background audit loop...")
while self.running:
try:
await asyncio.sleep(60) # فحص كل دقيقة
if not self.pending_verifications: continue
now = datetime.now()
updated = False
remaining_tickets = []
# نسخ القائمة للتعديل عليها بأمان
tickets_to_process = list(self.pending_verifications)
for ticket in tickets_to_process:
target_time = datetime.fromisoformat(ticket['target_time'])
if now >= target_time:
# 🔔 حان وقت الحساب!
symbol = ticket['symbol']
entry_p = ticket['entry_price']
size = ticket['size_usd']
votes = ticket['votes']
# جلب السعر الحالي (بغض النظر عن حالة الصفقة: مغلقة أو مفتوحة)
curr_p = await self.data_manager.get_latest_price_async(symbol)
if curr_p > 0:
pnl_pct = (curr_p - entry_p) / entry_p
pnl_usd = pnl_pct * size
is_win = pnl_pct > 0
result_str = "WIN" if is_win else "LOSS"
print(f" 🕵️ [Audit Complete] {symbol}: {result_str} after 1h ({pnl_pct:+.2f}%)")
# تحديث المصفوفة لكل نموذج صوت بنعم
model_updates = {}
models_to_track = ["Titan", "Patterns", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "Governance"]
for model in models_to_track:
if votes.get(model, False): # هل صوت النموذج بنعم؟
model_updates[model] = {
"wins": 1 if is_win else 0,
"losses": 1 if not is_win else 0,
"pnl": pnl_usd
}
if model_updates:
await self.r2.update_diagnostic_stats_async(model_updates)
updated = True # تم معالجة تذكرة
else:
# السعر غير متوفر، نؤجلها للدورة القادمة
remaining_tickets.append(ticket)
else:
# لم يحن الوقت بعد
remaining_tickets.append(ticket)
if updated:
self.pending_verifications = remaining_tickets
await self._save_pending_verifications()
except Exception as e:
print(f"❌ [Verification Loop Error] {e}")
await asyncio.sleep(60)
# ============================================================
# 📊 Type Stats & R2 Management
# ============================================================
async def _load_type_stats_from_r2(self):
"""تحميل إحصائيات أنواع العملات من R2"""
try:
saved_stats = await self.r2.get_file_json_async("stats/coin_type_performance_v1.json")
if saved_stats:
self.type_stats = saved_stats
print(" 📊 [Stats] Coin Type performance loaded.")
else:
print(" ℹ️ [Stats] No existing type stats found. Starting fresh.")
except Exception:
print(" ⚠️ [Stats] Error loading type stats.")
async def _save_type_stats_to_r2(self):
"""حفظ إحصائيات الأنواع إلى R2"""
try:
await self.r2.upload_json_async(self.type_stats, "stats/coin_type_performance_v1.json")
except Exception as e:
print(f"❌ Failed to save type stats: {e}")
async def sync_internal_state_with_r2(self):
"""استرجاع الصفقات المفتوحة من R2"""
try:
open_trades_list = await self.r2.get_open_trades_async()
self.open_positions = {trade['symbol']: trade for trade in open_trades_list}
print(f" -> [Sync] Recovered {len(self.open_positions)} active trades.")
total_allocated = sum(float(t.get('entry_capital', 0.0)) for t in self.open_positions.values())
self.smart_portfolio.state["allocated_capital_usd"] = total_allocated
except Exception as e:
print(f"❌ [TradeManager] R2 Sync Failed: {e}")
self.open_positions = {}
async def ensure_active_guardians(self):
"""التأكد من أن كل صفقة مفتوحة لها حارس"""
active_symbols = list(self.open_positions.keys())
if not active_symbols: return "💤 No active trades."
restored_count = 0
status_msgs = []
for symbol in active_symbols:
task = self.sentry_tasks.get(symbol)
is_alive = task and not task.done()
if not is_alive:
print(f"🚨 [Watchdog] Found DEAD guardian for {symbol}. Resurrecting...")
self.sentry_tasks[symbol] = asyncio.create_task(self._guardian_loop(symbol))
restored_count += 1
status_msgs.append(f"♻️ Resurrected {symbol}")
else:
status_msgs.append(f"✅ {symbol} Running")
if restored_count > 0:
self.latest_guardian_log = f"⚠️ Watchdog restored: {', '.join(status_msgs)}"
return f"⚠️ Watchdog restored {restored_count} guardians."
return "✅ All guardians active."
# ============================================================
# 🧠 Logic Layers
# ============================================================
def _snapshot_model_votes(self, signal_data: Dict[str, Any]) -> Dict[str, bool]:
"""
تحدد أي النماذج صوّتت بـ 'شراء' وقت الدخول.
"""
votes = {}
limits = signal_data.get('dynamic_limits', {})
comps = signal_data.get('components', {}) or {}
votes['Titan'] = comps.get('titan_score', signal_data.get('titan_score', 0)) > 0.5
votes['Patterns'] = comps.get('patterns_score', signal_data.get('patterns_score', 0)) > 0.5
oracle_thresh = limits.get('l3_oracle_thresh', SystemLimits.L3_CONFIDENCE_THRESHOLD)
votes['Oracle'] = signal_data.get('confidence', 0) >= oracle_thresh
sniper_thresh = limits.get('l4_sniper_thresh', SystemLimits.L4_ENTRY_THRESHOLD)
votes['Sniper'] = signal_data.get('sniper_score', 0) >= sniper_thresh
votes['MonteCarlo_L'] = comps.get('mc_score', 0.5) > 0.5
votes['MonteCarlo_A'] = signal_data.get('mc_advanced_score', 0) > 0
votes['Governance'] = signal_data.get('governance_grade', 'REJECT') != 'REJECT'
return votes
async def select_and_execute_best_signal(self, oracle_approved_signals: List[Dict[str, Any]]):
"""اختيار أفضل إشارة وتنفيذها"""
if not self.processor.initialized: await self.processor.initialize()
sniper_candidates = []
print(f"\n🔎 [Sniper] Scanning {len(oracle_approved_signals)} candidates...")
for signal in oracle_approved_signals:
symbol = signal['symbol']
if signal.get('action_type') != 'BUY': continue
if symbol in self.open_positions: continue
ohlcv_task = self.data_manager.get_latest_ohlcv(symbol, '1m', 1000)
ob_task = self.data_manager.get_order_book_snapshot(symbol)
ohlcv_1m, order_book = await asyncio.gather(ohlcv_task, ob_task)
if not ohlcv_1m or len(ohlcv_1m) < 100:
print(f" -> ⚠️ [Skip] {symbol}: Insufficient 1m data.")
continue
# ✅ تمرير سياق الإشارة بالكامل
sniper_result = await self.processor.check_sniper_entry(ohlcv_1m, order_book, context_data=signal)
sniper_signal = sniper_result.get('signal', 'WAIT')
final_conf = sniper_result.get('confidence_prob', 0.0)
reason_str = sniper_result.get('reason', 'N/A') # ✅ استخراج السبب
# ✅ إضافة السبب للطباعة
log_msg = (f" -> 🔭 {symbol:<6} | Decision: {sniper_signal} | Score: {final_conf:.2f} | Reason: {reason_str}")
print(log_msg)
if sniper_signal == 'BUY':
signal['sniper_entry_price'] = sniper_result.get('entry_price', 0)
signal['sniper_score'] = final_conf
if 'components' not in signal: signal['components'] = {}
signal['components']['sniper_score'] = final_conf
sniper_candidates.append(signal)
if not sniper_candidates:
print(" -> 📉 No candidates passed the Sniper L4 check.")
return
sniper_candidates.sort(key=lambda x: (x.get('confidence', 0) + x.get('sniper_score', 0)), reverse=True)
best_signal = sniper_candidates[0]
async with self.execution_lock:
print(f"🚀 [EXECUTING] Attempting entry for best candidate: {best_signal['symbol']}")
await self._execute_entry_from_signal(best_signal['symbol'], best_signal)
async def _execute_entry_from_signal(self, symbol, signal_data):
"""تنفيذ الدخول الفعلي"""
try:
print(f" 🏛️ [Governance] Convening Senate for {symbol}...")
t15_task = self.data_manager.get_latest_ohlcv(symbol, '15m', 200)
t1h_task = self.data_manager.get_latest_ohlcv(symbol, '1h', 200)
ob_task = self.data_manager.get_order_book_snapshot(symbol)
t15, t1h, ob = await asyncio.gather(t15_task, t1h_task, ob_task)
ohlcv_dict = {'15m': t15, '1h': t1h}
strategy_type = signal_data.get('strategy_type', 'NORMAL')
# تقييم الحوكمة
gov_decision = await self.governance.evaluate_trade(symbol, ohlcv_dict, ob, strategy_type=strategy_type)
if gov_decision['grade'] == 'REJECT':
print(f"⛔ [Governance VETO] {symbol} Rejected. Grade: REJECT")
return
print(f" ✅ [Governance PASS] Grade: {gov_decision['grade']} | Score: {gov_decision['governance_score']:.1f}")
signal_data['governance_grade'] = gov_decision['grade']
signal_data['governance_score'] = gov_decision['governance_score']
signal_data['governance_details'] = gov_decision['components']
# طلب الموافقة المالية
is_approved, plan = await self.smart_portfolio.request_entry_approval(signal_data, len(self.open_positions))
if not is_approved:
print(f"⛔ [Portfolio Rejection] {symbol}: {plan.get('reason')}")
return
approved_size_usd = plan['approved_size_usd']
approved_tp = plan['approved_tp']
trade_id = str(uuid.uuid4())
current_price = float(signal_data.get('sniper_entry_price', 0.0))
if current_price <= 0.0: current_price = await self.data_manager.get_latest_price_async(symbol)
entry_fee_usd = approved_size_usd * self.FEE_RATE
# ✅ لقطة التصويت (من وافق على هذه الصفقة؟)
model_votes = self._snapshot_model_votes(signal_data)
decision_snapshot = {
'components': signal_data.get('components', {}),
'oracle_conf': signal_data.get('confidence', 0),
'governance_grade': gov_decision['grade'],
'governance_score': gov_decision['governance_score'],
'governance_details': gov_decision['components'],
'system_confidence': plan.get('system_confidence', 0.5),
'market_mood': plan.get('market_mood', 'N/A'),
'regime_at_entry': getattr(SystemLimits, 'CURRENT_REGIME', 'UNKNOWN'),
'dynamic_limits': signal_data.get('dynamic_limits', {}),
'asset_regime': signal_data.get('asset_regime', 'UNKNOWN')
}
new_trade = {
'id': trade_id,
'symbol': symbol,
'entry_price': current_price,
'direction': 'LONG',
'entry_time': datetime.now().isoformat(),
'status': 'OPEN',
'tp_price': approved_tp,
'sl_price': float(signal_data.get('sl_price', current_price * 0.95)),
'last_update': datetime.now().isoformat(),
'last_oracle_check': datetime.now().isoformat(),
'strategy': 'OracleV4_Governance_Hydra',
'entry_capital': approved_size_usd,
'entry_fee_usd': entry_fee_usd,
'decision_data': decision_snapshot,
'highest_price': current_price,
'strategy_type': strategy_type,
'model_votes': model_votes
}
self.open_positions[symbol] = new_trade
if self.watchlist: self.watchlist.clear()
# حجز الأموال
await self.smart_portfolio.register_new_position(approved_size_usd)
# تحديث الحالة في R2
portfolio_state = await self.r2.get_portfolio_state_async()
if portfolio_state.get('first_trade_timestamp') is None:
portfolio_state['first_trade_timestamp'] = new_trade['entry_time']
await self.r2.save_portfolio_state_async(portfolio_state)
await self.r2.save_open_trades_async(list(self.open_positions.values()))
# تشغيل الحارس
if symbol in self.sentry_tasks: self.sentry_tasks[symbol].cancel()
self.sentry_tasks[symbol] = asyncio.create_task(self._guardian_loop(symbol))
# ✅ إصدار تذكرة تحقق مستقلة
await self._register_verification_ticket(symbol, current_price, approved_size_usd, model_votes)
print(f"✅ [ENTRY] {symbol} @ {current_price} | Type: {strategy_type} | Grade: {gov_decision['grade']} | Size: ${approved_size_usd:.2f}")
except Exception as e:
print(f"❌ [Entry Error] {symbol}: {e}")
traceback.print_exc()
async def _guardian_loop(self, symbol: str):
"""حلقة الحراسة المستمرة (Guardian Loop)"""
print(f"🛡️ [Dual-Core] STARTING WATCH for {symbol}...")
last_ai_check_time = 0
while self.running:
if symbol not in self.open_positions: break
try:
await asyncio.sleep(1)
trade = self.open_positions.get(symbol)
if not trade: break
current_ticker_price = await self.data_manager.get_latest_price_async(symbol)
# تحديث أعلى سعر وصل له السعر (للتبع)
if 'highest_price' not in trade: trade['highest_price'] = float(trade['entry_price'])
if current_ticker_price > float(trade['highest_price']): trade['highest_price'] = current_ticker_price
# 1. فحص الهدف ووقف الخسارة الصلب
if current_ticker_price >= trade['tp_price']:
print(f"🎯 [TP HIT] {symbol} @ {current_ticker_price}")
async with self.execution_lock: await self._execute_exit(symbol, trade['tp_price'], "TP_HIT")
break
if current_ticker_price <= trade['sl_price']:
print(f"🛑 [SL HIT] {symbol} @ {current_ticker_price}")
async with self.execution_lock: await self._execute_exit(symbol, trade['sl_price'], "SL_HIT")
break
# 2. فحص الذكاء الاصطناعي (كل دقيقة)
if time.time() - last_ai_check_time > 60:
t1 = self.data_manager.get_latest_ohlcv(symbol, '1m', 1000)
t5 = self.data_manager.get_latest_ohlcv(symbol, '5m', 300)
t15 = self.data_manager.get_latest_ohlcv(symbol, '15m', 200)
tob = self.data_manager.get_order_book_snapshot(symbol)
try: d1, d5, d15, d_ob = await asyncio.gather(t1, t5, t15, tob)
except: continue
if d1 and d5 and d15 and len(d5) >= 6:
last_6_5m = d5[-6:]
vol_30m_sum = sum([float(c[5]) * float(c[4]) for c in last_6_5m])
context_data = {
'entry_price': trade['entry_price'],
'tp_price': trade['tp_price'],
'sl_price': trade['sl_price'],
'entry_time': trade['entry_time'],
'oracle_conf': trade.get('decision_data', {}).get('oracle_conf', 0.8),
'system_conf': trade.get('decision_data', {}).get('system_confidence', 0.8),
'highest_price': float(trade['highest_price']),
'time_in_trade_mins': (datetime.now() - datetime.fromisoformat(trade['entry_time'])).total_seconds() / 60,
'volume_30m_usd': vol_30m_sum
}
# ✅ استدعاء الحراس
decision = self.processor.consult_dual_guardians(symbol, d1, d5, d15, context_data, order_book_snapshot=d_ob)
action = decision.get('action', 'HOLD')
reason = decision.get('reason', '')
ai_metrics = decision.get('probs') or decision.get('scores') or {}
self.latest_guardian_log = f"🛡️ {action} | {reason}"
if action in ['EXIT_HARD', 'EXIT_SOFT']:
print(f"🐲 [Dual-Core Trigger] {action}: {reason}")
async with self.execution_lock:
await self._execute_exit(symbol, current_ticker_price, f"DualGuard_{action}", ai_scores=ai_metrics)
break
elif action in ['TIGHTEN_SL', 'TRAIL_SL']:
await self._handle_sl_update(symbol, action, trade, current_ticker_price)
last_ai_check_time = time.time()
self.open_positions[symbol]['last_update'] = datetime.now().isoformat()
# 3. إعادة فحص Oracle
last_oracle = datetime.fromisoformat(trade.get('last_oracle_check', datetime.now().isoformat()))
if (datetime.now() - last_oracle).total_seconds() > self.ORACLE_CHECK_INTERVAL:
self.open_positions[symbol]['last_oracle_check'] = datetime.now().isoformat()
await self._consult_oracle_strategy_update(symbol, trade)
except asyncio.CancelledError: break
except Exception as e:
print(f"❌ [Sentry Error] {symbol}: {e}"); traceback.print_exc(); await asyncio.sleep(5)
async def _handle_sl_update(self, symbol, action, trade, current_price):
"""تحديث وقف الخسارة ديناميكياً"""
if action == 'TIGHTEN_SL':
entry_p = float(trade['entry_price'])
if float(trade['sl_price']) < entry_p:
print(f"🛡️ [Dual-Core] TIGHTEN_SL -> Entry {entry_p}")
self.open_positions[symbol]['sl_price'] = entry_p
await self.r2.save_open_trades_async(list(self.open_positions.values()))
elif action == 'TRAIL_SL':
entry_p = float(trade['entry_price'])
if current_price > entry_p:
potential_sl = entry_p + ((current_price - entry_p) * 0.5)
if potential_sl > float(trade['sl_price']):
print(f"🛡️ [Dual-Core] TRAIL_SL -> {potential_sl:.4f}")
self.open_positions[symbol]['sl_price'] = potential_sl
await self.r2.save_open_trades_async(list(self.open_positions.values()))
async def _consult_oracle_strategy_update(self, symbol, trade):
try:
tasks = [self.data_manager.get_latest_ohlcv(symbol, tf, limit=100) for tf in ["15m", "1h", "4h"]]
results = await asyncio.gather(*tasks)
ohlcv_data = {tf: res for tf, res in zip(["15m", "1h", "4h"], results) if res}
if '1h' not in ohlcv_data: return
curr_p = await self.data_manager.get_latest_price_async(symbol)
decision_data = trade.get('decision_data', {})
saved_limits = decision_data.get('dynamic_limits', {})
saved_regime = decision_data.get('asset_regime', 'UNKNOWN')
raw_input = {
'symbol': symbol,
'ohlcv': ohlcv_data,
'current_price': curr_p,
'dynamic_limits': saved_limits,
'asset_regime': saved_regime
}
l2 = await self.processor.process_compound_signal(raw_input)
if not l2: return
oracle = await self.processor.consult_oracle(l2)
if oracle.get('action') == 'WAIT' or oracle.get('direction') == 'SHORT':
print(f"🚨 [Oracle] Outlook Bearish (Re-Check). Exiting {symbol}...")
await self.force_exit_by_manager(symbol, reason="Oracle_Bearish_Flip")
return
except Exception: pass
# ============================================================
# 🕵️ Post-Exit Analysis (The Guardian Auditor)
# ============================================================
def _launch_post_exit_analysis(self, symbol, exit_price, exit_time, position_size_usd, ai_scores=None, trade_obj=None):
"""إطلاق مهمة التدقيق الخلفي (Audit) لقرار الخروج"""
asyncio.create_task(self._analyze_after_exit_task(symbol, exit_price, exit_time, position_size_usd, ai_scores, trade_obj))
def _update_specific_stat(self, key, is_good, usd_impact):
if key not in self.ai_stats: return
self.ai_stats[key]["total"] += 1
if is_good: self.ai_stats[key]["good"] += 1; self.ai_stats[key]["saved"] += abs(usd_impact)
else: self.ai_stats[key]["missed"] += abs(usd_impact)
async def _analyze_after_exit_task(self, symbol, exit_price, exit_time, position_size_usd, ai_scores, trade_obj):
"""مهمة التدقيق: هل كان الخروج صحيحاً؟"""
await asyncio.sleep(900) # انتظار 15 دقيقة
try:
curr = await self.data_manager.get_latest_price_async(symbol)
if curr == 0: return
change_pct = (curr - exit_price) / exit_price
usd_impact = change_pct * position_size_usd
is_good_exit = change_pct < 0
self._update_specific_stat("hybrid", is_good_exit, usd_impact)
record = {"symbol": symbol, "exit_price": exit_price, "price_15m": curr, "usd_impact": usd_impact, "verdict": "SUCCESS" if is_good_exit else "MISS"}
await self.r2.append_deep_steward_audit(record)
# 🔥 حفظ إحصائيات الحراس في R2 بعد التحديث
await self.r2.save_guardian_stats_async(self.ai_stats)
except Exception: pass
async def _execute_exit(self, symbol, price, reason, ai_scores=None):
"""تنفيذ الخروج وتحديث الإحصائيات"""
if symbol not in self.open_positions: return
try:
trade = self.open_positions.pop(symbol)
entry_price = float(trade['entry_price']); exit_price = float(price)
entry_capital = float(trade.get('entry_capital', 100.0)); entry_fee = float(trade.get('entry_fee_usd', 0.0))
exit_val_gross = (exit_price / entry_price) * entry_capital
exit_fee = exit_val_gross * self.FEE_RATE
total_fees = entry_fee + exit_fee
gross_pnl_usd = exit_val_gross - entry_capital
true_net_pnl_usd = gross_pnl_usd - total_fees
true_net_pct = (true_net_pnl_usd / entry_capital) * 100
# تحديث المحفظة
await self.smart_portfolio.register_closed_position(entry_capital, gross_pnl_usd, total_fees)
trade.update({
'status': 'CLOSED', 'exit_price': exit_price, 'exit_reason': reason,
'profit_pct': true_net_pct, 'net_pnl_usd': true_net_pnl_usd, 'fees_paid_usd': total_fees,
'exit_time': datetime.now().isoformat()
})
# 1. تحديث الإحصائيات العامة
portfolio = await self.r2.get_portfolio_state_async()
portfolio['total_trades'] = portfolio.get('total_trades', 0) + 1
if true_net_pnl_usd >= 0:
portfolio['winning_trades'] = portfolio.get('winning_trades', 0) + 1
portfolio['total_profit_usd'] = portfolio.get('total_profit_usd', 0) + true_net_pnl_usd
trade['result'] = 'WIN'
else:
portfolio['losing_trades'] = portfolio.get('losing_trades', 0) + 1
portfolio['total_loss_usd'] = portfolio.get('total_loss_usd', 0) + abs(true_net_pnl_usd)
trade['result'] = 'LOSS'
# ✅ 2. تحديث إحصائيات الأنواع (Persistent Type Stats)
strat_type = trade.get('strategy_type', 'UNKNOWN')
if strat_type in self.type_stats:
if true_net_pnl_usd >= 0:
self.type_stats[strat_type]['wins'] += 1
self.type_stats[strat_type]['profit_usd'] += true_net_pnl_usd
else:
self.type_stats[strat_type]['losses'] += 1
self.type_stats[strat_type]['loss_usd'] += true_net_pnl_usd
asyncio.create_task(self._save_type_stats_to_r2())
await self.r2.save_portfolio_state_async(portfolio)
await self.r2.save_open_trades_async(list(self.open_positions.values()))
await self.r2.append_to_closed_trades_history(trade)
print(f"✅ [EXIT] {symbol} | Type: {strat_type} | PnL: {true_net_pct:.2f}% (${true_net_pnl_usd:.2f}) | {reason}")
# ==========================================================
# 🏛️ إرسال البيانات لملف تدريب الحوكمة
# ==========================================================
try:
decision_data = trade.get('decision_data', {})
if 'governance_grade' in decision_data:
training_record = {
"symbol": symbol,
"entry_time": trade['entry_time'],
"exit_time": trade['exit_time'],
"governance_grade": decision_data['governance_grade'],
"governance_score": decision_data.get('governance_score', 0),
"governance_components": decision_data.get('governance_details', {}), # تفاصيل الـ 156 مؤشر
"entry_price": trade['entry_price'],
"exit_price": trade['exit_price'],
"profit_pct": true_net_pct,
"result": trade['result'],
"strategy_type": strat_type
}
asyncio.create_task(self.r2.append_governance_training_data(training_record))
except Exception as ge:
print(f"⚠️ [Learning Error] Failed to save governance training data: {ge}")
# ==========================================================
# 🧠 THE TACTICAL LEARNING LINK
# ==========================================================
if self.learning_hub:
asyncio.create_task(self.learning_hub.register_trade_outcome(trade))
# ✅ 3. إطلاق تحليل ما بعد الخروج (لتقييم الحارس)
# لاحظ: تقييم "الدخول" يتم عبر التذاكر المستقلة، بينما هذا لتقييم "الخروج".
self._launch_post_exit_analysis(symbol, exit_price, trade.get('exit_time'), entry_capital, ai_scores, trade)
self.latest_guardian_log = f"✅ Closed {symbol} ({reason})"
if symbol in self.sentry_tasks: self.sentry_tasks[symbol].cancel(); del self.sentry_tasks[symbol]
except Exception as e:
print(f"❌ [Exit Error] {e}"); traceback.print_exc()
if symbol not in self.open_positions: self.open_positions[symbol] = trade
async def force_exit_by_manager(self, symbol, reason):
p = await self.data_manager.get_latest_price_async(symbol)
async with self.execution_lock: await self._execute_exit(symbol, p, reason)
async def start_sentry_loops(self):
await self.ensure_active_guardians()
async def stop_sentry_loops(self):
self.running = False
for task in self.sentry_tasks.values(): task.cancel()