Tradgptbacktest / learning_hub /adaptive_hub.py
Riy777's picture
Update learning_hub/adaptive_hub.py
35067b1 verified
# ==============================================================================
# 🧠 learning_hub/adaptive_hub.py
# (V61.1 - GEM-Architect: Persistence Fix)
# ==============================================================================
import json
import asyncio
import numpy as np
import traceback
from collections import deque
from typing import Dict, Any, List
# محاولة استيراد SystemLimits لتحديث النظام الحي
try:
from ml_engine.processor import SystemLimits
except ImportError:
SystemLimits = None
class StrategyDNA:
def __init__(self, name, model_weights, ob_settings, filters, guard_settings=None, backtest_performance=None):
self.name = name
# 1. Base Parameters (Theta Base) - From Strategic Backtest
self.base_filters = filters
self.base_guards = guard_settings if guard_settings else {}
# 2. Deltas (The Layers) - From Periodic & Fast Learners
self.delta_fast = {k: 0.0 for k in filters.keys()}
self.delta_weekly = {k: 0.0 for k in filters.keys()}
self.delta_monthly = {k: 0.0 for k in filters.keys()}
self.delta_guards_fast = {k: 0.0 for k in self.base_guards.keys()}
self.model_weights = model_weights
self.ob_settings = ob_settings
self.backtest_performance = backtest_performance
self.stats = {"wins": 0, "losses": 0, "win_rate": 0.0}
# Trade Buffer for Fast Learner (Batch 100)
self.trade_buffer = []
def get_final_filters(self):
""" Calculate Final Theta: Base + Fast + Weekly + Monthly """
final = {}
for k, v in self.base_filters.items():
d_fast = self.delta_fast.get(k, 0.0)
d_week = self.delta_weekly.get(k, 0.0)
d_month = self.delta_monthly.get(k, 0.0)
val = v + d_fast + d_week + d_month
final[k] = max(0.1, min(0.99, val)) # Clamping
return final
def get_final_guards(self):
final = {}
for k, v in self.base_guards.items():
# For guards, we rely on Base updates mostly, but structure supports delta
d_fast = self.delta_guards_fast.get(k, 0.0)
val = v + d_fast
final[k] = max(0.1, min(0.99, val))
return final
def to_dict(self):
return {
"name": self.name,
"base_filters": self.base_filters,
"base_guards": self.base_guards,
"delta_fast": self.delta_fast,
"delta_weekly": self.delta_weekly,
"delta_monthly": self.delta_monthly,
"delta_guards_fast": self.delta_guards_fast,
"model_weights": self.model_weights,
"ob_settings": self.ob_settings,
"backtest_performance": self.backtest_performance,
"stats": self.stats,
# ✅ FIX: Save the trade buffer to R2
"trade_buffer": self.trade_buffer
}
class AdaptiveHub:
def __init__(self, r2_service=None):
self.r2 = r2_service
self.dna_file_key = "learning/strategic_dna_v61_context.json"
self.current_market_regime = "RANGE"
self.strategies: Dict[str, StrategyDNA] = {}
print("🧠 [AdaptiveHub V61.1] Persistence & Context Engine Ready.")
def get_learning_progress(self) -> str:
"""يعيد نسبة التقدم في التعلم السريع"""
if self.current_market_regime not in self.strategies: return "0/100"
dna = self.strategies[self.current_market_regime]
count = len(dna.trade_buffer)
return f"{count}/100"
async def initialize(self):
try:
if self.r2:
data_bytes = await self.r2.get_file_async(self.dna_file_key)
if data_bytes:
saved_data = json.loads(data_bytes)
self._load_from_dict(saved_data)
print(" 📂 Loaded DNA from R2.")
else:
self._create_default_dna()
print(" ⚠️ No R2 file. Created Golden Defaults.")
else:
self._create_default_dna()
self._inject_current_parameters()
except Exception as e:
print(f" ❌ Init Error: {e}")
self._create_default_dna()
def _create_default_dna(self):
"""
💎 GEM-Architect: الحقن المباشر للقيم الذهبية (Golden Values)
"""
# 1. BULL MARKET (Golden Config)
d_guards_bull = {"hydra_crash": 0.60, "hydra_giveback": 0.60, "legacy_v2": 0.85}
d_filters_bull = {"l1_min_score": 0.60, "l3_oracle_thresh": 0.55, "l4_sniper_thresh": 0.50}
self.strategies["BULL"] = StrategyDNA(
"BULL",
{"titan": 0.50, "structure": 0.10},
{"wall_ratio_limit": 0.60},
d_filters_bull,
d_guards_bull
)
# 2. BEAR MARKET (Golden Config)
d_guards_bear = {"hydra_crash": 0.60, "hydra_giveback": 0.60, "legacy_v2": 0.85}
d_filters_bear = {"l1_min_score": 0.60, "l3_oracle_thresh": 0.75, "l4_sniper_thresh": 0.30}
self.strategies["BEAR"] = StrategyDNA(
"BEAR",
{"titan": 0.10, "structure": 0.10},
{"wall_ratio_limit": 0.30},
d_filters_bear,
d_guards_bear
)
# 3. RANGE/DEAD (Conservative Defaults)
d_guards_def = {"hydra_crash": 0.80, "hydra_giveback": 0.70, "legacy_v2": 0.95}
d_filters_range = {"l1_min_score": 0.65, "l3_oracle_thresh": 0.55, "l4_sniper_thresh": 0.40}
self.strategies["RANGE"] = StrategyDNA(
"RANGE",
{"titan": 0.30, "structure": 0.30},
{"wall_ratio_limit": 0.40},
d_filters_range,
d_guards_def
)
d_filters_dead = {"l1_min_score": 0.80, "l3_oracle_thresh": 0.80, "l4_sniper_thresh": 0.50}
self.strategies["DEAD"] = StrategyDNA(
"DEAD",
{"titan": 0.10, "structure": 0.10},
{"wall_ratio_limit": 0.20},
d_filters_dead,
{"hydra_crash": 0.50, "hydra_giveback": 0.50, "legacy_v2": 0.80}
)
def _load_from_dict(self, data):
for key, val in data.get("strategies", {}).items():
filters = val.get("base_filters", val.get("filters", {}))
guards = val.get("base_guards", val.get("guard_settings", {}))
dna = StrategyDNA(
val["name"], val["model_weights"], val["ob_settings"], filters, guards,
val.get("backtest_performance", None)
)
dna.delta_fast = val.get("delta_fast", dna.delta_fast)
dna.delta_weekly = val.get("delta_weekly", dna.delta_weekly)
dna.delta_monthly = val.get("delta_monthly", dna.delta_monthly)
dna.stats = val.get("stats", {"wins":0, "losses":0})
# ✅ FIX: Load the trade buffer so progress is not lost (0/100)
dna.trade_buffer = val.get("trade_buffer", [])
self.strategies[key] = dna
self.current_market_regime = data.get("current_regime", "RANGE")
# ============================================================
# 🧬 CONTEXT API (Per-Asset Injection)
# ============================================================
def get_regime_config(self, regime_name: str) -> Dict[str, Any]:
"""
💎 Returns the specific configuration (DNA) for a given market regime.
Used to inject 'Per-Asset' limits dynamically.
"""
target_regime = regime_name if regime_name in self.strategies else "RANGE"
dna = self.strategies[target_regime]
filters = dna.get_final_filters()
guards = dna.get_final_guards()
mw = dna.model_weights
return {
"regime": target_regime,
# Layer 2 Weights
"w_titan": mw.get("titan", 0.4),
"w_patt": mw.get("structure", 0.3),
# Layer 3 & 4 Thresholds
"l1_min_score": filters.get("l1_min_score", 0.65),
"l3_oracle_thresh": filters.get("l3_oracle_thresh", 0.65),
"l4_sniper_thresh": filters.get("l4_sniper_thresh", 0.40),
"l4_ob_wall_ratio": dna.ob_settings.get("wall_ratio_limit", 0.4),
# Guard Thresholds
"hydra_crash": guards.get('hydra_crash', 0.85),
"hydra_giveback": guards.get('hydra_giveback', 0.70),
"legacy_v2": guards.get('legacy_v2', 0.95)
}
# ============================================================
# 🧪 STRATEGIC BACKTEST LINK (Base Updater)
# ============================================================
def submit_challenger(self, regime: str, new_config: dict, new_stats: dict) -> bool:
if regime not in self.strategies: return False
champion = self.strategies[regime]
print(f" ⚖️ [Strategic Update] Updating BASE DNA for {regime}...")
# Update Base
champion.model_weights['titan'] = new_config['w_titan']
champion.model_weights['structure'] = new_config['w_struct']
champion.base_filters['l1_min_score'] = new_config['thresh']
champion.base_filters['l3_oracle_thresh'] = new_config.get('oracle_thresh', 0.65)
champion.base_filters['l4_sniper_thresh'] = new_config.get('sniper_thresh', 0.40)
champion.base_guards['hydra_crash'] = new_config['hydra_thresh']
champion.base_guards['hydra_giveback'] = new_config['hydra_thresh']
champion.base_guards['legacy_v2'] = new_config['legacy_thresh']
champion.backtest_performance = new_stats
# Reset Deltas
for k in champion.delta_weekly: champion.delta_weekly[k] = 0.0
for k in champion.delta_monthly: champion.delta_monthly[k] = 0.0
for k in champion.delta_fast: champion.delta_fast[k] = 0.0
# Save & Inject
asyncio.create_task(self._save_state_to_r2())
self._inject_current_parameters()
return True
# ============================================================
# 🏎️ THE FAST LEARNER (Every 100 Trades)
# ============================================================
async def register_trade_outcome(self, trade_data: Dict[str, Any]):
if self.current_market_regime not in self.strategies: return
dna = self.strategies[self.current_market_regime]
dna.trade_buffer.append(trade_data)
# حفظ التقدم في كل مرة تضاف صفقة (اختياري للأمان) أو الاعتماد على الحفظ الدوري
# هنا سنعتمد على الحفظ عند الاكتمال أو عند الخروج الآمن
if len(dna.trade_buffer) >= 100:
print(f"🎓 [Fast Learner] Batch of 100 trades reached. Analyzing...")
self._process_fast_learning_batch(dna)
dna.trade_buffer.clear()
await self._save_state_to_r2()
self._inject_current_parameters()
else:
# حفظ جزئي كل صفقة لضمان عدم الضياع عند الريستارت
asyncio.create_task(self._save_state_to_r2())
def _process_fast_learning_batch(self, dna: StrategyDNA):
models = ['l3_oracle_thresh', 'l4_sniper_thresh']
performance = {m: {'wins': 0, 'total': 0, 'pnl_sum': 0.0} for m in models}
for trade in dna.trade_buffer:
pnl = trade.get('profit_pct', 0.0)
is_win = pnl > 0
comps = trade.get('decision_data', {}).get('components', {})
# Oracle Check
oracle_score = float(comps.get('oracle_conf', 0) or trade.get('oracle_conf', 0))
if oracle_score >= (dna.get_final_filters()['l3_oracle_thresh'] - 0.05):
performance['l3_oracle_thresh']['total'] += 1
if is_win: performance['l3_oracle_thresh']['wins'] += 1
performance['l3_oracle_thresh']['pnl_sum'] += pnl
# Sniper Check
sniper_score = float(comps.get('sniper_score', 0) or trade.get('sniper_score', 0))
if sniper_score >= (dna.get_final_filters()['l4_sniper_thresh'] - 0.05):
performance['l4_sniper_thresh']['total'] += 1
if is_win: performance['l4_sniper_thresh']['wins'] += 1
performance['l4_sniper_thresh']['pnl_sum'] += pnl
TARGET_HIT_LOW = 0.40
TARGET_HIT_HIGH = 0.65
for model, data in performance.items():
if data['total'] < 20: continue
win_rate = data['wins'] / data['total']
avg_pnl = data['pnl_sum'] / data['total']
if win_rate < TARGET_HIT_LOW and avg_pnl < 0:
dna.delta_fast[model] = min(dna.delta_fast[model] + 0.01, 0.05)
elif win_rate > TARGET_HIT_HIGH and avg_pnl > 0.5:
dna.delta_fast[model] = max(dna.delta_fast[model] - 0.01, -0.05)
# ============================================================
# 🗓️ PERIODIC UPDATES (Weekly/Monthly)
# ============================================================
def update_periodic_delta(self, regime: str, update_type: str, new_deltas: Dict[str, float]):
if regime not in self.strategies: return
dna = self.strategies[regime]
target_dict = dna.delta_weekly if update_type == 'weekly' else dna.delta_monthly
limit = 0.03 if update_type == 'weekly' else 0.05
for k, v in new_deltas.items():
if k in dna.base_filters or k in dna.base_guards:
target_dict[k] = max(-limit, min(limit, v))
print(f" 🗓️ [AdaptiveHub] {update_type.capitalize()} Deltas updated for {regime}.")
asyncio.create_task(self._save_state_to_r2())
self._inject_current_parameters()
# ============================================================
# 💉 INJECTION
# ============================================================
def _inject_current_parameters(self):
"""Injects Global Fallback Parameters"""
if self.current_market_regime not in self.strategies or SystemLimits is None: return
dna = self.strategies[self.current_market_regime]
final_filters = dna.get_final_filters()
final_guards = dna.get_final_guards()
mw = dna.model_weights
total_w = sum(mw.values()) or 1.0
SystemLimits.L2_WEIGHT_TITAN = mw.get("titan", 0.4) / total_w
SystemLimits.L2_WEIGHT_PATTERNS = mw.get("structure", 0.3) / total_w
SystemLimits.L1_MIN_AFFINITY_SCORE = (final_filters.get("l1_min_score", 0.65) * 100.0) - 20.0
SystemLimits.L3_CONFIDENCE_THRESHOLD = final_filters.get("l3_oracle_thresh", 0.65)
SystemLimits.L4_ENTRY_THRESHOLD = final_filters.get("l4_sniper_thresh", 0.40)
SystemLimits.CURRENT_REGIME = self.current_market_regime
SystemLimits.L4_OB_WALL_RATIO = dna.ob_settings.get("wall_ratio_limit", 0.4)
SystemLimits.HYDRA_CRASH_THRESH = final_guards.get('hydra_crash', 0.85)
SystemLimits.HYDRA_GIVEBACK_THRESH = final_guards.get('hydra_giveback', 0.70)
SystemLimits.LEGACY_V2_PANIC_THRESH = final_guards.get('legacy_v2', 0.95)
print(f" 💉 [System Updated] {self.current_market_regime} | Oracle: {SystemLimits.L3_CONFIDENCE_THRESHOLD:.3f} | Sniper: {SystemLimits.L4_ENTRY_THRESHOLD:.3f}")
def get_status(self) -> str:
dna = self.strategies.get(self.current_market_regime)
if not dna: return "Initializing..."
f = dna.get_final_filters()
return f"{self.current_market_regime} | Orc:{f.get('l3_oracle_thresh'):.2f} | Snip:{f.get('l4_sniper_thresh'):.2f}"
async def _save_state_to_r2(self):
if not self.r2: return
try:
data = {
"current_regime": self.current_market_regime,
"strategies": {k: v.to_dict() for k, v in self.strategies.items()}
}
await self.r2.upload_json_async(data, self.dna_file_key)
except Exception: pass