Tradtesting / smart_portfolio.py
Riy777's picture
Update smart_portfolio.py
01fca6c verified
raw
history blame
14.4 kB
# ==============================================================================
# 💼 smart_portfolio.py (V36.0 - GEM-Architect: Regime-Aligned Risk)
# ==============================================================================
# التحديثات:
# 1. إلغاء تحديد الاتجاه المستقل. الاعتماد الكلي على SystemLimits.CURRENT_REGIME.
# 2. إدارة الخانات الديناميكية (Dynamic Slots) بناءً على حالة السوق.
# 3. تعديل مضاعفات المخاطرة (Risk Multipliers) لتتوافق مع الـ DNA الحالي.
# ==============================================================================
import asyncio
import json
import httpx
import traceback
import pandas as pd
import pandas_ta as ta
from datetime import datetime, timedelta
from typing import Dict, Any, Tuple, Optional
# استيراد الدستور المركزي لقراءة حالة السوق
try:
from ml_engine.processor import SystemLimits
except ImportError:
# Fallback في حال التشغيل المنفصل
class SystemLimits:
CURRENT_REGIME = "RANGE"
class SmartPortfolio:
def __init__(self, r2_service, data_manager):
self.r2 = r2_service
self.data_manager = data_manager
# ⚙️ إعدادات المحفظة الأساسية
self.MIN_CAPITAL_FOR_SPLIT = 20.0
self.DAILY_LOSS_LIMIT_PCT = 0.20
# 📊 أوزان الثقة المركبة
self.WEIGHTS = {
"L2_TECHNICAL": 0.25,
"L3_ORACLE": 0.35,
"L4_SNIPER": 0.20,
"CONTEXT": 0.10,
"MARKET_MOOD": 0.10
}
# حالة السوق (تقرأ الآن من SystemLimits)
self.market_trend = "NEUTRAL" # للعرض فقط
self.fear_greed_index = 50
self.fear_greed_label = "Neutral"
self.capital_lock = asyncio.Lock()
# 📂 حالة المحفظة
self.state = {
"current_capital": 10.0,
"allocated_capital_usd": 0.0,
"session_start_balance": 10.0,
"last_session_reset": datetime.now().isoformat(),
"daily_net_pnl": 0.0,
"is_trading_halted": False,
"halt_reason": None
}
print("💼 [SmartPortfolio V36.0] Regime-Aligned Risk System Initialized.")
async def initialize(self):
await self._sync_state_from_r2()
await self._check_daily_reset()
# تشغيل مراقب الخوف والجشع فقط (أما الاتجاه فيأتي من العقل المركزي)
asyncio.create_task(self._market_monitor_loop())
# ==============================================================================
# 🦅 Market Monitor: Fear & Greed Only (Trend from Central)
# ==============================================================================
async def _market_monitor_loop(self):
"""مراقبة مؤشر الخوف والجشع فقط"""
print("🦅 [SmartPortfolio] Sentiment Sentinel Started.")
async with httpx.AsyncClient() as client:
while True:
try:
# تحديث العرض بناءً على النظام المركزي
regime = getattr(SystemLimits, 'CURRENT_REGIME', 'RANGE')
self.market_trend = regime # تحديث المتغير للعرض في الواجهة
# جلب مؤشر الخوف والجشع
try:
resp = await client.get("https://api.alternative.me/fng/?limit=1", timeout=10)
data = resp.json()
if data['data']:
self.fear_greed_index = int(data['data'][0]['value'])
self.fear_greed_label = data['data'][0]['value_classification']
except Exception:
pass
await asyncio.sleep(300) # كل 5 دقائق
except Exception as e:
print(f"⚠️ [Market Monitor] Error: {e}")
await asyncio.sleep(60)
# ==============================================================================
# 🧮 The Consensus Engine
# ==============================================================================
def _calculate_composite_confidence(self, signal_data: Dict[str, Any]) -> float:
try:
l2_score = float(signal_data.get('enhanced_final_score', 0.5))
oracle_conf = float(signal_data.get('confidence', 0.5))
sniper_score = float(signal_data.get('sniper_score', 0.5))
whale_impact = float(signal_data.get('whale_score', 0.0))
news_impact = float(signal_data.get('news_score', 0.0))
context_val = max(0.0, min(1.0, 0.5 + whale_impact + news_impact))
# حساب سكور المزاج بناءً على Regime المركزي
regime = getattr(SystemLimits, 'CURRENT_REGIME', 'RANGE')
mood_score = 0.5
if regime == "BULL": mood_score += 0.2
elif regime == "BEAR": mood_score -= 0.2
elif regime == "DEAD": mood_score -= 0.1
# دمج F&G (Contrarian Logic)
fg = self.fear_greed_index
if fg < 20: mood_score += 0.1 # خوف شديد = شراء
elif fg > 80: mood_score -= 0.1 # جشع شديد = حذر
mood_score = max(0.0, min(1.0, mood_score))
final_conf = (
(l2_score * self.WEIGHTS["L2_TECHNICAL"]) +
(oracle_conf * self.WEIGHTS["L3_ORACLE"]) +
(sniper_score * self.WEIGHTS["L4_SNIPER"]) +
(context_val * self.WEIGHTS["CONTEXT"]) +
(mood_score * self.WEIGHTS["MARKET_MOOD"])
)
return round(final_conf, 3)
except Exception:
return float(signal_data.get('confidence', 0.5))
# ==============================================================================
# 🧠 Core Logic: Entry Approval (Regime-Adaptive Risk)
# ==============================================================================
async def request_entry_approval(self, signal_data: Dict[str, Any], open_positions_count: int) -> Tuple[bool, Dict[str, Any]]:
"""
تطلب الموافقة مع مراعاة حالة السوق المركزية (Regime).
"""
async with self.capital_lock:
# 1. Circuit Breaker
if self.state["is_trading_halted"]:
return False, {"reason": f"Halted: {self.state['halt_reason']}"}
# 2. Daily Loss Limit
current_cap = float(self.state["current_capital"])
start_cap = float(self.state["session_start_balance"])
drawdown = (start_cap - current_cap) / start_cap if start_cap > 0 else 0
if drawdown >= self.DAILY_LOSS_LIMIT_PCT:
self.state["is_trading_halted"] = True
self.state["halt_reason"] = "Daily Loss Limit (-20%)"
await self._save_state_to_r2()
return False, {"reason": "Daily Limit Hit"}
# 3. Regime-Based Slots & Risk
# قراءة الحالة من النظام المركزي
regime = getattr(SystemLimits, 'CURRENT_REGIME', 'RANGE')
# تحديد الحد الأقصى للصفقات بناءً على الحالة
if regime == "BULL":
max_slots = 6 # هجومي
risk_factor_base = 1.2
elif regime == "BEAR":
max_slots = 3 # دفاعي جداً
risk_factor_base = 0.5
elif regime == "DEAD":
max_slots = 2 # حذر جداً
risk_factor_base = 0.4
else: # RANGE
max_slots = 4 # متوازن
risk_factor_base = 0.8
# تعديل للخزين الصغير
if current_cap < self.MIN_CAPITAL_FOR_SPLIT:
max_slots = min(max_slots, 2) # لا نفتح الكثير إذا الرصيد قليل
if open_positions_count >= max_slots:
return False, {"reason": f"Max slots reached for {regime} ({open_positions_count}/{max_slots})"}
# 4. Free Capital Check
allocated = float(self.state.get("allocated_capital_usd", 0.0))
free_capital = max(0.0, current_cap - allocated)
if free_capital < 5.0:
return False, {"reason": f"Insufficient Free Capital (${free_capital:.2f})"}
# 5. Position Sizing
remaining_slots = max_slots - open_positions_count
base_allocation = 0.0
if current_cap >= self.MIN_CAPITAL_FOR_SPLIT:
# تقسيم ذكي: نحاول الحفاظ على توازن، لكن نأخذ حصة أكبر في الـ Bull
target_size = current_cap / max_slots
base_allocation = min(target_size, free_capital)
else:
base_allocation = free_capital * 0.95 # All-in تقريباً للحسابات الصغيرة
# 6. Consensus & Risk Multiplier
system_confidence = self._calculate_composite_confidence(signal_data)
# تعديل المخاطرة بناءً على الثقة + الحالة
risk_multiplier = 1.0
if system_confidence >= 0.80: risk_multiplier = 1.2
elif system_confidence < 0.60: risk_multiplier = 0.7
# دمج عامل الحالة (Regime Factor)
final_risk_mult = risk_multiplier * risk_factor_base
final_size_usd = base_allocation * final_risk_mult
final_size_usd = min(final_size_usd, free_capital, current_cap * 0.98)
if final_size_usd < 5.0:
return False, {"reason": "Calculated size too small"}
# 7. Dynamic TP Selection
entry_price = float(signal_data.get('sniper_entry_price') or signal_data.get('current_price'))
tp_map = signal_data.get('tp_map', {})
# أهداف طموحة في Bull، ومحافظة في Bear
if regime == "BULL" and system_confidence >= 0.75:
selected_tp = tp_map.get('TP3') or tp_map.get('TP4')
target_label = "TP3/4 (Bull Run)"
elif regime == "BEAR":
selected_tp = tp_map.get('TP1') # خروج سريع في الهبوط
target_label = "TP1 (Scalp)"
else:
selected_tp = tp_map.get('TP2')
target_label = "TP2"
if not selected_tp or selected_tp <= entry_price: selected_tp = entry_price * 1.02
return True, {
"approved_size_usd": float(final_size_usd),
"approved_tp": float(selected_tp),
"target_label": target_label,
"system_confidence": system_confidence,
"risk_multiplier": final_risk_mult,
"market_mood": f"{regime} | FG:{self.fear_greed_index}"
}
# ==============================================================================
# 🔒 Capital Tracking
# ==============================================================================
async def register_new_position(self, size_usd: float):
async with self.capital_lock:
self.state["allocated_capital_usd"] = float(self.state.get("allocated_capital_usd", 0.0)) + float(size_usd)
if self.state["allocated_capital_usd"] > self.state["current_capital"]:
self.state["allocated_capital_usd"] = self.state["current_capital"]
await self._save_state_to_r2()
async def register_closed_position(self, released_capital_usd: float, net_pnl: float, fees: float):
async with self.capital_lock:
current_allocated = float(self.state.get("allocated_capital_usd", 0.0))
self.state["allocated_capital_usd"] = max(0.0, current_allocated - released_capital_usd)
net_impact = net_pnl - fees
self.state["current_capital"] += net_impact
self.state["daily_net_pnl"] += net_impact
# Check Daily Limit after trade
start = self.state["session_start_balance"]
dd = (start - self.state["current_capital"]) / start if start > 0 else 0
if dd >= self.DAILY_LOSS_LIMIT_PCT:
self.state["is_trading_halted"] = True
self.state["halt_reason"] = "Daily Limit Hit After Exit"
await self._save_state_to_r2()
# ==============================================================================
# 💾 Utilities
# ==============================================================================
async def _check_daily_reset(self):
last_reset = datetime.fromisoformat(self.state.get("last_session_reset", datetime.now().isoformat()))
if datetime.now() - last_reset > timedelta(hours=24):
self.state["session_start_balance"] = self.state["current_capital"]
self.state["daily_net_pnl"] = 0.0
self.state["is_trading_halted"] = False
self.state["last_session_reset"] = datetime.now().isoformat()
await self._save_state_to_r2()
async def _sync_state_from_r2(self):
try:
data = await self.r2.get_file_async("smart_portfolio_state.json")
if data:
loaded = json.loads(data)
self.state.update(loaded)
else:
old = await self.r2.get_portfolio_state_async()
if old:
self.state["current_capital"] = float(old.get("current_capital_usd", 10.0))
self.state["session_start_balance"] = self.state["current_capital"]
except: pass
async def _save_state_to_r2(self):
try:
await self.r2.upload_json_async(self.state, "smart_portfolio_state.json")
except: pass