Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
87795a6 verified
# ==============================================================================
# ๐Ÿ›ก๏ธ trade_manager.py (V82.5 - GEM-Architect: Self-Healing Accounting)
# ==============================================================================
# - Fix: Forces Atomic R2 updates on trade exit.
# - Feature: "Self-Healing" logic detects accounting drift and auto-corrects stats.
# - Fix: Ensures 'smart_portfolio' state is persisted immediately after exit.
# ==============================================================================
import asyncio
import uuid
import time
import traceback
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
# --- Core Modules ---
from smart_portfolio import SmartPortfolio
from ml_engine.processor import SystemLimits
from governance_engine import GovernanceEngine
# --- Logger Setup ---
logger = logging.getLogger("TradeManager")
class TradeManager:
"""
The Central Nervous System of the Trading Operation.
Manages Entries, Exits, Position Monitoring, State Persistence, and Learning Feedback.
"""
def __init__(self, r2_service, data_manager, processor):
self.r2 = r2_service
self.data_manager = data_manager
self.processor = processor
# --- External Connections ---
self.learning_hub = None
self.agency_director = None
# --- Internal Engines ---
self.smart_portfolio = SmartPortfolio(r2_service, data_manager)
self.governance = GovernanceEngine()
# --- State Management ---
self.open_positions: Dict[str, Dict] = {}
self.watchlist: Dict[str, Dict] = {}
self.sentry_tasks: Dict[str, asyncio.Task] = {}
# --- Advanced Tracking ---
self.pending_verifications = []
self.shadow_watchlist = []
# --- System Flags ---
self.running = True
self.execution_lock = asyncio.Lock()
self.latest_guardian_log = "๐Ÿ›ก๏ธ Guardian & Governance Systems Online."
# --- Constants ---
self.FEE_RATE = 0.001 # 0.1% per trade
# --- Statistics Containers ---
self.ai_stats = {
"hybrid": {"total":0, "good":0, "saved":0.0, "missed":0.0},
"crash": {"total":0, "good":0, "saved":0.0, "missed":0.0},
"giveback": {"total":0, "good":0, "saved":0.0, "missed":0.0},
"stagnation": {"total":0, "good":0, "saved":0.0, "missed":0.0}
}
self.type_stats = {
"SAFE_BOTTOM": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0},
"MOMENTUM_LAUNCH": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0},
"ACCUMULATION_SQUEEZE": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0},
"NORMAL": {"wins":0, "losses":0, "profit_usd":0.0, "loss_usd":0.0}
}
print(f"๐Ÿ›ก๏ธ [TradeManager V82.5] Accounting Integrity Systems Active.")
# ==========================================================================
# ๐Ÿ›๏ธ SECTION 1: SELECTION & GOVERNANCE
# ==========================================================================
async def select_and_execute_best_signal(self, candidates: List[Dict]):
if not candidates: return
print(f"๐Ÿ”Ž [TradeManager] Validating {len(candidates)} candidates from Sniper L4...")
best_candidate = None
for cand in candidates:
symbol = cand['symbol']
# 1. Sniper Threshold
s_score = cand.get('sniper_score', 0.0)
entry_threshold = SystemLimits.L4_ENTRY_THRESHOLD
if s_score < entry_threshold:
print(f" ๐Ÿ“‰ {symbol}: Sniper Score {s_score:.2f} < {entry_threshold} (Skipped)")
self._add_to_shadow_watchlist(cand, f"Sniper Low Score ({s_score:.2f})")
continue
# 2. Portfolio Constraints
if symbol in self.open_positions:
print(f" โš ๏ธ {symbol}: Already in position.")
continue
if not await self.smart_portfolio.can_trade():
print(f" โ›” Portfolio Halted or Max Trades Reached.")
return
# 3. Governance Engine
print(f" ๐Ÿ›๏ธ {symbol}: Consulting Governance Engine...")
try:
# Governance needs moderate history
t1m = await self.data_manager.get_latest_ohlcv(symbol, '1m', 150)
ob = await self.data_manager.get_order_book_snapshot(symbol)
t15m = cand.get('ohlcv', {}).get('15m')
if not t15m:
t15m = await self.data_manager.get_latest_ohlcv(symbol, '15m', 150)
ohlcv_dict = {
'1m': t1m,
'5m': await self.data_manager.get_latest_ohlcv(symbol, '5m', 150),
'15m': t15m,
'1h': cand.get('ohlcv', {}).get('1h', []),
'daily': await self.data_manager.get_latest_ohlcv(symbol, '1d', 60)
}
gov_decision = await self.governance.evaluate_trade(
symbol=symbol,
ohlcv_data=ohlcv_dict,
order_book=ob,
strategy_type=cand.get('strategy_type', 'NORMAL')
)
cand['decision_data'] = gov_decision
gov_status = gov_decision['status']
gov_score = gov_decision['governance_score']
if gov_status == "REJECTED":
print(f" โ›” Governance Veto: {gov_decision.get('reason')} (Score: {gov_score:.1f})")
self._add_to_shadow_watchlist(cand, f"Governance Veto: {gov_decision.get('reason')}")
continue
print(f" โœ… Governance APPROVED (Score: {gov_score:.1f})")
best_candidate = cand
break
except Exception as e:
print(f" โŒ Governance Error for {symbol}: {e}")
traceback.print_exc()
continue
if best_candidate:
await self._execute_entry(best_candidate)
else:
print("-> ๐Ÿ“‰ No candidates passed final Governance checks.")
# ==========================================================================
# ๐Ÿš€ SECTION 2: EXECUTION
# ==========================================================================
async def _execute_entry(self, candidate):
symbol = candidate['symbol']
async with self.execution_lock:
allocation = self.smart_portfolio.allocate_capital(candidate)
amount_usd = allocation['amount_usd']
if amount_usd < 5.0:
print(f"โš ๏ธ Insufficient capital allocated: ${amount_usd:.2f}")
return
print(f"๐Ÿš€ [EXECUTING] BUY {symbol} | Allocated: ${amount_usd:.2f}...")
entry_price = float(candidate.get('sniper_entry_price', 0) or candidate.get('current_price', 0))
if entry_price <= 0:
entry_price = await self.data_manager.get_latest_price_async(symbol)
atr = candidate.get('atr_value', entry_price * 0.02)
if atr == 0: atr = entry_price * 0.02
gov_data = candidate.get('decision_data', {})
reward_ratio = gov_data.get('reward_ratio', 2.0)
sl_price = entry_price - (atr * 1.5)
tp_price = entry_price + (atr * 1.5 * reward_ratio)
if sl_price <= 0: sl_price = entry_price * 0.95
trade_id = str(uuid.uuid4())[:8]
trade_record = {
"trade_id": trade_id,
"symbol": symbol,
"entry_time": datetime.now().isoformat(),
"entry_price": entry_price,
"entry_capital": amount_usd,
"tp_price": tp_price,
"sl_price": sl_price,
"quantity": amount_usd / entry_price,
"strategy": candidate.get('strategy_type', 'NORMAL'),
"decision_data": gov_data,
"sniper_data": candidate.get('sniper_data', {}),
"ai_scores": {
"l2_score": candidate.get('l2_score', 0),
"oracle": candidate.get('oracle_score', 0),
"pattern": candidate.get('pattern_score', 0),
"mc": candidate.get('mc_score', 0)
},
"highest_price": entry_price,
"status": "OPEN",
"logs": [f"Trade opened at {entry_price}"]
}
self.open_positions[symbol] = trade_record
self.smart_portfolio.register_trade_entry(amount_usd)
# FORCE SYNC ON ENTRY
await self._save_active_trades()
if hasattr(self.smart_portfolio, '_save_state_to_r2'):
await self.smart_portfolio._save_state_to_r2()
if symbol not in self.sentry_tasks:
self.sentry_tasks[symbol] = asyncio.create_task(self._sentry_guard(symbol))
print(f"โœ… Trade {trade_id} Active. TP: {tp_price:.4f} | SL: {sl_price:.4f}")
if self.agency_director:
event_payload = {
"type": "TRADE_OPEN",
"symbol": symbol,
"price": entry_price,
"size": amount_usd,
"reason": gov_data.get('reason', 'Sniper Signal')
}
asyncio.create_task(self.agency_director.process_event("TRADE_OPEN", event_payload))
# ==========================================================================
# ๐Ÿ›ก๏ธ SECTION 3: MONITORING (The Sentry)
# ==========================================================================
async def _sentry_guard(self, symbol):
"""
Continuous monitoring loop for an active trade.
Updated V82.5: Fetches DEEP history for EMA200/V3 Model stability.
"""
logger.info(f"๐Ÿ›ก๏ธ Sentry started for {symbol}")
try:
while symbol in self.open_positions and self.running:
trade = self.open_positions[symbol]
# 1. Update Market Data
curr_price = await self.data_manager.get_latest_price_async(symbol)
if curr_price <= 0:
await asyncio.sleep(5)
continue
if curr_price > trade['highest_price']:
trade['highest_price'] = curr_price
trade['current_price'] = curr_price
# 2. Check Hard Exits
if curr_price >= trade['tp_price']:
await self._execute_exit(symbol, curr_price, "TAKE_PROFIT_HARD")
break
if curr_price <= trade['sl_price']:
await self._execute_exit(symbol, curr_price, "STOP_LOSS_HARD")
break
# 3. Consult Guardians (AI Exit Logic)
# โšก CRITICAL FIX: Ensure enough data for EMA_200 (Requires >200 candles)
t1m = await self.data_manager.get_latest_ohlcv(symbol, '1m', 500)
t5m = await self.data_manager.get_latest_ohlcv(symbol, '5m', 300)
t15m = await self.data_manager.get_latest_ohlcv(symbol, '15m', 300)
if not t1m or len(t1m) < 100:
await asyncio.sleep(5)
continue
start_time = datetime.fromisoformat(trade['entry_time'])
duration = (datetime.now() - start_time).total_seconds() / 60
trade_context = trade.copy()
trade_context['time_in_trade_mins'] = duration
trade_context['volume_30m_usd'] = 0
guardian_res = self.processor.consult_guardians(
symbol, t1m, t5m, t15m, trade_context
)
action = guardian_res.get('action', 'HOLD')
if action in ['EXIT_HARD', 'EXIT_SOFT']:
reason = guardian_res.get('reason', 'Guardian Exit')
await self._execute_exit(symbol, curr_price, f"GUARDIAN_{action}: {reason}")
break
# Live X-Ray Logging
scores = guardian_res.get('scores', {})
probs = guardian_res.get('probs', {})
v2 = scores.get('v2', 0.0)
v3 = scores.get('v3', 0.0)
p_crash = probs.get('crash', 0.0)
p_giveback = probs.get('giveback', 0.0)
p_stag = probs.get('stagnation', 0.0)
pnl_pct = ((curr_price - trade['entry_price']) / trade['entry_price']) * 100
self.latest_guardian_log = (
f"๐Ÿ›ก๏ธ {symbol}: {action} ({pnl_pct:+.2f}%) | "
f"Hybrid[V2:{v2:.2f}|V3:{v3:.2f}] | "
f"Hydra[๐Ÿ’ฅ:{p_crash:.2f}|๐Ÿ’ธ:{p_giveback:.2f}|๐ŸŒ:{p_stag:.2f}]"
)
await asyncio.sleep(3)
except asyncio.CancelledError:
logger.info(f"๐Ÿ›ก๏ธ Sentry for {symbol} stopped.")
except Exception as e:
logger.error(f"โŒ Sentry Error {symbol}: {e}")
traceback.print_exc()
await asyncio.sleep(10)
# ==========================================================================
# ๐Ÿšช SECTION 4: EXITS & POST-ANALYSIS (ATOMIC & MATH-VERIFIED)
# ==========================================================================
async def _execute_exit(self, symbol, exit_price, reason):
async with self.execution_lock:
if symbol not in self.open_positions: return
# 1. Retrieve & Calculate
trade = self.open_positions[symbol]
entry_price = trade['entry_price']
quantity = trade['quantity']
entry_capital = trade['entry_capital']
raw_pnl = (exit_price - entry_price) * quantity
total_fees = (entry_capital * self.FEE_RATE) + ((exit_price * quantity) * self.FEE_RATE)
net_pnl = raw_pnl - total_fees
pnl_pct = (net_pnl / entry_capital) * 100
print(f"๐Ÿ’ฐ [EXIT] {symbol} | Net PnL: ${net_pnl:.4f} (Fees: ${total_fees:.4f}) | {reason}")
# 2. Update Trade Record
trade['exit_price'] = exit_price
trade['exit_time'] = datetime.now().isoformat()
trade['net_pnl'] = net_pnl
trade['exit_reason'] = reason
trade['status'] = "CLOSED"
trade['result'] = "WIN" if net_pnl > 0 else "LOSS"
# 3. ATOMIC R2 UPDATE SEQUENCE (WITH MATH INTEGRITY CHECK)
try:
# Step A: Save History First
await self.r2.save_trade_history_async(trade)
# Step B: Remove from Active & Sync
self.open_positions.pop(symbol)
await self._save_active_trades()
# Step C: Update Internal Wallet (Source of Truth for Capital)
self.smart_portfolio.register_trade_exit(
capital_returned=(entry_capital + net_pnl),
net_pnl=net_pnl,
is_win=(net_pnl > 0)
)
# Step D: Force R2 Sync for Portfolio State
if hasattr(self.smart_portfolio, '_save_state_to_r2'):
await self.smart_portfolio._save_state_to_r2()
# Step E: Update Global Counters & ENFORCE MATH INTEGRITY
stats = await self.r2.get_portfolio_state_async()
# 1. Update counters based on current trade
stats['total_trades'] = int(stats.get('total_trades', 0)) + 1
if net_pnl > 0:
stats['winning_trades'] = int(stats.get('winning_trades', 0)) + 1
else:
stats['losing_trades'] = int(stats.get('losing_trades', 0)) + 1
# 2. Get ACTUAL capital from SmartPortfolio (The Truth)
actual_current_capital = self.smart_portfolio.state.get('current_capital', 10.0)
stats['current_capital_usd'] = actual_current_capital
# 3. ACCOUNTING EQUATION CHECK: Current = Initial + Profits - Losses
initial_cap = stats.get('initial_capital_usd', 10.0)
current_total_profit = float(stats.get('total_profit_usd', 0.0)) + (net_pnl if net_pnl > 0 else 0.0)
current_total_loss = float(stats.get('total_loss_usd', 0.0)) + (abs(net_pnl) if net_pnl < 0 else 0.0)
# Calculate discrepancy
calculated_capital = initial_cap + current_total_profit - current_total_loss
discrepancy = actual_current_capital - calculated_capital
# 4. SELF-HEALING LOGIC
if abs(discrepancy) > 0.0001:
print(f"โš ๏ธ [Math Fix] Drift detected: ${discrepancy:.4f}. Auto-correcting stats...")
# If actual < calculated, we have missing losses (fees/slippage)
if discrepancy < 0:
current_total_loss += abs(discrepancy)
# If actual > calculated, we have missing profits
else:
current_total_profit += discrepancy
# Set corrected values
stats['total_profit_usd'] = current_total_profit
stats['total_loss_usd'] = current_total_loss
# Recalculate Win Rate
total = stats['total_trades']
wins = stats['winning_trades']
stats['win_rate'] = (wins / total * 100) if total > 0 else 0.0
await self.r2.save_portfolio_state_async(stats)
print(f"๐Ÿ“Š [Stats] Integrity Verified. Cap: ${actual_current_capital:.2f}")
self._update_stats(trade, reason, net_pnl)
if symbol in self.sentry_tasks:
self.sentry_tasks[symbol].cancel()
del self.sentry_tasks[symbol]
self.latest_guardian_log = f"โœ… Closed {symbol} ({reason})"
except Exception as e:
print(f"โŒ Critical Save Error during Exit for {symbol}: {e}")
traceback.print_exc()
if self.agency_director:
event_payload = {
"type": "TRADE_CLOSED",
"symbol": symbol,
"pnl_usd": net_pnl,
"pnl_pct": pnl_pct,
"reason": reason,
"duration_mins": (datetime.now() - datetime.fromisoformat(trade['entry_time'])).total_seconds() / 60
}
asyncio.create_task(self.agency_director.process_event("TRADE_CLOSED", event_payload))
self._launch_post_exit_analysis(symbol, exit_price, trade['exit_time'], entry_capital, trade.get('ai_scores', {}), trade)
def _update_stats(self, trade, reason, net_pnl):
strat_type = trade.get('strategy', 'NORMAL')
if strat_type in self.type_stats:
if net_pnl > 0:
self.type_stats[strat_type]['wins'] += 1
self.type_stats[strat_type]['profit_usd'] += net_pnl
else:
self.type_stats[strat_type]['losses'] += 1
self.type_stats[strat_type]['loss_usd'] += abs(net_pnl)
if "GUARDIAN" in reason:
key = "hybrid"
if "Crash" in reason: key = "crash"
elif "Giveback" in reason: key = "giveback"
elif "Stagnation" in reason: key = "stagnation"
self.ai_stats[key]["total"] += 1
if net_pnl > 0:
self.ai_stats[key]["good"] += 1
self.ai_stats[key]["saved"] += net_pnl
else:
self.ai_stats[key]["missed"] += abs(net_pnl)
asyncio.create_task(self.r2.save_guardian_stats_async(self.ai_stats))
# ==========================================================================
# ๐Ÿง  SECTION 5: LEARNING & SHADOW TRACKING
# ==========================================================================
def _launch_post_exit_analysis(self, symbol, exit_price, exit_time, capital, ai_scores, full_trade_record):
if not self.learning_hub: return
try:
sample = {
"symbol": symbol,
"entry_time": full_trade_record['entry_time'],
"exit_time": exit_time,
"final_pnl": full_trade_record['net_pnl'],
"ai_inputs": ai_scores,
"market_conditions": full_trade_record.get('decision_data', {})
}
if hasattr(self.learning_hub, 'ingest_trade_result'):
self.learning_hub.ingest_trade_result(sample)
print(f"๐Ÿง  [Learning] Ingested trade result for {symbol}.")
except Exception as e:
print(f"โš ๏ธ Learning Ingest Error: {e}")
def _add_to_shadow_watchlist(self, candidate, reason):
entry = {
"symbol": candidate['symbol'],
"time": time.time(),
"price_at_rejection": candidate.get('current_price', 0),
"reason": reason,
"scores": {
"l2": candidate.get('l2_score', 0),
"sniper": candidate.get('sniper_score', 0)
}
}
self.shadow_watchlist.append(entry)
if len(self.shadow_watchlist) > 100:
self.shadow_watchlist.pop(0)
# ==========================================================================
# ๐Ÿ’พ SECTION 6: STATE PERSISTENCE & UTILS
# ==========================================================================
async def _save_active_trades(self):
await self.r2.save_active_trades_async(self.open_positions)
async def sync_internal_state_with_r2(self):
print("๐Ÿ›ก๏ธ [TradeManager] Syncing State from R2...")
saved_trades = await self.r2.load_active_trades_async()
if saved_trades:
self.open_positions = saved_trades
print(f" -> [Sync] Recovered {len(self.open_positions)} active trades.")
for sym in self.open_positions:
if sym not in self.sentry_tasks:
self.sentry_tasks[sym] = asyncio.create_task(self._sentry_guard(sym))
else:
self.open_positions = {}
print(" -> [Sync] No active trades found.")
g_stats = await self.r2.get_guardian_stats_async()
if g_stats:
self.ai_stats = g_stats
print(" -> [Sync] Guardian Stats loaded.")
await self.smart_portfolio.sync_state()
async def force_exit_by_manager(self, symbol, reason):
p = await self.data_manager.get_latest_price_async(symbol)
await self._execute_exit(symbol, p, reason)
async def ensure_active_guardians(self):
active = [t for t in self.sentry_tasks.values() if not t.done()]
return f"Active Guardians: {len(active)}"
async def initialize_sentry_exchanges(self):
pass
async def start_sentry_loops(self):
await self.ensure_active_guardians()
async def stop_sentry_loops(self):
self.running = False
for t in self.sentry_tasks.values(): t.cancel()